线程池运行模型源码全解析

在上一篇文章《从0到1玩转线程池》中,咱们了解了线程池的使用方法,以及向线程池中提交任务的完整流程和ThreadPoolExecutor.execute方法的源代码。在这篇文章中,咱们将会从头阅读线程池ThreadPoolExecutor类的源代码,深刻剖析线程池从提交任务到执行任务的完整流程,从而创建起完整的线程池运行模型。java

查看JDK源码的方式

在IDE中,例如IDEA里,咱们能够点击咱们样例代码里的ThreadPoolExecutor类跳转到JDK中ThreadPoolExecutor类的源代码。在源代码中咱们能够看到不少java.util.concurrent包的缔造者大牛“Doug Lea”所留下的各类注释,下面的图片就是该类源代码的一个截图。数组

这些注释的内容很是有参考价值,建议有能力的读者朋友能够本身阅读一遍。下面,咱们就开始阅读ThreadPoolExecutor的源代码吧。安全

控制变量与线程池生命周期

ThreadPoolExecutor类定义的开头,咱们能够看到以下的几行代码:bash

// 控制变量,前3位表示状态,剩下的数据位表示有效的线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer的位数减去3位状态位就是线程数的位数
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY就是线程数的上限(含),即2^COUNT_BITS - 1个
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
复制代码

第一行是一个用来做为控制变量的整型值,即一个Integer。之因此要用AtomicInteger类是由于要保证多线程安全,在本系列以后的文章中会对AtomicInteger进行具体介绍。一个整型通常是32位,可是这里的代码为了保险起见,仍是使用了Integer.SIZE来表示整型的总位数。这里的“位”指的是数据位(bit),在计算机中,8bit = 1字节,1024字节 = 1KB,1024KB = 1MB。每一位都是一个0或1的数字,咱们若是把整型想象成一个二进制(0或1)的数组,那么一个Integer就是32个数字的数组。其中,前三个被用来表示状态,那么咱们就能够表示2^3 = 8个不一样的状态了。剩下的29位二进制数字都会被用于表示当前线程池中有效线程的数量,上限就是(2^29 - 1)个,即常量CAPACITY数据结构

以后的部分列出了线程池的全部状态:多线程

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
复制代码

在这里能够忽略数字后面的<< COUNT_BITS,能够把状态简单地理解为前面的数字部分,这样的简化基本不影响结论。ui

各个状态的解释以下:this

  • RUNNING,正常运行状态,能够接受新的任务和处理队列中的任务
  • SHUTDOWN,关闭中状态,不能接受新任务,可是能够处理队列中的任务
  • STOP,中止中状态,不能接受新任务,也不处理队列中的任务,会中断进行中的任务
  • TIDYING,待结束状态,全部任务已经结束,线程数归0,进入TIDYING状态后将会运行terminated()方法
  • TERMINATED,结束状态,terminated()方法调用完成后进入

这几个状态所对应的数字值是按照顺序排列的,也就是说线程池的状态只能从小到大变化,这也方便了经过数字比较来判断状态所在的阶段,这种经过数字大小来比较状态值的方法在ThreadPoolExecutor的源码中会有大量的使用。spa

下图是这五个状态之间的变化过程: 线程

  1. 当线程池被建立时会处于RUNNING状态,正常接受和处理任务;
  2. shutdown()方法被直接调用,或者在线程池对象被GC回收时经过finalize()方法隐式调用了shutdown()方法时,线程池会进入SHUTDOWN状态。该状态下线程池仍然会继续执行完阻塞队列中的任务,只是再也不接受新的任务了。当队列中的任务被执行完后,线程池中的线程也会被回收。当队列和线程都被清空后,线程池将进入TIDYING状态;
  3. 在线程池处于RUNNING或者SHUTDOWN状态时,若是有代码调用了shutdownNow()方法,则线程池会进入STOP状态。在STOP状态下,线程池会直接清空阻塞队列中待执行的任务,而后中断全部正在进行中的任务并回收线程。当线程都被清空之后,线程池就会进入TIDYING状态;
  4. 当线程池进入TIDYING状态时,将会运行terminated()方法,该方法执行完后,线程池就会进入最终的TERMINATED状态,完全结束。

到这里咱们就已经清楚地了解了线程从刚被建立时的RUNNING状态一直到最终的TERMINATED状态的整个生命周期了。那么当咱们要向一个RUNNING状态的线程池提交任务时会发生些什么呢?

execute方法的实现

咱们通常会使用execute方法提交咱们的任务,那么线程池在这个过程当中作了什么呢?在ThreadPoolExecutor类的execute()方法的源代码中,咱们主要作了四件事:

  1. 若是当前线程池中的线程数小于核心线程数corePoolSize,则经过threadFactory建立一个新的线程,并把入参中的任务做为第一个任务传入该线程;
  2. 若是当前线程池中的线程数已经达到了核心线程数corePoolSize,那么就会经过阻塞队列workerQueueoffer方法来将任务添加到队列中保存,并等待线程空闲后进行执行;
  3. 若是线程数已经达到了corePoolSize且阻塞队列中没法插入该任务(好比已满),那么线程池就会再增长一个线程来执行该任务,除非线程数已经达到了最大线程数maximumPoolSize
  4. 若是确实已经达到了最大线程数,那么就会经过拒绝策略对象handler拒绝这个任务。

整体上的执行流程以下,下方的黑色同心圆表明流程结束:

这里解释一下阻塞队列的定义,方便你们阅读:

线程池中的阻塞队列专门用于存放须要等待线程空闲的待执行任务,而阻塞队列是这样的一种数据结构,它是一个队列(相似于一个List),能够存放0到N个元素。咱们能够对这个队列进行插入和弹出元素的操做,弹出操做能够理解为是一个获取并从队列中删除一个元素的操做。当队列中没有元素时,对这个队列的获取操做将会被阻塞,直到有元素被插入时才会被唤醒;当队列已满时,对这个队列的插入操做将会被阻塞,直到有元素被弹出后才会被唤醒。

这样的一种数据结构很是适合于线程池的场景,当一个工做线程没有任务可处理时就会进入阻塞状态,直到有新任务提交后才被唤醒。

线程池中经常使用的阻塞队列通常有三种类型:直连队列、无界队列、有界队列。不一样的阻塞队列类型会被线程池的行为产生不一样的影响,有兴趣的读者能够在上一篇文章《从0到1玩转线程池》中找到不一样类型阻塞队列的具体解释。

下面是带有注释的源代码,你们能够和上面的流程对照起来参考一下:

public void execute(Runnable command) {
    // 检查提交的任务是否为空
    if (command == null)
        throw new NullPointerException();
    
    // 获取控制变量值
    int c = ctl.get();
    // 检查当前线程数是否达到了核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 未达到核心线程数,则建立新线程
        // 并将传入的任务做为该线程的第一个任务
        if (addWorker(command, true))
            // 添加线程成功则直接返回,不然继续执行
            return;

        // 由于前面调用了耗时操做addWorker方法
        // 因此线程池状态有可能发生了改变,从新获取状态值
        c = ctl.get();
    }

    // 判断线程池当前状态是不是运行中
    // 若是是则调用workQueue.offer方法将任务放入阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 由于执行了耗时操做“放入阻塞队列”,因此从新获取状态值
        int recheck = ctl.get();
        // 若是当前状态不是运行中,则将刚才放入阻塞队列的任务拿出,若是拿出成功,则直接拒绝这个任务
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 若是线程池中没有线程了,那就建立一个
            addWorker(null, false);
    }
    // 若是放入阻塞队列失败(如队列已满),则添加一个线程
    else if (!addWorker(command, false))
        // 若是添加线程失败(如已经达到了最大线程数),则拒绝任务
        reject(command);
}
复制代码

从上面的源码中咱们能够知道,当一个任务被经过ThreadPoolExecutorexecute方法提交到线程池中执行时,这个任务有可能以两种方式被执行:

  1. 直接在建立一个新的Worker时被做为第一个任务传入,由这个新建立的线程来执行;
  2. 把任务放入一个阻塞队列,等待线程池中的工做线程Worker捞取任务进行执行。

这里的这个Worker指的就是ThreadPoolExecutor.Worker类,这是一个ThreadPoolExecutor的内部类,用于对基础线程类Thread进行包装和对线程进行管理。那么线程池究竟是怎么利用Worker类来实现持续不断地接收提交的任务并执行的呢?接下来,咱们经过ThreadPoolExecutor的源代码来一步一步抽丝剥茧,揭开线程池运行模型的神秘面纱。

addWorker方法

在上文中的execute方法的代码中咱们能够看到线程池是经过addWorker方法来向线程池中添加新线程的,那么新的线程又是如何运行起来的呢?

这里咱们暂时跳过addWorker方法的详细源代码,由于虽然这个方法的代码行数较多,可是功能相对比较直接,只是经过new Worker(firstTask)建立了一个表明线程的Worker对象,而后调用了这个对象所包含的Thread对象的start()方法。

咱们知道一旦调用了Thread类的start()方法,则这个线程就会开始执行建立线程时传入的Runnable对象。从下面的Worker类构造器源代码能够看出,Worker类正是把本身(this引用)传入了线程的构造器当中,因此这个线程启动后就会执行Worker类的run()方法了,而在Workerrun()方法中只执行了一行很简单的代码runWorker(this)

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

public void run() {
    runWorker(this);
}
复制代码

runWorker方法的实现

咱们看到线程池中的线程在启动时会调用对应的Worker类的runWorker方法,而这里就是整个线程池任务执行的核心所在了。runWorker方法中包含有一个相似无限循环的while语句,让worker对象能够一直持续不断地执行提交到线程池中的新任务或者等待下一个新任务的提交。

你们能够配合代码上带有的注释来理解该方法的具体实现:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 将worker的状态重置为正常状态,由于state状态值在构造器中被初始化为-1
    w.unlock();
    // 经过completedAbruptly变量的值判断任务是否正常执行完成
    boolean completedAbruptly = true;
    try {
        // 若是task为null就经过getTask方法获取阻塞队列中的下一个任务
        // getTask方法通常不会返回null,因此这个while相似于一个无限循环
        // worker对象就经过这个方法的持续运行来不断处理新的任务
        while (task != null || (task = getTask()) != null) {
            // 每一次任务的执行都必须获取锁来保证下方临界区代码的线程安全
            w.lock();
            
            // 若是状态值大于等于STOP(状态值是有序的,即STOP、TIDYING、TERMINATED)
            // 且当前线程尚未被中断,则主动中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();

            // 开始
            try {
                // 执行任务前处理操做,默认是一个空实现
                // 在子类中能够经过重写来改变任务执行前的处理行为
                beforeExecute(wt, task);

                // 经过thrown变量保存任务执行过程当中抛出的异常
                // 提供给下面finally块中的afterExecute方法使用
                Throwable thrown = null;
                try {
                    // *** 重要:实际执行任务的代码
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    // 由于Runnable接口的run方法中不能抛出Throwable对象
                    // 因此要包装成Error对象抛出
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行任务后处理操做,默认是一个空实现
                    // 在子类中能够经过重写来改变任务执行后的处理行为
                    afterExecute(task, thrown);
                }
            } finally {
                // 将循环变量task设置为null,表示已处理完成
                task = null;
                // 累加当前worker已经完成的任务数
                w.completedTasks++;
                // 释放while体中第一行获取的锁
                w.unlock();
            }
        }

        // 将completedAbruptly变量设置为false,表示任务正常处理完成
        completedAbruptly = false;
    } finally {
        // 销毁当前的worker对象,并完成一些诸如完成任务数量统计之类的辅助性工做
        // 在线程池当前状态小于STOP的状况下会建立一个新的worker来替换被销毁的worker
        processWorkerExit(w, completedAbruptly);
    }
}
复制代码

runWorker方法的源代码中有两个比较重要的方法调用,一个是while条件中对getTask方法的调用,一个是在方法的最后对processWorkerExit方法的调用。下面是对这两个方法更详细的解释。

getTask方法在阻塞队列中有待执行的任务时会从队列中弹出一个任务并返回,若是阻塞队列为空,那么就会阻塞等待新的任务提交到队列中直到超时(在一些配置下会一直等待而不超时),若是在超时以前获取到了新的任务,那么就会将这个任务做为返回值返回。因此通常getTask方法是不会返回null的,只会阻塞等待下一个任务并在以后将这个新任务做为返回值返回。

getTask方法返回null时会致使当前Worker退出,当前线程被销毁。在如下状况下getTask方法才会返回null:

  1. 当前线程池中的线程数超过了最大线程数。这是由于运行时经过调用setMaximumPoolSize修改了最大线程数而致使的结果;
  2. 线程池处于STOP状态。这种状况下全部线程都应该被当即回收销毁;
  3. 线程池处于SHUTDOWN状态,且阻塞队列为空。这种状况下已经不会有新的任务被提交到阻塞队列中了,因此线程应该被销毁;
  4. 线程能够被超时回收的状况下等待新任务超时。线程被超时回收通常有如下两种状况:
    • 超出核心线程数部分的线程等待任务超时
    • 容许核心线程超时(线程池配置)的状况下线程等待任务超时

processWorkerExit方法会销毁当前线程对应的Worker对象,并执行一些累加总处理任务数等辅助操做,但在线程池当前状态小于STOP的状况下会建立一个新的Worker来替换被销毁的Worker。

getTaskprocessWorkerExit方法源代码感兴趣的读者能够阅读下一节来具体了解一下,不过跳过这一节也是彻底能够的。

getTask与processWorkerExit方法源代码

如下是getTaskprocessWorkerExit两个方法的带有中文解释的源代码:

private Runnable getTask() {
    // 经过timeOut变量表示线程是否空闲时间超时了
    boolean timedOut = false;

    // 无限循环
    for (;;) {
        // 获取线程池状态
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 若是 线程池状态>=STOP
        //    或者 (线程池状态==SHUTDOWN && 阻塞队列为空)
        // 则直接减小一个worker计数并返回null(返回null会致使当前worker被销毁)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // 获取线程池中的worker计数
        int wc = workerCountOf(c);

        // 判断当前线程是否会被超时销毁
        // 会被超时销毁的状况:线程池容许核心线程超时 或 当前线程数大于核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 若是 (当前线程数大于最大线程数 或 (容许超时销毁 且 当前发生了空闲时间超时))
        //   且 (当前线程数大于1 或 阻塞队列为空) —— 该条件在阻塞队列不为空的状况下保证至少会保留一个线程继续处理任务
        // 则 减小worker计数并返回null(返回null会致使当前worker被销毁)
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 从阻塞队列中取出一个任务(若是队列为空会进入阻塞等待状态)
            // 若是容许空闲超时销毁线程的话则带有一个等待的超时时间
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 若是获取到了任务就直接返回该任务,返回后会开始执行该任务
            if (r != null)
                return r;
            // 若是任务为null,则说明发生了等待超时,将空闲时间超时标志设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 若是等待被中断了,那说明空闲时间(等待任务的时间)尚未超时
            timedOut = false;
        }
    }
}
复制代码

processWorkerExit方法的源代码:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 若是completedAbruptly为true则表示任务执行过程当中抛出了未处理的异常
    // 因此尚未正确地减小worker计数,这里须要减小一次worker计数
    if (completedAbruptly)
        decrementWorkerCount();

    // 获取线程池的主锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 把将被销毁的线程已完成的任务数累计到线程池的完成任务总数上
        completedTaskCount += w.completedTasks;
        // 从worker集合中去掉将会销毁的worker
        workers.remove(w);
    } finally {
        // 释放线程池主锁
        mainLock.unlock();
    }

    // 尝试结束线程池
    // 这里是为了在关闭线程池时等到全部worker都被回收后再结束线程池
    tryTerminate();

    int c = ctl.get();
    // 若是线程池状态 < STOP,即RUNNING或SHUTDOWN
    // 则须要考虑建立新线程来代替被销毁的线程
    if (runStateLessThan(c, STOP)) {
        // 若是worker是正常执行完的,则要判断一下是否已经知足了最小线程数要求
        // 不然直接建立替代线程
        if (!completedAbruptly) {
            // 若是容许核心线程超时则最小线程数是0,不然最小线程数等于核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 若是阻塞队列非空,则至少要有一个线程继续执行剩下的任务
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 若是当前线程数已经知足最小线程数要求
            // 那么就不建立替代线程了
            if (workerCountOf(c) >= min)
                return;
        }

        // 从新建立一个worker来代替被销毁的线程
        addWorker(null, false);
    }
}
复制代码

总结

到这里咱们的线程池源代码之旅就结束了,在这篇文章中咱们首先了解了线程池中的控制变量与状态变换流程,以后咱们经过线程池的源代码深刻解析了从提交任务到执行任务的全过程,相信经过这些知识咱们已经能够在脑海中创建起一套完整的线程池运行模型了。若是你们有一些细节感受还不是特别清晰的话,建议不妨再返回到文章的开头多读几遍,相信第二遍的阅读能给你们带来不同的体验,由于我本身也是在第三次读ThreadPoolExecutor类的源代码时才真正打通了其中的一些重要关节的。

引子

在浏览ThreadPoolExexutor源码的过程当中,有几个点咱们其实并无彻底说清楚,好比对锁的加锁操做、对控制变量的屡次获取、控制变量的AtomicInteger类型。在下一篇文章中,我将会介绍这些以锁、volatile变量、CAS操做、AQS抽象类为表明的一系列线程同步方法,欢迎感兴趣的读者继续关注我后续发布的文章~

相关文章
相关标签/搜索