线程池的使用主要是解决两个问题:①当执行大量异步任务的时候线程池可以提供更好的性能,在不使用线程池时候,每当须要执行异步任务的时候直接new一个线程来运行的话,线程的建立和销毁都是须要开销的。而线程池中的线程是可复用的,不须要每次执行异步任务的时候从新建立和销毁线程;②线程池提供一种资源限制和管理的手段,好比能够限制线程的个数,动态的新增线程等等。程序员
在下面的分析中,咱们能够看到,线程池使用一个Integer的原子类型变量来记录线程池状态和线程池中的线程数量,经过线程池状态来控制任务的执行,每一个工做线程Worker线程能够处理多个任务。sql
一、咱们先简单看一下关于ThreadPoolExecutor的一些成员变量以及其所表示的含义数组
ThreadPoolExecutor继承了AbstractExecutorService,其中的成员变量ctl是一个Integer类型的原子变量,用来记录线程池的状态和线程池中的线程的个数,相似于前面讲到的读写锁中使用一个变量保存两种信息。这里(Integer看作32位)ctl高三位表示线程池的状态,后面的29位表示线程池中的线程个数。以下所示是ThreadPoolExecutor源码中的成员变量安全
1 //(高3位)表示线程池状态,(低29位)表示线程池中线程的个数;
2 // 默认状态是RUNNING,线程池中线程个数为0
3 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
4
5 //表示具体平台下Integer的二进制位数-3后的剩余位数表示的数才是线程的个数;
6 //其中Integer.SIZE=32,-3以后的低29位表示的就是线程的个数了
7 private static final int COUNT_BITS = Integer.SIZE - 3;
8
9 //线程最大个数(低29位)00011111111111111111111111111111(1<<29-1)
10 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
11
12 //线程池状态(高3位表示线程池状态)
13 //111 00000000000000000000000000000
14 private static final int RUNNING = -1 << COUNT_BITS;
15
16 //000 00000000000000000000000000000
17 private static final int SHUTDOWN = 0 << COUNT_BITS;
18
19 //001 00000000000000000000000000000
20 private static final int STOP = 1 << COUNT_BITS;
21
22 //010 00000000000000000000000000000
23 private static final int TIDYING = 2 << COUNT_BITS;
24
25 //011 00000000000000000000000000000
26 private static final int TERMINATED = 3 << COUNT_BITS;
27
28 //获取高3位(运行状态)==> c & 11100000000000000000000000000000
29 private static int runStateOf(int c) { return c & ~CAPACITY; }
30
31 //获取低29位(线程个数)==> c & 00011111111111111111111111111111
32 private static int workerCountOf(int c) { return c & CAPACITY; }
33
34 //计算原子变量ctl新值(运行状态和线程个数)
35 private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
下面咱们简单解释一下上面的线程状态的含义:bash
①RUNNING:接受新任务并处理阻塞队列中的任务多线程
②SHUTDOWN:拒绝新任务可是处理阻塞队列中的任务架构
③STOP:拒绝新任务并抛弃阻塞队列中的任务,同时会中断当前正在执行的任务并发
④TIDYING:全部任务执行完以后(包含阻塞队列中的任务)当前线程池中活跃的线程数量为0,将要调用terminated方法异步
⑥TERMINATED:终止状态。terminated方法调用以后的状态分布式
二、下面初步了解一下ThreadPoolExecutor的参数以及实现原理
①corePoolSize:线程池核心现车个数
②workQueue:用于保存等待任务执行的任务的阻塞队列(好比基于数组的有界阻塞队列ArrayBlockingQueue、基于链表的无界阻塞队列LinkedBlockingQueue等等)
③maximumPoolSize:线程池最大线程数量
④ThreadFactory:建立线程的工厂
⑤RejectedExecutionHandler:拒绝策略,表示当队列已满而且线程数量达到线程池最大线程数量的时候对新提交的任务所采起的策略,主要有四种策略:AbortPolicy(抛出异常)、CallerRunsPolicy(只用调用者所在线程来运行该任务)、DiscardOldestPolicy(丢掉阻塞队列中最近的一个任务来处理当前提交的任务)、DiscardPolicy(不作处理,直接丢弃掉)
⑥keepAliveTime:存活时间,若是当前线程池中的数量比核心线程数量多,而且当前线程是闲置状态,该变量就是这些线程的最大生存时间
⑦TimeUnit:存活时间的时间单位。
根据上面的参数介绍,简单了解一下线程池的实现原理,以提交一个新任务为开始点,分析线程池的主要处理流程
三、关于一些线程池的使用类型
①newFixedThreadPool:建立一个核心线程个数和最大线程个数均为nThreads的线程池,而且阻塞队列长度为Integer.MAX_VALUE,keepAliveTime=0说明只要线程个数比核心线程个数多而且当前空闲即回收。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
②newSingleThreadExecutor:建立一个核心线程个数和最大线程个数都为1 的线程池,而且阻塞队列长度为Integer.MAX_VALUE,keepAliveTime=0说明只要线程个数比核心线程个数多而且当前线程空闲即回收该线程。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
复制代码
③newCachedThreadPoolExecutor:建立一个按需建立线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,而且阻塞队列为同步队列(最多只有一个元素),keepAliveTime=60说明只要当前线程在60s内空闲则回收。这个类型的线程池的特色就是:加入同步队列的任务会被立刻执行,同步队列中最多只有一个任务
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
四、ThreadPoolExecutor中的其余成员
其中的ReentrantLock可参考前面写到的Java中的锁——Lock和synchronized,其中降到了ReentrantLock的具体实现原理;
关于AQS部分可参考前面说到的Java中的队列同步器AQS,也讲到了关于AQS的具体实现原理分析;
关于条件队列的相关知识可参考前面写的Java中的线程协做之Condition,里面说到了关于Java中线程协做Condition的实现原理;
//独占锁,用来控制新增工做线程Worker操做的原子性
private final ReentrantLock mainLock = new ReentrantLock();
//工做线程集合,Worker继承了AQS接口和Runnable接口,是具体处理任务的线程对象
//Worker实现AQS,并本身实现了简单不可重入独占锁,其中state=0表示当前锁未被获取状态,state=1表示锁被获取,
//state=-1表示Work建立时候的默认状态,建立时候设置state=-1是为了防止runWorker方法运行前被中断
private final HashSet<Worker> workers = new HashSet<Worker>();
//termination是该锁对应的条件队列,在线程调用awaitTermination时候用来存放阻塞的线程
private final Condition termination = mainLock.newCondition();
复制代码
executor方法的做用是提交任务command到线程池执行,能够简单的按照下面的图进行理解,ThreadPoolExecutor的实现相似于一个生产者消费者模型,当用户添加任务到线程池中至关于生产者生产元素,workers工做线程则直接执行任务或者从任务队列中获取任务,至关于消费之消费元素。
1 public void execute(Runnable command) {
2 //(1)首先检查任务是否为null,为null抛出异常,不然进行下面的步骤
3 if (command == null)
4 throw new NullPointerException();
5 //(2)ctl值中包含了当前线程池的状态和线程池中的线程数量
6 int c = ctl.get();
7 //(3)workerCountOf方法是获取低29位,即获取当前线程池中的线程个数,若是小于corePoolSize,就开启新的线程运行
8 if (workerCountOf(c) < corePoolSize) {
9 if (addWorker(command, true))
10 return;
11 c = ctl.get();
12 }
13 //(4)若是线程池处理RUNNING状态,就添加任务到阻塞队列中
14 if (isRunning(c) && workQueue.offer(command)) {
15 //(4-1)二次检查,获取ctl值
16 int recheck = ctl.get();
17 //(4-2)若是当前线程池不是出于RUNNING状态,就从队列中删除任务,并执行拒绝策略
18 if (! isRunning(recheck) && remove(command))
19 reject(command);
20 //(4-3)不然,若是线程池为空,就添加一个线程
21 else if (workerCountOf(recheck) == 0)
22 addWorker(null, false);
23 }
24 //(5)若是队列满,则新增线程,若是新增线程失败,就执行拒绝策略
25 else if (!addWorker(command, false))
26 reject(command);
27 }
复制代码
咱们在看一下上面代码的执行流程,按照标记的数字进行分析:
①步骤(3)判断当前线程池中的线程个数是否小于corePoolSize,若是小于核心线程数,会向workers里面新增一个核心线程执行任务。
②若是当前线程池中的线程数量大于核心线程数,就执行(4)。(4)首先判断当前线程池是否处于RUNNING状态,若是处于该状态,就添加任务到任务队列中,这里须要判断线程池的状态是由于线程池可能已经处于非RUNNING状态,而在非RUNNING状态下是须要抛弃新任务的。
③若是想任务队列中添加任务成功,须要进行二次校验,由于在添加任务到任务队列后,可能线程池的状态发生了变化,因此这里须要进行二次校验,若是当前线程池已经不是RUNNING状态了,须要将任务从任务队列中移除,而后执行拒绝策略;若是二次校验经过,则执行4-3代码从新判断当前线程池是否为空,若是线程池为空没有线程,那么就须要新建立一个线程。
④若是上面的步骤(4)建立添加任务失败,说明队列已满,那么(5)会尝试再开启新的线程执行任务(类比上图中的thread3和thread4,即不是核心线程的那些线程),若是当前线程池中的线程个数已经大于最大线程数maximumPoolSize,表示不能开启新的线程。这就属于线程池满而且任务队列满,就须要执行拒绝策略了。
下面咱们在看看addWorker方法的实现
1 private boolean addWorker(Runnable firstTask, boolean core) {
2 retry:
3 for (;;) {
4 int c = ctl.get();
5 int rs = runStateOf(c);
6
7 //(6)检查队列是否只在必要时候为空
8 if (rs >= SHUTDOWN &&
9 ! (rs == SHUTDOWN &&
10 firstTask == null &&
11 ! workQueue.isEmpty()))
12 return false;
13
14 //(7)使用CAS增长线程个数
15 for (;;) {
16 //根据ctl值得到当前线程池中的线程数量
17 int wc = workerCountOf(c);
18 //(7-1)若是线程数量超出限制,返回false
19 if (wc >= CAPACITY ||
20 wc >= (core ? corePoolSize : maximumPoolSize))
21 return false;
22 //(7-2)CAS增长线程数量,同时只有一个线程能够成功
23 if (compareAndIncrementWorkerCount(c))
24 break retry;
25 c = ctl.get(); // 从新读取ctl值
26 //(7-3)CAS失败了,须要查看当前线程池状态是否发生变化,若是发生变化须要跳转到外层循环尝试从新获取线程池状态,不然内层循环从新进行CAS增长线程数量
27 if (runStateOf(c) != rs)
28 continue retry;
29 }
30 }
31
32 //(8)执行到这里说明CAS增长新线程个数成功了,咱们须要开始建立新的工做线程Worker
33 boolean workerStarted = false;
34 boolean workerAdded = false;
35 Worker w = null;
36 try {
37 //(8-1)建立新的worker
38 w = new Worker(firstTask);
39 final Thread t = w.thread;
40 if (t != null) {
41 final ReentrantLock mainLock = this.mainLock;
42 //(8-2)加独占锁,保证workers的同步,可能线程池中的多个线程调用了线程池的execute方法
43 mainLock.lock();
44 try {
45 // (8-3)从新检查线程池状态,以避免在获取锁以前调用shutdown方法改变线程池状态
46 int rs = runStateOf(ctl.get());
47
48 if (rs < SHUTDOWN ||
49 (rs == SHUTDOWN && firstTask == null)) {
50 if (t.isAlive()) // precheck that t is startable
51 throw new IllegalThreadStateException();
52 //(8-4)添加新任务
53 workers.add(w);
54 int s = workers.size();
55 if (s > largestPoolSize)
56 largestPoolSize = s;
57 workerAdded = true;
58 }
59 } finally {
60 mainLock.unlock();
61 }
62 //(8-6)添加新任务成功以后,启动任务
63 if (workerAdded) {
64 t.start();
65 workerStarted = true;
66 }
67 }
68 } finally {
69 if (! workerStarted)
70 addWorkerFailed(w);
71 }
72 return workerStarted;
73 }
复制代码
简单再分析说明一下上面的代码,addWorker方法主要分为两部分,第一部分是使用CAS线程安全的添加线程数量,第二部分则是建立新的线程而且将任务并发安全的添加到新的workers之中,而后启动线程执行。
①代码(6)中检查队列是否只在必要时候为空,只有线程池状态符合条件才可以进行下面的步骤,从(6)中的判断条件来看,下面的集中状况addWorker会直接返回false
( I )当前线程池状态为STOP,TIDYING或者TERMINATED ; (I I)当前线程池状态为SHUTDOWN而且已经有了第一个任务; (I I I)当前线程池状态为SHUTDOWN而且任务队列为空
②外层循环中判断条件经过以后,在内层循环中使用CAS增长线程数,当CAS成功就退出双重循环进行(8)步骤代码的执行,若是失败须要查看当前线程池的状态是否发生变化,若是发生变化须要进行外层循环从新判断线程池状态而后在进入内层循环从新进行CAS增长线程数,若是线程池状态没有发生变化可是上一次CAS失败就继续进行CAS尝试。
③执行到(8)代码处,代表当前已经成功增长 了线程数,可是尚未线程执行任务。ThreadPoolExecutor中使用全局独占锁mainLock来控制将新增的工做线程Worker线程安全的添加到工做者线程集合workers中。
④(8-2)获取了独占锁,可是在获取到锁以后,还须要进行从新检查线程池的状态,这是为了不在获取全局独占锁以前其余线程调用了shutDown方法关闭了线程池。若是线程池已经关闭须要释放锁。不然将新增的线程添加到工做集合中,释放锁启动线程执行任务。
上面的addWorker方法最后几行中,会判断添加工做线程是否成功,若是失败,会执行addWorkerFailed方法,将任务从workers中移除,而且workerCount作-1操做。
1 private void addWorkerFailed(Worker w) {
2 final ReentrantLock mainLock = this.mainLock;
3 //获取锁
4 mainLock.lock();
5 try {
6 //若是worker不为null
7 if (w != null)
8 //workers移除worker
9 workers.remove(w);
10 //经过CAS操做,workerCount-1
11 decrementWorkerCount();
12 tryTerminate();
13 } finally {
14 //释放锁
15 mainLock.unlock();
16 }
17 }
复制代码
(1)工做线程Worker类源码分析
上面查看addWorker方法在CAS更新线程数成功以后,下面就是建立新的Worker线程执行任务,因此咱们这里先查看Worker类,下面是Worker类的源码,咱们能够看出,Worker类继承了AQS并实现了Runnable接口,因此他既是一个自定义的同步组件,也是一个执行任务的线程类。下面咱们分析Worker类的执行
1 private final class Worker
2 extends AbstractQueuedSynchronizer
3 implements Runnable
4 {
5
6 /** 使用线程工厂建立的线程,执行任务 */
7 final Thread thread;
8 /** 初始化执行任务 */
9 Runnable firstTask;
10 /** 计数 */
11 volatile long completedTasks;
12
13 /**
14 * 给出初始firstTask,线程建立工厂建立新的线程
15 */
16 Worker(Runnable firstTask) {
17 setState(-1); // 防止在调用runWorker以前被中断
18 this.firstTask = firstTask;
19 this.thread = getThreadFactory().newThread(this); //使用threadFactory建立线程
20 }
21
22 /** run方法实际上执行的是runWorker方法 */
23 public void run() {
24 runWorker(this);
25 }
26
27 // 关于同步状态(锁)
28 //
29 // 同步状态state=0表示锁未被获取
30 // 同步状态state=1表示锁被获取
31
32 protected boolean isHeldExclusively() {
33 return getState() != 0;
34 }
35
36 //下面都是重写AQS的方法,Worker为自定义的同步组件
37 protected boolean tryAcquire(int unused) {
38 if (compareAndSetState(0, 1)) {
39 setExclusiveOwnerThread(Thread.currentThread());
40 return true;
41 }
42 return false;
43 }
44
45 protected boolean tryRelease(int unused) {
46 setExclusiveOwnerThread(null);
47 setState(0);
48 return true;
49 }
50
51 public void lock() { acquire(1); }
52 public boolean tryLock() { return tryAcquire(1); }
53 public void unlock() { release(1); }
54 public boolean isLocked() { return isHeldExclusively(); }
55
56 void interruptIfStarted() {
57 Thread t;
58 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
59 try {
60 t.interrupt();
61 } catch (SecurityException ignore) {
62 }
63 }
64 }
65 }
复制代码
在构造函数中咱们能够看出,首先将同步状态state置为-1,而Worker这个同步组件的state有三个值,其中state=-1表示Work建立时候的默认状态,建立时候设置state=-1是为了防止runWorker方法运行前被中断 前面说到过这个结论,这里置为-1是为了不当前Worker在调用runWorker方法以前被中断(当其余线程调用线程池的shutDownNow时候,若是Worker的state>=0则会中断线程),设置为-1就不会被中断了。而Worker实现Runnable接口,那么须要重写run方法,在run方法中,咱们能够看到,实际上执行的是runWorker方法,在runWorker方法中,会首先调用unlock方法,该方法会将state置为0,因此这个时候调用shutDownNow方法就会中断当前线程,而这个时候已经进入了runWork方法了,就不会在尚未执行runWorker方法的时候就中断线程。
(2)runWorker方法的源码分析
1 final void runWorker(Worker w) {
2 Thread wt = Thread.currentThread();
3 Runnable task = w.firstTask;
4 w.firstTask = null;
5 w.unlock(); // 这个时候调用unlock方法,将state置为0,就能够被中断了
6 boolean completedAbruptly = true;
7 try {
8 //(10)若是当前任务为null,或者从任务队列中获取到的任务为null,就跳转到(11)处执行清理工做
9 while (task != null || (task = getTask()) != null) {
10 //task不为null,就须要线程执行任务,这个时候,须要获取工做线程内部持有的独占锁
11 w.lock();
12 /**若是线程池已被中止(STOP)(至少大于STOP状态),要确保线程都被中断
13 * 若是状态不对,检查当前线程是否中断并清除中断状态,而且再次检查线程池状态是否大于STOP
14 * 若是上述知足,检查该对象是否处于中断状态,不清除中断标记
15 */
16 if ((runStateAtLeast(ctl.get(), STOP) ||
17 (Thread.interrupted() &&
18 runStateAtLeast(ctl.get(), STOP))) &&
19 !wt.isInterrupted())
20 //中断该对象
21 wt.interrupt();
22 try {
23 //执行任务以前要作的事情
24 beforeExecute(wt, task);
25 Throwable thrown = null;
26 try {
27 task.run(); //执行任务
28 } catch (RuntimeException x) {
29 thrown = x; throw x;
30 } catch (Error x) {
31 thrown = x; throw x;
32 } catch (Throwable x) {
33 thrown = x; throw new Error(x);
34 } finally {
35 //执行任务以后的方法
36 afterExecute(task, thrown);
37 }
38 } finally {
39 task = null;
40 //更新当前已完成任务数量
41 w.completedTasks++;
42 //释放锁
43 w.unlock();
44 }
45 }
46 completedAbruptly = false;
47 } finally {
48 //执行清理工做:处理并退出当前worker
49 processWorkerExit(w, completedAbruptly);
50 }
51 }
复制代码
咱们梳理一下runWorker方法的执行流程
①首先先执行unlock方法,将Worker的state置为0,这样工做线程就能够被中断了(后续的操做若是线程池关闭就须要线程被中断)
②首先判断判断当前的任务(当前工做线程中的task,或者从任务队列中取出的task)是否为null,若是不为null就往下执行,为null就执行processWorkerExit方法。
③获取工做线程内部持有的独占锁(避免在执行任务期间,其余线程调用shutdown后正在执行的任务被中断,shutdown只会中断当前被阻塞挂起的没有执行任务的线程)
④而后执行beforeExecute()方法,该方法为扩展接口代码,表示在具体执行任务以前所作的一些事情,而后执行task.run()方法执行具体任务,执行完以后会调用afterExecute()方法,用以处理任务执行完毕以后的工做,也是一个扩展接口代码。
⑤更新当前线程池完成的任务数,并释放锁
(3)执行清理工做的方法processWorkerExit
下面是方法processWorkerExit的源码,在下面的代码中
①首先(1-1)处统计线程池完成的任务个数,而且在此以前获取全局锁,而后更新当前的全局计数器,而后从工做线程集合中移除当前工做线程,完成清理工做。
②代码(1-2)调用了tryTerminate 方法,在该方法中,判断了当前线程池状态是SHUTDOWN而且队列不为空或者当前线程池状态为STOP而且当前线程池中没有活动线程,则置线程池状态为TERMINATED。若是设置称为了TERMINATED状态,还须要调用全局条件变量termination的signalAll方法唤醒全部由于调用线程池的awaitTermination方法而被阻塞住的线程,使得线程池中的全部线程都中止,从而使得线程池为TERMINATED状态。
③代码(1-3)处判断当前线程池中的线程个数是否小于核心线程数,若是是,须要新增一个线程保证有足够的线程能够执行任务队列中的任务或者提交的任务。
1 private void processWorkerExit(Worker w, boolean completedAbruptly) {
2 /*
3 *completedAbruptly:是由runWorker传过来的参数,表示是否忽然完成的意思
4 *当在就是在执行任务过程中出现异常,就会忽然完成,传true
5 *
6 *若是是忽然完成,须要经过CAS操做,更新workerCount(-1操做)
7 *不是忽然完成,则不须要-1,由于getTask方法当中已经-1(getTask方法中执行了decrementWorkerCount()方法)
8 */
9 if (completedAbruptly)
10 decrementWorkerCount();
11 //(1-1)在统计完成任务个数以前加上全局锁,而后统计线程池中完成的任务个数并更新全局计数器,并从工做集中删除当前worker
12 final ReentrantLock mainLock = this.mainLock;
13 mainLock.lock(); //得到全局锁
14 try {
15 completedTaskCount += w.completedTasks; //更新已完成的任务数量
16 workers.remove(w); //将完成该任务的线程worker从工做线程集合中移除
17 } finally {
18 mainLock.unlock(); //释放锁
19 }
20 /**(1-2)
21 * 这一个方法调用完成了下面的事情:
22 * 判断若是当前线程池状态是SHUTDOWN而且工做队列为空,
23 * 或者当前线程池状态是STOP而且当前线程池里面没有活动线程,
24 * 则设置当前线程池状态为TERMINATED,若是设置成了TERMINATED状态,
25 * 还须要调用条件变量termination的signAll方法激活全部由于调用线程池的awaitTermination方法而被阻塞的线程
26 */
27 tryTerminate();
28
29 //(1-3)若是当前线程池中线程数小于核心线程,则增长核心线程数
30 int c = ctl.get();
31 //判断当前线程池的状态是否小于STOP(RUNNING或者SHUTDOWN)
32 if (runStateLessThan(c, STOP)) {
33 //若是任务突然完成,执行后续的代码
34 if (!completedAbruptly) {
35 //allowCoreThreadTimeOut表示是否容许核心线程超时,默认为false
36 //min这里当默认为allowCoreThreadTimeOut默认为false的时候,min置为coorPoolSize
37 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
38 //这里说明:若是容许核心线程超时,那么allowCoreThreadTimeOut可为true,那么min值为0,不须要维护核心线程了
39 //若是min为0而且任务队列不为空
40 if (min == 0 && ! workQueue.isEmpty())
41 min = 1; //这里表示若是min为0,且队列不为空,那么至少须要一个核心线程存活来保证任务的执行
42 //若是工做线程数大于min,表示当前线程数知足,直接返回
43 if (workerCountOf(c) >= min)
44 return; // replacement not needed
45 }
46 addWorker(null, false);
47 }
48 }
复制代码
在tryTerminate 方法中,咱们简单说明了该方法的做用,下面是该方法的源码,能够看出源码实现上和上面所总结的功能是差很少的
1 final void tryTerminate() {
2 for (;;) {
3 //获取线程池状态
4 int c = ctl.get();
5 //若是线程池状态为RUNNING
6 //或者状态大于TIDYING
7 //或者状态==SHUTDOWN并未任务队列不为空
8 //直接返回,不能调用terminated方法
9 if (isRunning(c) ||
10 runStateAtLeast(c, TIDYING) ||
11 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
12 return;
13 //若是线程池中工做线程数不为0,须要中断线程
14 if (workerCountOf(c) != 0) { // Eligible to terminate
15 interruptIdleWorkers(ONLY_ONE);
16 return;
17 }
18 //得到线程池的全局锁
19 final ReentrantLock mainLock = this.mainLock;
20 mainLock.lock();
21 try {
22 //经过CAS操做,将线程池状态设置为TIDYING
23 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //private static int ctlOf(int rs, int wc) { return rs | wc; }
24 try {
25 //调用terminated方法
26 terminated();
27 } finally {
28 //最终将线程状态设置为TERMINATED
29 ctl.set(ctlOf(TERMINATED, 0));
30 //调用条件变量termination的signaAll方法唤醒全部由于
31 //调用线程池的awaitTermination方法而被阻塞的线程
32 //private final Condition termination = mainLock.newCondition();
33 termination.signalAll();
34 }
35 return;
36 }
37 } finally {
38 mainLock.unlock();
39 }
40 // else retry on failed CAS
41 }
42 }
复制代码
(1)shutdown操做
咱们在使用线程池的时候知道,调用shutdown方法以后线程池就不会再接受新的任务了,可是任务队列中的任务仍是须要执行完的。调用该方法会马上返回,并非等到线程池的任务队列中的全部任务执行完毕在返回的。
1 public void shutdown() {
2 //得到线程池的全局锁
3 final ReentrantLock mainLock = this.mainLock;
4 mainLock.lock();
5 try {
6 //进行权限检查
7 checkShutdownAccess();
8
9 //设置当前线程池的状态的SHUTDOWN,若是线程池状态已是该状态就会直接返回,下面咱们会分析这个方法的源码
10 advanceRunState(SHUTDOWN);
11
12 //设置中断 标志
13 interruptIdleWorkers();
14 onShutdown(); // hook for ScheduledThreadPoolExecutor
15 } finally {
16 mainLock.unlock();
17 }
18 //尝试将状态变为TERMINATED,上面已经分析过该方法的源码
19 tryTerminate();
20 }
复制代码
该方法的源码比较简短,首先检查了安全管理器,是查看当前调用shutdown命令的线程是否有关闭线程的权限,若是有权限还须要看调用线程是否有中断工做线程的权限,若是没有权限将会抛出SecurityException异常或者空指针异常。下面咱们查看一下advanceRunState 方法的源码。
1 private void advanceRunState(int targetState) {
2 for (;;) {
3 //下面的方法执行的就是:
4 //首先获取线程的ctl值,而后判断当前线程池的状态若是已是SHUTDOWN,那么if条件第一个为真就直接返回
5 //若是不是SHUTDOWN状态,就须要CAS的设置当前状态为SHUTDOWN
6 int c = ctl.get();
7 if (runStateAtLeast(c, targetState) ||
8 //private static int ctlOf(int rs, int wc) { return rs | wc; }
9 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
10 break;
11 }
12 }
复制代码
咱们能够看出advanceRunState 方法实际上就是判断当前线程池的状态是否为SHUTDWON,若是是那么就返回,不然就须要设置当前状态为SHUTDOWN。
咱们再来看看shutdown方法中调用线程中断的方法interruptIdleWorkers源码
1 private void interruptIdleWorkers() {
2 interruptIdleWorkers(false);
3 }
4 private void interruptIdleWorkers(boolean onlyOne) {
5 final ReentrantLock mainLock = this.mainLock;
6 mainLock.lock();
7 try {
8 for (Worker w : workers) {
9 Thread t = w.thread;
10 //若是工做线程没有被中断,而且没有正在运行设置中断标志
11 if (!t.isInterrupted() && w.tryLock()) {
12 try {
13 //须要中断当前线程
14 t.interrupt();
15 } catch (SecurityException ignore) {
16 } finally {
17 w.unlock();
18 }
19 }
20 if (onlyOne)
21 break;
22 }
23 } finally {
24 mainLock.unlock();
25 }
26 }
复制代码
上面的代码中,须要设置全部空闲线程的中断标志。首先获取线程池的全局锁,同时只有一个线程能够调用shutdown方法设置中断标志。而后尝试获取工做线程Worker本身的锁,获取成功则能够设置中断标志(这是因为正在执行任务的线程须要获取本身的锁,而且不可重入,因此正在执行的任务没有被中断),这里要中断的那些线程是阻塞到getTask()方法并尝试从任务队列中获取任务的线程即空闲线程。
(2)shutdownNow操做
在使用线程池的时候,若是咱们调用了shutdownNow方法,线程池不只不会再接受新的任务,还会将任务队列中的任务丢弃,正在执行的任务也会被中断,而后马上返回该方法,不会等待激活的任务完成,返回值为当前任务队列中被丢弃的任务列表
1 public List<Runnable> shutdownNow() {
2 List<Runnable> tasks;
3 final ReentrantLock mainLock = this.mainLock;
4 mainLock.lock();
5 try {
6 checkShutdownAccess(); //仍是进行权限检查
7 advanceRunState(STOP); //设置线程池状态台STOP
8 interruptWorkers(); //中断全部线程
9 tasks = drainQueue(); //将任务队列中的任务移动到task中
10 } finally {
11 mainLock.unlock();
12 }
13 tryTerminate();
14 return tasks; //返回tasks
15 }
复制代码
从上面的代码中,咱们能够能够发现,shutdownNow方法也是首先须要检查调用该方法的线程的权限,以后不一样于shutdown方法之处在于须要即刻设置当前线程池状态为STOP,而后中断全部线程(空闲线程+正在执行任务的线程),移除任务队列中的任务
1 private void interruptWorkers() {
2 final ReentrantLock mainLock = this.mainLock;
3 mainLock.lock();
4 try {
5 for (Worker w : workers) //不须要判断当前线程是否在执行任务(即不须要调用w.tryLock方法),中断全部线程
6 w.interruptIfStarted();
7 } finally {
8 mainLock.unlock();
9 }
10 }
复制代码
(3)awaitTermination操做
当线程调用该方法以后,会阻塞调用者线程,直到线程池状态为TERMINATED状态才会返回,或者等到超时时间到以后会返回,下面是该方法的源码。
1 //调用该方法以后,会阻塞调用者线程,直到线程池状态为TERMINATED状态才会返回,或者等到超时时间到以后会返回
2 public boolean awaitTermination(long timeout, TimeUnit unit)
3 throws InterruptedException {
4 long nanos = unit.toNanos(timeout);
5 final ReentrantLock mainLock = this.mainLock;
6 mainLock.lock();
7 try {
8 //阻塞当前线程,(获取了Worker本身的锁),那么当前线程就不会再执行任务(由于获取不到锁)
9 for (;;) {
10 //当前线程池状态为TERMINATED状态,会返回true
11 if (runStateAtLeast(ctl.get(), TERMINATED))
12 return true;
13 //超时时间到返回false
14 if (nanos <= 0)
15 return false;
16 nanos = termination.awaitNanos(nanos);
17 }
18 } finally {
19 mainLock.unlock();
20 }
21 }
复制代码
在上面的代码中,调用者线程须要首先获取线程Worker 本身的独占锁,而后在循环判断当前线程池是否已是TERMINATED状态,若是是则直接返回,不然说明当前线程池中还有线程正在执行任务,这时候须要查看当前设置的超时时间是否小于0,小于0说明不须要等待就直接返回,若是大于0就须要调用条件变量termination的awaitNanos方法等待设置的时间,并在这段时间以内等待线程池的状态变为TERMINATED。
咱们在前面说到清理线程池的方法processWorkerExit的时候,须要调用tryTerminated方法,在该方法中会查看当前线程池状态是否为TERMINATED,若是是该状态也会调用termination.signalAll()方法唤醒全部线程池中因调用awaitTermination而被阻塞住的线程。
若是是设置了超时时间,那么termination的awaitNanos方法也会返回,这时候须要从新检查线程池状态是否为TERMINATED,若是是则返回,不是就继续阻塞本身。
欢迎工做一到五年的Java工程师朋友们加入Java程序员开发: 721575865
群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用本身每一分每一秒的时间来学习提高本身,不要再用"没有时间“来掩饰本身思想上的懒惰!趁年轻,使劲拼,给将来的本身一个交代!