threadpoolexecutor源码分析
文章转自:threadpoolexecutor源码分析
前段时间学习java.util.concurrent的源码,学习线程池这一块的时候发现了一篇不错的文章,就记录下来。同时,文章之中加入了本身的一些看法。废话很少说,直接开始。html
ThreadPoolExecutor做为Java.util.concurrent包中核心的类,先看下类型的结构: java
核心的接口实际上是Executor,它只有一个execute方法抽象为对任务(Runnable接口)的执行, ExecutorService接口在Executor的基础上提供了对任务执行的生命周期的管理,主要是submit和shutdown方法, AbstractExecutorService对ExecutorService一些方法作了默认的实现,主要是submit和invoke方法,而真正的任务执行 的Executor接口execute方法是由子类实现,就是ThreadPoolExecutor,它实现了基于线程池的任务执行框架,因此要了解 JDK的线程池,那么就得先看这个类。并发
再看execute方法以前须要先介几个变量或类。框架
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
这个变量是整个类的核心,AtomicInteger保证了对这个变量的操做是原子的,经过巧妙的操做,ThreadPoolExecutor用这一个变量保存了两个内容:函数
全部有效线程的数量oop
各个线程的状态(runState)源码分析
低29位存线程数,高3位存runState,这样runState有5个值:学习
RUNNING:-536870912ui
SHUTDOWN:0this
STOP:536870912
TIDYING:1073741824
TERMINATED:1610612736
线程池中各个状态间的转换比较复杂,主要记住下面内容就能够了:
RUNNING状态:线程池正常运行,能够接受新的任务并处理队列中的任务;
SHUTDOWN状态:再也不接受新的任务,可是会执行队列中的任务;
STOP状态:再也不接受新任务,不处理队列中的任务
围绕rtc有一些操做和变量:
/** * 这个方法用于取出runState的值 由于CAPACITY值为:00011111111111111111111111111111 * ~为按位取反操做,则~CAPACITY值为:11100000000000000000000000000000 * 再同参数作&操做,就将低29位置0了,而高3位仍是保持原先的值,也就是runState的值 * * @param c * 该参数为存储runState和workerCount的int值 * @return runState的值 */ private static int runStateOf(int c) { return c & ~CAPACITY; } /** * 这个方法用于取出workerCount的值 * 由于CAPACITY值为:00011111111111111111111111111111,因此&操做将参数的高3位置0了 * 保留参数的低29位,也就是workerCount的值 * * @param c * ctl, 存储runState和workerCount的int值 * @return workerCount的值 */ private static int workerCountOf(int c) { return c & CAPACITY; } /** * 将runState和workerCount存到同一个int中 * “|”运算的意思是,假设rs的值是101000,wc的值是000111,则他们位或运算的值为101111 * * @param rs * runState移位事后的值,负责填充返回值的高3位 * @param wc * workerCount移位事后的值,负责填充返回值的低29位 * @return 二者或运算事后的值 */ private static int ctlOf(int rs, int wc) { return rs | wc; } // 只有RUNNING状态会小于0 private static boolean isRunning(int c) { return c < SHUTDOWN; }
核心线程池大小,活动线程小于corePoolSize则直接建立,大于等于则先加到workQueue中,队列满了才建立新的线程。
线程从队列中获取任务的超时时间,也就是说若是线程空闲超过这个时间就会终止。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable ...
内部类Worker是对任务的封装,全部submit的Runnable都被封装成了Worker,它自己也是一个Runnable, 而后利用AQS框架(关于AQS能够看我这篇文章)实现了一个简单的非重入的互斥锁, 实现互斥锁主要目的是为了中断的时候判断线程是在空闲仍是运行,能够看后面shutdown和shutdownNow方法的分析。
// state只有0和1,互斥 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true;// 成功得到锁 } // 线程进入等待队列 return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; }
之因此不用ReentrantLock是为了不任务执行的代码中修改线程池的变量,如setCorePoolSize,由于ReentrantLock是可重入的。
execute方法主要三个步骤:
活动线程小于corePoolSize的时候建立新的线程;
活动线程大于corePoolSize时都是先加入到任务队列当中;
任务队列满了再去启动新的线程,若是线程数达到最大值就拒绝任务。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 活动线程数 < corePoolSize if (workerCountOf(c) < corePoolSize) { // 直接启动新的线程。第二个参数true:addWorker中会从新检查workerCount是否小于corePoolSize if (addWorker(command, true)) // 添加成功返回 return; c = ctl.get(); } // 活动线程数 >= corePoolSize // runState为RUNNING && 队列未满 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // double check // 非RUNNING状态 则从workQueue中移除任务并拒绝 if (!isRunning(recheck) && remove(command)) reject(command);// 采用线程池指定的策略拒绝任务 // 线程池处于RUNNING状态 || 线程池处于非RUNNING状态可是任务移除失败 else if (workerCountOf(recheck) == 0) // 这行代码是为了SHUTDOWN状态下没有活动线程了,可是队列里还有任务没执行这种特殊状况。 // 添加一个null任务是由于SHUTDOWN状态下,线程池再也不接受新任务 addWorker(null, false); // 两种状况: // 1.非RUNNING状态拒绝新的任务 // 2.队列满了启动新的线程失败(workCount > maximumPoolSize) } else if (!addWorker(command, false)) reject(command); }
其中比较难理解的应该是addWorker(null, false);这一行,这要结合addWorker一块儿来看。 主要目的是防止HUTDOWN状态下没有活动线程了,可是队列里还有任务没执行这种特殊状况。
/** * @param firstTask:新增一个线程并执行这个任务,可空,增长的线程从队列获取任务; * * @param core:是否使用corePoolSize做为上限,不然使用maxmunPoolSize **/ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);// 当前线程池状态 // Check if queue empty only if necessary. // 这条语句等价:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || // workQueue.isEmpty()) // 知足下列调价则直接返回false,线程建立失败: // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此时再也不接受新的任务,且全部任务执行结束 // rs = SHUTDOWN:firtTask != null 此时再也不接受任务,可是仍然会执行队列中的任务 // rs = SHUTDOWN:firtTask == null见execute方法的addWorker(null, // false),任务为null && 队列为空 // 最后一种状况也就是说SHUTDONW状态下,若是队列不为空还得接着往下执行,为何?add一个null任务目的究竟是什么? // 看execute方法只有workCount==0的时候firstTask才会为null结合这里的条件就是线程池SHUTDOWN了再也不接受新任务 // 可是此时队列不为空,那么还得建立线程把任务给执行完才行。 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // 走到这的情形: // 1.线程池状态为RUNNING // 2.SHUTDOWN状态,但队列中还有任务须要执行 for (;;) { int wc = workerCountOf(c); //判断条件有点难理解,实际上是非运行状态下(>=SHUTDOWN)或者SHUTDOWN状态下任务非空(新提交任务)、任务队列为空, //就不能够再新增线程了(return false),即SHUTDOWN状态是能够新增线程去执行队列中的任务; if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))// 原子操做递增workCount break retry;// 操做成功跳出的重试的循环 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs)// 若是线程池的状态发生变化则重试 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // wokerCount递增成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 并发的访问线程池workers对象必须加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); // RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将新启动的线程添加到线程池中 workers.add(w); // 更新largestPoolSize int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 启动新添加的线程,这个线程首先执行firstTask,而后不停的从队列中取任务执行 // 当等待keepAlieTime尚未任务执行则该线程结束。见runWoker和getTask方法的代码。 if (workerAdded) { t.start();// 最终执行的是ThreadPoolExecutor的runWoker方法 workerStarted = true; } } } finally { // 线程启动失败,则从wokers中移除w并递减wokerCount if (!workerStarted) // 递减wokerCount会触发tryTerminate方法 addWorkerFailed(w); } return workerStarted; }
任务添加成功后实际执行的是runWorker这个方法,这个方法很是重要,简单来讲它作的就是:
第一次启动会执行初始化传进来的任务firstTask;
而后会从workQueue中取任务执行,若是队列为空则等待keepAliveTime这么长时间。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // Worker的构造函数中抑制了线程中断setState(-1),因此这里须要unlock从而容许中断 w.unlock(); // 用于标识是否异常终止,finally中processWorkerExit的方法会有不一样逻辑 // 为true的状况:1.执行任务抛出异常;2.被中断。 boolean completedAbruptly = true; try { // 若是getTask返回null那么getTask中会将workerCount递减,若是异常了这个递减操做会在processWorkerExit中处理 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 任务执行前能够插入一些处理,子类重载该方法 beforeExecute(wt, task); Throwable thrown = null; try { task.run();// 执行用户任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 和beforeExecute同样,留给子类去重载 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 结束线程的一些清理工做 processWorkerExit(w, completedAbruptly); } }
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 1.rs > SHUTDOWN 因此rs至少等于STOP,这时再也不处理队列中的任务 // 2.rs = SHUTDOWN 因此rs>=STOP确定不成立,这时还须要处理队列中的任务除非队列为空 // 这两种状况都会返回null让runWoker退出while循环也就是当前线程结束了,因此必需要decrement // wokerCount if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 递减workerCount值 decrementWorkerCount(); return null; } // 标记从队列中取任务时是否设置超时时间 boolean timed; // Are workers subject to culling? // 1.RUNING状态 // 2.SHUTDOWN状态,但队列中还有任务须要执行 for (;;) { int wc = workerCountOf(c); // 1.core thread容许被超时,那么超过corePoolSize的的线程一定有超时 // 2.allowCoreThreadTimeOut == false && wc > // corePoolSize时,通常都是这种状况,core thread即便空闲也不会被回收,只要超过的线程才会 timed = allowCoreThreadTimeOut || wc > corePoolSize; // 从addWorker能够看到通常wc不会大于maximumPoolSize,因此更关心后面半句的情形: // 1. timedOut == false 第一次执行循环, 从队列中取出任务不为null方法返回 或者 // poll出异常了重试 // 2.timeOut == true && timed == // false:看后面的代码workerQueue.poll超时时timeOut才为true, // 而且timed要为false,这两个条件相悖不可能同时成立(既然有超时那么timed确定为true) // 因此超时不会继续执行而是return null结束线程。(重点:线程是如何超时的???) if (wc <= maximumPoolSize && !(timedOut && timed)) break; // workerCount递减,结束当前thread if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl // 须要从新检查线程池状态,由于上述操做过程当中线程池可能被SHUTDOWN if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { // 1.以指定的超时时间从队列中取任务 // 2.core thread没有超时 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true;// 超时 } catch (InterruptedException retry) { timedOut = false;// 线程被中断重试 } } }
线程退出会执行这个方法作一些清理工做。
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 正常的话再runWorker的getTask方法workerCount已经被减一了 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 累加线程的completedTasks completedTaskCount += w.completedTasks; // 从线程池中移除超时或者出现异常的线程 workers.remove(w); } finally { mainLock.unlock(); } // 尝试中止线程池 tryTerminate(); int c = ctl.get(); // runState为RUNNING或SHUTDOWN if (runStateLessThan(c, STOP)) { // 线程不是异常结束 if (!completedAbruptly) { // 线程池最小空闲数,容许core thread超时就是0,不然就是corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 若是min == 0可是队列不为空要保证有1个线程来执行队列中的任务 if (min == 0 && !workQueue.isEmpty()) min = 1; // 线程池还不为空那就不用担忧了 if (workerCountOf(c) >= min) return; // replacement not needed } // 1.线程异常退出 // 2.线程池为空,可是队列中还有任务没执行,看addWoker方法对这种状况的处理 addWorker(null, false); } }
processWorkerExit方法中会尝试调用tryTerminate来终止线程池。这个方法在任何可能致使线程池终止的动做后执行:好比减小wokerCount或SHUTDOWN状态下从队列中移除任务。
final void tryTerminate() { for (;;) { int c = ctl.get(); // 如下状态直接返回: // 1.线程池还处于RUNNING状态 // 2.SHUTDOWN状态可是任务队列非空 // 3.runState >= TIDYING 线程池已经中止了或在中止了 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) return; // 只能是如下情形会继续下面的逻辑:结束线程池。 // 1.SHUTDOWN状态,这时再也不接受新任务并且任务队列也空了 // 2.STOP状态,当调用了shutdownNow方法 // workerCount不为0则还不能中止线程池,并且这时线程都处于空闲等待的状态 // 须要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。 if (workerCountOf(c) != 0) { // Eligible to terminate // runWoker方法中w.unlock就是为了能够被中断,getTask方法也处理了中断。 // ONLY_ONE:这里只须要中断1个线程去处理shutdown信号就能够了。 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 进入TIDYING状态 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 子类重载:一些资源清理工做 terminated(); } finally { // TERMINATED状态 ctl.set(ctlOf(TERMINATED, 0)); // 继续awaitTermination termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
shutdown这个方法会将runState置为SHUTDOWN,会终止全部空闲的线程。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 线程池状态设为SHUTDOWN,若是已经至少是这个状态那么则直接返回 advanceRunState(SHUTDOWN); // 注意这里是中断全部空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit → // tryTerminate方法中会保证队列中剩余的任务获得执行。 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方法会终止全部的线程。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // STOP状态:再也不接受新任务且再也不执行队列中的任务。 advanceRunState(STOP); // 中断全部线程 interruptWorkers(); // 返回队列中尚未被执行的任务。 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
主要区别在于shutdown调用的是interruptIdleWorkers这个方法,而shutdownNow实际调用的是Worker类的interruptIfStarted方法:
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // w.tryLock能获取到锁,说明该线程没有在运行,由于runWorker中执行任务会先lock, // 所以保证了中断的确定是空闲的线程。 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
void interruptIfStarted() { Thread t; // 初始化时state == -1 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
这就是前面提到的Woker类实现AQS的主要做用。
注意:shutdown方法可能会在finalize被隐式的调用。