Java 中的线程池是运用场景最多的并发框架,几乎全部须要异步或并发执行任务的程序均可以使用线程池。在开发过程当中,合理地使用线程池可以带来 3 个好处。java
ThreadPoolExecutor 执行 execute 方法分下面 4 种状况。编程
1)若是当前运行的线程少于 corePoolSize,则建立新线程来执行任务(注意,执行这一步骤须要获取全局锁)。
2)若是运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。
3)若是没法将任务加入 BlockingQueue(队列已满),则建立新的线程来处理任务(注意,执行这一步骤须要获取全局锁)。
4)若是建立新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution() 方法。并发
ThreadPoolExecutor 采起上述步骤的整体设计思路,是为了在执行 execute() 方法时,尽量地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在 ThreadPoolExecutor 完成预热以后(当前运行的线程数大于等于 corePoolSize),几乎全部的 execute() 方法调用都是执行步骤2,而步骤2不须要获取全局锁。框架
上面的流程分析让咱们很直观地了解了线程池的工做原理,让咱们再经过源代码来看看是如何实现的,线程池执行任务的方法以下。咱们从 execute 入手分析源码。异步
private static final int RUNNING = -1 << COUNT_BITS; // 运行状态 private static final int SHUTDOWN = 0 << COUNT_BITS; // 拒绝提交新任务,会执行完已提交的任务后关闭线程池 private static final int STOP = 1 << COUNT_BITS; // 拒绝提交新任务,也拒绝执行已提交的任务 private static final int TIDYING = 2 << COUNT_BITS; // 关闭线程池后,线程所有关闭后的状态,以后回调 terminated private static final int TERMINATED = 3 << COUNT_BITS; // 回调 terminated 方法后状态变为 TERMINATED
线程池用 ctl 的低 29 位表示线程池中的线程数,高 3 位表示当前线程状态。oop
// ctl 高3位表示线程池状态,低29位表示当前工做线程数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 低29位表示工做线程数 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 最大线程数 0x1fffffff // 获取线程池状态、线程总数、构造 ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
// 全局锁,建立工做线程等操做时须要获取全局锁 private final ReentrantLock mainLock = new ReentrantLock(); private final Condition termination = mainLock.newCondition(); // 工做线程 private final HashSet<Worker> workers = new HashSet<Worker>(); private int largestPoolSize; private volatile int corePoolSize;
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // ctl 高3位表示线程池状态,低29位表示当前工做线程数 int c = ctl.get(); // 1. 小于核心线程数,建立新的线程执行任务。须要获取全局锁 if (workerCountOf(c) < corePoolSize) { // addWorker 建立新的工做线程,true 表示核心线程数,false 表示最大线程数 if (addWorker(command, true)) return; c = ctl.get(); } // 2. 核心线程已满,将任务提交到队列中。不须要获取全局锁 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 2.1 恰好此时线程池关闭了,则须要将任务从队列中踢除 if (!isRunning(recheck) && remove(command)) reject(command); // 任务被踢除后回滚,执行拒绝任务 // 2.2 线程池工做线程为0,建立一个新的工做线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3. 队列满后且线程数小于最大线程数,则建立新的线程执行任务。须要获取全局锁 // 4. 超出最大线程拒绝任务 else if (!addWorker(command, false)) reject(command); }
工做线程:线程池建立线程时,会将线程封装成工做线程 Worker,Worker 在执行完任务后,还会循环获取工做队列里的任务来执行。咱们能够从 Worker 类的 run() 方法里看到这点。源码分析
// Worker 是对线程 Thread 的包装,实现了 AbstractQueuedSynchronizer private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; // 包装的线程 Runnable firstTask; // 线程初始化时的任务,能够为 null Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } }
思考:Worker 为何要继承 AbstractQueuedSynchronizer 实现本身的锁,而不使用 ReentrantLock 呢?ui
实际上 ReentrantLock 是可重入锁,而 Worker 实现的是独占锁,只有三种状 -1(初始化)、0(释放锁)、1(占有锁)。Worker 之因此实现独占锁是为了不在线程执行的时候被 interrupted 中断(下面会讲到)。this
// addWorker 建立一个新的工做线程 // firstTask 线程初始化任务,能够为 null;core 表示是核心线程仍是最大线程 private boolean addWorker(Runnable firstTask, boolean core) { // 1. 经过自旋线程数+1 compareAndIncrementWorkerCount retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 1.1 1、STOP 不能建立新线程 // 2、SHUTDOWN 时 workQueue 为空,也不能建立新线程 // firstTask 表示线程初始化任务,是新提交的任务,SHUTDOWN 时拒绝新提交的任务 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // 1.2 自旋使线程数+1 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 不断检查线程池状态变化 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 2. 建立线程 Worker boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 2.1 初始化工做线程 Worker,使用全局锁添加到 workers 队列中 w = new Worker(firstTask); final Thread t = w.thread; // threadFactory 可能建立线程失败,返回 null if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // 2.2 1、RUNNING能够建立新线程 // 2、SHUTDOWN不接收新任务,但会执行完 workQueue 的任务 ,所以能够建立空任务的线程 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) // largestPoolSize 表示线程池运行过程当中达到的最大线程数 largestPoolSize = s; workerAdded = true; // 工做线程添加到 workers 成功 } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; // 启动线程成功 } } } finally { // 2.3 建立工做线程失败,回滚 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
总结:spa
addWorker 的 4 种调用方式:
addWorker(command, true)
线程数 < coreSize 时,则建立新线程addWorker(command, false)
当①阻塞队列已满,②线程数 < maximumPoolSize 时,则建立新线程addWorker(null, true)
同 1。只是线程初始化任务为 null,至关于建立一个新的线程。实际的使用是在 prestartCoreThread() 等方法,有兴趣的读者能够自行阅读,在此不作详细赘述。addWorker(null, false)
同 2。只是线程初始化任务为 null,至关于建立一个新的线程,没立马分配任务;在 addWorker 建立线程后调用 t.start() 启动线程,run 方法主要干了一件事,调用 runWorker(this),接下来咱们来看看 runWorker 的具体实现。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; // 线程初始化任务 task w.firstTask = null; // 1. Worker 是独占锁,此时状态由 -1 -> 0,也就是其它线程才能获取w的锁,进而interrupt w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 2. 循环经过 getTask 获取任务,若是不能获取任务了,退出循环,关闭线程池 // 也就是说 getTask 返回 null 时线程就关闭了 while (task != null || (task = getTask()) != null) { w.lock(); // 获取锁,这样在线程执行过程当中不能中断线程(interrupt) // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // // 3.1 线程池已经STOP,若是线程尚未被中断(wt.isInterrupted=false),则调用wt.interrupt中断线程 // 3.2 若是runStateAtLeast(ctl.get(), STOP)=false,则说明线程池处于RUNNING或SHUTDOWN状态 // 调用 Thread.interrupted() 后会清空线程的 interrupted 状态 // Thread.interrupted()&& false 结果始终为 false,这里仅仅是为了调用Thread.interrupted() // 实际上就是:一若是线程已经STOP,则必定要将线程 interrupt // 二若是线程处于运行状态(包括SHUTDOWN),则必定不能 interrupt(也就是要清除 interrupt 标记) if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); // 执行前 Throwable thrown = null; try { task.run(); // 执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); // 执行后 } } finally { task = null; w.completedTasks++; // 统计执行的任务数 w.unlock(); // 释放锁,能够被中断了 } } completedAbruptly = false; // true时表示正常退出,false表示异常退出 } finally { processWorkerExit(w, completedAbruptly); } }
总结,runWoker 具体实现:
// 注意 getTask 前 worker 释放了锁,也就是可能被 interrupt 唤醒 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // 自旋获取任务 int c = ctl.get(); int rs = runStateOf(c); // 1. ①STOP直接销毁线程,②SHUTDOWN时任务队列为空时也直接销毁线程 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); // 原子性更新,工做线程数-1 return null; } int wc = workerCountOf(c); // 当前工做线程数 // 2.1 timed表示是否能够销毁线程。timed=true表示超时获取任务,则可能返回null // 当线程数大于核心线程数或容许销毁核心线程时 timed=true boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 2.2 一是超过了最大线程数,当线程池启动后手动修改最大线程数可能会出现这种状况 // 二是当容许销毁线程时,获取任务超时 // 2.3 三是线程池中至少有一个工做线程或任务队列为空,则能够销毁线程 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) // 失败重试,此时线程数已经-1 return null; continue; } try { // 3. 获取任务,无限等待则不会返回 null,也就不会销毁线程。而限时等待则可能返回 null Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; // 其它线程唤醒等待的线程 } } }
总结,整个 getTask 循环实现:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 权限检查 advanceRunState(SHUTDOWN); // 更新线程池状态为 SHUTDOWN interruptIdleWorkers(); // 关闭全部的空闲线程 onShutdown(); // 子类实现,如 ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); // 尝试中止线程池 } public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 权限检查 advanceRunState(STOP); // 更新线程池状态为 SHUTDOWN interruptWorkers(); // 关闭全部的线程 tasks = drainQueue(); // 返回还未执行的任务 } finally { mainLock.unlock(); } tryTerminate(); // 尝试中止线程池 return tasks; }
总结,shutdown 和 shutdownNow 区别:
// 关闭全部的空闲线程 private void interruptIdleWorkers() { interruptIdleWorkers(false); } // 中断线程其实是调用 t.interrupt(),须要获取线程锁 w.tryLock private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 只有空闲线程才能获取锁,正在执行的线程没法获取锁,也就没法中断 // 这也就是为何 Worker 要实现独占锁的缘由。 if (!t.isInterrupted() && w.tryLock()) { // 须要获取w的独占锁 try { t.interrupt(); // 其实是调用 t.interrupt() 中断线程 // 其实是给能线程设置一个标记位 } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
interruptIdleWorkers 只会尝试获取锁,所以只会中断空闲线程。而 interruptWorkers 不须要获取锁,强行中断线程。实际上业务线程必须对 interrupt 作出响应才能中断线程,不然会一直等线程执行结束才会销毁。
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } // 调用Worker#interruptIfStarted 不须要获取锁 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
而 interruptIdleWorkers 和 interruptWorkers 都是 interrupt 全部线程, 所以大部分线程将马上被中断。之因此是大部分,而不是所有,是由于 interrupt() 方法能力有限。 若是线程中没有 sleep 、wait、Condition、定时锁等应用, interrupt() 方法是没法中断当前的线程的。因此,ShutdownNow() 并不表明线程池就必定当即就能退出,它可能必需要等待全部正在执行的任务都执行完成了才能退出。 以下面这个线程永远不会中断,由于该线程没有响应 Thread.interrupted() 或者是直接将 InterruptedException 异常 catch 了。
// 没法响应 interrupted,线程永远没法停止。 executorService.submit(() -> { while (true) System.out.println("go go go"); }); executorService.shutdownNow();
final void tryTerminate() { for (;;) { int c = ctl.get(); // 1. RUNNING或SHUTDOWN还有任务执行时不能关闭,TIDYING则已经关闭 if (isRunning(c) || // 1.1 正在运行,不能中断 runStateAtLeast(c, TIDYING) || // 1.2 已经中断,不须要执行 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 1.3 SHUTDOWN时还有任务执行 return; // 2. 还有线程则关闭空闲线程 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 3. 工做线程数为0时,能够关闭线程池了,设置线程状态为TIDYING, // 并回调terminated后,线程的状态最终变为TERMINATED // 4. 线程状态设置失败,则 CAS 自旋 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
除了 shutdown 和 shutdownNow 外,addWorkerFailed、processWorkerExit、remove 等方法也会调用 tryTerminate 方法。
参考:
天天用心记录一点点。内容也许不重要,但习惯很重要!