ThreadPoolExecutor 是用来处理异步任务的一个接口,能够将其理解成为一个线程池和一个任务队列,提交到 ExecutorService 对象的任务会被放入任务队或者直接被线程池中的线程执行。ThreadPoolExecutor 支持经过调整构造参数来配置不一样的处理策略,本文主要介绍经常使用的策略配置方法以及应用场景。异步
首先看一下 ThreadPoolExecutor 构造函数的定义:ide
public ThreadPoolExecutor(int corePoolSize, //线程池核心线程数量 int maximumPoolSize, //线程池最大线程数量 long keepAliveTime, //线程KeepAlive时间,当线程池数量超过核心线程数量之后,idle时间超过这个值的线程会被终止 TimeUnit unit, //线程KeepAlive时间单位 BlockingQueue<Runnable> workQueue, //任务队列 ThreadFactory threadFactory, //建立线程的工厂对象 RejectedExecutionHandler handler) //任务被拒绝后调用的handler
ThreadPoolExecutor 对线程池和队列的使用方式以下:函数
从线程池中获取可用线程执行任务,若是没有可用线程则使用ThreadFactory建立新的线程,直到线程数达到corePoolSize限制spa
线程池线程数达到corePoolSize之后,新的任务将被放入队列,直到队列不能再容纳更多的任务线程
当队列不能再容纳更多的任务之后,会建立新的线程,直到线程数达到maxinumPoolSize限制日志
线程数达到maxinumPoolSize限制之后新任务会被拒绝执行,调用 RejectedExecutionHandler 进行处理code
Executors 是提供了一组工厂方法用于建立经常使用的 ExecutorService ,分别是 FixedThreadPool,CachedThreadPool 以及 SingleThreadExecutor。这三种ThreadPoolExecutor都是调用 ThreadPoolExecutor 构造函数进行建立,区别在于参数不一样。对象
下面是 Executors 类 newFixedThreadPool 方法的源码:接口
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
能够看到 corePoolSize 和 maximumPoolSize 设置成了相同的值,此时不存在线程数量大于核心线程数量的状况,因此KeepAlive时间设置不会生效。任务队列使用的是不限制大小的 LinkedBlockingQueue ,因为是无界队列因此容纳的任务数量没有上限。队列
所以,FixedThreadPool的行为以下:
从线程池中获取可用线程执行任务,若是没有可用线程则使用ThreadFactory建立新的线程,直到线程数达到nThreads
线程池线程数达到nThreads之后,新的任务将被放入队列
FixedThreadPool的优势是可以保证全部的任务都被执行,永远不会拒绝新的任务;同时缺点是队列数量没有限制,在任务执行时间无限延长的这种极端状况下会形成内存问题。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
这个工厂方法中使用无界LinkedBlockingQueue,并的将线程数设置成1,除此之外还使用FinalizableDelegatedExecutorService类进行了包装。这个包装类的主要目的是为了屏蔽ThreadPoolExecutor中动态修改线程数量的功能,仅保留ExecutorService中提供的方法。虽然是单线程处理,一旦线程由于处理异常等缘由终止的时候,ThreadPoolExecutor会自动建立一个新的线程继续进行工做。
SingleThreadExecutor 适用于在逻辑上须要单线程处理任务的场景,同时无界的LinkedBlockingQueue保证新任务都可以放入队列,不会被拒绝;缺点和FixedThreadPool相同,当处理任务无限等待的时候会形成内存问题。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
SynchronousQueue是一个只有1个元素的队列,入队的任务须要一直等待直到队列中的元素被移出。核心线程数是0,意味着全部任务会先入队列;最大线程数是Integer.MAX_VALUE,能够认为线程数量是没有限制的。KeepAlive时间被设置成60秒,意味着在没有任务的时候线程等待60秒之后退出。CachedThreadPool对任务的处理策略是提交的任务会当即分配一个线程进行执行,线程池中线程数量会随着任务数的变化自动扩张和缩减,在任务执行时间无限延长的极端状况下会建立过多的线程。
类型 | 核心线程数 | 最大线程数 | Keep Alive 时间 | 任务队列 | 任务处理策略 |
---|---|---|---|---|---|
FixedThreadPool | 固定大小 | 固定大小(与核心线程数相同) | 0 | LinkedBlockingQueue | 线程池大小固定,没有可用线程的时候任务会放入队列等待,队列长度无限制 |
SingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue | 与 FixedThreadPool 相同,区别在于线程池的大小为1,适用于业务逻辑上只容许1个线程进行处理的场景 |
CachedThreadPool | 0 | Integer.MAX_VALUE | 1分钟 | SynchronousQueue | 线程池的数量无限大,新任务会直接分配或者建立一个线程进行执行 |
咱们也能够经过修改 ThreadPoolExecutor 的构造函数来自定义任务处理策略。例如面对的业务是将数据异步写入HBase,当HBase严重超时的时候容许写入失败并记录日志以便过后补写。对于这种应用场景,若是使用FixedThreadPool,在HBase服务严重超时的时候会致使队列无限增加,引起内存问题;若是使用CachedThreadPool,会致使线程数量无限增加。对于这种场景,咱们能够设置ExecutorService使用带有长度限制的队列以及限定最大线程个数的线程池,同时经过设置RejectedExecutionHandler处理任务被拒绝的状况。
首先定义 RejectedExecutionHandler:
public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 处理任务被拒绝的状况,例如记录日志等 } }
建立 ThreadPoolExecutor:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 10, //核心线程数设置成10 30, //线程池最大线程数为30 30, TimeUnit.SECONDS, //超过核心线程数量的线程idle 30秒以后会退出 new ArrayBlockingQueue<Runnable>(100), //队列长度为100 new MyRejectedExecutionHandler() //任务被拒绝之后的处理类 );
这样设置之后,若是任务处理函数出现长时间挂起的状况,会依次发生下列现象:
线程池线程数量达到核心线程数,向ArrayBlockingQueue中放入任务
ArrayBlockingQueue达到上限,建立新的线程进行处理
线程池中的线程数量达到30个,调用MyRejectedExecutionHandler处理新提交的任务
对于须要保证全部提交的任务都要被执行的状况,使用FixedThreadPool
若是限定只能使用一个线程进行任务处理,使用SingleThreadExecutor
若是但愿提交的任务尽快分配线程执行,使用CachedThreadPool
若是业务上容许任务执行失败,或者任务执行过程可能出现执行时间过长进而影响其余业务的应用场景,能够经过使用限定线程数量的线程池以及限定长度的队列进行容错处理。