线程池看懂了也很简单

理论知识

周末上海下起了雨也降温了,无事打开电脑看看源码,就想到了线程池。线程池的技术网络上已经有不少文章都已经写过了,并且理论都是同样的。java

可是理论归理论,面试的时候也许你恰好看了一篇能应付过去,可是若是深究细节可能就会懵逼。因此我很建议任何理论咱们都须要本身去探究一下才好,本身实践过的才有本身的理解而不是死记硬背,这样才会经久不忘。程序员

线程池属于开发中常见的一种池化技术,这类的池化技术的目的都是为了提升资源的利用率和提升效率,相似的HttpClient链接池,数据库链接池等。面试

在没有线程池的时候,咱们要建立多线程的并发,通常都是经过继承 Thread 类或实现 Runnable 接口或者实现 Callable 接口,咱们知道线程资源是很宝贵的,并且线程之间切换执行时须要记住上下文信息,因此过多的建立线程去执行任务会形成资源的浪费并且对CPU影响较大。数据库

为了方便, JDK 1.5 以后为咱们提供了几种建立线程池的方法:缓存

  • Executors.newFixedThreadPool(nThreads):建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • Executors.newCachedThreadPool():建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程。
  • Executors.newSingleThreadExecutor():建立一个单线程化的线程池,它只会用惟一的工做线程来执行任务, 保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行。
  • Executors.newScheduledThreadPool(nThreads):建立一个定长线程池,支持定时及周期性任务执行。

虽然这些都是 JDK 默认提供的,可是仍是要说它们的定制性太差了并且有点鸡肋,不少时候不能知足咱们的需求。例如经过 newFixedThreadPool 方式建立的固定线程池,它内部使用的队列是 LinkedBlockingQueue,可是它的队列大小默认是 Integer.MAX_VALUE,这会有什么问题?安全

当核心线程满了的时候,任务会进入队列中等待,直到队列满了为止。可是也许任务还未达到 Integer.MAX_VALUE 这个值的时候,内存就已经 OOM 了,由于内存放不下这么多的任务,毕竟内存大小有限。markdown

因此更多的时候咱们都是自定义线程池,也就是使用 new ThreadPoolExecutor 的方式,其实你看源码你能够发现以上的4个线程池技术底层都是经过 ThreadPoolExecutor 来建立的,只不过它们本身为咱们填充了这些参数的固定值而已。网络

ThreadPoolExecutor 的构造函数以下所示:多线程

ThreadPoolExecutor(int corePoolSize,
                   int maximumPoolSize,
                   long keepAliveTime,
                   TimeUnit unit,
                   BlockingQueue<Runnable> workQueue,
                   ThreadFactory threadFactory,
                   RejectedExecutionHandler handler);
复制代码

咱们来看下这几个核心参数的涵义和做用:并发

  • corePoolSize: 为线程池的核心线程基本大小。
  • maximumPoolSize: 为线程池最大线程大小。
  • keepAliveTimeunit 则是线程空闲后的存活时间。
  • workQueue: 用于存听任务的阻塞队列。
  • handler: 当队列和最大线程池都满了以后的饱和策略。

经过这些参数的配置使得整个线程池的工做流程以下:

前几年通常普通的技术面试了解了以上的知识内容也差很少就够了,可是目前的大环境的影响或者面试更高级的开发上面的知识点是经不起深度考问的。例如如下几个问题你是否了解:线程池的内部有哪些状态?是如何判断核心线程数是否已满的?最大线程数是否包含核心线程数?当线程池中的线程数恰好达到 maximumPoolSize 这个值的时候,这个任务可否正常被执行?......,想要了解这些问题的答案咱们只能在线程池的源码中寻找了。

实战模拟测试

咱们自定义一个线程池,而后经过 for 循环连续建立10个任务并打印线程执行信息,总体代码以下所示:

public static void main(String[] args) {

    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 6, 5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(4));
    
    for (int i = 0; i < 10; i++) {
        threadPoolExecutor.execute(() -> {
             System.out.println("测试线程池:" + Thread.currentThread().getName() + "," + threadPoolExecutor.toString());
        });
    }
}
复制代码

当 corePoolSize = 3,maximumPoolSize = 6,workQueue 大小为4的时候,咱们的打印信息为:

能够发现总的建立了6个线程来执行完成了10个任务,其实很好理解,c=3个核心线程执行了3个任务,而后4个任务在队列中等待核心线程执行,最后额外建立了e=3个线程执行了剩下的3个任务,总建立的线程数就是 c + e = 6 <= 6(最大线程数)。

若是咱们调整对象建立的时候的构造函数参数,例如

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(2));
复制代码

咱们再次执行上述的代码,则会报错,抛出以下 RejectedExecutionException 异常信息,能够看到是由于拒绝策略拦截的异常信息。

仍是按照上面的逻辑分析,这时核心线程数是 c = 3,而阻塞队列的大小是 2,所以核心线程会处理掉其中5个任务,而剩下的5个任务会额外建立 e=5个线程去执行,那么总线程数就是 c + e = 8,可是这时的最大线程数 maximumPoolSize = 5,所以超过了最大线程数的限制,这时就执行了默认的拒绝策略抛出异常。其实它在准备建立第6个线程的时候就已经报错了,从这里也能够得知只要建立的总线程数 >= maximumPoolSize 的时候,线程池就不会继续执行任务了而会去执行拒绝策略的逻辑

技术来源于生活

人们经常在生活中遇到一些困难的时候会进行头脑风暴从而产生一些意想不到的解决方案,这些都是思想和智慧的结晶。咱们不少技术的解决方案也都来源于生活。

我常常想若是之后不作程序员应该作什么?餐饮彷佛是最大众的了,毕竟民以食为天。

开餐馆前期确定不能作太大,一是本金的问题,还有就是须要市场试水。在市场需求不明确的状况下租个小店面仍是靠谱的,就算亏也不会太多。

店面租个几十平的,就作香辣烤鱼,餐桌大概15桌的样子。而后就是员工了,除了厨师主要是服务员了,可是我不能招15个服务员啊,每桌分配一个太浪费了,须要提升资源利用率控制成本,因此员工不能招太多,我只须要招5个固定服务员负责在大厅招呼顾客和传菜就能够了,每一个人负责3个餐桌。

可是我没想到咱们餐馆作的烤鱼很合大众口味,很受欢迎又加上营销效果好,成了一家网红餐馆。生意更是蒸蒸日上,天天座无虚席。可是空间有限啊,因此咱们只能让后来无座的顾客稍微等候了,因而咱们安排了一个取号排队等候区,顾客等待叫号有序就餐。

这时候餐馆的人员不变,仍然是5个服务员负责处理大厅的主要服务工做,同时排队等候区面积也不能过大,有个范围限制,不能影响咱们的正常人员活动,同时也不能超过餐馆的范围排到餐馆外,若是顾客排队站到门外马路上了,这是就很危险的。随着口碑的发酵,一传十,十传百,咱们的顾客络绎不绝,同时咱们为了提升消费率又作起了外卖的服务,能够打包外带。

为了不发生上述这种危险的状况和提升订单处理率,咱们只能额外请一些临时工了,让他们来帮忙处理咱们的外卖订单从而提升业务处理能力。

可是也不是请的越多越好,咱们有成本控制,由于请的临时工咱们也须要付工资。那怎么办呢?最终只能忍痛了啊,对于超出咱们处理能力的订单,咱们就采起必定的拒绝策略,例如告知顾客当天的份额已经售罄,请改天再来。

以上就是咱们线程池运行的一个现实生活中的例子,核心线程就是咱们的5个固定服务员,而排队等候区就是咱们的等待队列,队列不能设为无限大,由于会形成OOM,若是队列满了线程池会另起额外线程去处理任务,也就是上述例子中的临时工,餐馆有经营成本控制因此有员工上限,不能请过多的临时工,这就是最大线程数。若是临时工达到最大数且队列也满了,那么咱们只能经过拒绝策略暂时不接受额外的服务要求了。

一块儿看源码

口说无凭,理论都是这样说的,那实际上源码是否是真是这样写的呢?咱们一块儿来看下线程池的源码。经过 threadPoolExecutor.execute(...)的入口进入源码,删除了注释信息以后的源码内容以下,因为封装的好,因此只有短短几行。

public void execute(Runnable command) {
    // #1 任务非空校验
    if (command == null)
        throw new NullPointerException();

    // #2 添加核心线程执行任务
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // #3 任务入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //二次校验
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    
    // #4 添加普通线程执行任务,若是失败则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}
复制代码

若是不关注细节只关注总体,从以上源码中咱们能够发现其中主要分为了四个步骤来处理逻辑。排除第一步的非空校验代码,咱们能够看出剩下的三步其实就是咱们线程池的运行逻辑,也就是上面的运行流程图的逻辑内容。

  • (1) 任务的非空校验。
  • (2) 获取当前RUNNING的线程数,若是小于核心线程数,则建立核心线程去执行任务,不然走#3。
  • (3) 若是当前线程池处于RUNNING状态,那么就将任务放入队列中。这时还会再作个双重校验,由于可能存在有些线程在咱们上次检查后死了,或者从咱们进入这个方法后pool被关闭了,因此咱们须要再次检查state。若是线程池中止了就须要回滚刚才的添加任务到队列中的操做并经过拒绝策略拒绝该任务,或者若是池中没有线程了,则新开启一个线程执行任务。
  • (4) 若是队列满了以后没法在将任务加入队列,则建立新的线程去执行任务,若是也失败了,那么就多是线程池关闭了或者线程池饱和了,这时执行拒绝策略再也不接受任务。

双重校验中有如下两个点须要注意:

1. 为何须要 double check 线程池的状态?

在多线程环境下,线程池的状态时刻在变化,而 ctl.get() 是非原子操做,颇有可能刚获取了线程池状态后线程池状态就改变了。判断是否将 command 加入 workque 是线程池以前的状态。假若没有 double check,万一线程池处于非 running 状态(在多线程环境下颇有可能发生),那么 command 永远不会执行。

二、为何 addWorker(null, false) 的任务为null?

addWorker(null, false),这个方法执行时只是建立了一个新的线程,可是没有传入任务,这是由于前面已经将任务添加到队列中了,这样能够防止线程池处于 running 状态,可是没有线程去处理这个任务。

而根据以上代码的具体步骤咱们能够画出详细的执行流程,以下图所示

以上的源码其实只有10几行,看起来很简单,主要是它的封装性比较好,其中主要有两个点须要重点解释,分别是:线程池的状态addWorker()添加工做的方法,这两个点弄明白了这段线程池的源码差很少也就理解了。

线程池运行状态-runState

线程有状态,线程池也有它的运行状态,这些状态提供了主生命周期控制,伴随着线程池的运行,由内部来维护,从源码中咱们能够发现线程池共有5个状态:RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED

各状态值所表明的的含义和该状态值下可执行的操做,具体信息以下:

运行状态 状态描述
RUNNING 接收新任务,而且也能处理阻塞队列中的任务。
SHUTDOWN 不接收新任务,可是却能够继续处理阻塞队列中的任务。
STOP 不接收新任务,同时也不处理队列任务,而且中断正在进行的任务。
TIDYING 全部任务都已终止,workercount(有效线程数)为0,线程转向 TIDYING 状态将会运行 terminated() 钩子方法。
TERMINATED terminated() 方法调用完成后变成此状态。

生命周期状态流转以下图所示:

不少时候咱们表示状态都是经过简单的 int 值来表示,例如数据库数据的删除标志 delete_flag 其中0表示有效,1表示删除。而在线程池的源码里咱们能够看到它是经过以下方式来进行表示的,

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
复制代码

线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)何作到的呢?将十进制 int 值转换为二进制的值,共32位,其中高3位表明运行状态(runState ),而低29位表明工做线程数(workerCount)。

关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如如下代码所示:

//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// Packing and unpacking ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码

经过巧妙的位运算能够分别获取高3位的运行状态值低29位的线程数量值,若是感兴趣的能够去看下具体的实现代码,这里就再也不赘述了。

添加工做线程-addWorker

添加线程是经过 addWorker() 方法来实现的,这个方法有两个入参,Runnable firstTaskboolean core

private boolean addWorker(Runnable firstTask, boolean core){...}
复制代码
  • Runnable firstTask 便是当前添加的线程须要执行的首个任务.
  • boolean core 用来标记当前执行的线程是不是核心线程仍是普通线程.

返回前面的线程池的 execute() 方法的代码中,能够发现这个addWorker() 有三个地方在调用,分别在 #2,#3和#4。

  • #2:当工做线程数 < 核心线程数的时候,经过addWorker(command, true)添加核心线程执行command任务。
  • #3:double check的时候,若是发现线程池处于正常运行状态可是里面没有工做线程,则添加个空任务和一个普通线程,这样一个 task 为空的 worker 在线程执行的时候会去阻塞任务队列里拿任务,这样就至关于建立了一个新的线程,只是没有立刻分配任务。
  • #4:队列已满的状况下,经过添加普通线程(非核心线程)去执行当前任务,若是失败了则执行拒绝策略。

addWorker() 方法调用的地方咱们看完了,接下来咱们一块儿来看下它里面究竟作了些什么,源码以下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
复制代码

这个方法稍微有点长,咱们分段来看下,将上面的代码咱们拆分红两个部分来看,首先看第一部分:

retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);//获取线程池的状态

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
	    // 尝试经过CAS方式增长workerCount
        if (compareAndIncrementWorkerCount(c))
            break retry;
        c = ctl.get();  // Re-read ctl
        // 若是线程池状态发生变化,从新从最外层循环
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}
复制代码

这部分代码有两层嵌套的 for 死循环,在第一行有个retry:代码,这个也许有些同窗没怎么见过,这个是至关因而一个位置标记,retry后面跟循环,标记这个循环的位置。

咱们平时写 for 循环的时候,是经过continue;break;来跳出当前循环,可是若是咱们有多重嵌套的 for 循环,若是咱们想在里层的某个循环体中当达到某个条件的时候直接跳出全部循环或跳出到某个指定的位置,则使用retry:来标记这个位置就能够了。

代码中共有4个位置有改变循环体继续执行下去,分别是两个return false;,一个break retry;和一个continue retry;

首先咱们来看下第一个return false;,这个 return 在最外层的一个 for 循环,

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
   return false;
复制代码

这是一个判断线程池状态和线程队列状况的代码,这个逻辑判断有点绕能够改为

rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())
复制代码

这样就好理解了,逻辑判断成立能够分为如下几种状况直接返回 false,表示添加工做线程失败。

  • rs > shutdown:线程池状态处于 STOPTIDYINGTERMINATED时,添加工做线程失败,不接受新任务。
  • rs >= shutdown && firstTask != null:线程池状态处于 SHUTDOWNSTOPTIDYINGTERMINATED状态且worker的首个任务不为空时,添加工做线程失败,不接受新任务。
  • rs >= shutdown && workQueue.isEmppty:线程池状态处于 SHUTDOWNSTOPTIDYINGTERMINATED状态且阻塞队列为空时,添加工做线程失败,不接受新任务。

这样看来,最外层的 for 循环是不断的校验当前的线程池状态是否能接受新任务,若是校验经过了以后才能继续往下运行。

而后接下来看第二个return false;,这个 return 是在内层的第二个 for 循环中,是判断线程池中当前的工做线程数量的,不知足条件的话直接返回 false,表示添加工做线程失败。

  • 工做线程数量是否超过可表示的最大容量(CAPACITY).
  • 若是添加核心工做线程,是否超过最大核心线程容量(corePoolSize).
  • 若是添加普通工做线程,是否超过线程池最大线程容量(maximumPoolSize).

后面的break retry; ,表示若是尝试经过CAS方式增长工做线程数workerCount成功,则跳出这个双循环,往下执行后面第二部分的代码,而continue retry;是再次校验下线程池状态是否发生变化,若是发生了变化则从新从最外层 for 开始继续循环执行。

经过第一部分代码的解析,咱们发现只有break retry;的时候才能执行到后面第二部分的代码,然后面第二部分代码作了些什么呢?

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    //建立Worker对象实例
    w = new Worker(firstTask);
    //获取Worker对象里的线程
    final Thread t = w.thread;
    if (t != null) {
        //开启可重入锁,独占
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            //获取线程池运行状态
            int rs = runStateOf(ctl.get());

            //知足 rs < SHUTDOWN 判断线程池是不是RUNNING,或者
            //rs == SHUTDOWN && firstTask == null 线程池若是是SHUTDOWN,
            //且首个任务firstTask为空,
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                //将Worker实例加入线程池workers
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                //线程添加成功标志位 -> true
                workerAdded = true;
            }
        } finally {
            //释放锁
            mainLock.unlock();
        }
        //若是worker实例加入线程池成功,则启动线程,同时修改线程启动成功标志位 -> true
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        //添加线程失败
        addWorkerFailed(w);
}
return workerStarted;
复制代码

这部分代码主要的目的其实就是启动一个线程,前面是一堆的条件判断,看是否可以启动一个工做线程。它由两个try...catch...finally内容组成,能够将他们拆开来看,这样就很容易看懂。

咱们先看里面一层的try...catch...finally,当Worker实例中的 Thread 线程不为空的时候,开启一个独占锁ReentrantLock mainLock,防止其余线程也来修改操做。

try {
   //获取线程池运行状态
   int rs = runStateOf(ctl.get());

   if (rs < SHUTDOWN ||
       (rs == SHUTDOWN && firstTask == null)) {
       if (t.isAlive()) // precheck that t is startable
           throw new IllegalThreadStateException();
       workers.add(w);
       int s = workers.size();
       if (s > largestPoolSize)
           largestPoolSize = s;
       workerAdded = true;
   }
} finally {
   mainLock.unlock();
}
复制代码
  • 首先检查线程池的状态,当线程池处于 RUNNING 状态或者线程池处于 SHUTDOWN 状态可是当前线程的 firstTask 为空,知足以上条件时才能将 worker 实例添加进线程池,即workers.add(w);
  • 同时修改 largestPoolSize,largestPoolSize变量用于记录出现过的最大线程数。
  • 将标志位 workerAdded 设置为 true,表示添加工做线程成功。
  • 不管成功与否,在 finally 中都必须执行 mainLock.unlock()来释放锁。

外面一层的try...catch...finally主要是为了判断工做线程是否启动成功,若是内层try...catch...finally代码执行成功,即 worker 添加进线程池成功,workerAdded 标志位置为true,则启动 worker 中的线程 t.start(),同时将标志位 workerStarted 置为 true,表示线程启动成功。

if (workerAdded) {
    t.start();
    workerStarted = true;
}
复制代码

若是失败了,即 workerStarted == false,则在 finally 里面必须执行addWorkerFailed(w)方法,这个方法至关因而用来回滚操做的,前面增的这里移除,前面加的这里减去。

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            //从线程池中移除worker实例
            workers.remove(w);
        //经过CAS,将工做线程数量workerCount减1
        decrementWorkerCount();
        //
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}
复制代码

Worker类

上面咱们分析了addWorker 方法的源码,而且看到了 Thread t = w.threadworkers.add(w)t.start()等代码,知道了线程池的运行状态和添加工做线程的流程,那么咱们还有一些疑问:

  • 这里的 Worker 是什么?和 Thread 有什么区别?
  • 线程启动后是如何拿任务?在哪拿任务去执行的?
  • 阻塞队列满后,额外新建立的线程是去队列里拿任务的吗?若是不是那它是去哪拿的?
  • 核心线程会一直存在于线程池中吗?额外建立的普通线程执行完任务后会销毁吗?

Worker 是 ThreadPoolExecutor的一个内部类,主要是用来维护线程执行任务的中断控制状态,它实现了Runnable 接口同时继承了AQS,实现 Runnable 接口意味着 Worker 就是一个线程,继承 AQS 是为了实现独占锁这个功能。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        /** Thread this worker is running in. Null if factory fails. */
        final Thread thread;
        /** Initial task to run. Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        
        //构造函数,初始化AQS的state值为-1
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
}
复制代码

至于为何没有使用可重入锁 ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程如今的执行状态。

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
  2. 若是正在执行任务,则不该该中断线程。
  3. 若是该线程如今不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时能够对该线程进行中断。
  4. 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是不是空闲状态;若是线程是空闲状态则能够安全回收。

Worker 类有一个构造方法,构造参数为给定的首个任务 firstTask,并持有一个线程thread。thread是在调用构造方法时经过 ThreadFactory 来建立的线程,能够用来执行任务;

firstTask用它来初始化时传入的第一个任务,这个任务能够有也能够为null。若是这个值是非空的,那么线程就会在启动初期当即执行这个任务;若是这个值是null,那么就须要建立一个线程去执行阻塞队列中的任务,也就是非核心线程的建立。

任务运行-runWorker

上面咱们一块儿看过线程的启动t.start(),具体运行是在 Worker 的 run() 方法中

public void run() {
    runWorker(this);
}
复制代码

run() 方法中又调用了runWorker() 方法,全部的实现都在这里

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
复制代码

不少人看到这样的代码就感受头痛,其实你细看,这里面咱们能够看关键点,里面有三块try...catch...finally代码,咱们将这三块分别单独拎出来看而且将抛异常的地方暂时删掉或注释掉,这样它看起来就清爽了不少

Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//因为Worker初始化时AQS中state设置为-1,这里要先作一次解锁把state更新为0,容许线程中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
    // 循环的判断任务(firstTask或从队列中获取的task)是否为空
    while (task != null || (task = getTask()) != null) {
        // Worker加锁,本质是AQS获取资源而且尝试CAS更新state由0更变为1
        w.lock();
        // 若是线程池运行状态是stopping, 确保线程是中断状态;
        // 若是不是stopping, 确保线程是非中断状态. 
        if ((runStateAtLeast(ctl.get(), STOP) ||
             (Thread.interrupted() &&
              runStateAtLeast(ctl.get(), STOP))) &&
            !wt.isInterrupted())
            wt.interrupt();
            
            //此处省略了第二个try...catch...finally
    }
    // 走到这里说明某一次getTask()返回为null,线程正常退出
    completedAbruptly = false;
} finally {
    //处理线程退出
    processWorkerExit(w, completedAbruptly);
}
复制代码

第二个try...catch...finally

try {
   beforeExecute(wt, task);
   Throwable thrown = null;
    
    //此处省略了第三个try...catch...finally
    
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}
复制代码

第三个try...catch...finally

try {
    // 运行任务
    task.run();
} catch (RuntimeException x) {
    thrown = x; throw x;
} catch (Error x) {
    thrown = x; throw x;
} catch (Throwable x) {
    thrown = x; throw new Error(x);
} finally {
    afterExecute(task, thrown);
}
复制代码

上面的代码中能够看到有beforeExecuteafterExecuteterminaerd三个函数,它们都是钩子函数,能够分别在子类中重写它们用来扩展ThreadPoolExecutor,例如添加日志、计时、监视或者统计信息收集的功能。

  • beforeExecute():线程执行以前调用
  • afterExecute():线程执行以后调用
  • terminaerd():线程池退出时候调用

这样拆分完以后发现,其实主要注意两个点就好了,分别是getTask()task.run()task.run()就是运行任务,那咱们继续来看下getTask()是如何获取任务的。

获取任务-getTask

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //1.线程池状态是STOP,TIDYING,TERMINATED
        //2.线程池shutdown而且队列是空的.
        //知足以上两个条件之一则工做线程数wc减去1,而后直接返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        //容许核心工做线程对象销毁淘汰或者工做线程数 > 最大核心线程数corePoolSize
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //1.工做线程数 > 最大线程数maximumPoolSize 或者timed == true && timedOut == true
        //2.工做线程数 > 1 或者队列为空 
        //同时知足以上两个条件则经过CAS把线程数减去1,同时返回null。CAS把线程数减去1失败会进入下一轮循环作重试
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /// 若是timed为true,经过poll()方法作超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
            // 若是timed为false,经过take()作阻塞拉取,会阻塞到有下一个有效的任务时候再返回(通常不会是null)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
复制代码

里面有个关键字allowCoreThreadTimeOut,它的默认值为false,在Java1.6开始你能够经过threadPoolExecutor.allowCoreThreadTimeOut(true)方式来设置为true,经过字面意思就能够明白这个字段的做用是什么了,便是否容许核心线程超时销毁。

默认的状况下核心线程数量会一直保持,即便这些线程是空闲的它也是会一直存在的,而当设置为 true 时,线程池中 corePoolSize 线程空闲时间达到 keepAliveTime 也将销毁关闭。

结尾

经过整片分析下来,线程池里面有不少细节处须要注意,阅读完源码以后也理解了更多,解开了不少困惑,获取到了更多的知识点,因此源码的阅读是很重要的。

相关文章
相关标签/搜索