在 java 中,线程池 ThreadPoolExecutor 是一个绕不过去的类,它是享元模式思想的体现,经过在容器中建立必定数量的线程加以重复利用,从而避免频繁建立线程带来的额外开销。一个设置合理的线程池能够提升任务响应的速度,而且避免线程数超过硬件能力带来的意外状况。java
在本文,将深刻线程池源码,了解线程池的底层实现与运行机制。函数
ThreadPoolExecutor 类一共提供了四个构造方法,咱们基于参数最完整构造方法了解一下线程池建立所须要的变量:this
public ThreadPoolExecutor(int corePoolSize, // 核心线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 非核心线程闲置存活时间 TimeUnit unit, // 时间单位 BlockingQueue<Runnable> workQueue, // 工做队列 ThreadFactory threadFactory, // 建立线程使用的线程工厂 RejectedExecutionHandler handler // 拒绝策略) { }
线程池拥有一个 AtomicInteger 类型的成员变量 ctl ,经过位运算分别使用 ctl 的高位低位以便在一个值中存储线程数量以及线程池状态。线程
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 29(32-3) private static final int COUNT_BITS = Integer.SIZE - 3; // 容许的最大工做线程(2^29-1 约5亿) private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 运行状态。线程池接受并处理新任务 private static final int RUNNING = -1 << COUNT_BITS; // 关闭状态。线程池不能接受新任务,处理完剩余任务后关闭。调用shutdown()方法会进入该状态。 private static final int SHUTDOWN = 0 << COUNT_BITS; // 中止状态。线程池不能接受新任务,而且尝试中断旧任务。调用shutdownNow()方法会进入该状态。 private static final int STOP = 1 << COUNT_BITS; // 整理状态。由关闭状态转变,线程池任务队列为空时进入该状态,会调用terminated()方法。 private static final int TIDYING = 2 << COUNT_BITS; // 终止状态。terminated()方法执行完毕后进入该状态,线程池完全中止。 private static final int TERMINATED = 3 << COUNT_BITS;
这里比较很差理解的是上述-1的位运算,下面咱们来分析一下:日志
在计算机中,二进制负数通常用补码表示,即源码取反再加一。但又有这种说法,即将最高位做为符号位,0为正数,1为负数。实际上二者是能够结合在一块儿看的。假如数字是单字节数,1 字节对应8 bit,即八位,如今,咱们要计算 - 1。code
按照第二种说法,最高位为符号位,则有 1/000 0001,而后按第一种说法取反后+1,而且符号位不变,则有 1/111 1110 + 1,即 1/111 1111。对象
如今回到 -1 << COUNT_BITS
这行代码:继承
一个 int 是 4 个字节,对应 32 bit,按上述过程 -1 转为二进制即为 1/111......1111(32个1), COUNT_BITS
是 29,-1 左移 29 位,最终获得 111.0...0000。队列
同理,计算其余的几种状态,可知分别是:rem
状态 | 二进制 |
---|---|
RUNNING | 111...0....00 |
SHUTDOWN | 000...0....00 |
STOP | 001...0....00 |
TIDYING | 010...0....00 |
TERMINATED | 011...0....00 |
其中,咱们能够知道 SHUTDOWN 状态转为十进制也是 0 ,而 RUNNING 做为有符号数,它的最高位是 1,说明转为十进制之后是个负数,其余的状态最高位都是 0,转为十进制以后都是正数,也就是说,咱们能够这么认为:
小于 SHUTDOWN 的就是 RUNNING,大于 SHUTDOWN 就是中止或者中止中。
这也是后面状态计算的一些写法的基础。好比 isRunning()
方法:
private static boolean isRunning(int c) { return c < SHUTDOWN; }
// 根据当前运行状态和工做线程数获取当前的 ctl private static int ctlOf(int rs, int wc) { return rs | wc; } // 获取运行状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取工做线程数 private static int workerCountOf(int c) { return c & CAPACITY; }
前面获取状态的时候调用了 ctlOf()
方法,根据前面,咱们能够知道,CAPACITY
其实是 29 位,而线程状态用的是 32 - 30 共 3 位,也就是说,ctl 共 32 位,高3 位用于表示线程池状态,而低 29 位表示工做线程的数量。
这样上述三个方法就很好理解了:
ctlOf()
:获取 ctl。
将工做线程数量与运行状态进行于运算,假如咱们处于 RUNNING,而且有 1 个工做线程,那么 ctl = 111....000 | 000.... 001,最终获得 111 ..... 001;
runStateOf()
:获取运行状态。
继续根据上文的数据,~CAPACITY
取反即为 111....000,与运行状态 111...0000 与运算,最终获得 111....000,至关于低位掩码,消去低 29 位;
workerCountOf()
:获取工做线程数。
同理,c & CAPACITY
里的 CAPACITY 至关于高位掩码,用于消去高 3 位,最终获得 00...001,即工做线程数。
同理,若是要增长工做线程数,就直接经过 CAS 去递增 ctl,好比新建线程中使用的公共方法:
private boolean compareAndIncrementWorkerCount(int expect) { // 经过 CAS 递增 ctl return ctl.compareAndSet(expect, expect + 1); }
要改变线程池状态,就根据当前工做线程和要改变的状态去合成新的 ctl,而后 CAS 改变 ctl,好比 shutdown()
中涉及的相关代码:
private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || // 经过 CAS 改变 ctl ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
线程池任务提交方法是 execute()
,根据代码可知,当一个任务进来时,分四种状况:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 1.当前工做线程数小于核心线程数,启动新线程 if (workerCountOf(c) < corePoolSize) { // 添加任务 if (addWorker(command, true)) return; c = ctl.get(); } // 2. 当前工做线程数大于核心线程数,可是未大于最大线程数,尝试添加到工做队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 若是当前线程处于非运行态,而且移除当前任务成功,则拒绝任务(防止添加到一半就shutdown) if (! isRunning(recheck) && remove(command)) reject(command); // 若是当前没有工做线程了,就启动新线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3.当前线程池核心线程和队列都满了,尝试建立新非核心线程 else if (!addWorker(command, false)) // 4.线程池完全满了,执行拒绝策略 reject(command); }
添加任务依靠 addWorker()
方法,这个方法很长,可是主要就干了两件事:
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 1.改变 ctl 使工做线程+1 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 若是当前不处于运行状态,传入任务为空,而且任务队列为空的时候拒绝添加新任务 // 即线程池 shutdown 时不让添加新任务,可是运行继续跑完任务队列里的任务。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 线程不容许超过最大线程数,核心线程不容许超过最大核心线程数 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS 递增工做线程数 if (compareAndIncrementWorkerCount(c)) // 失败了就从新回到上面的retry处继续往下执行 break retry; // 更新 ctl c = ctl.get(); // 若是运行状态改变了就所有历来 if (runStateOf(c) != rs) continue retry; } } // 2.启动新线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 建立新线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // 若是线程池处于运行状态,或者没有新任务的SHUTDOWN状态(即SHUTDOW之后还在消费工做队列里的任务) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 线程是否在未启动前就已经启动了 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); // 若是集合中的工做线程数大于最大线程数,则将池中最大线程数改成当前工做线程数 if (s > largestPoolSize) largestPoolSize = s; // 线程建立完成 workerAdded = true; } } finally { mainLock.unlock(); } // 若是线程成功建立,就启动线程,而且更改启动状态为成功 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 若是线程启动不成功,就执行失败策略 if (! workerStarted) // 启动失败策略,从当前工做线程队列移除当前启动失败的线程,递减工做线程数,而后尝试关闭线程池(若是当前任务就是线程池最后一个任务) addWorkerFailed(w); } return workerStarted; }
根据上文,不难发现,在线程池中线程每每以 Worker 对象的方式存在,那么这个 Worker 又是何方神圣?
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 工做线程 final Thread thread; // 要执行的任务 Runnable firstTask; // 线程执行过的任务数 volatile long completedTasks; // 经过线程工厂建立工做线程 Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } // 执行任务 public void run() { runWorker(this); } ... ... }
这个 Worker 类继承了 AQS,也就是说,他自己就至关于一个同步队列,结合他的成员变量 thread 和 firstTask,能够知道他实际上就是咱们线程池中所说的“线程”。除了父类 AQS 自己提供的独占锁之外,Worker 还提供了一些检查任务线程运行状态以及中断线程相关的方法。
此外,线程池中还有一个工做队列 workers,用于保存当前所有的 Worker:
private final HashSet<Worker> workers = new HashSet<Worker>();
当调用 Worker.run()
的时候,其实调用的是 runWorker()
方法。
runWorker()
方法实际上就是调用线程执行任务的方法,他的逻辑大题是这样的:
getTask()
中循环等待任务);若是整个流程执行完毕,就删除当前的 Worker。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 新建立的Worker默认state为-1,AQS的unlock方法会将其改成0,此后容许使用interruptIfStarted()方法进行中断 // 完成任务之后是否须要移除当前Worker,即当前任务是否意外退出 boolean completedAbruptly = true; try { // 循环获取任务 while (task != null || (task = getTask()) != null) { // 加锁,防止 shundown 时中断正在运行的任务 w.lock(); // 若是线程池状态为 STOP 或更后面的状态,中断线程任务 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 钩子方法,默认空实现,须要本身提供 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 钩子方法 afterExecute(task, thrown); } } finally { task = null; // 任务执行完毕 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 根据completedAbruptly决定是否要移除意外退出的Worker,并补充新的Worker // 也就是说,若是上述过程顺利完成,工做线程没有挂掉,就不删除,下次继续用,不然就干掉它再补充一个。 processWorkerExit(w, completedAbruptly); } }
在 runWorker()
方法中,经过 getTask()
方法去获取任务。值得注意的是,超时处理也在此处,简单的来讲,整套流程是这样的:
runWorker()
中的processWorkerExit()
方法去删除;换句话说,runWorker()
方法一旦执行完毕,必然会删除当前的 Worker,而经过 getTask()
拿任务的 Worker,在线程池正常运行的状态下,核心线程只会一直在 for 循环中等待直到拿到任务,而非核心线程超时之后拿不到任务就会返回一个 null,而后回到 runWorker()
中走完processWorkerExit()
方法被删除。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 若是线程池关闭了,而且工做队列里的任务都完成了,或者线程池直接进入了 STOP 或更进一步的状态,就不返回新任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 获取当前工做线程 int wc = workerCountOf(c); // 核心线程是否超时(默认false)或当前是否存在非核心线程,即判断当前当前是否须要进行超时控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 判断线程是否超过最大线程数或存在非核心线程 if ((wc > maximumPoolSize || (timed && timedOut)) // 而且除非任务队列为空,不然池中最少有一个线程 && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 获取任务 Runnable r = timed ? // 阻塞 keepaliveTime 以获取任务,若是在 keepaliveTime 时间内没有获取到任务,则返回 null. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 若是获取不到任务,说明非核心线程超时了,下一轮判断确认是否退出循环。 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
线程池的中断方法分为三种:
shutdown()
:中断线程池,再也不添加新任务,同时等待当前进行和队列中的任务完成;shutdownNow()
:当即中断线程池,再也不添加新任务,同时中断全部工做中的任务,再也不处理任务队列中任务。shutdown 是有序关闭。主要干了三件事:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { checkShutdownAccess(); // 改变当前线程池状态 advanceRunState(SHUTDOWN); // 中断当前线程 interruptIdleWorkers(); // 钩子函数,默认空实现 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
其中,interruptIdleWorkers()
方法以下:
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍历工做队列中的所有 Worker for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { // 标记为中断 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
shutdownNow()
与 shutdown()
流程相似,可是会直接将状态转为 STOP,在 addWorker()
或者getTask()
等处理任务的相关方法里,会针对 STOP 或更进一步的状态作区分,将不会再处理任务队列中的任务,配合drainQueue()
方法以删除任务队列中的任务。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 改变当前线程池状态 advanceRunState(STOP); // 中断当前线程 interruptWorkers(); // 删除任务队列中的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
当任务队列已满,而且线程池中线程也到达最大线程数的时候,就会调用拒绝策略。也就是reject()
方法
final void reject(Runnable command) { handler.rejectedExecution(command, this); }
拒绝策略共分四种:
咱们能够简单的了解一下他们的实现:
AbortPolicy
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
CallerRunsPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
DiscardOldestPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 弹出队头元素 e.getQueue().poll(); e.execute(r); } }
DiscardPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // Does nothing }
和 HashMap 与 LinkedHashMap 中的行为有点相似,在线程池的代码中,有些方法调用了一些具备空实现的方法,这些方法是提供给用户去继承并重写的钩子函数,主要包括三个:
beforeExecute()
:在执行任务以前回调afterExecute()
:在任务执行完后回调terminated()
:在线程池中的全部任务执行完毕后回调经过继承 ThreadPoolExecutor 类,并重写以上三个方法,咱们能够进行监控或者输出日志,更方便的了解线程池的状态。
值得一提的是,afterExecute()
方法的入参类型是(Runnable r, Throwable t)
,也就是说,若是线程运行中抛出异常,咱们也能够经过该方法去捕获异常并做出相应的处理。
线程池提供了四个构造方法,参数最全的构造方法参数按顺序有:核心线程数,最大线程数,非核心线程闲置存活时间,存活时间单位,任务队列,线程工厂,拒绝策略。
线程池共有五种状态,分别是:RUNNING,SHUTDOWN,STOP,TYDYING,TERMINATED,它们与工做线程数量一同记录在成员变量 ctl 中,其中高 3 位用于记录状态,低 29 位用于记录工做线程数,实际使用中经过位运算去获取。
线程池中任务线程以继承了 AQS 的 Worker 类的实例形式存在。当添加任务时,会有四种状况:核心线程不满,优先建立核心线程;核心线程满,优先添加任务队列;核心线程与队列都满,建立非核心线程;线程和队列都满,则执行拒绝策略。
其中,拒绝策略分为四类,默认的拒绝策略 AbortPolicy;调用者运行策略 CallerRunsPolicy;弃老策略 DiscardOldestPolicy;丢弃策略 DiscardPolicy。
线程池的中断有两个方法:shutdown()
与 shutdownNow()
,二者都会让线程池再也不接受新任务,可是 shutdown()
会等待当前与任务队列中的任务执行完毕,而 shutdownNow()
会直接中断当前任务,忽略并删除任务队列中的任务。
线程池提供了beforeExecute()
,afterExecute()
,terminated()
三个钩子函数,其中,afterExecute()
的入参含有抛出的异常,所以能够借由该方法处理线程池中线程抛出的异常。