好久没更新了,缘由并非没有学习,而是学完了不知道怎么写出来,同时还有一股声音在耳边告诉我,如今公众号满天飞,写公众号的人比看公众号多,同 topic 的文章太多了......。但后面我本身想通了,虽然相似的文章不少,但它不是我写的,本身写完有助于对相关知识的梳理,若是恰好能给你们带来一些帮助,那就更好了,因此白牙仍是鼓励你们多输出,经过输出倒逼输入,其中的收获只有作了才知道
经过类图可知,ThreadPoolExecutor 是一个 ExecutorService,能够经过池中的线程来执行任务java
// 线程池中重要的变量 ctl,类型为 AtomicInteger,一个变量同时记录线程池状态和线程个数 // Integer 的位数为 32 位,其中高 3 位表示线程池的状态,低 29 位表示线程的个数。默认为 RUNNING 状态,线程个数为 0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 线程个数掩码位数,去掉高 3 位表明线程个数的 bit 位 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程最大个数,低 29 位 00011111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
不得不佩服写 JDK 源码这些大佬,把一个变量用的这么好,ctl 变量经过位运算同时表达了线程池状态和线程个数,下面看下与之相关的方法面试
// 计算线程池的状态 ~CAPACITY 为:11100000000000000000000000000000,经过让 ctl 与 ~CAPACITY 相与,至关于取高 3 位的值(前面说了 ctl 高 3 位表示线程池状态) private static int runStateOf(int c) { return c & ~CAPACITY; } // 计算线程个数 CAPACITY 为:00011111111111111111111111111111,经过让 ctl 与 CAPACITY 相与,至关于取低 29 位的值(前面说了 ctl 低 29 位表示线程个数) private static int workerCountOf(int c) { return c & CAPACITY; } // 计算 ct l的值,用线程池状态和线程个数进行或运算 private static int ctlOf(int rs, int wc) { return rs | wc; }
在源码中常常看到这些方法,是否是用的很巧妙?redis
// 默认状态,接收新任务并处理阻塞队列里的任务 private static final int RUNNING = -1 << COUNT_BITS; // 拒绝新任务可是处理阻塞队列里的任务 private static final int SHUTDOWN = 0 << COUNT_BITS; // 拒绝新任务而且抛弃阻塞队列里的任务,同时中断正在处理的任务 private static final int STOP = 1 << COUNT_BITS; // 全部任务都已经执行完成,线程数是 0,将调用 terminated() 方法 private static final int TIDYING = 2 << COUNT_BITS; // 终止状态,调用完 terminated() 方法后的状态 private static final int TERMINATED = 3 << COUNT_BITS;
备注编程
若是想查看上面变量的二进制表现形式,能够经过方法 Integer.toBinaryString(int i) 查看
RUNNING -> SHUTDOWN:当调用 shutdown() 方法时,也可能隐式的调用 finalize() 方法时(由于 finalize() 方法也是调用的 shutdown() 方法) On invocation of shutdown(), perhaps implicitly in finalize() (RUNNING or SHUTDOWN) -> STOP:当调用 shutdownNow() 方法时 On invocation of shutdownNow() SHUTDOWN -> TIDYING:当队列和线程池都空时 When both queue and pool are empty STOP -> TIDYING:当线程池为空时 When pool is empty TIDYING -> TERMINATED:当 terminated() 方法完成时 When the terminated() hook method has completed
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
建立线程池有如上经常使用参数,下面简单介绍下segmentfault
corePoolSize:线程池中的核心线程数,即便他们处于空闲状态,除非 allowCoreThreadTimeOut 被设置了 maximumPoolSize:线程池中的最大线程数 workQueue:存放还未被执行任务的阻塞队列 threadFactory:建立线程的工厂类 rejectHandle:拒绝策略,当线程个数达到最大线程数,同时任务队列满了。就会执行拒绝策略。拒绝策略有:AbortPolicy(直接抛出异常)、CallerRunsPolicy(调用者所在线程来执行任务)、DiscardOldestPolicy(从任务队列中移除一个待执行的任务(最先提交的),而后再次执行任务)、DiscardPolicy(直接抛弃任务) keepAliveTime:存活时间,当线程个数大于了核心线程数,且处于空闲状态,这些空闲线程可存活的最大时间
使用线程池时,咱们通常是调用 ThreadPoolExecutor.submit(task) 方法,直接把任务交给线程池去处理,而后返回给咱们一个 Future,后面能够经过 Future.get() 方法获取任务结果
public Future<?> submit(Runnable task) { // 任务为空,直接抛异常 if (task == null) throw new NullPointerException(); // 把任务封装成 RunnableFuture RunnableFuture<Void> ftask = newTaskFor(task, null); // 执行封装后的任务 execute(ftask); return ftask; }
newTaskFor 方法是把任务封装成一个 RunnableFuture 类,能够经过 get 方法获取结果
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } public FutureTask(Runnable runnable, V result) { // 这里把 Runnable 适配成 Callable 类型的任务,result 是当任务成功完成时返回的结果,若是须要特殊结 果,就用 null 就好了 this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); // 这里经过 RunnableAdapter 适配任务和任务的结果 return new RunnableAdapter<T>(task, result); } // 适配类 RunnableAdapter,这种写法,咱们能够借鉴下 static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
尝试把任务交给线程池执行,执行任务的线程多是新建的,也多是复用了线程池中的。若是线程池不能执行该任务(可能缘由有两个,1.线程池已经关闭了,2.线程达到了最大容量)就会执行拒绝策略
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps:留着原汁原味的总结 * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 获取表示线程池状态和线程个数的组合变量 ctl int c = ctl.get(); // 判断线程个数是否小于核心线程数,若是小于就新建一个核心线程来执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 若是线程池处于 RUNNAING 状态,就把任务添加到阻塞队列中(代码运行到这里说明要么线程个数>=核心线程数,要么执行 addWorder 方法失败) if (isRunning(c) && workQueue.offer(command)) { // 再次获取组合变量 ctl,作二次检查(由于可能在此以前,线程池的状态已经发生了改变) int recheck = ctl.get(); // 若是线程池状态不是 RNUUAING 状态,就把该任务从阻塞任务队列中移除,并执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 若是线程池中线程个数为 0,就新建一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是阻塞任务队列满了,新建线程,若是建立线程失败(即线程个数达到了最大线程个数),执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
当任务被提交到线程池时,判断运行的线程的个数是否小于 corePoolSize,若是小于就新建一个线程来处理这个任务即便有其余线程处于空闲状态。而后再来任务,若是核心线程有空闲的,就直接执行任务。若是核心线程都在忙,那么就把待执行的任务添加到任务队列中。微信
若是任务队列满了,且运行的线程个数大于 corePoolSize 且小于 maximumPoolSize,那就新建线程来执行任务。网络
首先会根据当前线程池的状态和线程数的边界(核心线程数仍是最大线程数)检查是否能够新建一个 worker 线程。若是能够就新建一个 worker 线程并启动,而后执行传过来的任务
/** * 建立新的worker * * @param firstTask 提交给线程的任务,要最早执行,能够为 null * @param core 若是为 true,表示以核心线程数为界建立线程 为 false 表示以最大线程数为界建立线程 * @return */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 获取 ctl int c = ctl.get(); // 获取线程池的状态 int rs = runStateOf(c); // 这里的判断条件有点多,拆成 rs>=SHUTDOWN 和 !(rs == SHUTDOWN && firstTask == null &&!workQueue.isEmpty()) // !(rs == SHUTDOWN && firstTask == null &&!workQueue.isEmpty()) 逆着考虑 ,以下: // rs!=SHUTDOWN 也就是为大于 shutdown,为 stop,tidying,terminated // firstTask != null // workQueue.isEmpty() // 若是线程池处于关闭状态,且知足下面条件之一的,不建立 worker // 线程池处于 stop,tidying,terminated 状态 // firstTask != null // workQueue.isEmpty() // 注意:若是线程池处于 shutdown,且 firstTask 为 null,同时队列不为空,容许建立 worker // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取工做线程数 int wc = workerCountOf(c); // 工做线程数大于最大容量或者工做线程数超过线程数的边界(根据 core 的值取不一样的值) 时 不建立worker if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 工做线程数 +1 经过 CAS // 这里若是失败,表示有并发操做 if (compareAndIncrementWorkerCount(c)) // 调出循环,执行真正的建立 worker 逻辑 break retry; // 由于存在并发,须要再读取 ctl 值进行状态判断 // Re-read ctl c = ctl.get(); // 若是线程状态发生了变化,回到外部循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 校验已经都经过,开始建立 worker // 是否已经启动了 worker boolean workerStarted = false; // 是否已经添加了 worker boolean workerAdded = false; Worker w = null; try { // 把 task 封装成 worker,经过线程工厂建立线程,最后会把任务设置到 Thread 的 target 属性上,后续在执行线程的 start 方法时,就会执行对应的任务的 run 方法 w = new Worker(firstTask); // 获取 worker 中的线程 final Thread t = w.thread; if (t != null) { // 获取对象锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 若是线程池处于 Running 状态 或者 线程池处于 shutdown 状态且任务为 null(执行任务队列中的任务) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // precheck that t is startable // 检查线程是否为启动状态,若是为启动状态抛异常 if (t.isAlive()) throw new IllegalThreadStateException(); // 把新建的 worker 添加到 worker 集中 workers.add(w); int s = workers.size(); // largestPoolSize 记录 workers 中个数存在过的最大值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 新建的 worker 添加成功就启动线程,后续有分析 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 线程没有启动成功,对上面建立线程的过程作回滚操做 if (! workerStarted) // 回滚操做,好比把 worker 从 workers 中移除,把线程数减一 addWorkerFailed(w); } return workerStarted; }
回滚以前的 worker 线程建立操做
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) // 从 workers 中移除 worker workers.remove(w); // 把线程数减一 decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
在介绍 addWorker 方法时,有一个逻辑是若是新建立的 woker 线程成功添加到 woker 线程集后,会调用线程的 start 方法,其实最后就会执行到 Worker 的 run 方法。由于 Woker 的构造器是经过线程工厂建立的线程,分析以下架构
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 这里默认的线程工厂是 DefaultThreadFactory this.thread = getThreadFactory().newThread(this); }
经过线程工厂建立线程,最后调用 Thread 的构造器,Thread(... Runnable target ...),要执行的任务做为 target 参数建立线程,而后调用 Thread.start 方法后会执行 run 方法,而后会执行 target.run,相似代理并发
@Override public void run() { if (target != null) { // 这个 target 就是建立线程时传递过来的那个任务 target.run(); } }
而后就能够执行咱们的 worker 的 run 方法了,run 方法又会调用 runWorker 方法,下面咱们看下 这个方法异步
不断从任务队列中取任务并执行
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 把 status 设置为 0,容许中断 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 从任务队列中获取任务并执行 while (task != null || (task = getTask()) != null) { // 这里加锁是为了不任务运行期间,其余线程调用 shutdown 方法关闭线程池中正在执行任务的线程 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 若是线程池状态大于或等于 stop,即 runStateAtLeast(ctl.get(), STOP) 为 true,这个时候就要确保线程是中断的 // 不用看||后面的条件,直接判断 !wt.isInterrupted(),由于线程池状态为暂停,要确保线程中断,若是没有中断,就要手动中断线程,即执行 wt.interrupt() // 若是线程池状态不是 stop,即 runStateAtLeast(ctl.get(), STOP) 为 false,就要确保线程没有中断,这样才能在后面执行任务 // 这时候须要看 || 后面的 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) ,由于要确保线程没有中断,调用Thread.interrupted()清除中断状态, // 这里须要再次进行验证线程池的状态,由于可能会有 shutdownNow 的状况 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 空方法体,子类能够实现,作一些特殊化处理工做 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 空方法体,子类能够实现,作一些特殊化处理工做 afterExecute(task, thrown); } } finally { task = null; // 统计当前 worker 完成了多少任务 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 执行清理工做 processWorkerExit(w, completedAbruptly); } }
在 runWorker 方法中,有个 getTask 方法,下面简单介绍下
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 知足下面两种状况任意一个就返回 null 同时把线程个数减 1 // 1.线程池已经处于关闭状态 // 2.线程池处于 shutdown,且队列为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 判断线程是否有时效性,前面说过,若是把 allowCoreThreadTimeOut 设为 false,那么核心线程数之内的线程是不会关闭的。若是设为 true 就只会存活 keepAliveTime 这么长时间 // 若是线程数大于核心线程数的线程都有时效性 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 这个判断逻辑就是下面方法总结的第①点和第④点(这段代码我看了足足有半个小时,结合方法的注释终于弄明白了,感受本身好笨) // 超时且超时的 worker 线程须要终止。 // 若是任务队列非空,要保证当前 worker 线程不是线程池中最后一个线程(若是任务为空,当前线程是线程池中的最后一个线程也无妨,毕竟任务队列为空,当前 worker 线程关闭就关闭了,没影响) // 这里的判断条件能够当作 if (wc > maximumPoolSize || ((timed && timedOut) && (wc > 1 || workQueue.isEmpty())) if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 从队列中取任务,分为有超时限制和非超时限制两种状况 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { // 这里会抛出中断异常是由于可能会调用 setMaximumPoolSize 方法,把线程的最大数设置小了,那可能存在当前线程数大于新的最大线程数 // 这样就得关闭多余的线程,因此从新进入 for 循环,并返回 null timedOut = false; } } }
1.返回 task
2.返回 null,这种状况是该 worker 线程须要退出,由于线程的数量减小了,发生这种状况的可能缘由有以下 4 个
①线程池中的线程个数大于最大线程数了(由于能够经过 setMaximumPoolSize 方法进行设置)
②线程池关闭了【既拒绝新任务,又不执行任务队列中的任务】
③线程池处于 shutdown,同时任务队列为空【拒绝新任务】
④超时且超时的 worker 线程须要终止。若是任务队列非空,要保证当前 worker 线程不是线程池中最后一个线程(若是任务为空,当前线程是线程池中的最后一个线程也无妨,毕竟任务队列为空,当前 worker 线程关闭就关闭了,没影响)
runWorker 方法中还有个执行清理工做的 processWorkerExit 方法,下面简单介绍下
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 若是是忽然完成,须要调整线程数 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 计算线程池完成的任务个数,并从 worke r线程集中删除当前 worker 线程 completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } // 尝试把线程池的状态设置为 TERMINATED,该方法在后面分析 tryTerminate(); int c = ctl.get(); // 线程池状态至少为 STOP if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } // 新建 worker 线程的条件为:当前线程数小于核心线程数或者任务队列不为空但没有运行的线程了(容许核心线程超时的状况下) addWorker(null, false); } }
final void tryTerminate() { for (;;) { int c = ctl.get(); // 若是处于下面三种任意一种状况,就不能把线程池的状态设为 TERMINATED if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 代码执行到这里,说明有资格终止了。可是若是这个时候线程个数非 0,就中断一个空闲的线程来确保 shutdown 信号传播 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 设置线程池状态为 TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { // 设置线程池状态为 TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // 激活调用线程池中因调用 awaitTermination 系列方法而阻塞的线程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
若是认真结合源码看这篇文章到这里的话,应该对线程池的执行原理有点小感受了吧?下面剩下最后的内容了,就是线程池关闭方法
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 权限校验 checkShutdownAccess(); // 设置线程池状态为 SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试设置线程池状态为 TERMINATED tryTerminate(); }
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 权限校验 checkShutdownAccess(); // 设置线程池状态为 STOP advanceRunState(STOP); // 中断全部线程 interruptWorkers(); // 将任务队列中的任务移动到 tasks 中 tasks = drainQueue(); } finally { mainLock.unlock(); } // 尝试设置线程池状态为 TERMINATED tryTerminate(); return tasks; }
这篇文章是假期写的,算是对线程池这块知识的一个小梳理,固然里面也许有问题,白牙但愿你能批判的继承,发现问题能够去公众号【天天晒白牙】留言指出,也能够加我微信【dingaiminIT】,咱们一块儿交流讨论。
三面阿里被挂,幸获内推名额,历经 5 面终获口碑 offer
老年代又占用100%了,顺便发现了vertx-redis-client 的bug
- https://juejin.im/entry/59b23...
- 《Java异步编程实战》
欢迎关注公众号 【天天晒白牙】,获取最新文章,咱们一块儿交流,共同进步!