在关闭线程池章节中,查看源码实际上会发现线程池有许多状态:java
/** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * * In order to pack them into one int, we limit workerCount to * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 * billion) otherwise representable. If this is ever an issue in * the future, the variable can be changed to be an AtomicLong, * and the shift/mask constants below adjusted. But until the need * arises, this code is a bit faster and simpler using an int. * * The workerCount is the number of workers that have been * permitted to start and not permitted to stop. The value may be * transiently different from the actual number of live threads, * for example when a ThreadFactory fails to create a thread when * asked, and when exiting threads are still performing * bookkeeping before terminating. The user-visible pool size is * reported as the current size of the workers set. * * The runState provides the main lifecycle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed * * The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TIDYING * When both queue and pool are empty * STOP -> TIDYING * When pool is empty * TIDYING -> TERMINATED * When the terminated() hook method has completed * * Threads waiting in awaitTermination() will return when the * state reaches TERMINATED. * * Detecting the transition from SHUTDOWN to TIDYING is less * straightforward than you'd like because the queue may become * empty after non-empty and vice versa during SHUTDOWN state, but * we can only terminate if, after seeing that it is empty, we see * that workerCount is 0 (which sometimes entails a recheck -- see * below). */
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码
将注释翻译下来就是对着几个线程池状态的具体描述:git
其中五个状态:程序员
RUNNING:接收新的任务,处理队列中的任务;github
SHUTDOWN:不接收新的任务,但处理队列中的任务;编程
STOP:不接收新的任务,不处理队列中的任务,中断正在执行的任务;数组
TIDYING:全部任务都终止,有效线程数为0, 线程过分到TIDYING时会调用terminated钩子方法;缓存
TERMINATED:terminated()方法执行完毕后进入该状态;安全
状态之间的转换:markdown
RUNNING -> SHUTDOWN:调用shutdown方法;网络
(RUNNING or SHUTDOWN) -> STOP:调用shutdownNow方法;
SHUTDOWN -> TIDYING:当线程池和任务队列都为空(队列中没有未执行的任务了,而且全部线程都完成了工做处于赋闲状态);
STOP -> TIDYING:当线程池中工做线程数量为0(其实就是变为stop状态,对全部正在执行任务的线程执行中断,也再也不处理队列中未处理的任务,一旦中断所有完成,全部工做线程数量就为0了,直接进入tidying状态,也无论队列中的任务了);
TIDYING -> TERMINATED:当terminated方法执行完毕;
状态转换示意图
线程池状态是由命名为ctl的AtomicIntegr的成员变量持有的(共32位),包含如下两个信息:
线程池状态-最高3位
线程池中线程数量-低29位
// 初始化线程池状态-RUNNING 0工做线程
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 设置位数 高3位与低29位分别表示线程池状态与线程池工做线程数量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程的最大数量大概是5亿多
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
private static final int STOP = 1 << COUNT_BITS; // 001
private static final int TIDYING = 2 << COUNT_BITS; // 010
private static final int TERMINATED = 3 << COUNT_BITS; // 011
// Packing and unpacking ctl
// 根据ctl获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 根据ctl获取线程池中工做线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 使用runstate与workercount组装ctl,初始状态下rs 为RUNNING wc为0
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 注意大小比较关系
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
复制代码
生产者消费者模型
,将线程和任务二者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分红两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收ThreadPoolExecutor
构造方法(针对参数最多的学习)public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
ThreadPoolExecutor 3 个最重要的参数:
allowCoreThreadTimeOut
,若是设置为true,则在超过keepAliveTime
以后,空闲的核心线程也会被回收
corePoolSize
个线程,也是按需求来的,最早分配任务的corePoolSize
自动成为核心线程execute
方法提交的Runnable
任务,而不是其余的什么复杂的结构ThreadPoolExecutor其余常见参数:
getTask
函数中调用的阻塞队列的poll函数的超时设置饱和策略(对应的是任务拒绝模块)
定义:饱和就是当任务队列满了,而且线程池当前同时运行的线程数量已经达到设定的最大值时的状态,更准确地定义应该是任务拒绝策略,而不只仅是饱和策略,由于线程池饱和的时候会执行拒绝,线程池状态不是running状态时,也要对新提交的任务执行拒绝策略
任务的拒绝是经过reject函数完成的, 默认提供4个拒绝策略,固然也能够实现本身的拒绝策略
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
复制代码
ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理,此policy也是使用Executors工具类建立线程池以及咱们不指定饱和策略使用ThreadPoolExecutor构造函数时的默认的饱和策略
ThreadPoolExecutor.CallerRunsPolicy:调用执行本身的线程运行任务,也就是直接在调用execute方法的线程(通常是主线程)中运行(run)被拒绝的任务,若是执行程序(线程池)已关闭,则会丢弃该任务。所以这种策略会下降对于新任务提交速度,影响程序的总体性能(由于Main线程去处理新提交的任务去了,就没法处理新的请求了)。若是您的应用程序能够承受此延迟而且你要求任何一个任务请求都要被执行的话,你能够选择这个策略
ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。(直接丢弃掉,甚至不会抛出异常)
ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最先的未处理的任务请求(所谓的抛弃最先的未处理的任务请求,就是抛弃下一个待处理的任务,处于头部的任务),丢弃后再次尝试提交新的任务
对于以上几种饱和策略的理解补充:
DiscardPolicy
策略)判断线程是否是shutdown状态,若是是会直接忽略新的任务任务队列(线程安全的阻塞队列)
BlockingQueue<Runnable>
,可是这个是一个接口类,真正可使用的实现类有以下几种(具体的描述看本身总结的线程安全的容器这个文章):
Integr.MAX_VALUE
,使用工具类建立线程池时,newFixedThreadPool与SingleThreadExecutor都是使用的此类型的队列线程工厂
线程工厂
,默认状况下的线程池建立线程的过程都是其内部的DefaultThreadFactory
,可是若是要用自定义的方式建立线程,以实现对于线程池建立的线程的监控与控制的话,就须要用到这个线程工厂的参数任务调度是整个线程池的入口,是整个线程池的核心所在,而这个任务调度对应的实际上就是execute
方法
execute
做为任务调度方法的大体运做流程是根据线程池的运行状态,工做线程的数量与运行策略来决定新提交的任务的三种可能的去向:
线程池中最重要的方法必定是任务的提交执行方法,又因为submit内部实际调用了execute方法,因此直接查看ThreadPoolExecutor的execute方法
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 获取当前线程池内正在运行的线程数量
private static int workerCountOf(int c) {
return c & CAPACITY;
}
// 任务队列
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
// 首先确定是检查的任务的有效性,若是为null就要报空指针异常
if (command == null)
throw new NullPointerException();
/* \* Proceed in 3 steps: * \* 1. If fewer than corePoolSize threads are running, try to \* start a new thread with the given command as its first \* task. The call to addWorker atomically checks runState and \* workerCount, and so prevents false alarms that would add \* threads when it shouldn't, by returning false. * \* 2. If a task can be successfully queued, then we still need \* to double-check whether we should have added a thread \* (because existing ones died since last checking) or that \* the pool shut down since entry into this method. So we \* recheck state and if necessary roll back the enqueuing if \* stopped, or start a new thread if there are none. * \* 3. If we cannot queue task, then we try to add a new \* thread. If it fails, we know we are shut down or saturated \* and so reject the task. */
// 检查完提交的任务的有效性以后就要执行如上英文注释的三个步骤的处理了
// 得到线程池的状态与当前运行的线程的数量的记录(ThreadPoolExecutor类中定义了五种线程池状态)
// ctl更像是一个线程池的运行时上下文的状态维护变量
int c = ctl.get();
// 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize
// 若是小于的话,经过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;而后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// addWorker失败,从新得到线程池状态,以进行下一步判断
c = ctl.get();
}
// 2.若是当前之行的线程数量大于等于 corePoolSize 或者addWorker失败(失败的缘由多是有效线程的数量已经大于corePoolSize,因此须要缓存任务)后就会走到这里
// 经过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会将任务加入到任务队列中,而且判断是否能加入到任务队列中
if (isRunning(c) && workQueue.offer(command)) {
// 成功将任务添加到任务队列中
// 再次检查线程池中的线程状态,并再次检查线程池中是否有可用的线程,由于自从上一次检查后,可能有线程已经完成了工做或者线程池已经shutdown了
int recheck = ctl.get();
// 若是线程池状态不是running状态,就要从任务队列中移除任务,至关于一次回滚,并执行构造函数中参数指定的饱和策略
if (! isRunning(recheck) && remove(command))
// 执行回滚
reject(command);
// 若是当前线程池是running状态而且工做线程为0(以前运行的工做线程被回收了,corePoolSize也有可能被回收)就新建立一个线程,其中worker的初始任务为null
//
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 任务队列已经满了,或者线程池已经不是running状态了 所以作最后的尝试,在建立一个新的线程试试,若是建立失败,表示线程池已经满了,所以执行饱和策略
else if (!addWorker(command, false))
reject(command);
}
复制代码
事实上,源代码的注释上就已经说的很清楚了。
reject
函数corePoolSize
说明还能够无脑添加新的线程并使其执行新的任务(这也说明corePoolSize数量的线程也是懒建立的,不是默认就自动维护这么多数量的线程)maximumPoolSize
若是小于,则建立新的线程并用来执行新的任务;若是有效线程数量已经大于maximumPoolSize
,只能去执行拒绝策略了线程池为了方便的掌握线程的状态与维护线程的周期,设计了工做线程对象Worker
,Worker起做用的最关键的就是实现了Runnable
的接口(使得Worker能够做为线程任务被执行,至关于将提交的任务作了包装)和继承了AQS
类(控制线程的中断,维护线程的生命周期)
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
// ...
}
复制代码
Worker中最重要的两个成员变量
thread,在Worker的构造函数中被建立,使用的是ThreadPoolExecutor建立时传入的threadFactory去执行线程的构建
firstTask,firstTask用来保存传入的第一个任务,这个任务能够有也能够为null。若是这个值是非空的,那么线程就会在启动初期当即执行这个任务,也就对应核心线程建立时的状况,执行完firstTask
后再去队列中取后续的任务;若是这个值是null,那么就须要建立一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的建立
firstTask
也是为null
的addWorker函数(任务调度时建立核心线程执行任务或者建立非核心线程)
// 全局锁,并发操做必备
private final ReentrantLock mainLock = new ReentrantLock();
// 跟踪线程池的最大大小,应该只有在持有全局锁mainLock的前提下才访问此属性
private int largestPoolSize;
// 工做线程集合,存放线程池中全部的(活跃的)工做线程,只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// 返回true表示建立并启动线程成功
// firstaTask就是这个线程的初始任务
// 第二个参数为true表示新的worker也就是工做线程是尝试加到corePool中仍是maximumPool中
private boolean addWorker(Runnable firstTask, boolean core) {
// retry标志位经常使用于多循环嵌套的流程控制
retry:
for (;;) {
// 获取线程池状态
int c = ctl.get();
int rs = runStateOf(c);
// 若是状态>= SHUTDOWN 表示线程池是正在关闭(SHUTDOWN)或者已经关闭(>SHUTDOWN)状态的处置方法:
// 若是此时是shutdown状态,而且没有分配初始任务,而且任务队列不为空,则是容许建立新的worker的(此时新建立的worker用来在SHUTDOWN状态下,执行任务队列中剩余的任务),违反任一则是不容许的,好比线程池已经关闭(>SHUTDOWN)或者是SHUTDOWN状态,可是附加了本身的初始任务,是不容许的,只能执行队列中剩余的任务,或者队列已经为空了,再也不须要新的worker了,也会建立失败
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程池中的线程数量
int wc = workerCountOf(c);
// 判断当前线程池的数量与哪一个值比较,若是已经达到最终的最大值CAPACITY,当即返回false
// 不然根据参数判断与哪一个值比较,是最小值仍是最大值比较,若是目标是建立核心线程,就和corePoolSize比较,若是已经达到设计大小了,就建立失败,若是目标是建立非核心线程,就和maximumPoolSize比较
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 上一步判断后还能够添加worker,就使用CAS增长worker计数
if (compareAndIncrementWorkerCount(c))
// 跳出整个循环
break retry;
// CAS 失败
c = ctl.get();
// 若是线程池状态发生了改变,
if (runStateOf(c) != rs)
// 从头开始执行整个外部的for循环,从新根据线程池状态进行判决
continue retry;
// 省略的else表示的就是CAS失败的缘由是线程数量被同步修改了,只须要从新执行内部的for循环,根据线程数量进行判决便可
}
}
// 线程数量成功更新,
// 初始化工做线程启动成功标志
boolean workerStarted = false;
// 初始化工做线程建立成功标志
boolean workerAdded = false;
Worker w = null;
try {
// 建立worker实例
w = new Worker(firstTask);
// 得到worker持有的线程实例
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取线程池状态
int rs = runStateOf(ctl.get());
// < shutdown也就是running状态下执行操做
// 或者是在SHUTDOWN状态下,而且firstTask为空时执行下述操做
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判断线程是否已经启动了,若已经启动了,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将新建立的worker实例添加到worker集合中,是一个HashSet集合
workers.add(w);
// 更新largestPoolSize
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 设置worker添加成功标志位
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
if (workerAdded) {
// 启动worker
t.start();
// 设置worker启动成功标志位
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// worker启动失败,要执行回滚
// 从工做线程集合中移除新添加的Worker实例
// 线程池状态中线程池数量-1
// tryTerminate
addWorkerFailed(w);
}
// 返回worker是否启动的状态
return workerStarted;
}
复制代码
addWorker
源码中,也能看见线程池状态与线程池数量共同决定流程走向的场景,这就是为何要把这两个状态维护在一个变量中的缘由workers
这个集合来维护线程不被回收,当须要回收时,只须要将其引用消除,也就是将Worker
对象消除便可,jvm
会完成后续的回收(详见线程回收小节)runWorker函数,worker开始执行任务
/* * 如何addWorker中启动线程的语句t.start()转到了runWorker方法呢? */
// Worker的线程实例是在Worker构造函数中完成初始化的,注意,传入newThread的是this,也就是Worker实例自己被当作一个Runnable任务提交到了线程中,因此调用线程实例的start方法时,就会执行Runnable任务也就是Worker实例的run方法
Worker(Runnable firstTask) {
setState(-1); // 将AQS计数设置为-1,目的是为了在worker初始化致使runWorker被执行的期间内不被中断
this.firstTask = firstTask;
// thread成员变量由默认的或者指定的线程工厂建立,传入的Runnable参数是Worker实例自己
this.thread = getThreadFactory().newThread(this);
}
// 新线程启动后执行此方法
public void run() {
runWorker(this);
}
// addWorker:
// 建立worker实例
w = new Worker(firstTask);
final Thread t = w.thread;
// 开启新线程后执行Runnbale参数的run方法,也就是Worker实例的run方法
t.start()
/* * 上述代码是addWorker中的启动线程的代码 * 下边的代码是runWorker中的代码 */
// 实际在新线程中执行的方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 首先保存初始任务。再清空初始任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // worker 容许中断,此时worker的状态是空闲状态,能够被回收(中断)
// 初始化线程异常退出标志位
boolean completedAbruptly = true;
try {
// 执行初始化任务,或者while循环不断的阻塞以从任务队列得到新的任务,除非getTask返回null,表示已经没法得到任务,须要执行线程回收
while (task != null || (task = getTask()) != null) {
// 成功的获取了任务
// 开始执行任务,不容许中断,此时线程是非空闲状态
w.lock();
// 执行recheck 若是线程池已经关闭了,而且当前线程尚未中断,就要执行对当前线程的中断,不然要保证当前线程不是中断状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子函数,默认的钩子函数的函数体为空,能够去构造ThreadPoolExecutor的子类去复写此钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子函数与beforeExecute同理
afterExecute(task, thrown);
}
} finally {
task = null;
// worker的完成的任务数量加1,注意此时是线程安全的
w.completedTasks++;
// 释放锁
w.unlock();
}
}
// 线程不是由于异常退出的,而是由于没法得到任务致使退出的
completedAbruptly = false;
} finally {
// while循环已没法经过getTask得到新的任务了,具体的缘由参考后续的getTask方法
// 执行线程回收
// 若是是由于task执行时出现异常,completedAbruptly为true,不然为false
processWorkerExit(w, completedAbruptly);
}
}
复制代码
addWorker
中启动线程的语句t.start()
转到了runWorker
方法呢?这一块比较绕,能够直接看代码的注释
Worker
类实现Runnable
接口的做用,就是将本身包装为可执行任务geTask
方法getTask方法,也就是从队列中获取任务的方法也是很重要的,主要功能是核心线程获取任务或保持阻塞,非核心线程获取任务,或超时返回null,进而线程生命周期结束
private Runnable getTask() {
// 获取任务超时的标志位
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//线程池状态是STOP以后的状态,表示已经不处理任务了,或者是SHUTDOWN时,任务队列已经为空,想处理也没的处理了,就直接返回null,worker会被直接回收
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 工做线程数量-1
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//是否超时控制,allowCoreThreadTimeOut默认false,表明不对核心线程作超时限制,对于超出核心线程的线程须要控制超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//当线程数大于最大线程数,即线程池已经满了,或者须要作超时控制且上次获取任务就已经超时这两个任一的条件下
//且线程数大于1或者队列为空,尝试将线程数减一并返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
// 失败重试,从新根据线程池状态与线程池中线程数量作判断
continue;
}
try {
//当须要超时控制时,在keepAliveTime时间内没有获取到任务的话会设置超时标志位,若是没有超时限制,则调用take获取任务,此时线程是阻塞等待获取任务的
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 阻塞等待获取任务时,整个worker并无加锁,也就是被认为是空闲状态,可能会被回收掉
timedOut = false;
}
}
}
复制代码
这里须要补充的就是任务队列的poll与take方法虽然名称差别比较大,可是惟一的差别在于前者是加了超时时间,后者是阻塞
getTask这部分进行了屡次判断,为的是控制线程的数量,使其符合线程池的状态。若是线程池如今不该该持有那么多线程,则会返回null值。工做线程Worker会不断接收新任务去执行,而当工做线程Worker接收不到任务的时候,就会开始被回收
好比在下边这段代码中
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
// 失败重试,从新根据线程池状态与线程池中线程数量作判断
continue;
}
复制代码
decrementWorkerCount
与compareAndDecrementWorkerCount
两者的区别是什么
decrementWorkerCount
内部是在循环调用compareAndDecrementWorkerCount
,换句话说就是,必需要尝试将工做线程数量-1,由于确实不须要此线程了,而compareAndDecrementWorkerCount
直接拿来用,只是尝试一次将工做线程-1,若是失败的话,就要从新根据状态作出可能与以前不一样的判断线程回收,processWorkerExit
实际上,关于线程回收,是有两种场景的:1. 主动的线程回收,好比processWorkerExit
函数这样的 2. 探查式的回收,或者说是被动的回收,好比interruptIdleWorkers
主动回收:在runWorker
函数中,若是没法再得到任务,就会跳出执行此线程回收函数,实际上线程池中线程的回收依赖的是JVM的自动回收,线程池要作的只是把线程的引用消除而已
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 一个标志位,是不是由于发生线程异常,因此进入的此方法
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 工做线程数-1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
// 加锁,由于要进行审计计数了
mainLock.lock();
try {
// 统计此worker的完成的任务数目
completedTaskCount += w.completedTasks;
// 从线程池中移除此线程
// 执行remove方法完毕后,实际上已经完成了线程的回收,可是因为引发线程销毁的可能性有不少,线程池还要判断是什么引起了此次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,从新分配线程-----即所谓的线程状态自适应的过程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试中断、回收空闲线程
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 线程池状态是RUNNING或SHUTDOWN状态而且并不是由于异常致使线程关闭的状况下
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 若是线程够用,就直接返回,不然还要添加一个worker到线程池
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 若是由于线程异常致使的线程关闭的话,还须要再向线程池中补充一个worker
// 或者是此时线程数量不能知足最小要求时也要再添加一个worker
addWorker(null, false);
}
}
复制代码
被动回收:上述代码中提到的tryTerminate方法,也就是在某worker结束生命周期后判断线程池是否要关闭以及回收空闲线程,以便有效的管理线程池的生命周期,在全部可能致使线程池终止的地方都调用了此方法
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//当线程池状态是RUNNING(状态正常)或者已经TIDYING或者已经TERMINATED(线程已经快关闭了)或者SHUTDOWN且还有任务没有被执行(SHUTDOWN状态须要处理完队列中的任务),直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 当前线程池状态是STOP状态或是SHUTDOWN状态但任务列表为空时,若是线程数量不为0,须要最多终止1个空闲的线程,上边所述的stop状态或者shutdown状态而且queue为空统称为终止流程开始的状态
// 若是线程数不为0,则中断一个阻塞等待任务的空闲的工做线程
if (workerCountOf(c) != 0) {
// 尝试中断最多一个阻塞等待任务的空闲的工做线程
interruptIdleWorkers(ONLY_ONE);
return;
}
// 若是当前工做线程数量为0就准备关闭线程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 尝试设置线程池状态为tidying状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 若是设置成功调用此钩子方法
terminated();
} finally {
// 钩子方法执行完毕后,设置状态为TERMINATED,并设置线程数量为0
ctl.set(ctlOf(TERMINATED, 0));
// 通知调用awaitTermination的主线程,已经进入了TERMINATION状态
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// CAS失败的话,就从新根据状态进行判断
}
}
复制代码
调用了tryTerminate
方法的地方有
addWorkerFailed
processWorkerExit
shutdown
shutdownNow
remove
从队列中移除某任务purge
从队列中移除全部被取消的任务在被动回收过程当中,最重要的就是能了解线程的当前状态,在主动回收中尚且能够知道线程是须要回收的,可是被动回收时实际上并不清楚线程池中线程的状态,Worker经过继承AQS,使用AQS来实现不可重入的独占锁(使用AQS的独占模式)这个功能
interruptIdleWorkers,中断空闲线程,使其再也不阻塞等待任务
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 判断线程是否已经被中断,是的话就什么都不作
// 若未被中断,还要尝试获取worker的锁,此时若是worker若是已经经过lock方法获取了锁,则由于其不可重入的特性,致使此处为false,即对该worker不作任务处理
// 使用tryLock方法来判断线程池中的线程是不是空闲状态
if (!t.isInterrupted() && w.tryLock()) {
try {
// 执行线程中断
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// worker释放锁
w.unlock();
}
}
// 若是未true,最多只会中断一个空闲线程,也可能一个线程也没有中断
// 若是为false,则会持续遍历所有的worker,并尝试中断全部的空闲的线程
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
// shutdownNow函数中调用的中断全部工做线程的方法
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 粗暴的中断全部线程
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
// 定义在worker类中,粗暴的打断全部的已经执行过runWorker方法的worker
void interruptIfStarted() {
Thread t;
// getState() >= 0即state != -1,也就是否是刚初始化的Worker,而是已经运行runWorker的Worker
// 直接在线程层面执行中断,而无论worker此时是不是正在运行的状态(不用去获取worker的锁)
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
// shutdown中调用了中断全部空闲线程的方法
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
复制代码
只有在线程池终止流程开始状态下(线程池状态准备转入TIDYING状态,可是还有空闲线程的时候),传入的参数为true,其他调用都是false,也就是中断全部的空闲线程
为何仅在tryTerminate
方法中,传入的参数为true,也就是最多中断一个空闲的线程呢?(解释的不是很清除,本身不是很懂.......)
当前线程池状态是STOP状态或是SHUTDOWN状态但任务列表为空时,若是线程数量还不为0,这说明,有多是剩余的全部线程都是阻塞,而不能传递shutdown的指令,在线程池终止流程开始的状态下,必须最多使一个阻塞在等待获取任务的线程中断,才能传播shutdown信号,以避免全部的线程陷入等待而没法关闭线程池
中断一个空闲线程,也能保证在线程池已是SHUTDOWN
状态后,新来的Worker也能最终退出
综上,为了保证线程池将来最终可以终止,老是仅中断一个空闲的工做程序就足够了,可是shutdown
会中断全部空闲的工做程序,以便多余的工做程序迅速退出
参考interruptIdleWorkers
的注释
Interrupts threads that might be waiting for tasks (as indicated by not being locked) so they can check for termination or configuration changes. Ignores SecurityExceptions (in which case some threads may remain uninterrupted). Params: onlyOne – If true, interrupt at most one worker. This is called only from tryTerminate when termination is otherwise enabled but there are still other workers. In this case, at most one waiting worker is interrupted to propagate shutdown signals in case all threads are currently waiting. Interrupting any arbitrary thread ensures that newly arriving workers since shutdown began will also eventually exit. To guarantee eventual termination, it suffices to always interrupt only one idle worker, but shutdown() interrupts all idle workers so that redundant workers exit promptly, not waiting for a straggler task to finish.
从AQS的角度理解Worker的生命周期
Worker
使用的是AQS
的独占模式,使用独占的特性来判断Worker自己是空闲状态(未上锁)仍是工做状态(上锁)
//1. worker初始化
Worker(Runnable firstTask) {
setState(-1); // 设置AQS计数标志为-1,其目的是为了防止初始化到runWorker执行这段时间内被中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 2. runWorker函数
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 至此worker是被以被中断的,也就是进入了空闲状态
w.unlock();
// ...
w.lock();
}
// worker释放锁
public void unlock() { release(1); }
// 独占模式下释放资源
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
// 设置独占的线程为null
setExclusiveOwnerThread(null);
// 设置状态为0
setState(0);
return true;
}
// worker上锁
public void lock() { acquire(1); }
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// worker的实现,其实根本没有用到参数--1 由于规定就是状态1为上锁的状态,因此直接用的常量1
protected boolean tryAcquire(int unused) {
// 尝试得到worker的锁,必须保证锁状态的旧状态是0,才能设置状态为1
if (compareAndSetState(0, 1)) {
// 设置当前线程为独占线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// interruptIdleWorkers函数执行时尝试中断空闲的线程,会经过尝试获取锁的方法来判断线程的状态
// 在tryAcquire方法中尝试设置状态为1,可是状态的当前值应是0(即执行unlock()以后),才能设置成功
// 这一点也保证了,在Worker初始化设置状态为-1到runWorker的状态设置为0时,是可以保证不被中断的
public boolean tryLock() { return tryAcquire(1); }
复制代码
线程池数量的肯定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。
不少人甚至可能都会以为把线程池配置过大一点比较好!我以为这明显是有问题的。就拿咱们生活中很是常见的一例子来讲:并非人多就能把事情作好,增长了沟通交流成本。你原本一件事情只须要 3 我的作,你硬是拉来了 6 我的,会提高作事效率嘛?我想并不会。 线程数量过多的影响也是和咱们分配多少人作事情同样,对于多线程这个场景来讲主要是增长了上下文切换成本。不清楚什么是上下文切换的话,能够看我下面的介绍。
上下文切换:
多线程编程中通常线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能获得有效执行,CPU 采起的策略是为每一个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会从新处于就绪状态让给其余线程使用,这个过程就属于一次上下文切换。归纳来讲就是:当前任务在执行完 CPU 时间片切换到另外一个任务以前会先保存本身的状态,以便下次再切换回这个任务时,能够再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
上下文切换一般是计算密集型的。也就是说,它须要至关可观的处理器时间,在每秒几十上百次的切换中,每次切换都须要纳秒量级的时间。因此,上下文切换对系统来讲意味着消耗大量的 CPU 时间,事实上,多是操做系统中时间消耗最大的操做。
Linux 相比与其余操做系统(包括其余类 Unix 系统)有不少的优势,其中有一项就是,其上下文切换和模式切换的时间消耗很是少。
类比于实现世界中的人类经过合做作某件事情,咱们能够确定的一点是线程池大小设置过大或者太小都会有问题,合适的才是最好。
若是咱们设置的线程池数量过小的话,若是同一时间有大量任务/请求须要处理,可能会致使大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了以后任务/请求没法处理的状况,或者大量任务堆积在任务队列致使 OOM。这样很明显是有问题的! CPU 根本没有获得充分利用。可是,若是咱们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会致使大量的上下文切换,从而增长线程的执行时间,影响了总体执行效率。
有一个简单而且适用面比较广的公式:
如何判断是 CPU 密集任务仍是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务好比你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特色是 CPU 计算耗费时间相比于等待 IO 操做完成的时间来讲不多,大部分时间都花在了等待 IO 操做完成上。