派生体系
java.util.concurrent
ThreadPoolExecutor
AbstractExecutorService
ExecutorService
Executor
这个类是Executor框的核心实现,它的名字向咱们代表,它是使用thread pool实现的。这个thread pool主要解决了两个问题:
- 执行大量的单个异步任务,通常状况下,它能提高总体的性能。
- 执行由多个任务组成的任务集合,经过Future列表返回每一个任务的执行结果。
设计原理

重要概念
为了可以在更多上下文环境中使用,ThreadPool定义了一些概念,这些概念都直接或间接对应着可调节参数,若是不了解这些概念含义,很难正确地使用这些参数。下面来看一下这些概念及其含义:
当前,核心,最小,最大线程数(poolSize, corePoolSize, minimumPoolSize, maximumPoolSize)
poolSize: 当前处于运行和空闲状态的总线程数。
corePoolSize: 核心线程数, 当poolSize<=corePoolSize时,存在的线程称为coreThread。
minimumPoolSize: 最小线程数,当minimumPoolsize = allowCoreThreadTimeOut ? 0 : corePoolSize ,
maximunPoolSize: 最大线程数。
ThreadPool在运行过程当中会自动的调节线程数量(poolSize), 通常来讲,poolSize处于[corePoolSize maximumPoolSize]区间以内。
当用户调用execute提交一个任务时,若是poolSize<corePoolSize, 会建立一个新线程处理这个任务。若是若是poolSize处于[corePoolSize maximumPoolSize]区间内,只有队列尽是才会建立新线程。不管如何,poolSize不会大于maximumPoolSize。
默认状况下,ThreadPool没有收到任何任务时pooSize = 0, 只有当ThreadPool开始收到任务以后才会建立线程。可是能够经过覆盖prestartCoreThread或prestartAllCoreThreads方法改变这种行为,提早建立线程。
线程工厂--ThreadFactory
ThreadPool使用实现了ThreadFactory接口的实现建立新线程。Executors工厂类提供了defaultThreadFactory方法,该方法返回一个默认的ThreadFactory实例。使用这个实例建立的线程具备相同的优先级,是非后台线程,命名上使用相同的前缀。若是不满意这这些行为,能够本身实现一个ThreadFactory交给ThreadPool使用。
保持存活时间(keepAliveTime)
若是poolSize > corePoolSize, 当一个线程的空闲时间大于keepAliveTime, 它会被终止掉。默认状况下当poolSize <= corePoolSize时,keepAliveTime不会有影响,若是调用 allowCoreThreadTimeOut(true), 可让keepAliveTime在这个时间也起做用。
任务排队(queuing)
任何BlockingQueue的实例均可以用于保存排队的任务。不一样的BlockingQueue实现决定了不一样的排队策略:
SynchronousQueue: 同步队列,当提交一个任务时,要求ThreadPool中当前有至少一个空闲线程,或者能够建立新的线程(poolSize < maximumPoolSize)当即执行这个任务,不然ThreadPool会拒绝这个任务。
LinkedBlockingQueue: 无限制的队列(只受限于可以使用的内存), 不会处于full状态, offer方法不会返回false,这意味这ThreadPool的pooSize<=corePoolSize, 不会建立大于corePoolSize的线程数。
ArrayBlockingQueue: 有限制的队列, 受限于它的capacity。当poolSize == corePoolSize且队列没满时, 新提交的任务会追加到队列中排队执行。 当poolSize在[corePoolSize maximumPooSize)区间同时队被填列满时,将会建立新的线程。直到poolSize == maximumPoolSize位置。 若是队列被填满同时pooSize == maximumPoolSize,新的任务会被拒绝。
拒绝任务(rejected tasks)
当ThreadPool遇到如下两种状况时会触发拒绝任务策略:
- 正常状况下BlockingQueue被填满,同时poolSize == maximumPoolSize。
- 被关闭
ThreadPool使用RejectedExecutionHandler处理丢弃动做,默认定义了4中丢弃策略:
ThreadPoolExecutor.AbortPolicy: 抛出RejectedExecutionException异常。
ThreadPoolExecutor.CallerRunsPolicy: 本身执行这个被抛弃的任务。
ThreadPoolExecutor.DiscardPolicy: 悄无声息的丢弃掉这人任务。
状态
ThreadPool定义了5状态
RUNNING: 接受新提交的任务,执行队列中发任务。
SHUTDOWN: 不接受新提交的任务,但仍然会执行队列中的人。
STOP: 不接受新提交的任务,不执行队列中的任务,并且会打断正在执行中的任务。
TIDYING: 全部的任务都终止了,而且线程数为0,当全部的线程都过渡到TIDYING状态后会调用treminated方法。
TERMINATED: treminated方法调用已经完成。
状态之间的转换关系
RUNNING --> SHUTDOWN
调用shutdown()
(RUNNING或SHUTDOWN) -- > STOP
调用shutdownNow()
SHUTDOWN --> TIDYING
队列为空,同时线程数为0
TIDYING --> TREMINATED
treminated()执行完成。
向ThreadPool提交任务: execute
ThreadPoolExecutor实例建立以后,在没有调用execute提交任务以前,ThreadPool中是没有线程的,线程的建立是依赖exeute来驱动的。能够说,exeute是ThreadPoolExecutor运行的触发器,全部我选择先从exeute方法开始分析代码。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //
若是线程数小于 corePoolSize, 建立一个新线程。
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //
若是处于RUNNGIN状态把任务放到队列中
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) /
/再次检查线程状态,若是不是RUNNING状态,把任务从队列删除,而后拒绝这个任务
reject(command);
else if (workerCountOf(recheck) == 0) //
若是线程数为0,建立一个新线程
addWorker(null, false);
}
/*
若是运行到这里说明当前不是出于RUNNING状态,或处于RUNNING状态但队列已经被填满
*尝试建立新的线程执行这个任务,若是失败,拒绝这任务
*/
else if (!addWorker(command, false))
reject(command);
}
以上就是exeute代码,它很简单,但其中ctl成员变量比较费解。ctl是AtomicInteger类型,它被用来打包保存ThreadPoolExecutor的状态和线程数。
AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
它初始化时,把状态设置成RUNNING,下面来看看它的结构
高位 ---- > 低位
运算状态(run state)
|
线程数(workerCount)
|
31 -- 29
|
28 -- 0
|
状态位
RUNNING
SHUTDOWN
STOP
TIDYING
TREMINATED
知道了这些数据的保存方式,把他们取出来,只须要一些简单的位运算就能够了。
状态的大小关系 RUNNING < SHUTDOWN < STOP < TIDYING < TREMINATED,
runStateOf(clt.get()) < SHUTDOWN RUNNING状态
runStateOf(clt.get()) >= SHUTDOWN 非RUNNING状态
这个大小关系要记住,这样理解代码会更快。
建立新线程
ThreadPool把线程封装成Worker对对象,添加worker就是添加线程,addWorker方法作的事情就是添加线程。
private boolean addWorker(Runnable firstTask, boolean core) {
/*
这段代码的做用是确保知足一下条件的任意一个时才建立新线程
*1. 处于RUNNING 状态, 能够接受新任务,能够继续执行队列中的任务
*2. 处于SHUTDOWN状态, 队列不是空,且当前没有提交新任务
*/
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && //
非RUNNINGG状态
! (rs == SHUTDOWN &&
firstTask == null && //
当前提交的新任务
! workQueue.isEmpty())) //
队列不是空
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) //
若是当前调用建立的是core线程, 确保当前线程数 <corePoolSize, 不然确保当前线程数< maximumPoolSize
return false;
if (compareAndIncrementWorkerCount(c)) //
原子操做,增长线程数
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//
执行到这里表示已经经过检查能够建立新线程,而且线程数已经加1
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { //
再次检查,确保当前仍然知足容许建立线程的条件
if (t.isAlive()) //
确保Thread尚未调用start()
throw new IllegalThreadStateException();
workers.add(w); //
把worker线程放进HashSet中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //
启动新线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
线程的主循环
Worker实现了Runnable接口
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
构造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
建立线程时把Worker实例自己当作线程的Runnable产生,因此当线程启动后,将会调用Worker的run方法。
public void run() {
runWorker(this);
}
线程的主循环就在runWorker方法中实现
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; //
若是firstTask!=null, 先执行firstTask
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { //
若是没有firstTask, 从队列中取出一个task, 若是没有取到,退出线程
w.lock();
//
若是处于状态>=STOP(前面已经讲过状态直接的大小关系), 确保线程处于interrupted状态
//
不然清除线程的interrupted状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); //
执行任务前调用的方法,默认什么都没干,用户能够根据须要覆盖它
Throwable thrown = null;
try {
task.run(); //
执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //
任务完成以后调用的方法, 默认什么都没干,用户能够根据须要覆盖它
}
} finally {
task = null; //
把当前task置空,这样才能调用getTask从队列里取出任务
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//
正常退出线程 completedAbruptly是true, 异常致使的线程退出为false
processWorkerExit(w, completedAbruptly);
}
}
从队列中获得排队的任务
在runWorker主循环中,除了第一次的任务从worker的firsTask(在它不是null的状况下)取以外, 后面每次都是调用getTask从队列中取出一个任务。
下面是getTask的代码分析
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); //
获得当前状态
//
若是当前状态 > SHUTDOWN 退出线程
//
若是当前状态 == SHUTDOWN 且 队列为空,退出线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); //减小当前线程数
return null;
}
int wc = workerCountOf(c); //
获得当前的线程数
//
线程是否容许超时的条件: 设置容许coreThread超时,或者当前线程数 > corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//
线程退出须要同时知足如下两个条件条件:
//
1. 当前线程数>maximumPooSize 或 容许超时同时检查到已经超时
//
2. 当前线程数>1 或 队列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) //
减小当前线程数, 这个方法确保多线程环境下不会过多地结束线程。
return null;
continue;
}
try {
//
取出一个任务。若是容许超时,调用poll,不然调用take
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true; //
已经超时,运行到这里代表poll超时返回
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask的功能除了取出一个任务之外,它还负责在条件知足的状况下正常地结束一个线程
线程结束
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) //
若是线程是因为异常缘由结束的,这里要纠正线程数
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); //
把线程从HashSet中删除
} finally {
mainLock.unlock();
}
tryTerminate(); //
尝试终止整个ThreadPool
int c = ctl.get();
if (runStateLessThan(c, STOP)) { //
若是当前状态<STOP
if (!completedAbruptly) { //
若是不是异常结束
//
计算最小线程数min
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min) //
若是当前线程数>=min直接返回
return; // replacement not needed
}
//
建立新线程, 条件:
//
当前线程正常结束
//
当前线程异常结束,但当前线程数小于最小线程数
addWorker(null, false);
}
}
上面的代码实现了线程的生命周期的管理,线程只有在ThreadPoolExecutor的状态处于RUNNGIN或SHUTDOWN时才能够存在。下面是这两种状态下线程的生存状态:
RUNNING:
容许coreThread超时: 线程空闲(意味着队列为空)时间超过 keepAliveTime, 线程会被结束, 直到线程数为0。
不容许coreThread超时: 线程空闲时间超过 keepAliveTime, 线程会被结束,直到线程数为corePoolSize。
SHUDOWN:
当线程把已经在队列里的全部任务执行完毕后,全部线程都会进入退出流程,最终退出。
整个ThreadPoolExecutor的状态变迁
前面已经讲过,ThreadPool的状态和线程数被打包方进一个32整数中:
AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
初始化把状态设置成RUNNING, 线程为0
调用shutdown时把状态从RUNNING置为SHUTDOWN, 随后过渡到TIDYING->TREMINATED。
当调用shutdownNow时把状态从(RUNNING 或 SHUTDOWN) 设置为STOP, 随后过渡到TIDYING->TREMINATED。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); //
只有当前状态<SHUTDOWN时才执行状态设置的动做
interruptIdleWorkers(); //
打断全部空闲的的线程,让这些线程有机会本身结束
onShutdown(); //
回调方法,默认什么都没作,子类能够覆盖
} finally {
mainLock.unlock();
}
tryTerminate(); //
尝试执行ThreadPool的结束操做
}
shutdownNow和shutdown的操做大体同样,不一样的是它把状态设置成STOP,还会返回队列中没有来得及执行的任务list。
tryTerminate方法做用是尝试结束整个ThreadPool, 它不必定会执行真正的结束动做。它在三个地方被调用, worker线程结束时,shudown中,shutdownNow中。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//知足如下三个条件中的任何一个就当即返回
//1. 处于RUNNGING状态
//2. 状态>= TIDYING
//3. 处于SHUTDOWN状态,且队列不是空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//
若是处于STOP状态,且线程数不为0,通知一个处于空闲的线程结束本身
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//
执行到这里表示目前状态>=SHUTDOWN,线程数已是0
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //
总有一个线程会运行到这里,把状态置为 TIDYING
try {
terminated(); //
调用回调方面,默认什么都没干,子类能够覆盖
} finally {
ctl.set(ctlOf(TERMINATED, 0)); //
把状态置为TREMINATED, 自此整个ThreadPool才算终结
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
tryTerminate之因此要在三个地方调用,是为了保证当调用shutdown或shutdownNow以后,总有一个线程会完成最后的终结工做。
参数设置
分析完前面代码后,再来使用它,它的参数怎么设置天然就了然于心。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
public void allowCoreThreadTimeOut(boolean value)
public void setCorePoolSize(int corePoolSize)
public void setKeepAliveTime(long time, TimeUnit unit)
public void setMaximumPoolSize(int maximumPoolSize)
public void setRejectedExecutionHandler(RejectedExecutionHandler handler)
public void setThreadFactory(ThreadFactory threadFactory)