如下全部内容以及源码分析都是基于JDK1.8的,请知悉。java
我写博客就真的比较没有顺序了,这可能跟个人学习方式有关,我本身也以为这样挺很差的,可是没办法说服本身去改变,因此也只能这样想到什么学什么了。编程
池化技术真的是一门在我看来很是牛逼的技术,由于它作到了在有限资源内实现了资源利用的最大化,这让我想到了一门课程,那就是运筹学,当时在上运筹学的时候就常常作这种相似的问题。数组
言归正传吧,我接下来会进行一次线程池方面知识点的学习,也会记录下来分享给你们。并发
线程池的内容当中有涉及到AQS同步器的知识点,若是对AQS同步器知识点感受有点薄弱,能够去看个人上一篇文章。ide
既然说到线程池了,并且大多数的大牛也都会建议咱们使用池化技术来管理一些资源,那线程池确定也是有它的好处的,要否则怎么会那么出名而且让你们使用呢?函数
咱们就来看看它究竟有什么优点?oop
资源可控性:使用线程池能够避免建立大量线程而致使内存的消耗源码分析
提升响应速度:线程池地建立其实是很消耗时间和性能的,由线程池建立好有任务就运行,提高响应速度。性能
便于管理:池化技术最突出的一个特色就是能够帮助咱们对池子里的资源进行管理。由线程池统一分配和管理。学习
咱们要用线程池来统一分配和管理咱们的线程,那首先咱们要建立一个线程池出来,仍是有不少大牛已经帮咱们写好了不少方面的代码的,Executors的工厂方法就给咱们提供了建立多种不一样线程池的方法。由于这个类只是一个建立对象的工厂,并无涉及到不少的具体实现,因此我不会过于详细地去说明。
老规矩,仍是直接上代码吧。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
这里也就举出一个方法的例子来进行以后的讲解吧,咱们能够看出,Executors只是个工厂而已,方法也只是来实例化不一样的对象,实际上实例化出来的关键类就是ThreadPoolExecutor
。如今咱们就先来简单地对ThreadPoolExecutor
构造函数内的每一个参数进行解释一下吧。
corePoolSize(核心线程池大小):当提交一个任务到线程池时,线程池会建立一个线程来执行任务,即便其余空闲的基本线程可以执行新任务也会建立线程,当任务数大于核心线程数的时候就不会再建立。在这里要注意一点,线程池刚建立的时候,其中并无建立任何线程,而是等任务来才去建立线程,除非调用了prestartAllCoreThreads()
或者prestartCoreThread()
方法 ,这样才会预先建立好corePoolSize
个线程或者一个线程。
maximumPoolSize(线程池最大线程数):线程池容许建立的最大线程数,若是队列满了,而且已建立的线程数小于最大线程数,则线程池会再建立新的线程执行任务。值得注意的是,若是使用了无界队列,此参数就没有意义了。
keepAliveTime(线程活动保持时间):此参数默认在线程数大于corePoolSize
的状况下才会起做用, 当线程的空闲时间达到keepAliveTime
的时候就会终止,直至线程数目小于corePoolSize
。不过若是调用了allowCoreThreadTimeOut
方法,则当线程数目小于corePoolSize
的时候也会起做用.
unit(keelAliveTime的时间单位):keelAliveTime的时间单位,一共有7种,在这里就不列举了。
workQueue(阻塞队列):阻塞队列,用来存储等待执行的任务,这个参数也是很是重要的,在这里简单介绍一下几个阻塞队列。
ArrayBlockingQueue:这是一个基于数组结构的有界阻塞队列,此队列按照FIFO的原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按照FIFO排序元素,吞吐量一般要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()
就是使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态。吞吐量一般要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()
就使用了这个队列。
PriorityBlockingQueue:一个具备优先级的无阻塞队列。
handler(饱和策略);当线程池和队列都满了,说明线程池已经处于饱和状态了,那么必须采起一种策略来处理还在提交过来的新任务。这个饱和策略默认状况下是AbortPolicy
,表示没法处理新任务时抛出异常。共有四种饱和策略提供,固然咱们也能够选择本身实现饱和策略。
AbortPolicy:直接丢弃而且抛出RejectedExecutionException
异常
CallerRunsPolicy:只用调用者所在线程来运行任务。
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
DiscardPolicy:丢弃任务而且不抛出异常。
线程池的执行流程就用参考资料里的图介绍一下了,具体咱们仍是经过代码去讲解。
在上面咱们简单的讲解了一下Executors
这个工厂类里的工厂方法,而且讲述了一下建立线程池的一些参数以及它们的做用,固然上面的讲解并非很深刻,由于想要弄懂的话是须要持续地花时间去看去理解的,而博主本身也仍是没有彻底弄懂,不过博主的学习方法是先学了个大概,再回头来看看以前的知识点,可能会更加好理解,因此咱们接着往下面讲吧。
在上面咱们就发现了,Executors
的工厂方法主要就返回了ThreadPoolExecutor
对象,至于另外一个在这里暂时不讲,也就是说,要学习线程池,其实关键的仍是得学会分析ThreadPoolExecutor
这个对象里面的源码,咱们接下来就会对ThreadPoolExecutor
里的关键代码进行分析。
ctl
是主要的控制状态,是一个复合类型的变量,其中包括了两个概念。
workerCount:表示有效的线程数目
runState:线程池里线程的运行状态
咱们来分析一下跟ctl
有关的一些源代码吧,直接上代码
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//用来表示线程池数量的位数,很明显是29,Integer.SIZE=32
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池最大数量,2^29 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//咱们能够看出有5种runState状态,证实至少须要3位来表示runState状态
//因此高三位就是表示runState了
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//用于存放线程任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//重入锁
private final ReentrantLock mainLock = new ReentrantLock();
//线程池当中的线程集合,只有当拥有mainLock锁的时候,才能够进行访问
private final HashSet<Worker> workers = new HashSet<Worker>();
//等待条件支持终止
private final Condition termination = mainLock.newCondition();
//建立新线程的线程工厂
private volatile ThreadFactory threadFactory;
//饱和策略
private volatile RejectedExecutionHandler handler;
复制代码
CAPACITY
在这里咱们讲一下这个线程池最大数量的计算吧,由于这里涉及到源码以及位移之类的操做,我感受大多数人都仍是不太会这个,由于我一开始看的时候也是不太会的。
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
复制代码
从代码咱们能够看出,是须要1往左移29位
,而后再减去1,那个1往左移29位
是怎么计算的呢?
1 << COUNT_BITS
1的32位2进制是
00000000 00000000 00000000 00000001
左移29位的话就是
00100000 00000000 00000000 00000000
再进行减一的操做
000 11111 11111111 11111111 11111111
也就是说线程池最大数目就是
000 11111 11111111 11111111 11111111
复制代码
2.runState
正数的原码、反码、补码都是同样的 在计算机底层,是用补码来表示的
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码
能够接受新任务而且处理已经在阻塞队列的任务 高3位所有是1的话,就是RUNNING状态
-1 << COUNT_BITS
这里是-1往左移29位,稍微有点不同,-1的话须要咱们本身算出补码来
-1的原码
10000000 00000000 00000000 00000001
-1的反码,负数的反码是将原码除符号位之外所有取反
11111111 11111111 11111111 11111110
-1的补码,负数的补码就是将反码+1
11111111 11111111 11111111 11111111
关键了,往左移29位,因此高3位全是1就是RUNNING状态
111 00000 00000000 00000000 00000000
复制代码
不接受新任务,可是处理已经在阻塞队列的任务 高3位全是0,就是SHUTDOWN状态
0 << COUNT_BITS
0的表示
00000000 00000000 00000000 00000000
往左移29位
00000000 00000000 00000000 00000000
复制代码
不接受新任务,也不处理阻塞队列里的任务,而且会中断正在处理的任务 因此高3位是001,就是STOP状态
1 << COUNT_BITS
1的表示
00000000 00000000 00000000 00000001
往左移29位
00100000 00000000 00000000 00000000
复制代码
全部任务都被停止,workerCount是0,线程状态转化为TIDYING而且调用terminated()钩子方法 因此高3位是010,就是TIDYING状态
2 << COUNT_BITS
2的32位2进制
00000000 00000000 00000000 00000010
往左移29位
01000000 00000000 00000000 00000000
复制代码
terminated()钩子方法已经完成 因此高3位是110,就是TERMINATED状态
3 << COUNT_BITS
3的32位2进制
00000000 00000000 00000000 00000011
往左移29位
11000000 00000000 00000000 00000000
复制代码
3.部分方法介绍
实时获取runState的方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
复制代码
~CAPACITY
~是按位取反的意思
&是按位与的意思
而CAPACITY是,高位3个0,低29位都是1,因此是
000 11111 11111111 11111111 11111111
取反的话就是
111 00000 00000000 00000000 00000000
传进来的c参数与取反的CAPACITY进行按位与操做
1、低位29个0进行按位与,仍是29个0
2、高位3个1,既保持c参数的高3位
既高位保持原样,低29位都是0,这也就得到了线程池的运行状态runState
复制代码
获取线程池的当前有效线程数目
private static int workerCountOf(int c) { return c & CAPACITY; }
复制代码
CAPACITY的32位2进制是
000 11111 11111111 11111111 11111111
用入参c跟CAPACITY进行按位与操做
1、低29位都是1,因此保留c的低29位,也就是有效线程数
2、高3位都是0,因此c的高3位也是0
这样获取出来的即是workerCount的值
复制代码
原子整型变量ctl的初始化方法
//结合这几句代码来看
private static final int RUNNING = -1 << COUNT_BITS;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
RUNNING是
111 00000 00000000 00000000 00000000
ctlOf是将rs和wc进行按位或的操做
初始化的时候是将RUNNING和0进行按位或
0的32位2进制是
00000000 00000000 00000000 00000000
因此初始化的ctl是
111 00000 00000000 00000000 00000000
复制代码
public void execute(Runnable command) {
//须要执行的任务command为空,抛出空指针异常
if (command == null) // 1
throw new NullPointerException();
/* *执行的流程实际上分为三步 *一、若是运行的线程小于corePoolSize,以用户给定的Runable对象新开一个线程去执行 * 而且执行addWorker方法会以原子性操做去检查runState和workerCount,以防止当返回false的 * 时候添加了不该该添加的线程 *二、 若是任务可以成功添加到队列当中,咱们仍须要对添加的线程进行双重检查,有可能添加的线程在前 * 一次检查时已经死亡,又或者在进入该方法的时候线程池关闭了。因此咱们须要复查状态,并有有必 * 要的话须要在中止时回滚入列操做,或者在没有线程的时候新开一个线程 *三、若是任务没法入列,那咱们须要尝试新增一个线程,若是新建线程失败了,咱们就知道线程可能关闭了 * 或者饱和了,就须要拒绝这个任务 * */
//获取线程池的控制状态
int c = ctl.get(); // 2
//经过workCountOf方法算workerCount值,小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
//添加任务到worker集合当中
if (addWorker(command, true))
return; //成功返回
//失败的话再次获取线程池的控制状态
c = ctl.get();
}
/* *判断线程池是否正处于RUNNING状态 *是的话添加Runnable对象到workQueue队列当中 */
if (isRunning(c) && workQueue.offer(command)) { // 3
//再次获取线程池的状态
int recheck = ctl.get();
//再次检查状态
//线程池不处于RUNNING状态,将任务从workQueue队列中移除
if (! isRunning(recheck) && remove(command))
//拒绝任务
reject(command);
//workerCount等于0
else if (workerCountOf(recheck) == 0) // 4
//添加worker
addWorker(null, false);
}
//加入阻塞队列失败,则尝试以线程池最大线程数新开线程去执行该任务
else if (!addWorker(command, false)) // 5
//执行失败则拒绝任务
reject(command);
}
复制代码
咱们来讲一下上面这个代码的流程:
一、首先判断任务是否为空,空则抛出空指针异常 二、不为空则获取线程池控制状态,判断小于corePoolSize,添加到worker集合当中执行,
- 如成功,则返回
- 失败的话再接着获取线程池控制状态,由于只有状态变了才会失败,因此从新获取 三、判断线程池是否处于运行状态,是的话则添加command到阻塞队列,加入时也会再次获取状态而且检测 状态是否不处于运行状态,不处于的话则将command从阻塞队列移除,而且拒绝任务 四、若是线程池里没有了线程,则建立新的线程去执行获取阻塞队列的任务执行 五、若是以上都没执行成功,则须要开启最大线程池里的线程来执行任务,失败的话就丢弃
有时候再多的文字也不如一个流程图来的明白,因此仍是画了个execute的流程图给你们方便理解。
2.addWorker(Runnable firstTask, boolean core)
private boolean addWorker(Runnable firstTask, boolean core) {
//外部循环标记
retry:
//外层死循环
for (;;) {
//获取线程池控制状态
int c = ctl.get();
//获取runState
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/** *1.若是线程池runState至少已是SHUTDOWN *2\. 有一个是false则addWorker失败,看false的状况 * - runState==SHUTDOWN,即状态已经大于SHUTDOWN了 * - firstTask为null,即传进来的任务为空,结合上面就是runState是SHUTDOWN,可是 * firstTask不为空,表明线程池已经关闭了还在传任务进来 * - 队列为空,既然任务已经为空,队列为空,就不须要往线程池添加任务了 */
if (rs >= SHUTDOWN && //runState大于等于SHUTDOWN,初始位RUNNING
! (rs == SHUTDOWN && //runState等于SHUTDOWN
firstTask == null && //firstTask为null
! workQueue.isEmpty())) //workQueue队列不为空
return false;
//内层死循环
for (;;) {
//获取线程池的workerCount数量
int wc = workerCountOf(c);
//若是workerCount超出最大值或者大于corePoolSize/maximumPoolSize
//返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//经过CAS操做,使workerCount数量+1,成功则跳出循环,回到retry标记
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS操做失败,再次获取线程池的控制状态
c = ctl.get(); // Re-read ctl
//若是当前runState不等于刚开始获取的runState,则跳出内层循环,继续外层循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//CAS因为更改workerCount而失败,继续内层循环
}
}
//经过以上循环,能执行到这是workerCount成功+1了
//worker开始标记
boolean workerStarted = false;
//worker添加标记
boolean workerAdded = false;
//初始化worker为null
Worker w = null;
try {
//初始化一个当前Runnable对象的worker对象
w = new Worker(firstTask);
//获取该worker对应的线程
final Thread t = w.thread;
//若是线程不为null
if (t != null) {
//初始线程池的锁
final ReentrantLock mainLock = this.mainLock;
//获取锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取锁后再次检查,获取线程池runState
int rs = runStateOf(ctl.get());
//当runState小于SHUTDOWN或者runState等于SHUTDOWN而且firstTask为null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//线程已存活
if (t.isAlive()) // precheck that t is startable
//线程未启动就存活,抛出IllegalThreadStateException异常
throw new IllegalThreadStateException();
//将worker对象添加到workers集合当中
workers.add(w);
//获取workers集合的大小
int s = workers.size();
//若是大小超过largestPoolSize
if (s > largestPoolSize)
//从新设置largestPoolSize
largestPoolSize = s;
//标记worker已经被添加
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
//若是worker添加成功
if (workerAdded) {
//启动线程
t.start();
//标记worker已经启动
workerStarted = true;
}
}
} finally {
//若是worker没有启动成功
if (! workerStarted)
//workerCount-1的操做
addWorkerFailed(w);
}
//返回worker是否启动的标记
return workerStarted;
}
复制代码
咱们也简单说一下这个代码的流程吧,还真的是挺难的,博主写的时候都停了好屡次,想砸键盘的说:
一、获取线程池的控制状态,进行判断,不符合则返回false,符合则下一步 二、死循环,判断workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,没有的话则对workerCount+1操做, 三、若是不符合上述判断或+1操做失败,再次获取线程池的控制状态,获取runState与刚开始获取的runState相比,不一致则跳出内层循环继续外层循环,不然继续内层循环 四、+1操做成功后,使用重入锁ReentrantLock来保证往workers当中添加worker实例,添加成功就启动该实例。
接下来看看流程图来理解一下上面代码的一个执行流程
3.addWorkerFailed(Worker w)
addWorker方法添加worker失败,而且没有成功启动任务的时候,就会调用此方法,将任务从workers中移除,而且workerCount作-1操做。
private void addWorkerFailed(Worker w) {
//重入锁
final ReentrantLock mainLock = this.mainLock;
//获取锁
mainLock.lock();
try {
//若是worker不为null
if (w != null)
//workers移除worker
workers.remove(w);
//经过CAS操做,workerCount-1
decrementWorkerCount();
tryTerminate();
} finally {
//释放锁
mainLock.unlock();
}
}
复制代码
4.tryTerminate()
当对线程池执行了非正常成功逻辑的操做时,都会须要执行tryTerminate尝试终止线程池
final void tryTerminate() {
//死循环
for (;;) {
//获取线程池控制状态
int c = ctl.get();
/* *线程池处于RUNNING状态 *线程池状态最小大于TIDYING *线程池==SHUTDOWN而且workQUeue不为空 *直接return,不能终止 */
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//若是workerCount不为0
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//获取线程池的锁
final ReentrantLock mainLock = this.mainLock;
//获取锁
mainLock.lock();
try {
//经过CAS操做,设置线程池状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//设置线程池的状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//发送释放信号给在termination条件上等待的线程
termination.signalAll();
}
return;
}
} finally {
//释放锁
mainLock.unlock();
}
// else retry on failed CAS
}
}
复制代码
5.runWorker(Worker w)
该方法的做用就是去执行任务
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//获取worker里的任务
Runnable task = w.firstTask;
//将worker实例的任务赋值为null
w.firstTask = null;
/* *unlock方法会调用AQS的release方法 *release方法会调用具体实现类也就是Worker的tryRelease方法 *也就是将AQS状态置为0,容许中断 */
w.unlock(); // allow interrupts
//是否忽然完成
boolean completedAbruptly = true;
try {
//worker实例的task不为空,或者经过getTask获取的不为空
while (task != null || (task = getTask()) != null) {
//获取锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/* *获取线程池的控制状态,至少要大于STOP状态 *若是状态不对,检查当前线程是否中断并清除中断状态,而且再次检查线程池状态是否大于STOP *若是上述知足,检查该对象是否处于中断状态,不清除中断标记 */
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中断改对象
wt.interrupt();
try {
//执行前的方法,由子类具体实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行完后调用的方法,也是由子类具体实现
afterExecute(task, thrown);
}
} finally {//执行完后
//task设置为null
task = null;
//已完成任务数+1
w.completedTasks++;
//释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理并退出当前worker
processWorkerExit(w, completedAbruptly);
}
}
复制代码
接下来咱们用文字来讲明一下执行任务这个方法的具体逻辑和流程。
- 首先在方法一进来,就执行了w.unlock(),这是为了将AQS的状态改成0,由于只有getState() >= 0的时候,线程才能够被中断;
- 判断firstTask是否为空,为空则经过getTask()获取任务,不为空接着往下执行
- 判断是否符合中断状态,符合的话设置中断标记
- 执行beforeExecute(),task.run(),afterExecute()方法
- 任何一个出异常都会致使任务执行的终止;进入processWorkerExit来退出任务
- 正常执行的话会接着回到步骤2
附上一副简单的流程图:
6.getTask()
在上面的runWorker方法当中咱们能够看出,当firstTask为空的时候,会经过该方法来接着获取任务去执行,那咱们就看看获取任务这个方法究竟是怎么样的?
private Runnable getTask() {
//标志是否获取任务超时
boolean timedOut = false; // Did the last poll() time out?
//死循环
for (;;) {
//获取线程池的控制状态
int c = ctl.get();
//获取线程池的runState
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/* *判断线程池的状态,出现如下两种状况 *一、runState大于等于SHUTDOWN状态 *二、runState大于等于STOP或者阻塞队列为空 *将会经过CAS操做,进行workerCount-1并返回null */
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取线程池的workerCount
int wc = workerCountOf(c);
// Are workers subject to culling?
/* *allowCoreThreadTimeOut:是否容许core Thread超时,默认false *workerCount是否大于核心核心线程池 */
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/* *一、wc大于maximumPoolSize或者已超时 *二、队列不为空时保证至少有一个任务 */
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
/* *经过CAS操做,workerCount-1 *能进行-1操做,证实wc大于maximumPoolSize或者已经超时 */
if (compareAndDecrementWorkerCount(c))
//-1操做成功,返回null
return null;
//-1操做失败,继续循环
continue;
}
try {
/* *wc大于核心线程池 *执行poll方法 *小于核心线程池 *执行take方法 */
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//判断任务不为空返回任务
if (r != null)
return r;
//获取一段时间没有获取到,获取超时
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
仍是文字解说一下上面的代码逻辑和流程:
- 获取线程池控制状态和runState,判断线程池是否已经关闭或者正在关闭,是的话则workerCount-1操做返回null
- 获取workerCount判断是否大于核心线程池
- 判断workerCount是否大于最大线程池数目或者已经超时,是的话workerCount-1,-1成功则返回null,不成功则回到步骤1从新继续
- 判断workerCount是否大于核心线程池,大于则用poll方法从队列获取任务,不然用take方法从队列获取任务
- 判断任务是否为空,不为空则返回获取的任务,不然回到步骤1从新继续
接下来依然有一副流程图:
7.processWorkerExit
明显的,在执行任务当中,会去获取任务进行执行,那既然是执行任务,确定就会有执行完或者出现异常中断执行的时候,那这时候确定也会有相对应的操做,至于具体操做是怎么样的,咱们仍是直接去看源码最实际。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/* *completedAbruptly:在runWorker出现,表明是否忽然完成的意思 *也就是在执行任务过程中出现异常,就会忽然完成,传true * *若是是忽然完成,须要经过CAS操做,workerCount-1 *不是忽然完成,则不须要-1,由于getTask方法当中已经-1 * *下面的代码注释貌似与代码意思相反了 */
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//生成重入锁
final ReentrantLock mainLock = this.mainLock;
//获取锁
mainLock.lock();
try {
//线程池统计的完成任务数completedTaskCount加上worker当中完成的任务数
completedTaskCount += w.completedTasks;
//从HashSet<Worker>中移除
workers.remove(w);
} finally {
//释放锁
mainLock.unlock();
}
//由于上述操做是释听任务或线程,因此会判断线程池状态,尝试终止线程池
tryTerminate();
//获取线程池的控制状态
int c = ctl.get();
//判断runState是否小鱼STOP,便是RUNNING或者SHUTDOWN
//若是是RUNNING或者SHUTDOWN,表明没有成功终止线程池
if (runStateLessThan(c, STOP)) {
/* *是否忽然完成 *如若不是,表明已经没有任务可获取完成,由于getTask当中是while循环 */
if (!completedAbruptly) {
/* *allowCoreThreadTimeOut:是否容许core thread超时,默认false *min-默认是corePoolSize */
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//容许core thread超时而且队列不为空
//min为0,即容许core thread超时,这样就不须要维护核心核心线程池了
//若是workQueue不为空,则至少保持一个线程存活
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//若是workerCount大于min,则表示知足所需,能够直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//若是是忽然完成,添加一个空任务的worker线程--这里我也不太理解
addWorker(null, false);
}
}
复制代码
- 首先判断线程是否忽然终止,若是是忽然终止,经过CAS,workerCount-1
- 统计线程池完成任务数,并将worker从workers当中移除
- 判断线程池状态,尝试终止线程池
- 线程池没有成功终止
- 判断是否忽然完成任务,不是则进行下一步,是则进行第三步
- 如容许核心线程超时,队列不为空,则至少保证一个线程存活
- 添加一个空任务的worker线程
咱们在上面已经算是挺详细地讲了线程池执行任务execute
的执行流程和一些细节,在上面频繁地出现了一个字眼,那就是worker实例,那么这个worker到底是什么呢?里面都包含了一些什么信息,以及worker这个任务到底是怎么执行的呢?
咱们就在这个部分来介绍一下吧,仍是直接上源码:
咱们能够看到Worker内部类继承AQS同步器而且实现了Runnable接口,因此Worker很明显就是一个可执行任务而且又能够控制中断、起到锁效果的类。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
private static final long serialVersionUID = 6138294804551838833L;
/** 工做线程,若是工厂失败则为空. */
final Thread thread;
/** 初始化任务,有可能为空 */
Runnable firstTask;
/** 已完成的任务计数 */
volatile long completedTasks;
/** * 建立并初始化第一个任务,使用线程工厂来建立线程 * 初始化有3步 *一、设置AQS的同步状态为-1,表示该对象须要被唤醒 *二、初始化第一个任务 *三、调用ThreadFactory来使自身建立一个线程,并赋值给worker的成员变量thread */
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//重写Runnable的run方法
/** Delegates main run loop to outer runWorker */
public void run() {
//调用ThreadPoolExecutor的runWorker方法
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
//表明是否独占锁,0-非独占 1-独占
protected boolean isHeldExclusively() {
return getState() != 0;
}
//重写AQS的tryAcquire方法尝试获取锁
protected boolean tryAcquire(int unused) {
//尝试将AQS的同步状态从0改成1
if (compareAndSetState(0, 1)) {
//若是改变成,则将当前独占模式的线程设置为当前线程并返回true
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
//不然返回false
return false;
}
//重写AQS的tryRelease尝试释放锁
protected boolean tryRelease(int unused) {
//设置当前独占模式的线程为null
setExclusiveOwnerThread(null);
//设置AQS同步状态为0
setState(0);
//返回true
return true;
}
//获取锁
public void lock() { acquire(1); }
//尝试获取锁
public boolean tryLock() { return tryAcquire(1); }
//释放锁
public void unlock() { release(1); }
//是否被独占
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
复制代码
写这个线程池就真的是不容易了,历时两个星期,中途有不少的地方不懂,并且《Java并发编程的艺术》的这本书当中对线程池的介绍其实并不算多,因此本身看起来也挺痛苦的,还常常会看了这个方法就不知道为何要调用这个以及调用这个方法是出何用意。并且在这学习的过程中,有在怀疑本身的学习方法对不对,由于也有人跟我说不须要一句句去看去分析源码,只须要知道流程就能够了,可是后来仍是想一想按照本身的学习路线走,多读源码老是有好处的,在这里我也给程序猿一些建议,有本身的学习方法的时候,按照本身的方式坚决走下去。
方腾飞:《Java并发编程的艺术》
如需转载,请务必注明出处,毕竟一块块搬砖也不是容易的事情。