为了不频繁重复的建立和销毁线程,咱们可让这些线程进行复用,在线程池中,总会有活跃的线程在占用,可是线程池中也会存在没有占用的线程,这些线程处于空闲状态,当有任务的时候会从池子里面拿去一个线程来进行使用,当完成工做后,并无销毁线程,而是将线程放回到池子中去。java
线程池主要解决两个问题:编程
一是当执行大量异步任务时线程池可以提供很好的性能。数组
二是线程池提供了一种资源限制和管理的手段,好比能够限制现成的个数,动态新增线程等。缓存
-《Java并发编程之美》安全
上面内容出自《Java并发编程之美》这本书,第一个问题上面已经提到过,线程的频繁建立和销毁是很损耗性能的,可是线程池中的线程是能够复用的,能够较好的提高性能问题,线程池内部是采用了阻塞队列来维护Runnable对象。多线程
JDK为咱们封装了一套操做多线程的框架Executors,帮助咱们能够更好的控制线程池,Executors下提供了一些线程池的工厂方法:并发
SchemeExecutorService
对象,线程池大小为1,SchemeExecutorService
接口在ThreadPoolExecutor
类和 ExecutorService
接口之上的扩展,在给定时间执行某任务。SchemeExecutorService
对象,可指定线程池线程数量。对于核心的线程池来讲,它内部都是使用了ThreadPoolExecutor
对象来实现的,只不过内部参数信息不同,咱们先来看两个例子:nexFixedThreadPool
和newSingleThreadExecutor
以下所示:框架
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
复制代码
由上面的线程池的建立过程能够看到它们都是ThreadPoolExecutor
的封装,接下来咱们来看一下ThreadPoolExecutor
的参数说明:异步
参数名称 | 参数描述 |
---|---|
corePoolSize | 指定线程池线程的数量 |
maximumPoolSize | 指定线程池中线程的最大数量 |
keepAliveTime | 当线程池线程的数量超过corePoolSize的时候,多余的空闲线程存活的时间,若是超过了corePoolSize,在keepAliveTime的时间以后,销毁线程 |
unit | keepAliveTime的单位 |
workQueue | 工做队列,将被提交但还没有执行的任务缓存起来 |
threadFactory | 线程工厂,用于建立线程,不指定为默认线程工厂DefaultThreadFactory |
handler | 拒绝策略 |
其中workQueue表明的是提交但未执行的队列,它是BlockingQueue接口的对象,用于存放Runable对象,主要分为如下几种类型:ide
直接提交的队列:SynchronousQueue
队列,它是一个没有容量的队列,前面我有对其进行讲解,当线程池进行入队offer操做的时候,自己是无容量的,因此直接返回false,并无保存下来,而是直接提交给线程来进行执行,若是没有空余的线程则执行拒绝策略。
有界的任务队列:可使用ArrayBlockingQueue
队列,由于它内部是基于数组来进行实现的,初始化时必须指定容量参数,当使用有界任务队列时,当有任务进行提交时,线程池的线程数量小于corePoolSize则建立新的线程来执行任务,当线程池的线程数量大于corePoolSize的时候,则将提交的任务放入到队列中,当提交的任务塞满队列后,若是线程池的线程数量没有超过maximumPoolSize,则建立新的线程执行任务,若是超过了maximumPoolSize则执行拒绝策略。
无界的任务队列:可使用LinkedBlockingQueue
队列,它内部是基于链表的形式,默认队列的长度是Integer.MAX_VALUE
,也能够指定队列的长度,当队列满时进行阻塞操做,固然线程池中采用的是offer
方法并不会阻塞线程,当队列满时则返回false,入队成功则则返回true,当使用LinkedBlockingQueue
队列时,有任务提交到线程池时,若是线程池的数量小于corePoolSize,线程池会产生新的线程来执行任务,当线程池的线程数量大于corePoolSize时,则将提交的任务放入到队列中,等待执行任务的线程执行完以后进行消费队列中的任务,若后续仍有新的任务提交,而没有空闲的线程时,它会不断往队列中入队提交的任务,直到资源耗尽。
优先任务队列:t有限任务队列是带有执行优先级的队列,他可使用PriorityBlockingQueue
队列,能够控制任务的执行前后顺序,它是一个无界队列,该队列能够根据任务自身的优先级顺序前后执行,在确保性能的同时,也能有很好的质量保证。
上面讲解了关于线程池内部都是经过ThreadPoolExecutor
来进行实现的,那么下面我以一个例子来进行源码分析:
public class ThreadPoolDemo1 {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5,
10,
60L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(5), new CustomThreadFactory());
for (int i = 0; i < 15; i++) {
executorService.execute(() -> {
try {
Thread.sleep(50000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("由线程:" + Thread.currentThread().getName() + "执行任务完成");
});
}
}
}
复制代码
上面定义了一个线程池,线程池初始化的corePoolSize为5,也就是线程池中线程的数量为5,最大线程maximumThreadPoolSize为10,空余的线程存活的时间是60s,使用ArrayBlockingQueue来做为阻塞队列,这里还发现我自定义了ThreadFactory
线程池工厂,这里我真是针对线程建立的时候输出线程池的名称,源码以下所示:
/** * 自定义的线程池构造工厂 */
public class CustomThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public CustomThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {
String name = namePrefix + threadNumber.getAndIncrement();
Thread t = new Thread(group, r,
name,
0);
System.out.println("线程池建立,线程名称为:" + name);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
复制代码
代码和DefaultThreadFactory
同样,只是在newThread
新建线程的动做的时候输出了线程池的名称,方便查看线程建立的时机,上面main
方法中提交了15个任务,调用了execute
方法来进行提交任务,在分析execute
方法以前咱们先了解一下线程的状态:
//假设Integer类型是32位的二进制表示。
//高3位表明线程池的状态,低29位表明的是线程池的数量
//默认是RUNNING状态,线程池的数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程个数位数,表示的Integer中除去最高的3位以后剩下的位数表示线程池的个数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池的线程的最大数量
//这里举例是32为机器,表示为00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//线程池的状态
// runState is stored in the high-order bits
//11100000000000000000000000000000
//接受新任务而且处理阻塞队列里面任务
private static final int RUNNING = -1 << COUNT_BITS;
//00000000000000000000000000000000
//拒绝新任务可是处理阻塞队列的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//00100000000000000000000000000000
//拒接新任务而且抛弃阻塞队列里面的任务,同时会中断正在处理的任务
private static final int STOP = 1 << COUNT_BITS;
//01000000000000000000000000000000
//全部任务都执行完(包括阻塞队列中的任务)后当线程池活动线程数为0,将要调用terminated方法。
private static final int TIDYING = 2 << COUNT_BITS;
//01100000000000000000000000000000
//终止状态,terminated方法调用完成之后的状态
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码
经过上面内容能够看到ctl其实存放的是线程池的状态和线程数量的变量,默认是RUNNING
,也就是11100000000000000000000000000000
,这里咱们来假设运行的机器上的Integer的是32位的,由于有些机器上可能Integer并非32位,下面COUNT_BITS来控制位数,也就是先获取Integer在该平台上的位数,好比说是32位,而后32位-3位=29位,也就是低29位表明的是现成的数量,高3位表明线程的状态,能够清晰看到下面的线程池的状态都是经过低位来进行向左位移的操做的,除了上面的变量,还提供了操做线程池状态的方法:
// 操做ctl变量,主要是进行分解或组合线程数量和线程池状态。
// 获取高3位,获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取低29位,获取线程池中线程的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 组合ctl变量,rs=runStatue表明的是线程池的状态,wc=workCount表明的是线程池线程的数量
private static int ctlOf(int rs, int wc) { return rs | wc; }
/* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */
//指定的线程池状态c小于状态s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//指定的线程池状态c至少是状态s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判断线程池是否运行状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/** * CAS增长线程池线程数量. */
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/** * CAS减小线程池线程数量 */
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/** * 将线程池的线程数量进行较少操做,若是竞争失败直到竞争成功为止。 */
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
复制代码
下来咱们看一下ThreadPoolExecutor
对象下的execute
方法:
public void execute(Runnable command) {
// 判断提交的任务是否是为空,若是为空则抛出NullPointException异常
if (command == null)
throw new NullPointerException();
// 获取线程池的状态和线程池的数量
int c = ctl.get();
// 若是线程池的数量小于corePoolSize,则进行添加线程执行任务
if (workerCountOf(c) < corePoolSize) {
//添加线程修改线程数量而且将command做为第一个任务进行处理
if (addWorker(command, true))
return;
// 获取最新的状态
c = ctl.get();
}
// 若是线程池的状态是RUNNING,将命令添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
//二次检查线程池状态和线程数量
int recheck = ctl.get();
//线程不是RUNNING状态,从队列中移除当前任务,而且执行拒绝策略。
//这里说明一点,只有RUNNING状态的线程池才会接受新的任务,其他状态所有拒绝。
if (! isRunning(recheck) && remove(command))
reject(command);
//若是线程池的线程数量为空时,表明线程池是空的,添加一个新的线程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//若是队列是满的,或者是SynchronousQueue队列时,则直接添加新的线程执行任务,若是添加失败则进行拒绝
//可能线程池的线程数量大于maximumPoolSize则采起拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
复制代码
经过分析execute方法总结如下几点:
corePoolSize
时,直接添加线程到线程池而且将当前任务作为第一个任务执行。RUNNING
,则能够接受任务,将任务放入到阻塞队列中,内部进行二次检查,有可能在运行下面内容时线程池状态已经发生了变化,在这个时候若是线程池状态变成不是RUNNING
,则将当前任务从队列中移除,而且进行拒绝策略。SynchronousQueue
这种特殊队列无空间的时候,直接添加新的线程执行任务,当线程池的线程数量大于maximumPoolSize
时相应拒绝策略。offer
方法,该方法不会阻塞队列,若是队列已经满时或超时致使入队失败,返回false,若是入队成功返回true。针对上面例子源码咱们来作一下分析,咱们源码中阻塞队列采用的是ArrayBlockingQueue
队列,而且指定队列的长度是5,咱们看下面提交的线程池的任务是15个,并且corePoolSize设置的是5个核心线程,最大线程数(maximumPoolSzie)是10个(包括核心线程数),假设全部任务都同时提交到了线程池中,其中有5个任务会被提交到线程中做为第一个任务进行执行,会有5个任务被添加到阻塞队列中,还有5个任务提交到到线程池中的时候发现阻塞队列已经满了,这时候会直接提交任务,发现当前线程数是5小于最大线程数,能够进行新建线程来执行任务。
processWorkerExit
来处理线程的退出,接下来咱们来分析下
addWorker
都作了什么内容:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//获取线程池的状态和线程池线程的数量
int c = ctl.get();
//单独获取线程池的状态
int rs = runStateOf(c);
//检查队列是否只在必要时为空
if (rs >= SHUTDOWN && //线程池的状态是SHUTDOWN、STOP、TIDYING、TERMINATED
! (rs == SHUTDOWN && //能够看作是rs!=SHUTDOWN,线程池状态为STOP、TIDYING、TERMINATED
firstTask == null && //能够看作firstTask!=null,而且rs=SHUTDOWN
! workQueue.isEmpty())) //能够看作rs=SHUTDOWN,而且workQueue.isEmpty()队列为空
return false;
//循环CAS增长线程池中线程的个数
for (;;) {
//获取线程池中线程个数
int wc = workerCountOf(c);
//若是线程池线程数量超过最大线程池数量,则直接返回
if (wc >= CAPACITY ||
//若是指定使用corePoolSize做为限制则使用corePoolSize,反之使用maximumPoolSize,最为工做线程最大线程线程数量,若是工做线程大于相应的线程数量则直接返回。
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增长线程池中线程的数量
if (compareAndIncrementWorkerCount(c))
//跳出增长线程池数量。
break retry;
//若是修改失败,则从新获取线程池的状态和线程数量
c = ctl.get(); // Re-read ctl
//若是最新的线程池状态和原有县城出状态不同时,则跳转到外层retry中,不然在内层循环从新进行CAS
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//工做线程是否开始启动标志
boolean workerStarted = false;
//工做线程添加到线程池成功与否标志
boolean workerAdded = false;
Worker w = null;
try {
//建立一个Worker对象
w = new Worker(firstTask);
//获取worker中的线程,这里线程是经过ThreadFactory线程工厂建立出来的,详细看下面源码信息。
final Thread t = w.thread;
//判断线程是否为空
if (t != null) {
//添加独占锁,为添加worker进行同步操做,防止其余线程同时进行execute方法。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池的状态
int rs = runStateOf(ctl.get());
//若是线程池状态为RUNNING或者是线程池状态为SHUTDOWN而且第一个任务为空时,当线程池状态为SHUTDOWN时,是不容许添加新任务的,因此他会从队列中获取任务。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加worker到集合中
workers.add(w);
int s = workers.size();
//跟踪最大的线程池数量
if (s > largestPoolSize)
largestPoolSize = s;
//添加worker成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//若是添加worker成功就启动任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//若是没有启动,w不为空就已出worker,而且线程池数量进行减小。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
经过上面addWorker
方法能够分为两个部分来进行讲解,第一部分是对线程池中线程数量的经过CAS的方式进行增长,其中第一部分中上面有个if语句,这个地方着重分析下:
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
复制代码
能够当作下面的样子,将!
放到括号里面,变成下面的样子:
if (rs >= SHUTDOWN &&
(rs != SHUTDOWN ||
firstTask != null ||
workQueue.isEmpty()))
return false;
复制代码
上半部分分为内外两个循环,外循环对线程池状态的判断,用于判断是否须要添加工做任务线程,经过上面讲的内容进行判断,后面内循环则是经过CAS操做增长线程数,若是指定了core
参数为true,表明线程池中线程的数量没有超过corePoolSize
,当指定为false时,表明线程池中线程数量达到了corePoolSize
,而且队列已经满了,或者是SynchronousQueue
这种无空间的队列,可是尚未达到最大的线程池maximumPoolSize
,因此它内部会根据指定的core
参数来判断是否已经超过了最大的限制,若是超过了就不能进行添加线程了,而且进行拒绝策略,若是没有超过就增长线程数量。
第二部分主要是把任务添加到worker中,并启动线程,这里咱们先来看一下Worker对象。
// 这里发现它是实现了AQS,是一个不可重入的独占锁模式
// 而且它还集成了Runable接口,实现了run方法。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
/** 执行任务的线程,经过ThreadFactory建立 */
final Thread thread;
/** 初始化第一个任务*/
Runnable firstTask;
/** 每一个线程完成任务的数量 */
volatile long completedTasks;
/** * 首先现将state值设置为-1,由于在AQS中state=0表明的是锁没有被占用,并且在线程池中shutdown方法会判断可否争抢到锁,若是能够得到锁则对线程进行中断操做,若是调用了shutdownNow它会判断state>=0会被中断。 * firstTask第一个任务,若是为空则会从队列中获取任务,后面runWorker中。 */
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 委托调用外部的runWorker方法 */
public void run() {
runWorker(this);
}
//是否独占锁
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//这里就是上面shutdownNow中调用的线程中断的方法,getState()>=0
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
复制代码
能够看到Worker是一个实现了AQS的锁,它是一个不可重入的独占锁,而且他也实现了Runnable
接口,实现了run
方法,在构造函数中将AQS的state
设置为-1
,为了不线程尚未进入runWorker
方法前,就调用了shutdown
或shutdownNow
方法,会被中断,设置为-1则不会被中断。后面咱们看到run
方法,它调用的是ThreadPoolExecutor
的runWorker
方法,咱们这里回想一下,在addWorker
方法中,添加worker
到HashSet<Worker>
中后,他会将workerAdded
设置为true,表明添加worker
成功,后面有调用了下面代码:
if (workerAdded) {
t.start();
workerStarted = true;
}
复制代码
这个t表明的就是在Worker构造函数中的使用ThreadFactory
建立的线程,而且将本身(Worker本身)传递了当前线程,建立的线程就是任务线程,任务线程启动的时候会调用Worker
下的run
方法,run
方法内部又委托给外部方法runWorker
来进行操做,它的参数传递的是调用者本身,Worker
中的run
方法以下所示:
public void run() {
runWorker(this); //this指Worker对象自己
}
复制代码
这里简单画一张图来表示下调用的逻辑。
总体的逻辑是先进行建立线程,线程将Worker
设置为执行程序,并将线程塞到Worker
中,而后再addWorker中将Worker中的线程取出来,进行启动操做,启动后他会调用Worker中的run方法,而后run方法中将调用ThreadPoolExecutor的runWorker,而后runWorker又会调用Worker中的任务firstTask,这个fistTask是要真正执行的任务,也是用户本身实现的代码逻辑。
接下来咱们就要看一下runWorker方法里面具体内容:
final void runWorker(Worker w) {
//调用者也就是Worker中的线程
Thread wt = Thread.currentThread();
//获取Worker中的第一个任务
Runnable task = w.firstTask;
//将Worker中的任务清除表明执行了第一个任务了,后面若是再有任务就从队列中获取。
w.firstTask = null;
//这里还记的咱们在new Worker的时候将AQS的state状态设置为-1,这里先进行解锁操做,将state设置为0
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//循环进行获取任务,若是第一个任务不为空,或者是若是第一个任务为空,从任务队列中获取任务,若是有任务则返回获取的任务信息,若是没有任务能够获取则进行阻塞,阻塞也分两种第一种是阻塞直到任务队列中有内容,第二种是阻塞队列必定时间以后仍是没有任务就直接返回null。
while (task != null || (task = getTask()) != null) {
//先获取worker的独占锁,防止其余线程调用了shutdown方法。
w.lock();
// 若是线程池正在中止,确保线程是被中断的,若是没有则确保线程不被中断操做。
if ((runStateAtLeast(ctl.get(), STOP) || //若是线程池状态为STOP、TIDYING、TERMINATED直接拒绝任务中断当前线程
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行任务以前作一些操做,可进行自定义
beforeExecute(wt, task);
Throwable thrown = null;
try {
//运行任务在这里喽。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务以后作一些操做,可进行自定义
afterExecute(task, thrown);
}
} finally {
//将任务清空为了下次任务获取
task = null;
//统计当前Worker完成了多少任务
w.completedTasks++;
//独占锁释放
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理Worker的退出操做,执行清理工做。
processWorkerExit(w, completedAbruptly);
}
}
复制代码
咱们看到若是Worker是第一次被启动,它会从Worker中获取firstTask任务来执行,而后执行成功后,它会getTask()来从队列中获取任务,这个地方比较有意思,它是分状况进行获取任务的,咱们都直到BlockingQueue中提供了几种从队列中获取的方法,这个getTask中使用了两种方式,第一种是使用poll进行获取队列中的信息,它采用的是过一点时间若是队列中仍没有任务时直接返回null,而后还有一个就是take方法,take方法是若是队列中没有任务则将当前线程进行阻塞,等待队列中有任务后,会通知等待的队列线程进行消费任务,让咱们看一下getTask方法:
private Runnable getTask() {
boolean timedOut = false; //poll获取超时
for (;;) {
//获取线程池的状态和线程数量
int c = ctl.get();
//获取线程池的状态
int rs = runStateOf(c);
//线程池状态大于等于SHUTDOWN
//1.线程池若是是大于STOP的话减小工做线程池数量
//2.若是线程池状态为SHUTDOW而且队列为空时,表明队列任务已经执行完,返回null,线程数量减小1
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取线程池数量。
int wc = workerCountOf(c);
//若是allowCoreThreadTimeOut为true,则空闲线程在必定时间未得到任务会清除
//或者若是线程数量大于corePoolSize的时候会进行清除空闲线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1.若是线程池数量大于最大的线程池数量或者对(空余线程进行清除操做而且poll超时了,意思是队列中没有内容了,致使poll间隔一段时间后没有获取内容超时了。
//2.若是线程池的数量大于1或者是队列已是空的
//总之意思就是当线程池的线程池数量大于corePoolSize,或指定了allowCoreThreadTimeOut为true,当队列中没有数据或者线程池数量大于1的状况下,尝试对线程池的数量进行减小操做,而后返回null,用于上一个方法进行清除操做。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//若是timed表明的是清除空闲线程的意思
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //等待一段时间若是没有获取到返回null。
workQueue.take(); //阻塞当前线程
//若是队列中获取到内容则返回
if (r != null)
return r;
//若是没有获取到超时了则设置timeOut状态
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
咱们还记得第一张图中有标记出来是core线程和普通线程,其实这样标记不是很准确,准确的意思是若是线程池的数量超过了corePoolSize而且没有特别指定allowCoreThreadTimeOut的状况下,它会清除掉大于corePoolSize而且小于等于maximumPoolSize的一些线程,标记出core线程的意思是有corePoolSize不会被清除,可是会清除大于corePoolSize的线程,也就是线程池中的线程对获取任务的时候进行判断,也就是getTask中进行判断,若是当前线程池的线程数量大于corePoolSize就使用poll方式获取队列中的任务,当过一段时间尚未任务就会返回null,返回null以后设置timeOut=true,而且获取getTask也会返回null,到此会跳到调用者runWorker方法中,一直在while (task != null || (task = getTask()) != null)
此时的getTask返回null跳出while循环语句,设置completedAbruptly = false,表示不是忽然完成的而是正常完成,退出后它会执行finally的processWorkerExit(w, completedAbruptly)
,执行清理工做。咱们来看下源码:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 若是忽然完成则调整线程数量
decrementWorkerCount(); // 减小线程数量1
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //获取锁,同时只有一个线程得到锁
try {
completedTaskCount += w.completedTasks; //统计整个线程池完成的数量
workers.remove(w); //将完成任务的worker从HashSet中移除
} finally {
mainLock.unlock(); //释放锁
}
//尝试设置线程池状态为TERMINATED
//1.若是线程池状态为SHUTDOWN而且线程池线程数量与工做队列为空时,修改状态。
//2.若是线程池状态为STOP而且线程池线程数量为空时,修改状态。
tryTerminate();
// 获取线程池的状态和线程池的数量
int c = ctl.get();
// 若是线程池的状态小于STOP,也就是SHUTDOWN或RUNNING状态
if (runStateLessThan(c, STOP)) {
//若是不是忽然完成,也就是正常结束
if (!completedAbruptly) {
//若是指定allowCoreThreadTimeOut=true(默认false)则表明线程池中有空余线程时须要进行清理操做,不然线程池中的线程应该保持corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//这里判断若是线程池中队列为空而且线程数量最小为0时,将最小值调整为1,由于队列中还有任务没有完成须要增长队列,因此这里增长了一个线程。
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//若是当前线程数效益核心个数,就增长一个Worker
addWorker(null, false);
}
复制代码
经过上面的源码能够得出,若是线程数超过核心线程数后,在runWorker
中就不会等待队列中的消息,而是会进行清除操做,上面的清除代码首先是先对线程池的数量进行较少操做,其次是统计整个线程池中完成任务的数量,而后就是尝试修改线程池的状态由SHUTDOWN->TIDYING->TERMINATED
或者是由STOP->TIDYING->TERMINATED
,修改线程池状态为TERMINATED
,须要有两个条件:
当线程池线程数量和工做队列为空,而且线程池的状态为SHUTDOWN
时,才会将状态进行修改,修改的过程是SHUTDOWN->TIDYING->TERMINATED
当线程池的状态为STOP
而且线程池数量为空时,才会尝试修改状态,修改过程是STOP->TIDYING->TERMINATED
若是设置为TERMINATED
状态,还须要调用条件变量termination
的signalAll()
方法来唤醒全部由于调用awaitTermination
方法而被阻塞的线程,换句话说当调用awaitTermination
后,只有线程池状态变成TERMINATED才会被唤醒。
接下来咱们就来分析一下这个tryTerminate
方法,看一下他到底符不符合咱们上述说的内容:
final void tryTerminate() {
for (;;) {
// 获取线程池的状态和线程池的数量组合状态
int c = ctl.get();
//这里单独下面进行分析,这里说明两个问题,须要反向来想这个问题。
//1.若是线程池状态STOP则不进入if语句
//2.若是线程池状态为SHUTDOWN而且工做队列为空时,不进入if语句
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//若是线程池数量不为空时,进行中断操做。
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//修改状态为TIDYING,而且将线程池的数量进行清空
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//执行一些逻辑,默认是空的
terminated();
} finally {
//修改状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒调用awaitTermination方法的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
复制代码
咱们单独将上面的if语句摘出来进行分析,将上面的第一个if判断进行修改以下,能够看到return在else里面,这时候内部if判断进行转换,转换成以下所示:
if (!isRunning(c) &&
!runStateAtLeast(c, TIDYING) && //只能是SHUTDOWN和STOP
(runStateOf(c) != SHUTDOWN || workQueue.isEmpty())){
//这里执行逻辑
}else {
return;
}
复制代码
逐一分析分析内容以下:
!isRunning(c)
表明不是RUNNING,则可能的是SHUTDOWN
,STOP
,TIDYING
,TERMINATED
这四种状态
中间的链接符是而且的意思,跟着runStateAtLeast(c, TIDYING)
这句话的意思是至少是TIDYING
,TERMINATED
这两个,反过来就是多是RUNNING
,SHUTDOWN
,STOP
,可是前面已经判断了不能是RUNINNG
状态,因此前面两个连在一块儿就是只能是状态为SHUTDOWN
,STOP
runStateOf(c) != SHUTDOWN || workQueue.isEmpty()
当前面的状态是SHUTDOWN
时,则会出发workQueue.isEmpty()
,连在一块儿就是状态是SHUTDOWN
并工做队列为空,当线程池状态为STOP
时,则会进入到runStateOf(c) != SHUTDOWN
,直接返回true,就表明线程池状态为STOP
后面还有一个语句一个if语句将其转换一下逻辑就是下面的内容:
if (workerCountOf(c) == 0) {
//执行下面的逻辑
}else{
interruptIdleWorkers(ONLY_ONE);
return;
}
复制代码
这里咱们也进行转换下,就能够看出来当线程池的数量为空时,才会进行下面的逻辑,下面的逻辑就是修改线程池状态为TERMINATED
,两个连在一块儿就是上面分析的修改状态为TERMINATED
的条件,这里画一张图来表示线程池状态的信息:
其实上面图中咱们介绍了关于从SHUTDOWN
或STOP
到TERMINATED
的变化,没有讲解关于如何从RUNNING
状态转变成SHUTDOWN
或STOP
状态,实际上是调用了shutdown()
或shutdownNow
方法对其进行状态的变换,下面来看一下shutdown
方法源码:
public void shutdown() {
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//权限检查
checkShutdownAccess();
//设置线程池状态为SHUTDOWN,若是状态已是大于等于SHUTDOWN则直接返回
advanceRunState(SHUTDOWN);
//若是线程没有设置中断标识而且线程没有运行则设置中断标识
interruptIdleWorkers();
//空的能够实现的内容
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试修改线程池状态为TERMINATED
tryTerminate();
}
复制代码
SecurityException
或NullPointException
异常。接下来咱们来看一下advanceRunState
内容以下所示:
private void advanceRunState(int targetState) {
for (;;) {
//获取线程池状态和线程池的线程数量
int c = ctl.get();
if (runStateAtLeast(c, targetState) || //若是线程池的状态>=SHUTDOWN
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) //设置线程池状态为SHUTDOWN
//返回
break;
}
}
复制代码
interruptIdleWorkers
代码以下所示:
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
复制代码
private void interruptIdleWorkers(boolean onlyOne) {
//获取全局锁,同时只能有一个线程可以调用shutdown方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历工做线程
for (Worker w : workers) {
Thread t = w.thread;
//若是当前线程没有设置中断标志而且能够获取Worker本身的锁
if (!t.isInterrupted() && w.tryLock()) {
try {
//设置中断标志
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//执行一次,清理空闲线程。
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
复制代码
咱们看到当咱们调用shutdown方法的时候,只是将空闲的线程给设置了中断标识,也就是活跃正在执行任务的线程并无设置中断标识,直到将任务所有执行完后才会逐步清理线程操做,咱们还记的在getTask中的方法里面有这样一段代码:
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
复制代码
判断是不是状态>=SHUTDOWN,而且队列为空时,将线程池数量进行减小操做,内部进行CAS操做,直到CAS操做成功为止,而且返回null,返回null后,会调用processWorkerExit(w, false);
清理Workers线程信息,而且尝试将线程设置为TERMINATED
状态,上面是对全部shutdown
方法的分析,下面来看一下shutdownNow
方法而且比较两个之间的区别:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//权限检查
checkShutdownAccess();
//设置线程池状态为STOP,若是状态已是大于等于STOP则直接返回
advanceRunState(STOP);
//这里是和SHUTDOWN区别的地方,这里是强制进行中断操做
interruptWorkers();
//将为完成任务复制到list集合中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试修改线程池状态为TERMINATED
tryTerminate();
return tasks;
}
复制代码
shutdownNow
方法返回了未完成的任务信息列表tasks = drainQueue();
,其实该方法和shutdown
方法主要的区别在于一下几点内容:
shutdownNow
方法将线程池状态设置为STOP
,而shutdown
则将状态修改成SHUTDOWN
shutdownNow
方法将工做任务进行中断操做,也就是说若是工做线程在工做也会被中断,而shutdown
则是先尝试获取锁若是得到锁成功则进行中断标志设置,也就是中断操做,若是没有获取到锁则等待进行完成后自动退出。shutdownNow
方法返回未完成的任务列表。下面代码是shutDownNow
的interruptWorkers
方法:
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
//直接进行中断操做。
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
复制代码
内部调用了Worker
的interruptIfStarted
方法,方法内部是针对线程进行中断操做,可是中断的前提条件是AQS的state状态必须大于等于0,若是状态为-1的则不会被中断,可是若是任务运行起来的时候在runWorker
中则不会执行任务,由于线程池状态为STOP
,若是线程池状态为STOP则会中断线程,下面代码是Worker中的interruptIfStarted
:
void interruptIfStarted() {
Thread t;
//当前Worker锁状态大于等于0而且线程没有被中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
复制代码
JDK内置的拒绝策略以下:
首先先上一张图,针对这张图来进行总结:
addWorker
进行建立线程,并将线程放入到线程池中,这里咱们看到第二步是将线程添加到核心线程中,其实线程池内部不分核心线程和非核心线程,只是根据corePoolSize和maximumPoolSize设置的大小来进行区分,由于超过corePoolSize的线程会被回收,至于回收那些线程,是根据线程获取任务的时候进行判断,当前线程池数量大于corePoolSize,或者指定了allowCoreThreadTimeOut
为true,则他等待必定时间后会返回,不会一直等待