原文地址:https://www.xilidou.com/2018/02/09/thread-corepoolsize/java
最近在看《Java并发编程的艺术》回顾线程池的原理和参数的时候发现一个问题,若是 corePoolSize = 0 且 阻塞队列是无界的。线程池将如何工做?编程
咱们先回顾一下书里面描述线程池execute()
工做的逻辑:微信
看了这四个步骤,其实描述上是有一个漏洞的。若是核心线程数是0,阻塞队列也是无界的,会怎样?若是按照上文的逻辑,应该没有线程会被运行,而后线程无限的增长到队列里面。而后呢?并发
因而我作了一下试验看看到底会怎样?oop
public class threadTest { private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(); while (true) { executor.execute(() -> { System.out.println(atomicInteger.getAndAdd(1)); }); } } }
结果里面的System.out.println(atomicInteger.getAndAdd(1));
语句执行了,与上面的描述矛盾了。到底发生了什么?线程池建立线程的逻辑是什么?咱们仍是从源码来看看到底线程池的逻辑是什么?学习
要了解线程池,咱们首先要了解的线程池里面的状态控制的参数 ctl。ui
这个ctl包含两个参数 :this
它的低29位用于存放当前的线程数, 所以一个线程池在理论上最大的线程数是 536870911; 高 3 位是用于表示当前线程池的状态, 其中高三位的值和状态对应以下:atom
为了可以使用 ctl 线程池提供了三个方法:spa
// Packing and unpacking ctl // 获取线程池的状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取线程池的工做线程数 private static int workerCountOf(int c) { return c & CAPACITY; } // 根据工做线程数和线程池状态获取 ctl private static int ctlOf(int rs, int wc) { return rs | wc; }
外界经过 execute 这个方法来向线程池提交任务。
先看代码:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //若是工做线程数小于核心线程数, if (workerCountOf(c) < corePoolSize) { //执行addWork,提交为核心线程,提交成功return。提交失败从新获取ctl if (addWorker(command, true)) return; c = ctl.get(); } //若是工做线程数大于核心线程数,则检查线程池状态是不是正在运行,且将新线程向阻塞队列提交。 if (isRunning(c) && workQueue.offer(command)) { //recheck 须要再次检查,主要目的是判断加入到阻塞队里中的线程是否能够被执行 int recheck = ctl.get(); //若是线程池状态不为running,将任务从阻塞队列里面移除,启用拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 若是线程池的工做线程为零,则调用addWoker提交任务 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //添加非核心线程失败,拒绝 else if (!addWorker(command, false)) reject(command); }
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); //获取线程池状态 int rs = runStateOf(c); // Check if queue empty only if necessary. // 判断是否能够添加任务。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //获取工做线程数量 int wc = workerCountOf(c); //是否大于线程池上限,是否大于核心线程数,或者最大线程数 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS 增长工做线程数 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //若是线程池状态改变,回到开始从新来 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; //上面的逻辑是考虑是否可以添加线程,若是能够就cas的增长工做线程数量 //下面正式启动线程 try { //新建worker w = new Worker(firstTask); //获取当前线程 final Thread t = w.thread; if (t != null) { //获取可重入锁 final ReentrantLock mainLock = this.mainLock; //锁住 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // rs < SHUTDOWN ==> 线程处于RUNNING状态 // 或者线程处于SHUTDOWN状态,且firstTask == null(多是workQueue中仍有未执行完成的任务,建立没有初始任务的worker线程执行) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 当前线程已经启动,抛出异常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //workers 是一个 HashSet 必须在 lock的状况下操做。 workers.add(w); int s = workers.size(); //设置 largeestPoolSize 标记workAdded if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //若是添加成功,启动线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //启动线程失败,回滚。 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
先看看 addWork()
的两个参数,第一个是须要提交的线程 Runnable firstTask,第二个参数是 boolean 类型,表示是否为核心线程。
execute() 中有三处调用了 addWork()
咱们逐一分析。
if (workerCountOf(c) < corePoolSize)
这个很好理解,工做线程数少于核心线程数,提交任务。因此 addWorker(command, true)
。workerCountOf(recheck) == 0
若是worker的数量为0,那就 addWorker(null,false)
。为何这里是 null
?以前已经把 command 提交到阻塞队列了 workQueue.offer(command)
。因此提交一个空线程,直接从阻塞队列里面取就能够了。addWorker(command,false)
,很好理解,对应的就是,阻塞队列满了,将任务提交到,非核心线程池。与最大线程池比较。至此,从新概括execute()
的逻辑应该是:
回顾我开始提出的问题:
若是 corePoolSize = 0 且 阻塞队列是无界的。线程池将如何工做?
这个问题应该就不难回答了。
《Java并发编程的艺术》是一本学习 java 并发编程的好书,在这里推荐给你们。
同时,但愿你们在阅读技术数据的时候要仔细思考,结合源码,发现,提出问题,解决问题。这样的学习才能高效且透彻。
欢迎关注个人微信公众号