一块儿共读,共同阅步。java
源码阅读本就是要贯注全神、戒骄戒躁的沉浸式过程。我本着浮躁至极的心态,单刀直入,从入口方法先杀入“敌军”内部,让你们短期内享受到最大的学习成就感,而后再横向铺开,带你们一窥源码的究竟。有不对之处,轻喷、指出。安全
java1.5引入的线程池的标准类,ThreadPoolExecutor。bash
咱们一般是经过Executors这个工厂类来建立具体的实例,如:多线程
Executors.newCachedThreadPool(...)
Executors.newScheduledThreadPool(...)
复制代码
前者建立的就是咱们要讲的ThreadPoolExecutor实例。后者是有延迟功能的线程池,ScheduledThreadPoolExecutor,有机会再讲吧。ThreadPoolExecutor
这个线程池实例,内部维护了一个线程的集合,用来存放线程;有一个存放待执行任务的队列,在池内线程数达到最大值时,任务就暂时入队,等待线程取走运行。因此,目前来看,ThreadPoolExecutor
的结构以下: 函数
我会先列一下该源码涉及到的重要的逻辑方法,而后按使用时一般的调用顺序,挨个讲解,最后合并总结。oop
execute
:执行任务方法,内部封装了新建线程、任务入队等重要逻辑addWorker
:新建线程方法getTask
:从任务队列内获取一个任务runWorker
:池内线程的主循环逻辑。提醒一下,多线程都会调用这同一个方法,因此尤为注意同步问题。workQueue
[BlockingQueue
],任务队列,是一个BlockingQueue对象,线程安全ctl
[Integer
],记录了线程池的运行状态值跟池内的线程数workers
[HashSet<Worker>
],具体存放线程的set
对象corePoolSize
[volatile int
],线程池核心线程数配置,低于这个数值时,新进来的任务一概以新启动线程处理ctl
基础知识准备ThreadPoolExecutor实例表明一个线程池,相应地,这个池子就有一些运行状态,以及池子内线程数量的配置。这二者的实现以下:学习
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
线程池状态跟池内线程数的统计都记录在ctl
这个AtomicInteger
(它是谁,先本身查吧。后面估计会专门写下,这里只要记住它的加减是线程安全的就好)中。具体实现是: java内一个整型量是32位,这里AtomicInteger
也是。源码做者将ctl
的32位中高3位用来记录线程池状态,低29位用来记录线程数量。 验证来看,COUNT_BITS
的值是29,方法 runStateOf(int c) {return c & ~CAPACITY;}
这里的c
就是ctl
变量,而CAPACITY
就是一个mask面纱,用来从 ctl
中提取上面两个变量的,它是这样的: ui
因此,runState
就是ctl
与取反后的 CAPACITY
相与,也就是只有高4位有效,正好对应线程池状态的记录位。 因此,各类状态下,ctl
的值以下: RUNNING
:1001 x(28个),-1,最高位是符号位,这个<<
位移操做是忽略符号位的位移 SHUTDOWN
:0000 x(28), 0 STOP
: 0001 x(28), 1 TIDYING
: 0010 x(28), 2 TERMINATED
: 0011 x(28), 3this
execute(Runnable command)
用过线程池的人应该都用过这个入口函数,它就是用来将一个Runnable
任务体放入线程池,下面让咱们来具体看看它的逻辑(代码块无法高亮了,你们看下代码段中注释的翻译部分):atom
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 主要的逻辑注释,在这
* 1. If fewer than corePoolSize threads are running, try to
* 若是当前池内线程数小于corePoolSize,那就尝试去新建一
* start a new thread with the given command as its first
* 条线程,传进来的command参数做为该线程的第一个任务
* task. The call to addWorker atomically checks runState and
* 体。调用addWorker函数会自动检查线程池的状态和池内活跃的线程数
* workerCount, and so prevents false alarms that would add
* 若是在不应或不能新建线程时新建了,那不会抛出异常,会返回false
* threads when it shouldn't, by returning false. * * 即便任务体成功入队,咱们仍须要再去检查一遍,咱们是否应该是新建线程而不是入队, * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (由于可能自第一次检查后,原池内活跃的线程 * (because existing ones died since last checking) or that * 又死掉啦)又或者线程池的状态发生了变化 * the pool shut down since entry into this method. So we * 因此咱们去二次检查池内状态,若线程池状态变为中止,就去回滚,或者线程池此时一个活跃线程都没有的话, * recheck state and if necessary roll back the enqueuing if * 新建一个线程去执行任务体。(PS:由于ThreadPoolExecutor * stopped, or start a new thread if there are none. * 主要是经过addWorker来建立线程,因此,若是池内一个活跃线程都没有, * 这时咱们任务体入队了,也没有线程去跑...固然为何只检查一遍?我是想,可 * 能就只是做者单纯地在这里想检查一遍,稍微确保下。由于即便这个二次检查 * 没问题,后续的,到池内线程确切地去跑这个任务体以前的代码,每一行 * 代码,都仍有发生这种状况的可能。这,就是多线程...) * 3. If we cannot queue task, then we try to add a new * 若是咱们任务体入队失败,那我尝试新建线程,若是还失败 * 那就说明线程池已经被shutdown了,或者整个池子已经满了,那咱们 * 就去拒绝这个任务体。这个拒绝,就会用到所谓的RejectPolicy对象 * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 获取ctl对象 int c = ctl.get(); // 若是池内活跃线程数小于corePoolSize if (workerCountOf(c) < corePoolSize) { // 新建线程,第二个参数true能够先忽略 if (addWorker(command, true)) return; // 新建线程失败,那咱们获取最新的线程池状态变量ctl c = ctl.get(); } // 若是当前线程池仍在运行并且任务体入队成功。 // (workQueue就是ThreadPoolExecutor具体的任务队列。 // 而这里就是咱们上面注释提到的那段二次检查的逻辑) if (isRunning(c) && workQueue.offer(command)) { // 二次检查。获取最新的线程池状态字段 int recheck = ctl.get(); // 若是线程不在运行状态 而且也成功把入队的任务体删除了 // 那就菜哦用拒绝策略来拒绝 if (! isRunning(recheck) && remove(command)) reject(command); // 或者,在线程池内活跃的线程数为0时,新建一个线程 // 这里传参跟上面不同,先忽略。记录这个新启动一个线程就够了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是上面的If失败了,就尝试新启动线程,启动失败了,那说明 // 上面的失败,是isRunning形成的,因此拒绝任务体。启动成功了,那就是成功了。 else if (!addWorker(command, false)) reject(command); } 复制代码
这里涉及到ThreadPoolExecutor
线程池增长线程的一个判断逻辑: 每当ThreadPoolExecutor.execute
执行一个任务时,先判断corePoolSize
,当池内线程数小于这个时,直接新增线程,若大于这个,则向workQueue
任务队列入队,队列满了时,则以maximumPoolSize
为界开始继续新建线程,当超过maximumPoolSize
时,就采用最后的RejectPolicy进行拒绝处理。
addWorker(Runnable firstTask, boolean core)
这个函数主要逻辑是新启动一个线程,firstTask
是新启动线程的第一个任务,能够为null
,为null
时,就是单纯地启动一个线程,记得咱们以前在execute(Runnable command)
方法中,在线程池内没有有效线程时,调用firstTask
为null
的方法来启动一条线程。 第二个参数core
是用来辨别,启动一个新线程时,是以corePoolSize
这个线程数配置量来做为限制,仍是以maximumPoolSize
这个线程数配置量做为限制。 看下源码(逻辑主要放注释里了):
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 获取到线程池的运行状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 检查任务队列在它必要的时候才能为空。这里
// 只能直接翻译下,具体的,if里的判断逻辑也能推断出来
// 可是目前我也不能肯定说出在这种状况下要false退出的
// 缘由。若是想搞清它,可能只能完全把线程池的运行状态、
// 线程池内的线程数、任务队列内的任务数三者全部可能的状况的
// 前提下才能肯定。这里待大神指出来了。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 第二个参数的做用就在这里产生了!
// 这里在确保池内线程数不超过ctl极限CAPACITY
// 以及不超过相应的xxxPoolSize的状况下,经过
// CAI操做去给线程数加1,成功了,则跳出retry标记后
// 的循环。至于CAI是什么?先记住它是线程安全的给数值+1的操做就好
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 到此位置,线程池内的线程数标记字段已经加1了
// 接下来的,就是具体添加一个线程的操做了
//
// 这里就不可避免的涉及到了ThreadPoolSize中的
// Worker这个内部类了,这个类就是具体的ThreadPoolSize
// 内部用来表明一个线程的封装对象,他封装了一个线程实例
// ,用来跑具体的任务;封装了一个Runnable 实例,表明具体的任务;
// 同时,它继承、实现了AQS(AbstractQueuedSynchronizer)跟Runnable,因此,这个
// Worker实例能够理解成一个小劳工,有本身的运行线程,有
// 本身的具体的执行任务体,同时,本身也有同步机制AQS。
// 这里涉及到AQS,你们伙就暂且理解成AQS赋予Worker同步的性质便可(调用AQS的方法就能实现)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 初始化一个Worker劳工,同时指定给他的任务。这个任务
// 能够为null,空。表示什么也不作。同时,初始化的时候,也会
// 初始化Worker体内的线程对象,这条线程的对象的启动,是
// 在worker对象的Runnable.run实现方法里
w = new Worker(firstTask);
final Thread t = w.thread;
// 这个mainLock是ThreadPoolExecutor用来同步对
// workers线程队列的CRUD操做的
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 当线程池处于RUNNING状态,则能够继续操做;
// 或者当线程池处于SHUTDOWN,可是firstTask 为null
// 也就是说,这里是为了增长一个线程的,因此,也能够放行
// 由于SHUTDOWN状态,是容许启动线程将任务队列内的任务跑完的
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将新生成的线程劳工加入到线程集合中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 若是线程劳工添加成功,则启动它
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 若是线程启动失败,则运行回滚操做
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
Worker
对象的线程start()
这里讲述的方法是接上面addWorker
时,成功调用的t.start()
,这里启动了Worker
封装的线程。这个线程是Worker
构造函数里生成的,以下:
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 这里设置了AQS的state,用来抑制interrupt直到
// runWorker方法
setState(-1); // inhibit interrupts until runWorker
// 这里传递了线程的任务体,能够为null
this.firstTask = firstTask;
// 初始化线程时,给线程指定了worker实例自身这个Runnable,所以,线程在start后,
// 就是在运行worker当前实例自身的run方法
this.thread = getThreadFactory().newThread(this);
}
复制代码
看完上面代码的注释,接着看worker
实例自身的run
方法
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
复制代码
能够看到这里调用了runWorker
方法,传参是worker
自身。runWorker
是同一个ThreadPoolExecutor
实例的方法,因此,线程池实例下的全部Worker
线程都是在跑这同一个runWorker
方法。
runWorker(Worker worker)
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them...
*/
final void runWorker(Worker w) {
// 这里获取到线程对象,其实就是参数Worker对象内封装的Thread对象。
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// allow interrupts,容许Interrupt打断,记得Worker对象的构造函数嘛?
// 构造函数一开始就调用了setState(-1)去抑制interrupt。这里就是去释放它。
// 固然,这里具体的抑制interrupt的含义,要结合AQS来了解了,我后面再加吧。
w.unlock();
boolean completedAbruptly = true;
try {
// 若是Worker中的firstTask对象不是空的,则
// 直接跑它;若否则,调用getTask从队列中获取一条任务来执行。这里
// 会一直while循环,因此worker们在任务队列中有任务时
// 会一直在这个runWorker中循环while取任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 这里有一个很巧妙的判断逻辑,这段代码也主要是实现了
// 上面英文注释中的逻辑,可是可能比较难理解,稍等
// 在后面我会专门讲一下
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任务体的前置函数,如今默认是空函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 任务体的执行函数
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 后置函数
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 这个参数记录worker运行是不是被打断的,若是不是,代码
// 会安全地走到这里,而后置字段为false。
// 不然,异常状况下就直接跳到finally中了,值仍为初始化时的true
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
复制代码
这段源码上方我放了一段注释,翻译过来就是: worker
对象的主要Loop循环体。从队列(workQueue
)中获取任务体,而后执行。
getTask
函数private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 这个for(;;)也是个无限循环的实现,它比while(true)的好处是
// 在字节码层面上,它比while(true)少两行字节码代码,因此
// 效率更高
for (;;) {
// 获取线程池最新状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// allowCoreThreadTimeOut表示coreThread,实际上是判断
// 线程数在这个coreThreadPoolSize范围内时,线程是否能够超时。
// 这里的判断逻辑也很巧妙,若是allowxxxTimeOut为true,coreThread
// 能够超时,则 || 后面判断coreThread的逻辑也就无所谓了,是吧。
// 但若是allowxxxxTimeOut为false,coreThread不容许超时,
// 则须要去判断在判断的线程是否实在coreThread范围内,是的话,
// 则最终结果也为false,符合coreThread不能超时的逻辑;若是大于,
// 则说明当前方法的线程不是在coreThread,
// 注意去理解这个是否是coreThread这个概念
// 因此,timed为true,也就是能够超时,符合逻辑
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 这里我把代码格式化了下,方便你们去看
// 这里的判断就是,在符合了一些逻辑后,就去直接
// wokerCount减一,表明当前这个woker就直接干掉了,
// 而在方法内返回null这个逻辑,在调用getTask的代码处
// 确实也是去干掉当前的worker实例。可是,woker不能
// 瞎干掉,必需要确保线程池能正常产生做用,这个正常做用
// 的实现,要么就是干掉当前的worker还剩下至少一个,
// 要么就是任务队列空了,这个逻辑就在(wc > 1 || workQueue.isEmpty)
// 实现了。再来看 && 以前,在当前线程数大于
// maximumPoolSize限制时,或者当前woker能够超时,
// 即timed为true,同时,上一次获取任务体时也超时了(timedOut)
// 则,当前的worker就干掉。这段逻辑有一个timedOut
// 判断,即上一次当前worker获取任务体时就超时了。
// 我猜想,加这个逻辑,可能就是纯粹的统计学上的效率
// 提升。固然,欢迎更多想法。
//
// 在符合上述条件后,CAS操做来减小workerCount数
// 再返回null,去干掉当前worker实例。
if (
(wc > maximumPoolSize || (timed && timedOut))
&&
(wc > 1 || workQueue.isEmpty())
) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根据当前的worker是否能够超时,调用BlockingQueue
// 的不一样方法来获取任务体。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 获取到任务体,则返回
if (r != null)
return r;
// 超时了,记录标记位
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
这里咱们总结下:
ThreadPoolExecutor的实际逻辑图
workers
线程集合中的Worker
对象,在runWorker
中循环自workQueue
中获取Runnable
任务体进行执行。对workers
线程集合的访问要通过mainLock
这个锁。