线程池通常包含三个主要部分:java
调度器: 决定由哪一个线程来执行任务, 执行任务所可以的最大耗时等安全
线程队列: 存放并管理着一系列线程, 这些线程都处于阻塞状态或休眠状态多线程
任务队列: 存放着用户提交的须要被执行的任务. 通常任务的执行 FIFO 的, 即先提交的任务先被执行ide
调度器并不是是必须的, 例如 Java 中实现的 ThreadPoolExecutor 就没有调度器, 而是全部的线程都不断从任务队列中取出任务, 而后执行.
线程池模型能够用下图简单地表示:oop
ThreadPoolExecutor 有两个参数用于控制线程池中线程的个数: corePoolSize
和 maximumPoolSize
, 根据这两个参数, ThreadPoolExecutor 会自适应地调整线程个数, 以适应不一样的任务数.
当咱们经过 execute(Runnable)
提交一个任务时:ui
若是此时线程池中线程个数小于 corePoolSize
, 则此任务不会插入到任务队列中, 而是直接建立一个新的线程来执行此任务, 即便当前线程池中有空闲的线程.this
若是线程数大于 corePoolSize
可是小于 maximumPoolSize
:spa
若是任务队列还未满, 则会将此任务插入到任务队列末尾;线程
若是此时任务队列已满, 则会建立新的线程来执行此任务.code
若是线程数等于 maximumPoolSize
:
若是任务队列还未满, 则会将此任务插入到任务队列末尾;
若是此时任务队列已满, 则会又 RejectedExecutionHandler
处理, 默认状况下是抛出 RejectedExecutionException
异常.
在建立一个线程池时, 咱们能够指定线程池中的线程的最大空闲(Idle)时间, 线程池会根据咱们设置的这个值来动态的减小没必要要的线程, 释放系统资源.
当咱们的线程池中的线程数大于 corePoolSize
时, 若是此时有线程处于空闲(Idle)状态超过指定的时间(keepAliveTime
), 那么线程池会将此线程销毁.
工做队列(WorkQueue)
是 一个 BlockingQueue
, 它时用于存放那些已经提交的, 可是尚未空余线程来执行的任务. 例如咱们在前面 线程池大小 一节中讨论的状况, 若是当前的线程数大于 corePoolSize
而且工做队列的还有剩余空间, 那么新提交的任务就会先放到工做队列中.
根据 Java Docs, 有三种常见的工做队列的使用场景:
直接切换(Direct handoffs): 一个不错而且是默认的工做队列的选择时
无界队列(Unbounded queues)
有界队列(Bounded queues)
由于线程池中维护有一个工做队列, 咱们天然地会想到, 当线程池中的工做队列满了, 不能再添加新的任务了, 此时线程池会怎么处理呢?
通常来讲, 当咱们提交一个任务到线程池中, 若是此时线程池不能再添加任务了, 那么一般会返回一个错误, 或者是调用咱们预先设置的一个错误处理 handler, 例如在 Java ThreadPoolExecutor 中, 咱们能够经过以下方式实例化一个带有任务提交失败 handler 的线程池:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("Task " + r.toString() + " failed!"); } });
ThreadPoolExecutor 中有一个名为 ctl
的字段, 它是一个 AtomicInteger 类型, ThreadPoolExecutor 复用了此字段来表示两个信息:
当前活跃的线程数
线程池状态
ctl
是一个 AtomicInteger 类型, 它的 低29位
用于存放当前的线程数, 所以一个线程池在理论上最大的线程数是 536870911
; 高 3 位是用于表示当前线程池的状态, 其中高三位的值和状态对应以下:
111: RUNNING
000: SHUTDOWN
001: STOP
010: TIDYING
110: TERMINATED
前面咱们提到, 一个线程池中有 corePoolSize
, maximumPoolSize
, keepAliveTime
, workQueue
之类的概念, 这些属性咱们必须在实例化线程池时经过构造器传入. Java 线程池实现类 ThreadPoolExecutor
中提供了很多构造方法, 咱们来看一下其中两个经常使用的构造器:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
能够看到, 在实例化一个 ThreadPoolExecutor
线程池时, 咱们须要指定一些线程池的基本属性, 而且可选地, 咱们还能够指定当任务提交失败时的处理 handler.
例如咱们能够经过以下方式实例化一个带有任务提交失败 handler 的线程池:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("Task " + r.toString() + " failed!"); } });
固然, 除了上述使用构造器来直接建立线程池, Java 还提供了几个简便地建立线程池的方法:
Executors.newCachedThreadPool
Executors.newFixedThreadPool
Executors.newWorkStealingPool
Executors.newSingleThreadExecutor
Executors.newScheduledThreadPool
例如咱们想建立一个有五个线程的线程池, 那么能够调用 Executors.newFixedThreadPool
, 这个方法等效于:
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
提交任务到线程池中比较简单, 若是是 ThreadPoolExecutor
类型的线程池, 咱们直接调用它的 execute 方法便可, 例如:
ExecutorService executorService = ... executorService.execute(new Runnable() { @Override public void run() { System.out.println("OK, thread name: " + Thread.currentThread().getName()); } });
若是咱们获取到一个 ScheduledThreadPoolExecutor
类型的线程池, 那么除了调用 execute 方法外, 咱们还能够经过调用 schedule
方法提交一个定时任务, 例如:
ScheduledExecutorService executorService = xxx executorService.schedule(new Runnable() { @Override public void run() { System.out.println("OK, thread name: " + Thread.currentThread().getName()); } }, 1, TimeUnit.SECONDS);
上面代代码就会在1秒后执行咱们的定时任务.
Java 线程池提供了两个方法用于关闭一个线程池, 一个是 shutdownNow()
, 另外一个是 shutdown()
. 咱们能够看一下这两个方法的签名:
void shutdown(); List<Runnable> shutdownNow();
这两个方法除了名字不同外(废话), 它们的返回值也不太同样.
那么这两个方法到底有什么区别呢? 它们的区别有:
当线程池调用该方法时,线程池的状态则马上变成 SHUTDOWN
状态. 咱们不能再往线程池中添加任何任务, 不然将会抛出RejectedExecutionException异常; 可是, 此时线程池不会马上退出, 直到添加到线程池中的任务都已经处理完成后才会退出.
当执行该方法, 线程池的状态马上变成STOP状态, 并试图中止全部正在执行的线程, 再也不处理还在池队列中等待的任务, 并以返回值的形式返回那些未执行的任务.
此方法会经过调用 Thread.interrupt()
方法来试图中止正在运行的 Worker 线程, 可是这种方法的做用有限, 若是线程中没有 sleep 、wait、Condition、定时锁
等操做时, interrupt() 方法是没法中断当前的线程的. 因此, ShutdownNow() 并不表明线程池就必定当即就能退出
, 可能必需要等待全部正在执行的任务都执行完成了才能退出.
废话了一大堆, 咱们来看一下具体的例子吧:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); executorService.schedule(new Runnable() { @Override public void run() { System.out.println("OK, thread name: " + Thread.currentThread().getName()); } }, 1, TimeUnit.SECONDS); // 调用此方法关闭线程时, 咱们提交的定时任务不会被执行 // executorService.shutdownNow(); executorService.shutdown();
能够看到, 若是咱们调用的是 executorService.shutdownNow()
, 那么原先提交的未执行的定时任务并不会再被执行, 可是若是咱们调用的是 executorService.shutdown()
, 那么此调用会阻塞住, 直到全部提交的任务都执行完毕才会返回.
在开始深刻了解 ThreadPoolExecutor
代码以前, 咱们先来简单地看一下 ThreadPoolExecutor
类中到底有哪些重要的字段.
public class ThreadPoolExecutor extends AbstractExecutorService { // 这个是一个复用字段, 它复用地表示了当前线程池的状态, 当前线程数信息. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 用于存放提交到线程池中, 可是还未执行的那些任务. private final BlockingQueue<Runnable> workQueue; // 线程池内部锁, 对线程池内部操做加锁, 防止竞态条件 private final ReentrantLock mainLock = new ReentrantLock(); // 一个 Set 结构, 包含了当前线程池中的全部工做线程. // 对 workers 字段的操做前, 须要获取到这个锁. private final HashSet<Worker> workers = new HashSet<Worker>(); // 条件变量, 用于支持 awaitTermination 操做 private final Condition termination = mainLock.newCondition(); // 记录线程池中曾经到达过的最大的线程数. // 这个字段在获取 mainLock 锁的前提下才能操做. private int largestPoolSize; // 记录已经完成的任务数. 仅仅当工做线程结束时才更新此字段. // 这个字段在获取 mainLock 锁的前提下才能操做. private long completedTaskCount; // 线程工厂. 当须要一个新的线程时, 这里生成. private volatile ThreadFactory threadFactory; // 任务提交失败后的处理 handler private volatile RejectedExecutionHandler handler; // 空闲线程的等待任务时间, 以纳秒为单位. // 当当前线程池中的线程数大于 corePoolSize 时, // 或者 allowCoreThreadTimeOut 为真时, 线程才有 idle 等待超时时间, // 若是超时则此线程会中止.; // 反之线程会一直等待新任务到来. private volatile long keepAliveTime; // 默认为 false. // 当为 false 时, keepAliveTime 不起做用, 线程池中的 core 线程会一直存活, // 即便这些线程是 idle 状态. // 当为 true 时, core 线程使用 keepAliveTime 做为 idle 超时 // 时间来等待新的任务. private volatile boolean allowCoreThreadTimeOut; // 核心线程数. private volatile int corePoolSize; // 最大线程数. private volatile int maximumPoolSize; }
ThreadPoolExecutor 中, 使用到 ctl
这个字段来维护线程池中当前线程数和线程池的状态. ctl
是一个 AtomicInteger 类型, 它的 低29位
用于存放当前的线程数, 所以一个线程池在理论上最大的线程数是 536870911
; 高 3 位是用于表示当前线程池的状态, 其中高三位的值和状态对应以下:
111: RUNNING
000: SHUTDOWN
001: STOP
010: TIDYING
110: TERMINATED
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
上面的代码有三个步骤, 首先第一步是检查当前线程池的线程数是否小于 corePoolSize, 若是小于, 那么由咱们前面提到的规则, 线程池会建立一个新的线程来执行此任务, 所以在第一个 if 语句中, 会调用 addWorker(command, true)
来建立一个新 Worker 线程, 并执行此任务. addWorker 的第二个参数是一个 boolean 类型的, 它的做用是用于标识是否须要使用 corePoolSize 字段, 若是它为真, 则添加新任务时, 须要考虑到 corePoolSize 字段的影响. 这里至于 addWorker
内部的实现细节咱们暂且无论, 先把整个提交任务的大致脉络理清了再说.
若是前面的判断不知足, 那么会将此任务插入到工做队列中, 即 workQueue.offer(command)
. 固然, 为了健壮性考虑, 当插入到 workQueue 后, 咱们还须要再次检查一下此时线程池是否仍是 RUNNING
状态, 若是不是的话就会将原来插入队列中的那个任务删除, 而后调用 reject 方法拒绝此任务的提交; 接着考虑到在咱们插入任务到 workQueue 中的同时, 若是此时线程池中的线程都执行完毕并终止了, 在这样的状况下刚刚插入到 workQueue 中的任务就永远不会获得执行了. 为了不这样的状况, 所以咱们由再次检查一下线程池中的线程数, 若是为零, 则调用 addWorker(null, false)
来添加一个线程.
若是前面所分析的状况都不知足, 那么就会进入到第三个 if 判断, 在这里会调用 addWorker(command, false)
来将此任务提交到线程池中. 注意到这个方法的第二个参数是 false
, 表示咱们在这次调用 addWorker 时, 不考虑 corePoolSize 的影响, 即忽略 corePoolSize 字段.
前面咱们大致分析了一下 execute
提交任务的流程, 不过省略了一个关键步骤, 即 addWorker
方法. 如今咱们就来揭开它的神秘面纱吧.
首先看一下 addWorker 方法的签名:
private boolean addWorker(Runnable firstTask, boolean core)
这个方法接收两个参数, 第一个是一个 Runnable 类型的, 通常来讲是咱们调用 execute 方法所传输的参数, 不过也有多是 null 值, 这样的状况咱们在前面一小节中也见到过.
那么第二个参数是作什么的呢? 第二个参数是一个 boolean 类型的变量, 它的做用是标识是否使用 corePoolSize 属性. 咱们知道, ThreadPoolExecutor 中, 有一个 corePoolSize 属性, 用于动态调整线程池中的核心线程数. 那么当 core 这个参数是 true 时, 则表示在添加新任务时, 须要考虑到 corePoolSzie 的影响(例如若是此时线程数已经大于 corePoolSize 了, 那么就不能再添加新线程了); 当 core 为 false 时, 就不考虑 corePoolSize 的影响(其实代码中是以 maximumPoolSize 做为 corePoolSize 来作判断条件的), 一有新任务, 就对应地生成一个新的线程.
说了这么多, 还不如来看一下 addWorker
的源码吧:
private boolean addWorker(Runnable firstTask, boolean core) { // 这里一大段的 for 语句, 其实就是判断和处理 core 参数的. // 当通过判断, 若是当前的线程大于 corePoolSize 或 maximumPoolSize 时(根据 core 的值来判断), // 则表示不能新建新的 Worker 线程, 此时返回 false. retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 当 core 为真, 那么就判断当前线程是否大于 corePoolSize // 当 core 为假, 那么就判断当前线程数是否大于 maximumPoolSize // 这里的 for 循环是一个自旋CAS(CompareAndSwap)操做, 用于确保多线程环境下的正确性 if (wc >= CAPACITY || wc >= (core ? corePoolSize : ma)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
首先在 addWorker 的一开始, 有一个 for 循环, 用于判断当前是否能够添加新的 Worker 线程. 它的逻辑以下:
若是传入的 core 为真, 那么判断当前的线程数是否大于 corePoolSize, 若是大于, 则不能新建 Worker 线程, 返回 false.
若是传入的 core 为假, 那么判断当前的线程数是否大于 maximumPoolSize, 若是大于, 则不能新建 Worker 线程, 返回 false.
若是条件符合, 那么在 for 循环内, 又有一个自旋CAS 更新逻辑, 用于递增当前的线程数, 即 compareAndIncrementWorkerCount(c)
, 这个方法会原子地更新 ctl 的值, 将当前线程数的值递增一.
addWorker
接下来有一个 try...finally
语句块, 这里就是实际上的建立线程、启动线程、添加线程到线程池中的工做了.
首先能够看到 w = new Worker(firstTask);
这里是实例化一个 Worker 对象, 这个类其实就是 ThreadPoolExecutor 中对工做线程的封装. Worker 类继承于 AbstractQueuedSynchronizer
并实现了 Runnable
接口, 咱们来看一下它的构造器:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
它会把咱们提交的任务(firstTask) 设置为本身的内部属性 firstTask, 而后呢, 使用 ThreadPoolExecutor 中的 threadFactory
来建立一个新的线程, 并保存在 thread 字段中, 并且注意到, 建立线程时, 咱们传递给新线程城的 Runnable 实际上是 Worker 对象自己(this), 所以当这个线程启动时, 实际上运行的是 Worker.run() 中的代码.
回过头来再看一下 addWorker 方法. 当建立好 Worker 线程后, 就会将这个 worker 线程存放在 workers
这个 HashSet<Worker>
类型的字段中. 并且注意到, 正如咱们在前面所提到的, mainLock
是 ThreadPoolExecutor 的内部锁, 咱们对 ThreadPoolExecutor 中的字段进行操做时, 为了保证线程安全, 所以都须要在获取到 mainLock
的前提下才能操做的.
最后别忘啦, 新建了一个线程后, 须要调用它的 start()
方法后, 这个线程才真正地运行, 所以咱们能够看到, 在 addWorker 方法的最后, 调用了 t.start();
来启动这个新建的线程.
咱们已经分析了工做线程的建立和任务插入到 wokerQuque 的过程, 那么根据本文最开头的线程池工做模型可知, 光有工做线程和工做队列还不行啊, 还须要有一个调度器, 把任务和工做线程关联起来才是一个真正的线程池.
在 ThreadPoolExecutor 中, 调度器的实现很简单, 其实就是每一个工做线程在执行完一个任务后, 会再次中 workQueue 中拿出下一个任务, 若是获取到了任务, 那么就再次执行.
咱们来看一下具体的代码实现吧.
在前面一小节中, 咱们讲到 addWorker 中会新建一个 Worker 对象来表明一个 worker 线程, 接着会调用线程的 start()
来启动这个线程, 咱们也提到了当启动这个线程后, 会运行到 Worker 中的 run 方法, 那么这里咱们就来看一下 Worker.run
有什么玄机吧:
public void run() { runWorker(this); }
Worker.run
方法很简单, 只是调用了 ThreadPoolExecutor.runWorker
方法而已.
runWorker 方法比较关键, 它是整个线程池任务分配的核心:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
runWorker
方法是整个工做线程的核心循环, 在这个循环中, 工做线程会不断的从 workerQuque 中获取新的 task, 而后执行它.
咱们注意到在 runWorker
一开始, 有一个 w.unlock();
, 咦, 这是为何呢? 其实这是 Worker 类玩的一个小把戏. 回想一下, Worker 类继承于 AbstractQueuedSynchronizer
并实现了 Runnable
接口, 它的构造器以下:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
setState(-1);
方法是 AbstractQueuedSynchronizer
提供的, 初始化 Worker 时, 会先设置 state 为 -1
, 根据注释, 这样作的缘由是为了抑制工做线程的 interrupt 信号, 直到此工做线程正是开始执行 task. 那么在 addWorker
中的 w.unlock();
就是容许 Worker 的 interrupt 信号.
接着在 addWorker
中会进入一个 while 循环, 在这里此工做线程会不断地从 workQueue 中取出一个任务, 而后调用 task.run()
来执行这个任务, 所以就执行到了用户所提交的 Runnable 中的 run()
方法了.
工做线程的 idle 超出处理在底层依赖于 BlockingQueue 带超时的 poll 方法, 即工做线程会不断地从 workQueue 这个 BlockingQueue 中获取任务, 若是 allowCoreThreadTimeOut
字段为 true, 或者当前的工做线程数大于 corePoolSize, 那么线程的 idle 超时机制就生效了, 此时工做线程会以带超时的 poll 方式从 workQueue 中获取任务. 当超时了尚未获取到任务, 那么咱们就知道此线程一个到达 idle 超时时间, 所以终止此工做线程.
具体源码以下:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
从源码中就能够看到, 一开始会判断当前的线程池状态, 若是不是 SHUTDOWN
或 STOP
之类的状态, 那么接着获取当前的工做线程数, 而后判断工做线程数量是否已经大于了 corePoolSize. 当 allowCoreThreadTimeOut
字段为 true, 或者当前的工做线程数大于 corePoolSize, 那么线程的 idle 超时机制就生效, 此时工做线程会以带超时的 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
方式从 workQueue 中获取任务; 反之会以 workQueue.take()
方式阻塞等待任务, 直到获取一个新的任务.
当从 workQueue 获取新任务超时时, 那么就会调用 compareAndDecrementWorkerCount
将当前的工做线程数减一, 并返回 null. getTask 方法返回 null 后, 那么 runWorker 中的 while 循环天然也就结束了, 所以也致使了 runWorker 方法的返回, 最后天然整个工做线程的 run()
方法执行完毕, 工做线程天然就终止了.