在web开发中,服务器须要接受并处理请求,因此会为一个请求来分配一个线程来进行处理。若是每次请求都新建立一个线程的话实现起来很是简便,可是存在一个问题: 若是并发的请求数量很是多,但每一个线程执行的时间很短,这样就会频繁的建立和销毁线程,如此一来会大大下降系统的效率。可能出现服务器在为每一个请求建立新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。那么有没有一种办法使执行完一个任务,并不被销毁,而是能够继续执行其余的任务呢?这就是线程池的目的了java
在实际使用中,线程是很占用系统资源的,若是对线程管理不善很容易致使系统问题。所以,在大多数并发框架中都会使用线程池来管理线程,使用线程池管理线程主要有以下好处:web
下降资源消耗。经过复用已存在的线程和下降线程关闭的次数来尽量下降系统性能损耗;编程
提高系统响应速度。经过复用线程,省去建立线程的过程,所以总体上提高了系统的响应速度:bash
提升线程的可管理性。线程是稀缺资源,若是无限制的建立,不只会消耗系统资源,还会下降系统的稳定性,所以,须要使用线程池来管理线程。服务器
何时使用线程池?多线程
单个任务处理时间比较短
须要处理的任务数量很大
复制代码
2.线程池的实现原理并发
当向线程池提交一个任务以后,线程池是如何处理这个任务的呢?能够看看以下步骤:框架
步骤1.线程池判断核心线程里的线程是否都在执行任务。分为两种状况,若是有空闲的核心线程,那么建立一个新的工做线程。则直接执行该任务。若是设置10条核心线程都在执行任务,那么进入下一个流程。ide
步骤2.线程池会判断,当前工做队列是否已经满了,若是没有满,则将该提交的线程任务存储在这个工做队列里面,等待核心线程有空闲,就会从该队列取线程出来执行。若是工做队列满了,则继续执行下一个流程。oop
步骤3.线程池会判断线程池里的线程(这里是线程池容许的线程总数)是否都处于执行状态,若是不是,则建立一个新的线程来执行任务。若是满了,则交给饱和策略来处理这个任务。
步骤4.按照饱和策略处理没法执行的任务。
如图所示:
3.线程池内部
先来关注如下主要的几个属性:
//记录线程池状态和线程数量(总共32位,前三位表示线程池状态,后29位表示线程数量)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程数量统计位数29 Integer.SIZE=32
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//线程池的运行状态
// runState存储在高位中
//接受新任务而且处理阻塞队列里的任务
private static final int RUNNING = -1 << COUNT_BITS;
//关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,
//调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程当中也会调用shutdown()方法进入该状态);
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或
// SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
private static final int STOP = 1 << COUNT_BITS;
//若是全部的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用
// terminated() 方法进入TERMINATED 状态。
private static final int TIDYING = 2 << COUNT_BITS;
//在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有作。
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码
进入TERMINATED的条件以下: 线程池不是RUNNING状态; 线程池状态不是TIDYING状态或TERMINATED状态; 若是线程池状态是SHUTDOWN而且workerQueue为空; workerCount为0; 设置TIDYING状态成功。
线程池状态转换
RUNNING -> SHUTDOWN
显式调用shutdown()方法, 或者隐式调用了finalize()方法
(RUNNING or SHUTDOWN) -> STOP
显式调用shutdownNow()方法
SHUTDOWN -> TIDYING
当线程池和任务队列都为空的时候
STOP -> TIDYING
当线程池为空的时候
TIDYING -> TERMINATED 当 terminated() hook 方法执行完成时候。
复制代码
ctl相关方法 这里还有几个对ctl进行计算的方法:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
runStateOf:获取运行状态; workerCountOf:获取活动线程数; ctlOf:获取运行状态和活动线程数的值。
建立线程池主要是ThreadPoolExecutor类来完成,ThreadPoolExecutor的有许多重载的构造方法,经过参数最多的构造方法来理解建立线程池有哪些须要配置的参数。看源码ThreadPoolExecutor的构造方法为:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue, //执行任务以前用于保存任务的队列
//如下两个若是不传入,会设置默认的线程工厂生成以及饱和处理策略
ThreadFactory threadFactory, //当执行程序建立新线程时,使用的自定义的工厂
//去建立个性化线程
RejectedExecutionHandler handler) { //饱和策略的获取,通常咱们
//能够传入一个实现了RejectedExecutionHandler 的子类,实例化成
RejectedExecutionHandler 传入。
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
下面对参数进行说明: 构造方法中的字段含义以下:
corePoolSize:核心线程数量,当有新任务在execute()方法提交时,会执行如下判断:
若是运行的线程少于 corePoolSize,则建立新线程来处理任务,即便线程池中的其余线程是空闲的;
若是线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才建立新的线程
去处理任务;
若是设置的corePoolSize 和 maximumPoolSize相同,则建立的线程池的大小是固定的,这时若是有新任务提交,
若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
若是运行的线程数量大于等于maximumPoolSize,这时若是workQueue已经满了,则经过handler所指定的策略来处理任务;
因此,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize。
复制代码
maximumPoolSize:最大线程数量;
workQueue:等待队列,当任务提交时,若是线程池中的线程数量大于等于corePoolSize而且等待队列是有界的,非满的时候,把该任务封装成一个Worker对象放入等待队列;
workQueue:保存等待执行的任务的阻塞队列,当提交一个新的任务到线程池之后, 线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有如下几种处理方式:
直接切换:这种方式经常使用的队列是SynchronousQueue,但如今尚未研究过该队列,这里暂时还无法介绍;
使用无界队列:通常使用基于链表的阻塞队列LinkedBlockingQueue。使用这种方式,那么线程池中可以建立的最大线程数就是corePoolSize,而maximumPoolSize就不会起做用了(后面也会说到)。当线程池中全部的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。
3.使用有界队列:通常使用ArrayBlockingQueue。使用该方式能够将线程池的最大线程数量限制为maximumPoolSize,这样可以下降资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,由于线程池和队列的容量都是有限的值,因此要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,而且还要尽量的下降线程池对资源的消耗,就须要合理的设置这两个数量。
keepAliveTime:线程池维护线程所容许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,若是这时没有新的任务提交,核心线程外的线程不会当即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
threadFactory:它是ThreadFactory类型的变量,用来建立新线程。默认使用Executors.defaultThreadFactory() 来建立线程。使用默认的ThreadFactory来建立线程时,会使新建立的线程具备相同的NORM_PRIORITY优先级而且是非守护线程,同时也设置了线程的名称。
handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。若是阻塞队列满了而且没有空闲的线程,这时若是继续提交任务,就须要采起一种策略处理该任务。线程池提供了4种策略:
下面就来看一下核心的几个执行的方法
1.submit方法用于提交须要返回值的任务。线程池会返回一个future类型的对象,经过这个future对象能够判断任务是否执行成功,而且能够经过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后当即返回,这时候有可能任务没有执行完。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
复制代码
流程步骤以下 1.调用submit方法,传入Runnable或者Callable对象 2.判断传入的对象是否为null,为null则抛出异常,不为null继续流程 3.将传入的对象转换为RunnableFuture对象 4.执行execute方法,传入RunnableFuture对象 5.返回RunnableFuture对象
二、execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* clt记录着runState和workerCount
*/
int c = ctl.get();
/*
* workerCountOf方法取出低29位的值,表示当前活动的线程数;
* 若是当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
* 并把任务添加到该线程中。
*/
if (workerCountOf(c) < corePoolSize) {
/*
* addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断仍是maximumPoolSize来判断;
* 若是为true,根据corePoolSize来判断;
* 若是为false,则根据maximumPoolSize来判断
*/
if (addWorker(command, true))
return;
/*
* 若是添加失败,则从新获取ctl值
*/
c = ctl.get();
}
/*
* 若是当前线程池是运行状态而且任务添加到队列成功
*/
if (isRunning(c) && workQueue.offer(command)) {
// 从新获取ctl值
int recheck = ctl.get();
// 再次判断线程池的运行状态,若是不是运行状态,因为以前已经把command添加到workQueue中了,
// 这时须要移除该command
// 执行事后经过handler使用拒绝策略对该任务进行处理,整个方法返回
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 获取线程池中的有效线程数,若是数量是0,则执行addWorker方法
* 这里传入的参数表示:
* 1. 第一个参数为null,表示在线程池中建立一个线程,但不去启动;
* 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
* 若是判断workerCount大于0,则直接返回,在workQueue中新增的command会在未来的某个时刻被执行。
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 若是执行到这里,有两种状况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize而且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 若是失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}
复制代码
简单来讲,在执行execute()方法时若是状态一直是RUNNING时,的执行过程以下:
若是workerCount < corePoolSize,则建立并启动一个线程来执行新提交的任务;
若是workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
若是workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则建立并启动一个线程来执行新提交的任务;
若是workerCount >= maximumPoolSize,而且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
这里要注意一下addWorker(null, false);,也就是建立一个线程,但并无传入任务,由于任务已经被添加到workQueue中了,因此worker在执行的时候,会直接从workQueue中获取任务。因此,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必需要有一个线程来执行任务。
execute方法执行流程以下:
下面来看下execute方法中调用的几个重要方法:
addWorker方法
addWorker方法的主要工做是在线程池中建立一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前须要判断当前活动线程数是否少于maximumPoolSize,实质就是判断 若是为true,则使用corePoolSize绑定,不然为* maximumPoolSize代码以下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);
/*
* 这个if判断
* 若是rs >= SHUTDOWN,则表示此时再也不接收新任务;
* 接着判断如下3个条件,只要有1个不知足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保存的任务
* 2. firsTask为空
* 3. 阻塞队列不为空
*
* 首先考虑rs == SHUTDOWN的状况
* 这种状况下不会接受新提交的任务,因此在firstTask不为空的时候会返回false;
* 而后,若是firstTask为空,而且workQueue也为空,则返回false,
* 由于队列中已经没有任务了,不须要再添加线程了
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数
int wc = workerCountOf(c);
// 若是wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
// 这里的core是addWorker方法的第二个参数,若是为true表示根据corePoolSize来比较,
// 若是为false则根据maximumPoolSize来比较。
//
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增长workerCount,若是成功,则跳出第一个for循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 若是增长workerCount失败,则从新获取ctl的值
c = ctl.get(); // Re-read ctl
// 若是当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask来建立Worker对象
w = new Worker(firstTask);
// 每个Worker对象都会建立一个线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// 若是rs是RUNNING状态或者rs是SHUTDOWN状态而且firstTask为null,向线程池中添加线程。
// 由于在SHUTDOWN时不会在添加新的任务,但仍是会执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
注意一下这里的t.start()这个语句,启动时会调用Worker类中的run方法,Worker自己实现了Runnable接口,因此一个Worker类型的对象也是一个线程。
Worker类 线程池中的每个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,看一下Worker的定义:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** 此线程用来存储进入线程池的线程 */
final Thread thread;
/** 要运行的初始化任务,可能为null */
Runnable firstTask;
/** 线程的任务计数器数量 */
volatile long completedTasks;
/**
* 使用给定的第一个任务和ThreadFactory中的线程建立。
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // 在runWorker以前禁止中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 委托主线程运行到外部的 runWorker */
public void run() {
runWorker(this);
}
// 锁定方法
//
// 值为0表示为未锁定状态
// 值为1表示为锁定状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//中断运行中的线程,是线程池调用shutdown或者tryTerminate方法时,最终会调用这个方法去中断线程。
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
复制代码
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时经过ThreadFactory来建立的线程,是用来处理任务的线程。
在调用构造方法时,须要把任务传入,这里经过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,由于Worker自己继承了Runnable接口,也就是一个线程,因此一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为何不使用ReentrantLock来实现呢?能够看到tryAcquire方法,它是不容许重入的,而ReentrantLock是容许重入的:
一、lock方法一旦获取了独占锁,表示当前线程正在执行任务中; 二、若是正在执行任务,则不该该中断线程; 三、若是该线程如今不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时能够对该线程进行中断; 四、线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是不是空闲状态;
之因此设置为不可重入,是由于咱们不但愿任务在调用像setCorePoolSize这样的线程池控制方法时从新获取锁。若是使用ReentrantLock,它是可重入的,这样若是在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。 因此,Worker继承自AQS,用于判断线程是否空闲以及是否能够被中断。
此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为何这么作呢?是由于AQS中默认的state是0,若是刚建立了一个Worker对象,尚未执行任务时,这时就不该该被中断,看一下tryAquire方法:
tryAcquire方法是根据state是不是0来判断的,因此,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。
正由于如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0.
runWorker方法
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码以下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
// 容许中断
w.unlock(); // 容许中断
// 是否由于异常退出循环
boolean completedAbruptly = true;
try {
// 若是task为空,则经过getTask来获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 若是线程池状态为中止,能够中断;
// 若是不是,确保线程不能够被中断
// 在第二种状况下须要从新检查以在清除中断时处理
//runStateAtLeast判断当前状态是否为TIDYING或TERMINATED,来控制中断线程.
STOP 值为 1,>stop状态的,只能为中断和结束状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//原来执行已废弃,如今执行的实际上是空方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //容许线程
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++; //任务数量加一
w.unlock();
}
}
completedAbruptly = false; //执行到这一步,说明没有异常
} finally {
processWorkerExit(w, completedAbruptly);
}
}
复制代码
这里说明一下第一个if判断,目的是:
若是线程池正在中止,那么要保证当前线程是中断状态; 若是不是的话,则要保证当前线程不是中断状态; 这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP,回顾一下STOP状态值为1:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。
STOP状态要中断线程池中的全部线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,由于Thread.interrupted()方法会复位中断的状态。
总结一下runWorker方法的执行过程:
while循环不断地经过getTask()方法获取任务; getTask()方法从阻塞队列中取任务; 若是线程池正在中止,那么要保证当前线程是中断状态,不然要保证当前线程不是中断状态; 调用task.run()执行任务; 若是task为null则跳出循环,执行processWorkerExit()方法; runWorker方法执行完毕,也表明着Worker中的run方法执行完毕,销毁线程。 这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。
completedAbruptly变量来表示在执行任务过程当中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。
下面来看看如何getTask()
private Runnable getTask() {
// timeOut变量的值表示上次从阻塞队列中取任务时是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 若是线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行如下判断:
* 1. rs >= STOP,线程池是否正在stop;
* 2. 阻塞队列是否为空。
* 若是以上条件知足,则将workerCount减1并返回null。
* 由于若是当前线程池状态的值是SHUTDOWN或以上时,不容许再向阻塞队列中添加任务。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed变量用于判断是否须要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不容许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,须要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* wc > maximumPoolSize的状况是由于可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
* timed && timedOut 若是为true,表示当前操做须要进行超时控制,而且上次从阻塞队列中获取任务发生了超时
* 接下来判断,若是有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
* 若是减1失败,则返回重试。
* 若是wc == 1时,也就说明当前线程是线程池中惟一的一个线程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 根据timed来判断,若是为true,则经过阻塞队列的poll方法进行超时控制,若是在keepAliveTime时间内没有获取到任务,则返回null;
* 不然经过take方法,若是这时队列为空,则take方法会阻塞直到队列不为空。
*
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 若是 r == null,说明已经超时,timedOut设置为true
timedOut = true;
} catch (InterruptedException retry) {
// 若是获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
timedOut = false;
}
}
}
复制代码
这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析能够知道,在执行execute方法时,若是当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,而且workQueue已满时,则能够增长工做线程,但这时若是超时没有获取到任务,也就是timedOut为true的状况,说明workQueue已经为空了,也就说明了当前线程池中不须要那么多线程来执行任务了,能够把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize便可。
何时会销毁?固然是runWorker方法执行完以后,也就是Worker中的run方法执行完,由JVM自动回收。
getTask方法返回null时,在runWorker方法中会跳出while循环,而后会执行processWorkerExit方法。
最后看下processWorkerExit方法,这是execute源码解析中的最后一个方法了。
/*
* 此方法从工做集中删除线程,而且*若是因为用户任务异常退出,或者若是正在*运行少于
* corePoolSize工做者或者队列非空但*没有工做者,则可能终止池或替换工做者
* 第二个参数表示,若是工做线程是否因为用户线程异常而终止的
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 若是completedAbruptly值为true,则说明线程执行时出现了异常,须要
// 将workerCount减1;
// 若是线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操做,这里就没必要再减了。
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //统计完成的任务数 completedTaskCount += w.completedTasks; // 从workers中移除,也就表示着从线程池中移除了一个工做线程 workers.remove(w); } finally { mainLock.unlock(); } // 根据线程池状态进行判断是否结束线程池 tryTerminate(); int c = ctl.get(); /* * 当线程池是RUNNING或SHUTDOWN状态时,若是worker是异常结束,那么会直接addWorker; * 若是allowCoreThreadTimeOut=true,而且等待队列有任务,至少保留一个worker; * 若是allowCoreThreadTimeOut=false,workerCount很多于corePoolSize。 */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } } 复制代码
至此,processWorkerExit执行完以后,工做线程被销毁,以上就是整个工做线程的生命周期,从execute方法开始,Worker使用ThreadFactory建立新的工做线程,runWorker经过getTask获取任务,而后执行任务,若是getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:
下面还有第二篇,主要讲如何关闭线程池和终止的几个方法。将紧接着第一篇继续讲。juejin.im/post/5d6886…
喜欢欢迎关注我哦
摘自: 《Java并发编程的艺术》 JDK8源码 深刻理解Java线程池 www.ideabuffer.cn/2017/04/04/…