多线程的设计方法确实能够最大限度的发挥多核处理器的计算能力,提升吞吐量和性能。可是若是不加控制随意使用线程,对系统的性能反而会产生不利。java
和进程相比,线程虽然是一种轻量级的,可是建立和关闭依然须要花费时间,若是每个小的任务都建立一个线程,则会颇有可能出现建立和销毁线程占用的时间大于该线程任务所消耗的时间。其次线程自己也是须要占用内存空间的,大量的线程会抢占宝贵的内存资源。编程
所以线程的使用须要掌握一个度,再有限的范围内增长线程的数量能够提升系统的性能,一旦超过这个范围,大量的线程只会拖垮整个系统。bash
为了不系统频繁的建立和销毁线程,咱们可让建立的线程复用。咱们可使用一个线程池维护一些线程,当你须要使用线程的时候,能够从池子中随便拿一个空闲线程,当完成工做时,并不急着关闭线程,而是将这些线程退回到线程池中,方便下次使用。多线程
简而言之,再使用线程池后,建立线程编程了从线程池中得到空闲线程,关闭线程变为想线程池归还线程。并发
线程池的成员都在java.util.concurrent包中,是JDK并发包的核心。其中ThreadPoolExecutor表示一个线程池。Executors类则是一个线程工厂的角色,经过Executors能够取得一个拥有特定功能的线程池,经过Executors能够取得一个特定功能的线程池。ide
该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中如有空闲线程,则当即执行。若没有则新的任务会暂存在一个任务队列中,待有线程空闲时,便处理任务队列中的队列。函数
该方法返回一个只有一个线程的线程池。如有多余的任务被提交到线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。性能
该方法返回一个可根据实际状况调整线程数量的线程池。线程池的线程数量不肯定,但如有空闲线程能够复用,则会优先使用可复用的线程。若全部的线程都在工做,又有新的任务提交,则会建立新的线程处理任务。全部线程在当前任务执行完毕后,将返回线程池进行复用。ui
该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口上扩展了在给定时间执行某任务的功能,如在某个固定的延时后执行,或者周期性执行某个任务。this
该方法会返回一个ScheduledExecutorService对象,但该线程池能够执行线程数量。
建立固定大小的线程池
public class ThreadPoolThread {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID: " + Thread.currentThread().getId());
try{
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
MyTask myTask = new MyTask();
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0;i< 10 ;i++){
executorService.submit(myTask);
}
}
}
}
复制代码
1562554721820:Thread ID: 12
1562554721820:Thread ID: 15
1562554721820:Thread ID: 16
1562554721820:Thread ID: 13
1562554721820:Thread ID: 14
1562554722821:Thread ID: 15
1562554722821:Thread ID: 16
1562554722821:Thread ID: 12
1562554722821:Thread ID: 13
1562554722821:Thread ID: 14
复制代码
计划执行任务
newScheduledThreadPool()方法返回一个ScheduledExecutorService对象,能够根据时间须要对线程进行调度。主要方法以下
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
复制代码
和其余线程不一样,ScheduledExecutorService不必定会当即安排执行任务。他实际上是起到了计划任务的做用,会在指定的时间对任务进行调度。
schedule()会在给定时间对任务进行一次调度。scheduleAtFixedRate()和scheduleWithFixedDelay()方法会对任务进行周期性调度,可是两者仍是有区别的。scheduleAtFixedRate()方法的任务调度频率是必定的,它是以上一个任务开始执行的时间为起点,再在规定的时间调度下一次任务。而scheduleWithFixedDelay()方法是以上一个任务的结束后再通过规定时间进行任务调度。
public class ScheduleExecutorServiceDemo {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},0,2, TimeUnit.SECONDS);
}
}
复制代码
1562555518798
1562555520798
1562555522798
1562555524799
1562555526800
复制代码
能够看出任务每两秒被调度一次。
若是任务的执行时间大于调度时间,则任务就会在上一个任务结束后当即被调用。
将代码修改成8秒
Thread.sleep(8000);
复制代码
1562555680333
1562555688333
1562555696333
1562555704333
复制代码
调度程序实际上并不保证任务会无限期的持续调用,若是任务自己抛出异常,那么后续全部执行都会中断。
对于几个核心的线程池,虽然看着建立的线程池有着不一样的功能特色,可是其内部都是使用了ThreadPoolExecutor类。
看几个线程池的建立源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
复制代码
能够看出他们都是ThreadPoolExecutor类的封装,来看一下ThreadPoolExecutor类的构造:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
参数workQueue指被提交但未执行的任务队列,他是一个BlockingQueue接口的对象,仅用于存放Runnable对象。在ThreadPoolExecutor构造中可使用一下几种BlockingQueue接口:
拒绝策略是当任务数量超过系统实际承载能力时执行的策略。拒绝策略能够说时系统超负荷运行时的补救措施。
JDK内置了四种拒绝策略:
上面的策略都实现了RejectedExecutionHandler接口,若是以上策略没法知足实际开发,能够本身扩展。
RejectedExecutionHandler接口构造:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
复制代码
自定义拒绝策略:
//拒绝策略demo
public class RejectThreadPoolDemo {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ": Thread ID : " + Thread.currentThread().getId());
try{
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTask myTask = new MyTask();
ThreadPoolExecutor es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.privilegedThreadFactory(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "被拒绝");
}
});
for (int i = 0;i<Integer.MAX_VALUE;i++){
es.submit(myTask);
Thread.sleep(10);
}
}
}
复制代码
1562575292467: Thread ID : 14
1562575292478: Thread ID : 15
1562575292489: Thread ID : 16
java.util.concurrent.FutureTask@b4c966a被拒绝
java.util.concurrent.FutureTask@2f4d3709被拒绝
java.util.concurrent.FutureTask@4e50df2e被拒绝
复制代码
ThreadFactory是一个接口,他只有一个用来建立线程的方法。
Thread newThread(Runnable r);
复制代码
经过自定义线程建立咱们能够跟踪线程池在什么时候建立了多少线程,自定义线程名等。
public class ThreadFactoryDemo {
static volatile int i = 0;
public static class TestTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
public static void main(String[] args) {
TestTask testTask = new TestTask();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,"test--" + i);
i++;
return thread;
}
});
for (int i = 0;i<5;i++){
threadPoolExecutor.submit(testTask);
}
}
}
复制代码
test--0
test--1
test--4
test--2
test--3
复制代码
虽然JDK已经帮咱们实现了稳定的线程池,但若是咱们想要对线程池进行一些扩展,好比监控任务执行的开始和结束时间怎么办呢。
ThreadPoolExecutor是一个能够扩展的线程池,它提供了beforExecutor(),afterExecutor()和terminated()三个接口来对其进行扩展。
public class ThreadFactoryDemo {
static volatile int i = 0;
public static class TestTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
public static void main(String[] args) {
TestTask testTask = new TestTask();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,"test--" + i);
i++;
return thread;
}
}){
@Override
protected void beforeExecute(Thread t,Runnable r){
System.out.println("task-----准备执行");
}
};
for (int i = 0;i<5;i++){
threadPoolExecutor.submit(testTask);
}
}
}
复制代码
task-----准备执行
task-----准备执行
test--2
task-----准备执行
test--1
task-----准备执行
test--4
task-----准备执行
test--3
test--0
复制代码
execute提交的方式只能提交一个Runnable的对象,且该方法的返回值是void,也便是提交后若是线程运行后,和主线程就脱离了关系了,固然能够设置一些变量来获取到线程的运行结果。而且当线程的执行过程当中抛出了异常一般来讲主线程也没法获取到异常的信息的,只有经过ThreadFactory主动设置线程的异常处理类才能感知到提交的线程中的异常。
sumbit()方法有三种形式:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
复制代码
sumbit方法会返回一个Future对象,这个Future对象表明这线程的执行结果,当主线程调用Future的get方法的时候会获取到从线程中返回的结果数据。若是在线程的执行过程当中发生了异常,get会获取到异常的信息。