先前,笔者讲解到ThreadPoolExecutor.addWorker(Runnable firstTask, boolean core),在这个方法中工做线程可能建立成功,也可能建立失败,具体视线程池的边界条件,以及当前内存状况而定。java
那么,若是线程池当前的状态,是容许建立Worker对象的,那么建立Worker的内部流程又是怎样呢?线程池为什么要使用Worker包装Thread来建立一个线程,为什么不直接使用原生的Thread来建立线程?若是建立Worker的firstTask不为空,那么Worker理所固然应该优先执行firstTask任务,若是firstTask为空,那Worker又要如何获取任务来执行呢?咱们还有一堆亟待解决的问题。程序员
首先咱们来解决前两个问题,Worker的建立流程,以及为何不使用原生Thread代替Worker?首先,Doug Lea用Worker包装Thread,意味着Worker比Thread拥有更多的功能。例如:Worker会统计它所对应的线程执行了多少任务、经过Worker能够知道线程是否已启动、线程是否正在执行任务?而这些信息都是原生Thread所没有的,因此须要一个Worker类来扩展Thread。安全
在建立Worker时,会先设置其state的值为-1,表明Worker所对应的线程还没有启动,即尚未调用Worker.thread.start(),以后会进行firstTask的赋值,向线程工厂申请建立线程,建立完毕后,等待外部调用Worker.thread.start()启动一个线程执行Worker.run()方法。多线程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /* * 当初始化一个Worker时,会向线程工厂申请建立一个Thread对象 * 用来执行任务,为null表明线程工厂建立失败。 */ final Thread thread; //首要执行任务,该字段可能为null。 Runnable firstTask; //thread已完成任务数。 volatile long completedTasks; Worker(Runnable firstTask) { /* * state初始为-1,表明还未调用Worker.thread.start(), * Worker对应的线程还没有被建立,还不能中断。线程启动后, * 若是线程正在执行任务,state为1,若是线程启动后没在 * 执行任务的状态则state为0。 */ setState(-1);//<1> this.firstTask = firstTask; /* * 建立Thread对象的时候,会把Worker对象自己传入,而Worker * 自己实现了Runnable接口,当调用thead.start()启动一个线程 * 执行thread.run()时,会进而调用Worker.run()方法。 */ this.thread = getThreadFactory().newThread(this); } //Worker对象将任务的执行委托给ThreadPoolExecutor.runWorker(Worker w). public void run() { runWorker(this); } //判断Worker是否处于被某个线程持有状态。 protected boolean isHeldExclusively() { return getState() != 0;//<2> } /* * 线程尝试持有worker对象,若是worker没有被某个线程 * 持有,则state为0,则用CAS的方式将worker的state * 改成1,并设置exclusiveOwnerThread为当前线程。 */ protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) {//<3> setExclusiveOwnerThread(Thread.currentThread());//<4> return true; } return false; } /* * 线程释放worker对象,将state改成0,exclusiveOwnerThread * 改成null。 */ protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } /* * 调用父类acquire(int arg)时,会进而调用到Worker自己实现的 * tryAcquire(int unused)。 */ public void lock() { acquire(1);//<5> } public boolean tryLock() { return tryAcquire(1); } /* * 调用父类的release(int arg)时,会进而调用到Worker自己实现的 * tryRelease(int unused)。 */ public void unlock() { release(1);//<6> } public boolean isLocked() { return isHeldExclusively(); } /* * 尝试中断worker的对应线程,若是线程已经启动。建立 * worker时,state为-1,直到调用worker.thread.start() * 后,worker的state为0,若是worker的state>=0,则尝试 * 中断线程。 */ void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
从上面的代码咱们能够注意到,<1>、<2>、<3>、<4>、<5>、<6>处的方法并非Worker自己有的方法,而是Worker继承自父类AbstractQueuedSynchronizer的方法。那么Worker为何须要继承AbstractQueuedSynchronizer(AQS)?AQS又是何方神圣呢?并发
这里先简单介绍下AQS,它定义了若干接口交由程序员实现,诸如:lock()、unlock()、tryAcquire(int arg)、tryRelease(int arg)……等,以保证多个线程不会同时访问同一资源。ThreadPoolExecutor中的字段mainLock为可重入锁ReentrantLock,某种程度上来讲也是实现了AQS,ThreadPoolExecutor经过mainLock的lock()、unlock()以保证线程池内一些非线程安全的对象不会出现并发读写,如:workers、completedTaskCount……等。函数
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {//...} static final class NonfairSync extends Sync {//...} public ReentrantLock() { sync = new NonfairSync(); } //... }
那么Worker继承了AQS,Worker也实现了lock()、unlock()方法,说明Worker自己也存在多线程访问的可能,那是何时会出现多线程访问Worker呢?这里咱们先按下这个问题,在介绍完Worker.run()以后你就会明白为什么Worker要继承AQS以保证线程访问的顺序性。ui
咱们知道当调用Worker.thread.start()方法时,会进而调用Worker.run()方法,而Worker.run()方法会进而调用ThreadPoolExecutor.runWorker(Worker w)。下面,咱们来看看runWorker(Worker w)的执行流程:this
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; /* * 将worker.firstTask设置为null,由于worker可能会存在较长的一段时间, * 而task可能很快执行完毕,避免worker长时间引用已完成task,以便GC回收 * 已完成task。 */ w.firstTask = null; /* * 新建立的worker对象state为-1,调用worker.unlock()会进而调用 * worker.tryRelease(int unused),将state设置为0,表明worker * 对应工做线程已启动,线程处于可中断状态。 */ w.unlock(); // allow interrupts /* * 标志worker是否异常完成,若是在<1>处task为空,且没法经过 * getTask()从任务队列中获取新的任务,则会跳出循环,并在<3>处 * 赋值为false,表明worker并无异常完成。 * 无论worker是正常完成仍是异常完成,最后都会将completedAbruptly的结果 * 传给processWorkerExit(...),若是是worker是正常退出,则将workerCount-1, * 并将worker从workers集合中移除。若是是异常退出,则不减小workerCount,仅仅 * 是将异常worker从workers集合中移除,并尝试新增一个worker。 */ boolean completedAbruptly = true; try { /* * 若是task不为空,或者调用getTask()能任务队列中获取到新的任务, * 则进入while块的代码。若是任务队列中没有待执行的任务,调用getTask() * 会让当前线程陷入阻塞,直到超时或者有新任务进入任务队列。 */ while (task != null || (task = getTask()) != null) {//<1> /* * 若是能进入循环,表明worker准备开始执行任务,但在执行任务 * 前会先上锁,等到任务执行结束又会在<2>处释放锁,然而线程池 * 又不会让多个线程同时执行同一个任务,那么为何在执行任务前 * 要让worker先上锁,执行完毕再释放锁呢? * 咱们假设有一个线程池有5个线程,其中A、B线程正在执行任务, * C、D、E处于空闲状态。因此咱们能肯定A、B两个worker已经 * 得到了锁,而C、D、E还阻塞在getTask()方法中。如今线程池 * 执行shutdown()方法,该方法会进而调用interruptIdleWorkers() * 中断处于空闲状态的工做线程。而在interruptIdleWorkers()方法中 * 判断一个worker是否处于空闲,会调用worker.tryLock(),若是能 * 成功获取到锁,则表明该worker处于空闲状态,则中断该worker对应的 * 线程。所以,一个worker是有可能被多个线程访问的,好比worker自己 * 对应的线程,又或者关闭线程池的线程。 */ w.lock(); /* * 若是线程池的运行状态>=STOP,则中断当前线程。若是运行状态<STOP, * 则确保线程没有被中断。 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //空方法,在执行任务前执行 beforeExecute(wt, task); try { task.run();//开始执行任务 afterExecute(task, null);//空方法,在执行任务完毕后执行。 } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { /* * 设置当前任务为空,这样就能够在下一次循环中获取新的任务。 * 对worker执行的任务数+1,并释放锁。 */ task = null; w.completedTasks++; w.unlock();//<2> } } completedAbruptly = false;//<3> } finally { processWorkerExit(w, completedAbruptly); } }
在上面的runWorker(Worker w)中若是执行完worker的首要任务,或者首要任务为null,便会调用getTask()尝试从任务队列中获取任务,但调用getTask()可能会使当前线程陷入等待或者阻塞直到有任务入队。getTask()也有可能返回null致使当前worker对应的线程退出,有如下几个缘由可能致使工做线程退出:线程
private Runnable getTask() { //超时标志,默认为false,获取任务若是超时则会在<5>赋值为true。 boolean timedOut = false; for (; ; ) { int c = ctl.get(); /* * 若是线程池处于RUNNING状态,则runStateAtLeast(c, SHUTDOWN) * 为false,不会再判断以后的逻辑为true或者false。 * 若是线程池处于SHUTDOWN状态,且workQueue.isEmpty()为true,即 * 任务队列为空,则直接返回。若是任务队列不为空,则没法进入if分支, * 依然要返回任务,按照SHUTDOWN的要求再也不接受新任务,但仍要处理队列 * 中的任务。 * 若是线程池处于STOP状态,即使任务队列不为空,也再也不处理,则直接进入 * if分支后返回。 * 因此总结一下,只有两种状况不会进入此分支: * 1.线程池处于RUNNING状态。 * 2.线程池处于SHUTDOWN状态且任务队列不为空。 */ if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {//<1> decrementWorkerCount(); return null; } int wc = workerCountOf(c); /* * timed决定是若是任务队列没有任务的话,是以无限期的方式 * 等待任务入队,或者一旦等待时间超过keepAliveTime,则 * 返回null。 * 若是allowCoreThreadTimeOut为true,则核心线程等待 * 任务时间超过keepAliveTime后会被回收。 * 若是当前工做线程数量workerCount大于核心线程数,也会在 * 线程等待任务超过keepAliveTime后回收线程。 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * 咱们先看进入此分支后会作的事,再分析如何进入这个分支。进入 * 此分支后,会用CAS减小workerCount的数量,成功则返回null。 * 不然continue从新开始新一轮的for循环。 * 如今,咱们来分析下进入此分支的逻辑: * 首先是wc > maximumPoolSize,通常workerCount不会大于 * maximumPoolSize,除非线程池运行期间经过 * setMaximumPoolSize(int maximumPoolSize)将线程池 * 的最大线程数改小。 * (timed && timedOut)在第一轮for循环永远为false,由于 * timedOut要为true的条件,首先是timed为true,即线程池内存在空闲后 * 可回收的线程,不论是线程池容许回收核心线程,或者线程数大于核心线程数。 * 只有在<4>获取任务超时后workQueue返回null,才有可能到达<5>处将 * timedOut赋值为true,而且开始新一轮的循环。 * 以后的两个判断wc>1和workQueue.isEmpty(),判断队列为空还好理解, * 为何要判断wc>1?首先咱们要知道maximumPoolSize必须大于等于1,当咱们 * 往线程池传入的maximumPoolSize<=0会抛出异常。其次,若是咱们将<2>处改成: * (wc >= 1 || workQueue.isEmpty()),有可能出现线程池内只有一个线程, * 但任务队列不为null。依旧会进入此分支内部执行<3>的代码以CAS的方式对 * workerCount-1,从而出现一个尴尬的状况,任务队列中有任务,但工做线程数 * 为0。因此<2>处必须保证wc>1。 * 思考一种状况:假设一个线程池核心线程数为3,最大线程数为5, * allowCoreThreadTimeOut为false,线程池当前工做线程数量也为5,5个线程 * 同时完成任务,并执行getTask()获取任务,可想而知timed为true,由于工做 * 线程数(5)大于核心线程数(3),5个工做线程都是调用<4>处workQueue.poll(...) * 等待任务,超时则返回null。若是超时时间到达,3个核心线程如何从新进入等待状态, * 剩余2个线程如何被回收? * 当5个线程超时返回后,会将timedOut赋值为true,而后从新开始新一轮的for循环,一直 * 执行到此分支,此时(timed && timedOut)都为true,队列也都为null,因此5个线程会 * 进入此分支。用CAS成功对workerCount-1的线程将被回收,失败的线程则continue又开始 * 新一轮的for循环,直到wc<=corePoolSize,timed为false,最后剩余的工做线程数调用 * workQueue.take()无限期地等待任务的到来,除非线程被中断。 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {//<2> if (compareAndDecrementWorkerCount(c))//<3> return null; continue; } try { /* * workQueue.poll(...)和workQueue.take()都有可能使当前线程陷入 * 等待,直到返回任务,只不过前者相比后者多了一个超时时间,到达超时时间 * 若是有任务入队,则r不为null,直接返回任务。 * 线程等待期间,若是线程被中断,则会抛出InterruptedException异常。 * 通常关闭线程池时,会尝试中断空闲线程,而处于等待任务的空闲线程会跳到<6>处, * 从新开始新一轮的for循环,而且在<1>处判断线程池处于SHUTDOWN或者STOP, * 从而退出。 */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://<4> workQueue.take(); if (r != null) return r; /* * 只有等待任务超时控制流才会执行到这里,将超时标志赋值为true, * 从新开始新一轮的for循环。 */ timedOut = true;//<5> } catch (InterruptedException retry) { timedOut = false;//<6> } } }
在runWorker(Worker w)中有两种方式能够进到下面的处理线程退出processWorkerExit(Worker w, boolean completedAbruptly)方法:对象
上面两种方式传递给processWorkerExit(...)的completedAbruptly是不一样的,第一个方式传入的completedAbruptly为true,第二个方式为false,虽然worker是同一个。那么当completedAbruptly为true或者false,processWorkerExit(...)的流程又是怎么走的呢?
private void processWorkerExit(Worker w, boolean completedAbruptly) { /* * 从runWorker(Worker w)传递而来的变量,标志worker是否意外完成, * 当worker执行任务时抛出异常,该变量为true。若是是意外完成,则代表 * workerCount还没有-1。若是worker获取任务超时从而要让线程被回收,在 * getTask()方法中会对workerCount-1。 */ if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /* * 获取可重入锁后,将worker已完成的任务数加到线程池已完成任务数, * 并将worker从workers集合中移除。 */ completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //尝试终止线程。 tryTerminate(); int c = ctl.get(); //若是线程池运行状态处于RUNNING或SHUTDOWN,则进入此分支。 if (runStateLessThan(c, STOP)) { //若是线程是正常退出,则进入此分支 if (!completedAbruptly) { /* * min为线程池建立核心线程后,容许最小的核心线程数,若是 * allowCoreThreadTimeOut为true则表明核心线程能够被回收, * 则min为0,不然min为核心线程数量。 * 若是线程池容许回收线程,且队列不为空,则判断在移除当前worker * 后,线程池工做线程的数量是否还大于等于1,避免出现队列里有任务 * 但没有线程执行的状况,若是工做线程数大于等于1,则退出线程线程。 * 不然调用addWorker(Runnable firstTask, boolean core) * 尝试增长新的线程。 */ int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
最后,咱们还须要介绍关闭线程池以后作的操做。关闭线程池会修改线程池运行状态,在advanceRunState(int targetState)会使用CAS自旋的方式,将线程池状态修改成SHUTDOWN。以后调用interruptIdleWorkers()中断空闲线程,这里咱们看到中断的时候调用worker的tryLock()和unlock()。Worker之因此继承AQS就是为了方便区分哪些worker正在执行任务,哪些worker处于空闲中,以便在关闭线程池时中断全部空闲的worker。
/** * 调用此方法后再也不接受新任务,但会执行现有队列任务。 * 若是调用此方法前该方法已被调用,则不会有任何效果。 * 该方法不会等待全部任务完成,须要调用: * awaitTermination(long timeout, TimeUnit unit) */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN);//设置线程池状态为SHUTDOWN interruptIdleWorkers();//中断空闲线程 onShutdown(); //钩子方法,按用户须要实现。 } finally { mainLock.unlock(); } /* * 尝试终止线程池,可能线程池有任务在执行,当前线程终止失败。 * 但随着工做线程逐个退出,最后一个工做线程将成功终止线程池。 */ tryTerminate(); } private void advanceRunState(int targetState) { for (; ; ) { int c = ctl.get(); /* * 若是线程池运行状态>=targetState,则 * runStateAtLeast(c, targetState)为true直接退出。 * 不然调用ctlOf(int rs, int wc),根据运行状态和工做 * 线程数生成的值以CAS自旋的方式set进ctl。 */ if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } //调用shutdown()会进而调用此方法,中断空闲线程。 private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; /* * 若是线程未被中断,则尝试获取worker的锁,若是能 * 成功获取,表明worker线程处于空闲中。 */ if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } //若是onlyOne为true表明最多只中断一个线程 if (onlyOne) break; } } finally { mainLock.unlock(); } }
在中断空闲线程后shutdown()还会调用tryTerminate(),若是查看tryTerminate()的引用,能够发现不仅仅shutdown()有调用,像addWorkerFailed(Worker w)、processWorkerExit(...)……都有调用。这个方法旨在终止线程池,若是当前有线程关闭了线程池,线程池若是没有存活线程,或者线程都处于空闲状态,天然而然执行尝试终止线程池方法;若是关闭线程池时,线程池仍然有线程处于执行任务状态,没法终止线程池,就要靠这些工做线程在退出时终止线程池。
final void tryTerminate() { for (; ; ) { int c = ctl.get(); /* * 若是线程池运行状态处于RUNNING、TIDYING则退出, * 或者运行状态小于STOP(即处于RUNNING、SHUTDOWN) * 且队列不为空,则退出。 * 总结一下,只有当运行状态处于STOP时或者状态处于SHUTDOWN * 但队列为空,才不会进此分支。 */ if (isRunning(c) || runStateAtLeast(c, TIDYING) ||//<1> (runStateLessThan(c, STOP) && !workQueue.isEmpty())) return; //若是工做线程数不为0,则尝试最多中断一个空闲线程后退出。 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } /* * 在processWorkerExit(...)方法中若是worker是 * 异常退出会对workerCount-1,若是是正常退出,则 * workerCount在getTask()中-1。以后在processWorkerExit(...) * 中移除worker。 * 不论worker是正常退出仍是异常退出,终归workerCount会慢慢回到0。 * 而processWorkerExit(...)中在对workerCount-1后,还会调用 * tryTerminate()。所以一个被关闭的线程池,它的最后一个线程,会 * 执行到此处,处理终止线程池的工做。 */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /* * 用CAS设置线程池运行状态为TIDYING,可能存在多个线程同时 * 调用shutdown()后并依次执行到这一步(由于要得到可重入锁), * 但只有一个线程能够CAS成功,其余线程CAS失败后返回<1>处后 * 判断运行状态>=TIDYING则退出。 */ if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //空函数,具体由用户实现,主要用于终止线程池后的操做。 terminated(); } finally { //最后设置线程池的状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } } }