这篇文章是我在阅读源码时整理的一些笔记,对源码的关键点进行了比较详细的注释,而后加上一些本身对线程池机制的理解。最终目的是要弄清楚下面这些问题:安全
首先须要介绍一下线程池的两个重要成员:bash
AtomicInteger 类型。高3位存储线程池状态,低29位存储当前线程数量。workerCountOf(c) 返回当前线程数量。runStateOf(c) 返回当前线程池状态。 线程池有以下状态:测试
这个线程在线程池中的包装类。一个 Worker 表明一个线程。线程池用一个 HashSet 管理这些线程。ui
须要注意的是,Worker 自己并不区分核心线程和非核心线程,核心线程只是概念模型上的叫法,特性是依靠对线程数量的判断来实现的 Worker 特性以下:this
submit 返回一个 Future 对象,咱们能够调用其 get 方法获取任务执行的结果。代码很简单,就是将 Runnable 包装成 FutureTask 而已。能够看到,最终仍是调用 Execute 方法:spa
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
复制代码
FutureTask 的代码就不贴了,简述一下原理:线程
这个机制你们应该都很熟了,再简述一遍:code
具体的代码分析以下:对象
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //小于核心线程数
if (addWorker(command, true)) //启动核心线程并执行任务
return;
c = ctl.get(); //执行失败时从新获取值
}
if (isRunning(c) && workQueue.offer(command)) { //检查运行状态并将任务添加到队列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //从新检查,防止状态有变化。若是有,移出队列并拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0) //若是线程数为0,建立非核心线程,第一个参数为空时会从队列中取任务执行
addWorker(null, false);
}
else if (!addWorker(command, false)) //添加到队列失败,说明队列已满,建立非核心线程执行任务
reject(command); //执行失败说明达到最大线程数,拒绝任务
复制代码
线程池使用 addWorker 方法新建线程,第一个参数表明要执行的任务,线程会将这个任务执行完毕后再从队列取任务执行。第二参数是核心线程的标志,它并非 Worker 自己的属性,在这里只用来判断工做线程数量是否超标。继承
这个方法能够分红两部分,第一部分进行一些前置判断,并使用循环 CAS 结构将线程数量加1。代码以下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //这个语法不经常使用,用于给外层 for 循环命名。方便嵌套 for 循环中,break 和 continue 指定是外层仍是内层循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// firstTask 不为空表明这个方法用于添加任务,为空表明新建线程。SHUTDOWN 状态下不接受新任务,但处理队列中的任务。这就是第二个判断的逻辑。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 使用循环 CAS 自旋,增长线程数量直到成功为止
for (;;) {
int wc = workerCountOf(c);
//判断是否超过线程容量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//使用 CAS 将线程数量加1
if (compareAndIncrementWorkerCount(c))
break retry;
//修改不成功说明线程数量有变化
//从新判断线程池状态,有变化时跳到外层循环从新获取线程池状态
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
//到这里说明状态没有变化,从新尝试增长线程数量
}
}
... ...
}
复制代码
第二部分负责新建并启动线程,并将 Worker 添加至 Hashset 中。代码很简单,没什么好注释的,用了 ReentrantLock 确保线程安全。
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; //这个参数是测试用的,不用管它
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); //添加失败时移除 Worker 并将线程数量减 1
}
return workerStarted;
}
复制代码
在 addWorker 方法中,线程会被启动。新建线程时,Worker 将自身传入,因此线程启动后会执行 Worker 的 run 方法,这个方法调用了 ThreadPoolExecutor 的 runWorker 方法执行任务,runWorker 中会循环取任务执行,执行逻辑以下:
具体代码分析以下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
//task 为咱们传给 execute 的任务。task 为空时从队列中取任务执行
try {
while (task != null || (task = getTask()) != null) {
w.lock();
//这段逻辑很是绕。实际上它实现了如下逻辑:
//1.若是线程池已中止且线程未中断,条件成立,中断线程
//2.若是线程池未中止,线程为中断状态,将线程状态重置,并从新进行1的判断
//3.若是线程池未中止,线程不为中断状态,条件不成立
//Thread.interrupted() 会重置中断状态,保证
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
//beforeExecute 和 afterExecute 为空方法,交给子类实现
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//执行到这里时说明线程执行完毕,此方法将线程从 HashSet 中移出。线程终止且没有引用,会被自动回收。
processWorkerExit(w, completedAbruptly);
}
}
复制代码
在 runWorker 方法中 getTask 方法返回 null 以后会致使线程执行完毕,被移除出 HashSet,从而被系统销毁。 线程的超时机制也是在这个方法实现的,借助于 BlockingQueue 的 poll 和 take 方法。
超时机制实现原理以下:
具体代码以下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 容许核心线程超时或者线程数大于核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// timed && timedOut 这两个参数结合起来控制超时机制
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 队列为空时,poll 方法会阻塞等待,超过 keepAliveTime 时返回空值。take 方法会直接返回异常。
// 当 allowCoreThreadTimeOut 为 true 时,核心线程和非核心线程没有区别,一概调用poll方法
// 当 allowCoreThreadTimeOut 为 false 时,线程数量超过核心线程数才会进入超时机制,若是不超过,则将当前线程看成核心线程处理,调用 take,抛出异常后进入下一次循环。若是队列为空,此处会一直循环。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码