(手机横屏看源码更方便)java
注:java源码分析部分如无特殊说明均基于 java8 版本。ide
注:线程池源码部分如无特殊说明均指ThreadPoolExecutor类。oop
前面咱们一块儿学习了Java中线程池的体系结构、构造方法和生命周期,本章咱们一块儿来学习线程池中普通任务究竟是怎么执行的。源码分析
建议学习本章前先去看看彤哥以前写的《死磕 java线程系列之本身动手写一个线程池》那两章,有助于理解本章的内容,且那边的代码比较短小,学起来相对容易一些。学习
(1)线程池中的普通任务是怎么执行的?this
(2)任务又是在哪里被执行的?线程
(3)线程池中有哪些主要的方法?翻译
(4)如何使用Debug模式一步一步调试线程池?debug
咱们建立一个线程池,它的核心数量为5,最大数量为10,空闲时间为1秒,队列长度为5,拒绝策略打印一句话。调试
若是使用它运行20个任务,会是什么结果呢?
public class ThreadPoolTest01 { public static void main(String[] args) { // 新建一个线程池 // 核心数量为5,最大数量为10,空闲时间为1秒,队列长度为5,拒绝策略打印一句话 ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(currentThreadName() + ", discard task"); } }); // 提交20个任务,注意观察num for (int i = 0; i < 20; i++) { int num = i; threadPool.execute(()->{ try { System.out.println(currentThreadName() + ", "+ num + " running, " + System.currentTimeMillis()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } private static String currentThreadName() { return Thread.currentThread().getName(); } }
构造方法的7个参数咱们就不详细解释了,有兴趣的能够看看《死磕 java线程系列之线程池深刻解析——构造方法》那章。
咱们一块儿来看看一次运行的结果:
pool-1-thread-1, 0 running, 1572678434411 pool-1-thread-3, 2 running, 1572678434411 pool-1-thread-2, 1 running, 1572678434411 pool-1-thread-4, 3 running, 1572678434411 pool-1-thread-5, 4 running, 1572678434411 pool-1-thread-6, 10 running, 1572678434412 pool-1-thread-7, 11 running, 1572678434412 pool-1-thread-8, 12 running, 1572678434412 main, discard task main, discard task main, discard task main, discard task main, discard task // 【本文由公从号“彤哥读源码”原创】 pool-1-thread-9, 13 running, 1572678434412 pool-1-thread-10, 14 running, 1572678434412 pool-1-thread-3, 5 running, 1572678436411 pool-1-thread-1, 6 running, 1572678436411 pool-1-thread-6, 7 running, 1572678436412 pool-1-thread-2, 8 running, 1572678436412 pool-1-thread-7, 9 running, 1572678436412
注意,观察num值的打印信息,先是打印了0~4,再打印了10~14,最后打印了5~9,居然不是按顺序打印的,为何呢?
让咱们一步一步debug进去查看。
execute()方法是线程池提交任务的方法之一,也是最核心的方法。
// 提交任务,任务并不是当即执行,因此翻译成执行任务彷佛不太合适 public void execute(Runnable command) { // 任务不能为空 if (command == null) throw new NullPointerException(); // 控制变量(高3位存储状态,低29位存储工做线程的数量) int c = ctl.get(); // 1. 若是工做线程数量小于核心数量 if (workerCountOf(c) < corePoolSize) { // 就添加一个工做线程(核心) if (addWorker(command, true)) return; // 从新获取下控制变量 c = ctl.get(); } // 2. 若是达到了核心数量且线程池是运行状态,任务入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次检查线程池状态,若是不是运行状态,就移除任务并执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 容错检查工做线程数量是否为0,若是为0就建立一个 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3. 任务入队列失败,尝试建立非核心工做线程 else if (!addWorker(command, false)) // 非核心工做线程建立失败,执行拒绝策略 reject(command); }
关于线程池状态的内容,咱们这里不拿出来细讲了,有兴趣的能够看看《死磕 java线程系列之线程池深刻解析——生命周期》那章。
提交任务的过程大体以下:
(1)工做线程数量小于核心数量,建立核心线程;
(2)达到核心数量,进入任务队列;
(3)任务队列满了,建立非核心线程;
(4)达到最大数量,执行拒绝策略;
其实,就是三道坎——核心数量、任务队列、最大数量,这样就比较好记了。
流程图大体以下:
任务流转的过程咱们知道了,可是任务是在哪里执行的呢?继续往下看。
这个方法主要用来建立一个工做线程,并启动之,其中会作线程池状态、工做线程数量等各类检测。
private boolean addWorker(Runnable firstTask, boolean core) { // 判断有没有资格建立新的工做线程 // 主要是一些状态/数量的检查等等 // 这段代码比较复杂,能够先跳过 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 线程池状态检查 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 工做线程数量检查 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 数量加1并跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 若是上面的条件知足,则会把工做线程数量加1,而后执行下面建立线程的动做 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 建立工做线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次检查线程池的状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 添加到工做线程队列 workers.add(w); // 还在池子中的线程数量(只能在mainLock中使用) int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 标记线程添加成功 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 线程添加成功以后启动线程 t.start(); workerStarted = true; } } } finally { // 线程启动失败,执行失败方法(线程数量减1,执行tryTerminate()方法等) if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
这里其实还没到任务执行的地方,上面咱们能够看到线程是包含在Worker这个类中的,那么,咱们就跟踪到这个类中看看。
Worker内部类能够看做是对工做线程的包装,通常地,咱们说工做线程就是指Worker,但其实是指其维护的Thread实例。
// Worker继承自AQS,自带锁的属性 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 真正工做的线程 final Thread thread; // 第一个任务,从构造方法传进来 Runnable firstTask; // 完成任务数 volatile long completedTasks; // 构造方法// 【本文由公从号“彤哥读源码”原创】 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 使用线程工厂生成一个线程 // 注意,这里把Worker自己做为Runnable传给线程 this.thread = getThreadFactory().newThread(this); } // 实现Runnable的run()方法 public void run() { // 调用ThreadPoolExecutor的runWorker()方法 runWorker(this); } // 省略锁的部分 }
这里要可以看出来工做线程Thread启动的时候实际是调用的Worker的run()方法,进而调用的是ThreadPoolExecutor的runWorker()方法。
runWorker()方法是真正执行任务的地方。
final void runWorker(Worker w) { // 工做线程 Thread wt = Thread.currentThread(); // 任务 Runnable task = w.firstTask; w.firstTask = null; // 强制释放锁(shutdown()里面有加锁) // 这里至关于无视那边的中断标记 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 取任务,若是有第一个任务,这里先执行第一个任务 // 只要能取到任务,这就是个死循环 // 正常来讲getTask()返回的任务是不可能为空的,由于前面execute()方法是有空判断的 // 那么,getTask()何时才会返回空任务呢? while (task != null || (task = getTask()) != null) { w.lock(); // 检查线程池的状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 钩子方法,方便子类在任务执行前作一些处理 beforeExecute(wt, task); Throwable thrown = null; try { // 真正任务执行的地方 task.run(); // 异常处理 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 钩子方法,方便子类在任务执行后作一些处理 afterExecute(task, thrown); } } finally { // task置为空,从新从队列中取 task = null; // 完成任务数加1 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 到这里确定是上面的while循环退出了 processWorkerExit(w, completedAbruptly); } }
这个方法比较简单,忽略状态检测和锁的内容,若是有第一个任务,就先执行之,以后再从任务队列中取任务来执行,获取任务是经过getTask()来进行的。
从队列中获取任务的方法,里面包含了对线程池状态、空闲时间等的控制。
private Runnable getTask() { // 是否超时 boolean timedOut = false; // 死循环 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 线程池状态是SHUTDOWN的时候会把队列中的任务执行完直到队列为空 // 线程池状态是STOP时当即退出 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 工做线程数量// 【本文由公从号“彤哥读源码”原创】 int wc = workerCountOf(c); // 是否容许超时,有两种状况: // 1. 是容许核心线程数超时,这种就是说全部的线程均可能超时 // 2. 是工做线程数大于了核心数量,这种确定是容许超时的 // 注意,非核心线程是必定容许超时的,这里的超时实际上是指取任务超时 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 超时判断(还包含一些容错判断) if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 超时了,减小工做线程数量,并返回null if (compareAndDecrementWorkerCount(c)) return null; // 减小工做线程数量失败,则重试 continue; } try { // 真正取任务的地方 // 默认状况下,只有当工做线程数量大于核心线程数量时,才会调用poll()方法触发超时调用 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 取到任务了就正常返回 if (r != null) return r; // 没取到任务代表超时了,回到continue那个if中返回null timedOut = true; } catch (InterruptedException retry) { // 捕获到了中断异常 // 中断标记是在调用shutDown()或者shutDownNow()的时候设置进去的 // 此时,会回到for循环的第一个if处判断状态是否要返回null timedOut = false; } } }
注意,这里取任务会根据工做线程的数量判断是使用BlockingQueue的poll(timeout, unit)方法仍是take()方法。
poll(timeout, unit)方法会在超时时返回null,若是timeout<=0,队列为空时直接返回null。
take()方法会一直阻塞直到取到任务或抛出中断异常。
因此,若是keepAliveTime设置为0,当任务队列为空时,非核心线程取不出来任务,会当即结束其生命周期。
默认状况下,是不容许核心线程超时的,可是能够经过下面这个方法设置使核心线程也可超时。
public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); if (value != allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value; if (value) interruptIdleWorkers(); } }
至此,线程池中任务的执行流程就结束了。
观察num值的打印信息,先是打印了0~4,再打印了10~14,最后打印了5~9,居然不是按顺序打印的,为何呢?
线程池的参数:核心数量5个,最大数量10个,任务队列5个。
答:执行前5个任务执行时,正好还不到核心数量,因此新建核心线程并执行了他们;
执行中间的5个任务时,已达到核心数量,因此他们先入队列;
执行后面5个任务时,已达核心数量且队列已满,因此新建非核心线程并执行了他们;
再执行最后5个任务时,线程池已达到满负荷状态,因此执行了拒绝策略。
本章经过一个例子并结合线程池的重要方法咱们一块儿分析了线程池中普通任务执行的流程。
(1)execute(),提交任务的方法,根据核心数量、任务队列大小、最大数量,分红四种状况判断任务应该往哪去;
(2)addWorker(),添加工做线程的方法,经过Worker内部类封装一个Thread实例维护工做线程的执行;
(3)runWorker(),真正执行任务的地方,先执行第一个任务,再源源不断从任务队列中取任务来执行;
(4)getTask(),真正从队列取任务的地方,默认状况下,根据工做线程数量与核心数量的关系判断使用队列的poll()仍是take()方法,keepAliveTime参数也是在这里使用的。
核心线程和非核心线程有什么区别?
答:实际上并无什么区别,主要是根据corePoolSize来判断任务该去哪里,二者在执行任务的过程当中并无任何区别。有可能新建的时候是核心线程,而keepAliveTime时间到告终束了的也多是刚开始建立的核心线程。
Worker继承自AQS有何意义?
前面咱们看了Worker内部类的定义,它继承自AQS,天生自带锁的特性,那么,它的锁是用来干什么的呢?跟任务的执行有关系吗?
答:既然是跟锁(同步)有关,说明Worker类跨线程使用了,此时咱们查看它的lock()方法发现只在runWorker()方法中使用了,可是其tryLock()倒是在interruptIdleWorkers()方法中使用的。
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
interruptIdleWorkers()方法的意思是中断空闲线程的意思,它只会中断BlockingQueue的poll()或take()方法,而不会中断正在执行的任务。
通常来讲,interruptIdleWorkers()方法的调用不是在本工做线程,而是在主线程中调用的,还记得《死磕 java线程系列之线程池深刻解析——生命周期》中说过的shutdown()和shutdownNow()方法吗?
观察两个方法中中断线程的方法,shutdown()中就是调用了interruptIdleWorkers()方法,这里tryLock()获取到锁了再中断,若是没有获取到锁则不中断,没获取到锁只有一种状况,也就是lock()所在的地方,也就是有任务正在执行。
而shutdownNow()中中断线程则很暴力,并无tryLock(),而是直接中断了线程,因此调用shutdownNow()可能会中断正在执行的任务。
因此,Worker继承自AQS实际是要使用其锁的能力,这个锁主要是用来控制shutdown()时不要中断正在执行任务的线程。
欢迎关注个人公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一块儿畅游源码的海洋。