/* 这个是用一个int来表示workerCount和runState的,其中runState占int的高3位, 其它29位为workerCount的值。 workerCount:当前活动的线程数; runState:线程池的当前状态。 用AtomicInteger是由于其在并发下使用compareAndSet效率很是高; 当改变当前活动的线程数时只对低29位操做,如每次加一减一,workerCount的值变了, 但不会影响高3位的runState的值。当改变当前状态的时候,只对高3位操做,不会改变低29位的计数值。 这里有一个假设,就是当前活动的线程数不会超过29位能表示的值,即不会超过536870911, 就目前以及可预见的很长一段时间来说,这个值是足够用了 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //COUNT_BITS,就是用来表示workerCount占用一个int的位数,其值为前面说的29 private static final int COUNT_BITS = Integer.SIZE - 3; /* CAPACITY为29位能表示的最大容量,即workerCount实际能用的最大值。 其值的二进制为:00011111111111111111111111111111(占29位,29个1) */ private static final int CAPACITY = (1 << COUNT_BITS) - 1; /* 如下常量是线程池的状态,状态存储在int的高3位,因此要左移29位。 腾出的低29位来表示workerCount 注意,这5个状态是有大小关系的。RUNNING<shutdown<stop<tidying<terminated 当须要判断多个状态时,只须要用<或="">来判断就能够了 */ /* RUNNING的含义:线程池能接受新任务,而且能够运行队列中的任务 -1的二进制为32个1,移位后为:11100000000000000000000000000000 */ private static final int RUNNING = -1 << COUNT_BITS; /* SHUTDOWN的含义:再也不接受新任务,但仍能够执行队列中的任务 0的二进制为32个0,移位后仍是全0 */ private static final int SHUTDOWN = 0 << COUNT_BITS; /* STOP的含义:再也不接受新任务,再也不执行队列中的任务,并且要中断正在处理的任务 1的二进制为前面31个0,最后一个1,移位后为:00100000000000000000000000000000 */ private static final int STOP = 1 << COUNT_BITS; /* TIDYING的含义:全部任务均已终止,workerCount的值为0, 转到TIDYING状态的线程即将要执行terminated()钩子方法. 2的二进制为00000000000000000000000000000010 移位后01000000000000000000000000000000 */ private static final int TIDYING = 2 << COUNT_BITS; /* TERMINATED的含义:terminated()方法执行结束. 3的二进制为00000000000000000000000000000011 移位后01100000000000000000000000000000 */ private static final int TERMINATED = 3 << COUNT_BITS; 各状态之间可能的转变有如下几种: RUNNING -> SHUTDOWN 调用了shutdown方法,线程池实现了finalize方法,在里面调用了shutdown方法,所以shutdown多是在finalize中被隐式调用的 (RUNNING or SHUTDOWN) -> STOP 调用了shutdownNow方法 SHUTDOWN -> TIDYING 当队列和线程池均为空的时候 STOP -> TIDYING 当线程池为空的时候 TIDYING -> TERMINATED terminated()钩子方法调用完毕 /* 传入的参数为存储runState和workerCount的int值,这个方法用于取出runState的值。 ~为按位取反操做,~CAPACITY值为:11100000000000000000000000000000, 再同参数作&操做,就将低29位置0了,而高3位仍是保持原先的值,也就是runState的值 */ private static int runStateOf(int c) { return c & ~CAPACITY; } /* 传入的参数为存储runState和workerCount的int值,这个方法用于取出workerCount的值。 由于CAPACITY值为:00011111111111111111111111111111,因此&操做将参数的高3位置0了, 保留参数的低29位,也就是workerCount的值。 */ private static int workerCountOf(int c) { return c & CAPACITY; } /* 将runState和workerCount存到同一个int中,这里的rs就是runState, 是已经移位过的值,填充返回值的高3位,wc填充返回值的低29位 */ private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ /* *分三步进行: * * 1.若是少于corePoolSize线程正在运行,请尝试 *用给定的命令做为第一个启动一个新的线程 *任务。 对addWorker的调用会自动检查runState和 * workerCount,从而防止将添加的错误警报 *线程,当它不该该经过返回false。 * * 2.若是任务能够成功排队,那么咱们仍然须要 *再次检查咱们是否应该添加一个线程 *(由于现有的自上次检查以来死亡)或者那个 自从进入这个方法以来,池关闭了。 因此咱们 *从新检查状态,若是有必要的话回滚入队 *中止,或者若是没有的话,开始一个新的线程。 * * 3.若是咱们不能排队任务,那么咱们尝试添加一个新的 *线程。 若是失败了,咱们知道咱们已经关闭了,或者已经饱和了 *所以拒绝任务。 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
处理过程: 1.活动线程数< corePoolSize 小于核心线程数时,直接启动新的线程,而且添加到工做线程中。(addWorker true时,会从新检查workerCount的值) 2.活动线程数 >corePoolSize 时,若是是运行时状态,而且队列未满,添加到队列中, 须要再次检查状态,1.不是running,而且移出失败,则拒绝任务。2。处于RUNNing状态,或者移出任务失败的时候,若是没有活动线程,添加一个空的任务,表示不在接受新的任务。3.隐藏的状况,若是是运行状态,而且能够移出成功,则正常执行。 3.不是运行状态,而且队列已满,启动新的线程失败,则拒绝任务。并发
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorker有两个参数:Runnable类型的firstTask,用于指定新增的线程执行的第一个任务;boolean类型的core,true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前须要判断当前活动线程数是否少于maximumPoolSize。函数
该方法的返回值表明是否成功新增一个线程。oop
// Check if queue empty only if necessary. // 这条语句等价:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || // workQueue.isEmpty()) // 知足下列调价则直接返回false,线程建立失败: // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此时再也不接受新的任务,且全部任务执行结束 // rs = SHUTDOWN:firtTask != null 此时再也不接受任务,可是仍然会执行队列中的任务 // rs = SHUTDOWN:firtTask == null见execute方法的addWorker(null, // false),任务为null && 队列为空 // 最后一种状况也就是说SHUTDONW状态下,若是队列不为空还得接着往下执行,为何?add一个null任务目的究竟是什么? // 看execute方法只有workCount==0的时候firstTask才会为null结合这里的条件就是线程池SHUTDOWN了再也不接受新任务 // 可是此时队列不为空,那么还得建立线程把任务给执行完才行。 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) // 等价实现 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()),
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);// 当前线程池状态 // Check if queue empty only if necessary. // 这条语句等价:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || // workQueue.isEmpty()) // 知足下列调价则直接返回false,线程建立失败: // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此时再也不接受新的任务,且全部任务执行结束 // rs = SHUTDOWN:firtTask != null 此时再也不接受任务,可是仍然会执行队列中的任务 // rs = SHUTDOWN:firtTask == null见execute方法的addWorker(null, // false),任务为null && 队列为空 // 最后一种状况也就是说SHUTDONW状态下,若是队列不为空还得接着往下执行,为何?add一个null任务目的究竟是什么? // 看execute方法只有workCount==0的时候firstTask才会为null结合这里的条件就是线程池SHUTDOWN了再也不接受新任务 // 可是此时队列不为空,那么还得建立线程把任务给执行完才行。 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // 走到这的情形: // 1.线程池状态为RUNNING // 2.SHUTDOWN状态,但队列中还有任务须要执行 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))// 原子操做递增workCount break retry;// 操做成功跳出的重试的循环 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs)// 若是线程池的状态发生变化则重试 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // wokerCount递增成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 并发的访问线程池workers对象必须加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); // RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将新启动的线程添加到线程池中 workers.add(w); // 更新largestPoolSize int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 启动新添加的线程,这个线程首先执行firstTask,而后不停的从队列中取任务执行 // 当等待keepAlieTime尚未任务执行则该线程结束。见runWoker和getTask方法的代码。 if (workerAdded) { t.start();// 最终执行的是ThreadPoolExecutor的runWoker方法 workerStarted = true; } } } finally { // 线程启动失败,则从wokers中移除w并递减wokerCount if (!workerStarted) // 递减wokerCount会触发tryTerminate方法 addWorkerFailed(w); } return workerStarted; }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // Worker的构造函数中抑制了线程中断setState(-1),因此这里须要unlock从而容许中断 w.unlock(); // 用于标识是否异常终止,finally中processWorkerExit的方法会有不一样逻辑 // 为true的状况:1.执行任务抛出异常;2.被中断。 boolean completedAbruptly = true; try { // 若是getTask返回null那么getTask中会将workerCount递减,若是异常了这个递减操做会在processWorkerExit中处理 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 任务执行前能够插入一些处理,子类重载该方法 beforeExecute(wt, task); Throwable thrown = null; try { task.run();// 执行用户任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 和beforeExecute同样,留给子类去重载 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 结束线程的一些清理工做 processWorkerExit(w, completedAbruptly); } }