最近在项目中遇到一个须要用线程池来处理任务的需求,因而我用ThreadPoolExecutor
来实现,可是在实现过程当中我发现提交大量任务时它的处理逻辑是这样的(提交任务还有一个submit
方法内部也调用了execute
方法):java
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
注释中已经写的很是明白:浏览器
corePoolSize
,直接建立新线程处理任务corePoolSize
,尝试将任务放到等待队列里maximumPoolSIze > corePoolSize
)可是在个人项目中一个线程启动须要10s左右的时间(须要启动一个浏览器对象),所以我但愿实现一个更精细的逻辑提高资源的利用率:安全
corePoolSize
个线程确保有新任务到来时能够当即获得执行threshold
时,说明堆积的任务已经太多了,这个时候开始建立非核心线程直到线程数量已经等于maximumPoolSize
maximumPoolSize
,再将新来的任务放回到任务队列中等待(直到队列满后开始拒绝任务)当我研究了常见的CachedThreadPool
、FixedThreadPool
以及尝试本身配置ThreadPoolExecutor
的构造函数后,发现不管如何都不能实现上面提到的逻辑,由于默认的实现只有在workQueue
达到容量上限后才会开始建立非核心线程,所以须要经过继承的方法实现一个新的类来完成需求。多线程
怎么实如今workQueue
到达容量上限前就建立非核心线程?还要回顾下execute
函数的代码ide
//尝试将任务插入等待队列,若是返回false //说明队列已经到达容量上限,进入else if逻辑 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //尝试建立非核心线程 else if (!addWorker(command, false)) reject(command);
那么只要改变workQueue.offer()
的逻辑,在线程数量还小于maximumPoolSize
的时候就返回false拒绝插入,让线程池调用addWoker
,等不能再建立更多线程时再容许添加到队列便可。函数
能够经过子类重写offer
方法来实现添加逻辑的改变oop
@Override public boolean offer(E e) { if (threadPoolExecutor == null) { throw new NullPointerException(); } //当调用该方法时,已经肯定了workerCountOf(c) > corePoolSize //当数量小于threshold,在队列里等待 if (size() < threshold) { return super.offer(e); //当数量大于等于threshold,说明堆积的任务太多,返回false //让线程池来建立新线程处理 } else { //此处可能会由于多线程致使错误的拒绝 if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) { return false; //线程池中的线程数量已经到达上限,只能添加到任务队列中 } else { return super.offer(e); } } }
这样就实现了基本实现了我须要的功能,可是在写代码的过程当中我找到了一个可能出错的地方:ThreadPoolExecutor
是线程安全的,那么重写的offer
方法也可能遇到多线程调用的状况测试
//设想当poolSize = maximumPoolSize-1时,两个任务到达此处同时返回false if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) { return false; }
因为添加到队列返回false
,execute
方法进入到else if (!addWorker(command, false))
ui
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //添加到队列失败后进入addWorker方法中 else if (!addWorker(command, false)) reject(command); }
再来看一下addWorker
方法的代码,这里只截取须要的一部分this
for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || //两个线程都认为还能够建立再建立一个新线程 wc >= (core ? corePoolSize : maximumPoolSize)) return false; //两个线程同时调用cas方法只有一个可以成功 //成功的线程break retry;进入后面的建立线程的逻辑 //失败的线程从新回到上面的检查并返回false if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop }
最终,在竞争中失败的线程因为addWorker
方法返回了false
最终调用了reject(command)
。在前面写的要实现的逻辑里提到了,只有在等待队列容量达到上限没法再插入时才拒绝任务,可是因为多线程的缘由,这里只是超过了threshold
但没有超过capacity
的时候就拒绝任务了,因此要对拒绝策略的触发作出修改:第一次触发Reject
时,尝试从新添加到任务队列中(不进行poolSize
的检测),若是仍然不能添加,再拒绝任务。
这里经过对execute
方法进行重写来实现重试
@Override public void execute(Runnable command) { try { super.execute(command); } catch (RejectedExecutionException e) { /* 这里参考源码中将任务添加到任务队列的实现 可是其中经过(workerCountOf(recheck) == 0) 检查当任务添加到队列后是否还有线程存活的部分 因为是private权限的,没法实现相似的逻辑,所以须要作必定的特殊处理 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } */ if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) { if (this.isShutdown() && remove(command)) //二次检查 realRejectedExecutionHandler.rejectedExecution(command, this); } else { //插入失败,队列已经满了 realRejectedExecutionHandler.rejectedExecution(command, this); } } } }
这里有两个小问题:
RejectedExecutionHandler
不必定会抛出异常(事实上,ThreadPoolExecutor
本身实现的4中拒绝策略中只有AbortPolicy
可以抛出异常并被捕捉到),所以须要在初始化父类时传入AbortPolicy
拒绝策略并将构造函数中传入的自定义拒绝策略保存下来,在重试失败后才调用本身的rejectedExecution
。corePoolSize = 0
的极端状况下,可能出现一个任务刚被插入队列的同时,全部的线程都结束任务而后被销毁了,此使这个被加入的任务就没法被执行,在ThreadPoolExecutor
中是经过else if (workerCountOf(recheck) == 0) addWorker(null, false);在添加后再检查工做线程是否为0来确保任务能够被执行,可是其中使用的方法是私有的,没法在子类中实现相似的逻辑,所以在初始化时只能强制
corePoolSize
至少为1来解决这个问题。所有代码以下
public class MyThreadPool extends ThreadPoolExecutor { private RejectedExecutionHandler realRejectedExecutionHandler; public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int queueCapacity) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, queueCapacity, new AbortPolicy()); } public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int queueCapacity, RejectedExecutionHandler handler) { super(corePoolSize == 0 ? 1 : corePoolSize, maximumPoolSize, keepAliveTime, unit, new MyLinkedBlockingQueue<>(queueCapacity), new AbortPolicy()); ((MyLinkedBlockingQueue)this.getQueue()).setThreadPoolExecutor(this); realRejectedExecutionHandler = handler; } @Override public void execute(Runnable command) { try { super.execute(command); } catch (RejectedExecutionException e) { if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) { if (this.isShutdown() && remove(command)) //二次检查 realRejectedExecutionHandler.rejectedExecution(command, this); } else { //插入失败,队列已经满了 realRejectedExecutionHandler.rejectedExecution(command, this); } } } } public class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> { private int threshold = 20; private ThreadPoolExecutor threadPoolExecutor = null; public MyLinkedBlockingQueue(int queueCapacity) { super(queueCapacity); } public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) { this.threadPoolExecutor = threadPoolExecutor; } @Override public boolean offer(E e) { if (threadPoolExecutor == null) { throw new NullPointerException(); } //当调用该方法时,已经肯定了workerCountOf(c) > corePoolSize //当数量小于threshold,在队列里等待 if (size() < threshold) { return super.offer(e); //当数量大于等于threshold,说明堆积的任务太多,返回false //让线程池来建立新线程处理 } else { //此处可能会由于多线程致使错误的拒绝 if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) { return false; //线程池中的线程数量已经到达上限,只能添加到任务队列中 } else { return super.offer(e); } } } public boolean offerWithoutCheck(E e) { return super.offer(e); } }
最后进行简单的测试
corePoolSize:2 maximumPoolSize:5 queueCapacity:10 threshold:7 任务2 线程数量:2 等待队列大小:0 等待队列大小小于阈值,继续等待。 任务3 线程数量:2 等待队列大小:1 等待队列大小小于阈值,继续等待。 任务4 线程数量:2 等待队列大小:2 等待队列大小小于阈值,继续等待。 任务5 线程数量:2 等待队列大小:3 等待队列大小小于阈值,继续等待。 任务6 线程数量:2 等待队列大小:4 等待队列大小小于阈值,继续等待。 任务7 线程数量:2 等待队列大小:5 等待队列大小小于阈值,继续等待。 任务8 线程数量:2 等待队列大小:6 等待队列大小小于阈值,继续等待。 任务9 线程数量:2 等待队列大小:7 等待队列大小大于等于阈值,线程数量小于MaximumPoolSize,建立新线程处理。 任务10 线程数量:3 等待队列大小:7 等待队列大小大于等于阈值,线程数量小于MaximumPoolSize,建立新线程处理。 任务11 线程数量:4 等待队列大小:7 等待队列大小大于等于阈值,线程数量小于MaximumPoolSize,建立新线程处理。 任务12 线程数量:5 等待队列大小:7 等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。 任务13 线程数量:5 等待队列大小:8 等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。 任务14 线程数量:5 等待队列大小:9 等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。 任务15 线程数量:5 等待队列大小:10 等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。 队列已满 任务16 线程数量:5 等待队列大小:10 等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。 队列已满
再从新复习一遍要实现的功能:
corePoolSize
个线程确保有新任务到来时能够当即获得执行threshold
时,说明堆积的任务已经太多了,这个时候开始建立非核心线程直到线程数量已经等于maximumPoolSize
maximumPoolSize
,再将新来的任务放回到任务队列中等待(直到队列满后开始拒绝任务)能够看出,线程池运行的逻辑和要实现的目标是相同的。