微信公众号「后端进阶」,专一后端技术分享:Java、Golang、WEB框架、分布式中间件、服务治理等等。
老司机倾囊相授,带你一路进阶,来不及解释了快上车!java
在讲解完线程池的构造参数和一些不经常使用的设置以后,有些同窗仍是想继续深刻地了解线程池的原理,因此这篇文章我会带你们深刻源码,从底层吃透线程池的运行原理。面试
在深刻源码以前先来看看J.U.C包中的线程池类图:后端
它们的最顶层是一个Executor接口,它只有一个方法:安全
public interface Executor {
void execute(Runnable command);
}
复制代码
它提供了一个运行新任务的简单方法,Java线程池也称之为Executor框架。微信
ExecutorService扩展了Executor,添加了操控线程池生命周期的方法,如shutDown(),shutDownNow()等,以及扩展了可异步跟踪执行任务生成返回值Future的方法,如submit()等方法。数据结构
ThreadPoolExecutor继承自AbstractExecutorService,同时实现了ExecutorService接口,也是Executor框架默认的线程池实现类,也是这篇文章重点分析的对象,通常咱们使用线程池,如没有特殊要求,直接建立ThreadPoolExecutor,初始化一个线程池,若是须要特殊的线程池,则直接继承ThreadPoolExecutor,并实现特定的功能,如ScheduledThreadPoolExecutor,它是一个具备定时执行任务的线程池。多线程
下面咱们开始ThreadPoolExecutor的源码分析了(如下源码为JDK8版本):并发
ctl是一个Integer值,它是对线程池运行状态和线程池中有效线程数量进行控制的字段,Integer值一共有32位,其中高3位表示"线程池状态",低29位表示"线程池中的任务数量"。咱们看看Doug Lea大神是如何实现的:框架
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 经过位运算获取线程池运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 经过位运算获取线程池中有效的工做线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 初始化ctl变量值
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
线程池一共有状态5种状态,分别是:异步
咱们再来看看位运算:
COUNT_BITS表示ctl变量中表示有效线程数量的位数,这里COUNT_BITS=29;
CAPACITY表示最大有效线程数,根据位运算得出COUNT_MASK=11111111111111111111111111111,这算成十进制大约是5亿,在设计之初就已经想到不会开启超过5亿条线程,因此彻底够用了;
线程池状态的位运算获得如下值:
这里简单解释一下Doug Lea大神为何使用一个Integer变量表示两个值:
不少人会想,一个变量表示两个值,就节省了存储空间,可是这里很显然不是为了节省空间而设计的,即便将这辆个值拆分红两个Integer值,一个线程池也就多了4个字节而已,为了这4个字节而去大费周章地设计一通,显然不是Doug Lea大神的初衷。
在多线程的环境下,运行状态和有效线程数量每每须要保证统一,不能出现一个改而另外一个没有改的状况,若是将他们放在同一个AtomicInteger中,利用AtomicInteger的原子操做,就能够保证这两个值始终是统一的。
Doug Lea大神牛逼!
Worker类继承了AQS,并实现了Runnable接口,它有两个重要的成员变量:firstTask和thread。firstTask用于保存第一次新建的任务;thread是在调用构造方法时经过ThreadFactory来建立的线程,是用来处理任务的线程。
线程池要执行任务,那么必须先添加任务,execute()虽然说是执行任务的意思,但里面也包含了添加任务的步骤在里面,下面源码:
java.util.concurrent.ThreadPoolExecutor#execute:
public void execute(Runnable command) {
// 若是添加订单任务为空,则空指针异常
if (command == null)
throw new NullPointerException();
// 获取ctl值
int c = ctl.get();
// 1.若是当前有效线程数小于核心线程数,调用addWorker执行任务(即建立一条线程执行该任务)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.若是当前有效线程大于等于核心线程数,而且当前线程池状态为运行状态,则将任务添加到阻塞队列中,等待空闲线程取出队列执行
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.若是阻塞队列已满,则调用addWorker执行任务(即建立一条线程执行该任务)
else if (!addWorker(command, false))
// 若是建立线程失败,则调用线程拒绝策略
reject(command);
}
复制代码
能够发现,源码的解读对应「你都了解线程池的参数吗?」里面那道面试题的解析是同样的,我在这里画一下execute执行任务的流程图:
继续往下看,addWorker添加任务,方法源码有点长,我按照逻辑拆分红两部分讲解:
java.util.concurrent.ThreadPoolExecutor#addWorker:
retry:
for (;;) {
int c = ctl.get();
// 获取线程池当前运行状态
int rs = runStateOf(c);
// 若是rs大于SHUTDOWN,则说明此时线程池不在接受新任务了
// 若是rs等于SHUTDOWN,同时知足firstTask为空,且阻塞队列若是有任务,则继续执行任务
// 也就说明了若是线程池处于SHUTDOWN状态时,能够继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取有效线程数量
int wc = workerCountOf(c);
// 若是有效线程数大于等于线程池所容纳的最大线程数(基本不可能发生),不能添加任务
// 或者有效线程数大于等于当前限制的线程数,也不能添加任务
// 限制线程数量有任务是否要核心线程执行决定,core=true使用核心线程执行任务
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用AQS增长有效线程数量
if (compareAndIncrementWorkerCount(c))
break retry;
// 若是再次获取ctl变量值
c = ctl.get(); // Re-read ctl
// 再次对比运行状态,若是不一致,再次循环执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
复制代码
这里特别强调,firstTask是开启线程执行的首个任务,以后常驻在线程池中的线程执行的任务都是从阻塞队列中取出的,须要注意。
以上for循环代码主要做用是判断ctl变量当前的状态是否能够添加任务,特别说明了若是线程池处于SHUTDOWN状态时,能够继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了;同时增长工做线程数量使用了AQS做同步,若是同步失败,则继续循环执行。
// 任务是否已执行
boolean workerStarted = false;
// 任务是否已添加
boolean workerAdded = false;
// 任务包装类,咱们的任务都须要添加到Worker中
Worker w = null;
try {
// 建立一个Worker
w = new Worker(firstTask);
// 获取Worker中的Thread值
final Thread t = w.thread;
if (t != null) {
// 操做workers HashSet 数据结构须要同步加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取当前线程池的运行状态
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// 若是rs是RUNNING状态或者rs是SHUTDOWN状态而且firstTask为null,向线程池中添加线程。
// 由于在SHUTDOWN时不会在添加新的任务,但仍是会执行workQueue中的任务
// rs是RUNNING状态时,直接建立线程执行任务
// 当rs等于SHUTDOWN时,而且firstTask为空,也能够建立线程执行任务,也说说明了SHUTDOWN状态时再也不接受新任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动线程执行任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
以上源码主要的做用是建立一个Worker对象,并将新的任务装进Worker中,开启同步将Worker添加进workers中,这里须要注意workers的数据结构为HashSet,非线程安全,因此操做workers须要加同步锁。添加步骤作完后就启动线程来执行任务了,继续往下看。
咱们注意到上面的代码中:
// 启动线程执行任务
if (workerAdded) {
t.start();
workerStarted = true;
}
复制代码
这里的t是w.thread获得的,便是Worker中用于执行任务的线程,该线程由ThreadFactory建立,咱们再看看生成Worker的构造方法:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
复制代码
newThread传的参数是Worker自己,而Worker实现了Runnable接口,因此当咱们执行t.start()时,执行的是Worker的run()方法,找到入口了:
java.util.concurrent.ThreadPoolExecutor.Worker#run:
public void run() {
runWorker(this);
}
复制代码
java.util.concurrent.ThreadPoolExecutor#runWorker:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环从workQueue阻塞队列中获取任务并执行
while (task != null || (task = getTask()) != null) {
// 加同步锁的目的是为了防止同一个任务出现多个线程执行的问题
w.lock();
// 若是线程池正在关闭,须确保中断当前线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务前能够作一些操做
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后能够作一些操做
afterExecute(task, thrown);
}
} finally {
// 将task置为空,让线程自行调用getTask()方法从workQueue阻塞队列中获取任务
task = null;
// 记录Worker执行了多少次任务
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程回收过程
processWorkerExit(w, completedAbruptly);
}
}
复制代码
这一步是执行任务的核心方法,首次执行不为空的firstTask任务,以后便一直从workQueue阻塞队列中获取任务并执行,若是你想在任务执行先后作点啥不可告人的小动做,你能够实现ThreadPoolExecutor如下两个方法:
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
复制代码
这样一来,咱们就能够对任务的执行进行实时监控了。
这里还须要注意,在finally块中,将task置为空,目的是为了让线程自行调用getTask()方法从workQueue阻塞队列中获取任务。
咱们以前已经知道线程池中可维持corePoolSize数量的常驻核心线程,那么它们是如何保证执行完任务而不被线程池回收的呢?在前面的章节中你可能已经到从workQueue队列中会阻塞式地获取任务,若是没有获取任务,那么就会一直阻塞下去,很聪明,你已经知道答案了,如今咱们来看Doug Lea大神是如何实现的。
java.util.concurrent.ThreadPoolExecutor#getTask:
private Runnable getTask() {
// 超时标记,默认为false,若是调用workQueue.poll()方法超时了,会标记为true
// 这个标记很是之重要,下面会说到
boolean timedOut = false;
for (;;) {
// 获取ctl变量值
int c = ctl.get();
int rs = runStateOf(c);
// 若是当前状态大于等于SHUTDOWN,而且workQueue中的任务为空或者状态大于等于STOP
// 则操做AQS减小工做线程数量,而且返回null,线程被回收
// 也说明假设状态为SHUTDOWN的状况下,若是workQueue不为空,那么线程池仍是能够继续执行剩下的任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 操做AQS将线程池中的线程数量减一
decrementWorkerCount();
return null;
}
// 获取线程池中的有效线程数量
int wc = workerCountOf(c);
// 若是开发者主动开启allowCoreThreadTimeOut而且获取当前工做线程大于corePoolSize,那么该线程是能够被超时回收的
// allowCoreThreadTimeOut默认为false,即默认不容许核心线程超时回收
// 这里也说明了在核心线程之外的线程都为“临时”线程,随时会被线程池回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 这里说明了两点销毁线程的条件:
// 1.原则上线程池数量不可能大于maximumPoolSize,但可能会出现并发时操做了setMaximumPoolSize方法,若是此时将最大线程数量调少了,极可能会出现当前工做线程大于最大线程的状况,这时就须要线程超时回收,以维持线程池最大线程小于maximumPoolSize,
// 2.timed && timedOut 若是为true,表示当前操做须要进行超时控制,这里的timedOut为true,说明该线程已经从workQueue.poll()方法超时了
// 以上两点知足其一,均可以触发线程超时回收
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 尝试用AQS将线程池线程数量减一
if (compareAndDecrementWorkerCount(c))
// 减一成功后返回null,线程被回收
return null;
// 不然循环重试
continue;
}
try {
// 若是timed为true,阻塞超时获取任务,不然阻塞获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 若是poll超时获取任务超时了, 将timeOut设置为true
// 继续循环执行,若是碰巧开发者开启了allowCoreThreadTimeOut,那么该线程就知足超时回收了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
我把我对getTask()方法源码的深度解析写在源码对应的地方了,该方法就是实现默认的状况下核心线程不被销毁的核心实现,其实现思路大体是:
呕心沥血的一篇源码解读到此结束,但愿能助同窗们完全吃透线程池的底层原理,之后遇到面试官问你线程池的问题,你就说看过「后端进阶」的线程池源码解读,面试官这时就会夸你:
这同窗基础真扎实!