ThreadPoolExcutor 原理探究

概论

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和总体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短期任务时建立与销毁线程的代价。线程池不只可以保证内核的充分利用,还能防止过度调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。 例如,线程数通常取 cpu 数量 +2 比较合适,线程数过多会致使额外的线程切换开销。html

Java 中的线程池是用 ThreadPoolExecutor 类来实现的. 本文就对该类的源码来分析一下这个类内部对于线程的建立, 管理以及后台任务的调度等方面的执行原理。java

先看一下线程池的类图:缓存

线程池的类图

上图的目的主要是为了让你们知道线程池相关类之间的关系,至少赚个眼熟,之后看到不会有惧怕的感受。安全


 

Executor 框架接口

Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。网络

下面是 ThreadPoolExeCutor 类图。Executors 实际上是一个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不一样的线程实例。多线程

从上图也能够看出来,ThreadPoolExeCutor 是线程池的核心。并发

J.U.C 中有三个 Executor 接口:框架

  • Executor:一个运行新任务的简单接口;异步

  • ExecutorService:扩展了 Executor 接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;socket

  • ScheduledExecutorService:扩展了 ExecutorService。支持 Future 和按期执行任务。

其实经过这些接口就能够看到一些设计思想,每一个接口的名字和其任务是彻底匹配的。不会由于 Executor 中只有一个方法,就将其放到其余接口中。这也是很重要的单一原则。


 

ThreadPoolExeCutor 分析

在去具体分析 ThreadPoolExeCutor 运行逻辑前,先看下面的流程图:

该图是 ThreadPoolExeCutor 整个运行过程的一个归纳,整个源码的核心逻辑总结起来就是:

  1. 建立线程:要知道如何去建立线程,控制线程数量,线程的存活与销毁;

  2. 添加任务:任务添加后如何处理,是马上执行,仍是先保存;

  3. 执行任务:如何获取任务,任务执行失败后如何处理?

下面将进入源码分析,来深刻理解 ThreadPoolExeCutor 的设计思想。


 

构造函数

先来看构造函数:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
     // 注意 workQueue, threadFactory, handler 是不能够为null 的,为空会直接抛出错误
if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
  1. corePoolSize 核心线程数表示核心线程池的大小。当提交一个任务时,若是当前核心线程池的线程个数没有达到 corePoolSize,则会建立新的线程来执行所提交的任务,即便当前核心线程池有空闲的线程。若是当前核心线程池的线程个数已经达到了corePoolSize,则再也不从新建立线程。若是调用了 prestartCoreThread() 或者 prestartAllCoreThreads(),线程池建立的时候全部的核心线程都会被建立而且启动。若 corePoolSize == 0,则任务执行完以后,没有任何请求进入时,销毁线程池的线程。若 corePoolSize > 0,即便本地任务执行完毕,核心线程也不会被销毁。corePoolSize 其实能够理解为可保留的空闲线程数。

  2. maximumPoolSize: 表示线程池可以容纳同时执行的最大线程数。若是当阻塞队列已满时,而且当前线程池线程个数没有超过 maximumPoolSize 的话,就会建立新的线程来执行任务。注意 maximumPoolSize >= 1 必须大于等于 1。maximumPoolSize == corePoolSize ,便是固定大小线程池。实际上最大容量是由 CAPACITY 控制

  3. keepAliveTime: 线程空闲时间。当空闲时间达到 keepAliveTime值时,线程会被销毁,直到只剩下 corePoolSize 个线程为止,避免浪费内存和句柄资源。默认状况,当线程池的线程数 > corePoolSize 时,keepAliveTime 才会起做用。但当 ThreadPoolExecutor 的 allowCoreThreadTimeOut 变量设置为 true 时,核心线程超时后会被回收。

  4. unit时间单位。为 keepAliveTime 指定时间单位。

  5. workQueue 缓存队列。当请求的线程数 > maximumPoolSize时,线程进入 BlockingQueue 阻塞队列。可使用 ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue。

  6. threadFactory 建立线程的工程类。能够经过指定线程工厂为每一个建立出来的线程设置更有意义的名字,若是出现并发问题,也方便查找问题缘由。

  7. handler 执行拒绝策略的对象。当线程池的阻塞队列已满和指定的线程都已经开启,说明当前线程池已经处于饱和状态了,那么就须要采用一种策略来处理这种状况。采用的策略有这几种:
    • AbortPolicy: 直接拒绝所提交的任务,并抛出 RejectedExecutionException 异常;

    • CallerRunsPolicy:只用调用者所在的线程来执行任务;

    • DiscardPolicy:不处理直接丢弃掉任务;

    • DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务,执行当前任务


属性定义

看完构造函数以后,再来看下该类里面的变量,有助于进一步理解整个代码运行逻辑,下面是一些比较重要的变量:

// 用来标记线程池状态(高3位),线程个数(低29位)
// 默认是 RUNNING 状态,线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 线程个数掩码位数,整型最大位数-3,能够适用于不一样平台
private static final int COUNT_BITS = Integer.SIZE - 3;

//线程最大个数(低29位)00011111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//(高3位):11100000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;

//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//(高3位):00100000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;

//(高3位):01000000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;

//(高3位):01100000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// 获取高三位 运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//获取低29位 线程个数
private static int workerCountOf(int c)  { return c & CAPACITY; }

//计算ctl新值,线程状态 与 线程个数
private static int ctlOf(int rs, int wc) { return rs | wc; }

这里须要对一些操做作些解释。 

  • Integer.SIZE:对于不一样平台,其位数不同,目前常见的是 32 位;

  • (1 << COUNT_BITS) - 1:首先是将 1 左移 COUNT_BITS 位,也就是第 COUNT_BITS + 1 位是1,其他都是 0;-1 操做则是将后面前面的 COUNT_BITS 位都变成 1。

  • -1 << COUNT_BITS:-1 的原码是 10000000 00000000 00000000 00000001 ,反码是 111111111 11111111 11111111 11111110 ,补码 +1,而后左移 29 位是 11100000 00000000 00000000 00000000;这里转为十进制是负数。

  • ~CAPACITY取反,最高三位是1;

总结:这里巧妙利用 bit 操做来将线程数量和运行状态联系在一块儿,减小了变量的存在和内存的占用。其中五种状态的十进制排序:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED


 

线程池状态

线程池状态含义:

  • RUNNING:接受新任务而且处理阻塞队列里的任务;

  • SHUTDOWN:拒绝新任务可是处理阻塞队列里的任务;

  • STOP:拒绝新任务而且抛弃阻塞队列里的任务同时会中断正在处理的任务;

  • TIDYING:全部任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为 0,将要调用 terminated 方法

  • TERMINATED:终止状态。terminated 方法调用完成之后的状态;

线程池状态转换:

  • RUNNING -> SHUTDOWN:显式调用 shutdown() 方法,或者隐式调用了 finalize(),它里面调用了shutdown()方法。

  • RUNNING or SHUTDOWN)-> STOP:显式 shutdownNow() 方法;

  • SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候;

  • STOP -> TIDYING:当线程池为空的时候;

  • TIDYING -> TERMINATED:当 terminated() hook 方法执行完成时候;


 原码,反码,补码知识小剧场:

1. 原码:原码就是符号位加上真值的绝对值, 即用第一位表示符号,其他位表示值. 好比若是是 8 位二进制:

[+1] = 0000 0001

[-1] = 1000 0001

负数原码第一位是符号位. 

 

2. 反码:反码的表示方法是,正数的反码是其自己,负数的反码是在其原码的基础上, 符号位不变,其他各个位取反.

[+1] = [0000 0001] = [0000 0001]

[-1] = [1000 0001] = [1111 1110]

 

3. 补码:补码的表示方法是,正数的补码就是其自己,负数的补码是在其原码的基础上, 符号位不变, 其他各位取反, 最后 +1. (即在反码的基础上 +1)

[+1] = [0000 0001] = [0000 0001] = [0000 0001]

[-1] = [1000 0001] = [1111 1110] = [1111 1111]

4. 总结
在知道一个数原码的状况下:
正数:反码,补码 就是自己本身
负数:反码是高位符号位不变,其他位取反。补码:反码+1

 

 5. 左移:当数值左、右移时,先将数值转化为其补码形式,移完后,再转换成对应的原码

     左移:高位丢弃,低位补零

     [+1]  = [00000001]

     [0000 0001] << 1 = [0000 0010] = [0000 0010] = [+2]

     [-1]  = [1000 0001] = [1111 1111]

     [1111 1111] << 1 = [1111 1110] = [1000 0010] = [-2]

其中,再次提醒,负数的补码是反码+1;负数的反码是补码-1;

 

 6. 右移:高位保持不变,低位丢弃

     [+127] = [0111 1111] = [0111 1111]

     [0111 1111]补 >> 1 = [0011 1111] = [0011 1111] = [+63]

     [-127] = [1111 1111] = [1000 0001]

     [1000 0001] >> 1 = [1100 0000] = [1100 0000]原 = [-64]


execute 方法分析

经过 ThreadPoolExecutor 建立线程池后,提交任务后执行过程是怎样的,下面来经过源码来看一看。execute 方法源码以下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    // 返回包含线程数及线程池状态(头3位)
    int c = ctl.get();
    
    // 若是工做线程数小于核心线程数,则建立线程任务执行
    if (workerCountOf(c) < corePoolSize) {
        
        if (addWorker(command, true))
            return;
            
        // 若是建立失败,防止外部已经在线程池中加入新任务,从新获取
        c = ctl.get();
    }
    
    // 只有线程池处于 RUNNING 状态,且 入队列成功
    if (isRunning(c) && workQueue.offer(command)) {
   // 后面的操做属于double-check int recheck = ctl.get();
        
        // 若是线程池不是 RUNNING 状态,则将刚加入队列的任务移除
        if (! isRunning(recheck) && remove(command))
            reject(command);
            
        // 若是以前的线程已被消费完,新建一个线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 核心池和队列都满了,尝试建立一个新线程
    else if (!addWorker(command, false))
        // 若是 addWorker 返回是 false,即建立失败,则唤醒拒绝策略
        reject(command);
} 
execute 方法执行逻辑有这样几种状况:
  1. 若是当前运行的线程少于 corePoolSize,则会建立新的线程来执行新的任务;

  2. 若是运行的线程个数等于或者大于 corePoolSize,则会将提交的任务存放到阻塞队列 workQueue 中;

  3. 若是当前 workQueue 队列已满的话,则会建立新的线程来执行任务;

  4. 若是线程个数已经超过了 maximumPoolSize,则会使用饱和策略 RejectedExecutionHandler 来进行处理。

这里要注意一下 addWorker(null, false) 也就是建立一个线程,但并无传入任务,由于任务已经被添加到 workQueue 中了,因此 worker 在执行的时候,会直接从 workQueue 中获取任务。因此,在 workerCountOf(recheck) == 0 时执行 addWorker(null, false) 也是为了保证线程池在 RUNNING 状态下必需要有一个线程来执行任务。

须要注意的是,线程池的设计思想就是使用了核心线程池 corePoolSize,阻塞队列 workQueue 和线程池 maximumPoolSize,这样的缓存策略来处理任务,实际上这样的设计思想在须要框架中都会使用。

须要注意线程和任务之间的区别,任务是保存在 workQueue 中的,线程是从线程池里面取的,由 CAPACITY 控制容量。


addWorker 方法分析

addWorker 方法的主要工做是在线程池中建立一个新的线程并执行,firstTask 参数用于指定新增的线程执行的第一个任务,core 参数为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSize,false 表示新增线程前须要判断当前活动线程数是否少于 maximumPoolSize,代码以下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 获取运行状态
        int rs = runStateOf(c);
        
        /*
         * 这个if判断
         * 若是rs >= SHUTDOWN,则表示此时再也不接收新任务;
         * 接着判断如下3个条件,只要有1个不知足,则返回false:
         * 1. rs == SHUTDOWN,这时表示关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保存的任务
         * 2. firsTask为空
         * 3. 阻塞队列不为空
         * 
         * 首先考虑rs == SHUTDOWN的状况
         * 这种状况下不会接受新提交的任务,因此在firstTask不为空的时候会返回false;
         * 而后,若是firstTask为空,而且workQueue也为空,则返回false,
         * 由于队列中已经没有任务了,不须要再添加线程了
         */
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 获取线程数
            int wc = workerCountOf(c);
            // 若是wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
            // 这里的core是addWorker方法的第二个参数,若是为true表示根据corePoolSize来比较,
            // 若是为false则根据maximumPoolSize来比较。
            // 
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 尝试增长workerCount,若是成功,则跳出第一个for循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 若是增长workerCount失败,则从新获取ctl的值
            c = ctl.get();  // Re-read ctl
            // 若是当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根据firstTask来建立Worker对象
        w = new Worker(firstTask);
        // 每个Worker对象都会建立一个线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING状态;
                // 若是rs是RUNNING状态或者rs是SHUTDOWN状态而且firstTask为null,向线程池中添加线程。
                // 由于在SHUTDOWN时不会在添加新的任务,但仍是会执行workQueue中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一个HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize记录着线程池中出现过的最大线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这里须要注意有如下几点:

  1. 在获取锁后从新检查线程池的状态,这是由于其余线程可可能在本方法获取锁前改变了线程池的状态,好比调用了shutdown方法。添加成功则启动任务执行。

  2.  t.start()会调用 Worker 类中的 run 方法,Worker 自己实现了 Runnable 接口。缘由在建立线程得时候,将 Worker 实例传入了 t 当中,可参见 Worker 类的构造函数。

  3. wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) 每次调用 addWorker 来添加线程会先判断当前线程数是否超过了CAPACITY,而后再去判断是否超 corePoolSize 或 maximumPoolSize,说明线程数其实是由 CAPACITY 来控制的。


内部类 Worker 分析

上面分析过程当中,提到了一个 Worker 类,对于某些对源码不是很熟悉得同窗可能有点不清楚,下面就来看看 Worker 的源码:

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
       // 注意此处传入的是this
this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */
     // 这里其实会调用外部的 runWorker 方法来执行本身。
public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) {
       // 若是已经设置过1了,这时候在设置1就会返回false,也就是不可重入
if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }      // 提供安全中断线程得方法 void interruptIfStarted() { Thread t;
       // 一开始 setstate(-1) 避免了还没开始运行就被中断可能
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }

首先看到的是 Worker 继承了(AbstractQueuedSynchronizer) AQS,并实现了 Runnable 接口,说明 Worker 自己也是线程。而后看其构造函数能够发现,内部有两个属性变量分别是 Runnable 和 Thread 实例,该类其实就是对传进来得属性作了一个封装,并加入了获取锁的逻辑(继承了 AQS )。具体可参考文章:透过 ReentrantLock 分析 AQS 的实现原理

Worker 继承了 AQS,使用 AQS 来实现独占锁的功能。为何不使用 ReentrantLock 来实现呢?能够看到 tryAcquire 方法,它是不容许重入的,而 ReentrantLock 是容许重入的:

  1. lock 方法一旦获取了独占锁,表示当前线程正在执行任务中;

  2. 若是正在执行任务,则不该该中断线程;

  3. 若是该线程如今不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时能够对该线程进行中断;

  4. 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是不是空闲状态;

  5. 之因此设置为不可重入,是由于咱们不但愿任务在调用像 setCorePoolSize 这样的线程池控制方法时从新获取锁。若是使用 ReentrantLock,它是可重入的,这样若是在任务中调用了如 setCorePoolSize 这类线程池控制的方法,会中断正在运行的线程,由于 size 小了,须要中断一些线程 。

因此,Worker 继承自 AQS,用于判断线程是否空闲以及是否能够被中断。

此外,在构造方法中执行了 setState(-1);,把 state 变量设置为 -1,为何这么作呢?是由于 AQS 中默认的 state 是 0,若是刚建立了一个 Worker 对象,尚未执行任务时,这时就不该该被中断,看一下 tryAquire 方法: 

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

正由于如此,在 runWorker 方法中会先调用 Worker 对象的 unlock 方法将 state 设置为 0。tryAcquire 方法是根据 state 是不是 0 来判断的,因此,setState(-1);将 state 设置为 -1 是为了禁止在执行任务前对线程进行中断。


 runWorker 方法分析

前面提到了内部类 Worker 的 run 方法调用了外部类 runWorker,下面来看下 runWork 的具体逻辑。

final void runWorker(Worker w) {
       Thread wt = Thread.currentThread();
       Runnable task = w.firstTask;
       w.firstTask = null;
       w.unlock(); // status 设置为0,容许中断,也能够避免再次加锁失败
       boolean completedAbruptly = true;
       try {
           while (task != null || (task = getTask()) != null) {
 // 要派发task的时候,须要上锁                w.lock();
               // 若是线程池当前状态至少是stop,则设置中断标志;
               // 若是线程池当前状态是RUNNININ,则重置中断标志,重置后须要从新
               //检查下线程池状态,由于当重置中断标志时候,可能调用了线程池的shutdown方法
               //改变了线程池状态。
               if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                     runStateAtLeast(ctl.get(), STOP))) &&
                   !wt.isInterrupted())
                   wt.interrupt();

               try {
                   //任务执行前干一些事情
                   beforeExecute(wt, task);
                   Throwable thrown = null;
                   try {
                       task.run();//执行任务
                   } catch (RuntimeException x) {
                       thrown = x; throw x;
                   } catch (Error x) {
                       thrown = x; throw x;
                   } catch (Throwable x) {
                       thrown = x; throw new Error(x);
                   } finally {
                       //任务执行完毕后干一些事情
                       afterExecute(task, thrown);
                   }
               } finally {
                   task = null;
                   //统计当前worker完成了多少个任务
                   w.completedTasks++;
                   w.unlock();
               }
           }
           completedAbruptly = false;
       } finally {

           //执行清了工做
           processWorkerExit(w, completedAbruptly);
       }
   }

总结一下 runWorker 方法的执行过程:

  1. while 循环不断地经过 getTask() 方法从阻塞队列中取任务;

  2. 若是线程池正在中止,那么要保证当前线程是中断状态,不然要保证当前线程不是中断状态;

  3. 调用 task.run()执行任务;

  4. 若是 task 为 null 则跳出循环,执行 processWorkerExit 方法;

  5. runWorker 方法执行完毕,也表明着 Worker 中的 run 方法执行完毕,销毁线程。

这里的 beforeExecute 方法和 afterExecute 方法在 ThreadPoolExecutor 类中是空的,留给子类来实现。

completedAbruptly 变量来表示在执行任务过程当中是否出现了异常,在 processWorkerExit 方法中会对该变量的值进行判断。


 

getTask 方法分析

getTask 方法是从阻塞队列里面获取任务,具体代码逻辑以下:

private Runnable getTask() {
    // timeOut变量的值表示上次从阻塞队列中取任务时是否超时
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        /*
         * 若是线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行如下判断:
         * 1. rs >= STOP,线程池是否正在stop;
         * 2. 阻塞队列是否为空。
         * 若是以上条件知足,则将workerCount减1并返回null。
         * 由于若是当前线程池状态的值是SHUTDOWN或以上时,不容许再向阻塞队列中添加任务。
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // timed变量用于判断是否须要进行超时控制。
        // allowCoreThreadTimeOut默认是false,也就是核心线程不容许进行超时;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,须要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        /*
         * wc > maximumPoolSize的状况是由于可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
         * timed && timedOut 若是为true,表示当前操做须要进行超时控制,而且上次从阻塞队列中获取任务发生了超时
         * 接下来判断,若是有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
         * 若是减1失败,则返回重试。
         * 若是wc == 1时,也就说明当前线程是线程池中惟一的一个线程了。
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            /*
             * 根据timed来判断,若是为true,则经过阻塞队列的poll方法进行超时控制,若是在keepAliveTime时间内没有获取到任务,则返回null;
             * 不然经过take方法,若是这时队列为空,则take方法会阻塞直到队列不为空。
             * 
             */
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 若是 r == null,说明已经超时,timedOut设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 若是获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
            timedOut = false;
        }
    }
}

其实到这里后,你会发如今 ThreadPoolExcute 内部有几个重要的检验:

  • 判断当前的运行状态,根据运行状态来作处理,若是当前都中止运行了,那不少操做也就不必了;

  • 判断当前线程池的数量,而后将该数据和 corePoolSize 以及 maximumPoolSize 进行比较,而后再去决定下一步该作啥;

首先是第一个 if 判断,当运行状态处于非 RUNNING 状态,此外 rs >= STOP(线程池是否正在 stop)或阻塞队列是否为空。则将 workerCount 减 1 并返回 null。为何要减 1 呢,由于此处实际上是去获取一个 task,可是发现处于中止状态了,也就是不必再去获取运行任务了,那这个线程就没有存在的意义了。后续也会在 processWorkerExit 将该线程移除。

第二个 if 条件目的是控制线程池的有效线程数量。由上文中的分析能够知道,在执行 execute 方法时,若是当前线程池的线程数量超过了 corePoolSize 且小于 maximumPoolSize,而且 workQueue 已满时,则能够增长工做线程,但这时若是超时没有获取到任务,也就是 timedOut 为 true 的状况,说明 workQueue 已经为空了,也就说明了当前线程池中不须要那么多线程来执行任务了,能够把多于 corePoolSize 数量的线程销毁掉,保持线程数量在 corePoolSize 便可。

何时会销毁?固然是 runWorker 方法执行完以后,也就是 Worker 中的 run 方法执行完,由 JVM 自动回收。

getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,而后会执行 processWorkerExit 方法。


 

processWorkerExit 方法

下面在看 processWorkerExit 方法的具体逻辑:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 若是completedAbruptly值为true,则说明线程执行时出现了异常,须要将workerCount减1;
    // 若是线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操做,这里就没必要再减了。  
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //统计完成的任务数
        completedTaskCount += w.completedTasks;
        // 从workers中移除,也就表示着从线程池中移除了一个工做线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 根据线程池状态进行判断是否结束线程池
    tryTerminate();
    int c = ctl.get();
    /*
     * 当线程池是RUNNING或SHUTDOWN状态时,若是worker是异常结束,那么会直接addWorker;
     * 若是allowCoreThreadTimeOut=true,而且等待队列有任务,至少保留一个worker;
     * 若是allowCoreThreadTimeOut=false,workerCount很多于corePoolSize。
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此,processWorkerExit 执行完以后,工做线程被销毁,以上就是整个工做线程的生命周期。可是这有两点须要注意:

  1. 你们想一想何时才会调用这个方法,任务干完了才会调用。那么没事作了,就须要看下是否有必要结束线程池,这时候就会调用 tryTerminate。

  2. 若是此时线程处于 STOP 状态如下,那么就会判断核心线程数是否达到了规定的数量,没有的话,就会继续建立一个线程。


tryTerminate方法

tryTerminate 方法根据线程池状态进行判断是否结束线程池,代码以下:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        /*
         * 当前线程池的状态为如下几种状况时,直接返回:
         * 1. RUNNING,由于还在运行中,不能中止;
         * 2. TIDYING或TERMINATED,由于线程池中已经没有正在运行的线程了;
         * 3. SHUTDOWN而且等待队列非空,这时要执行完workQueue中的task;
         */
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 若是线程数量不为0,则中断一个空闲的工做线程,并返回
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 这里尝试设置状态为TIDYING,若是设置成功,则调用terminated方法
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // terminated方法默认什么都不作,留给子类实现
                    terminated();
                } finally {
                    // 设置状态为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

interruptIdleWorkers(boolean onlyOne) 若是 ONLY_ONE = true 那么就的最多让一个空闲线程发生中断,ONLY_ONE = false 时是全部空闲线程都会发生中断。那线程何时会处于空闲状态呢?

一是线程数量不少,任务都完成了;二是线程在 getTask 方法中执行 workQueue.take() 时,若是不执行中断会一直阻塞。

因此每次在工做线程结束时调用 tryTerminate 方法来尝试中断一个空闲工做线程,避免在队列为空时取任务一直阻塞的状况。


 

shutdown方法

shutdown 方法要将线程池切换到 SHUTDOWN 状态,并调用 interruptIdleWorkers 方法请求中断全部空闲的 worker,最后调用 tryTerminate 尝试结束线程池。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 安全策略判断
        checkShutdownAccess();
        // 切换状态为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断空闲线程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 尝试结束线程池
    tryTerminate();
}

这里思考一个问题:在 runWorker 方法中,执行任务时对 Worker 对象 w 进行了 lock 操做,为何要在执行任务的时候对每一个工做线程都加锁呢?

下面仔细分析一下:

  • 在 getTask 方法中,若是这时线程池的状态是 SHUTDOWN 而且 workQueue 为空,那么就应该返回 null 来结束这个工做线程,而使线程池进入 SHUTDOWN 状态须要调用shutdown 方法;

  • shutdown 方法会调用 interruptIdleWorkers 来中断空闲的线程,interruptIdleWorkers 持有 mainLock,会遍历 workers 来逐个判断工做线程是否空闲。但 getTask 方法中没有mainLock;

  • 在 getTask 中,若是判断当前线程池状态是 RUNNING,而且阻塞队列为空,那么会调用 workQueue.take() 进行阻塞;

  • 若是在判断当前线程池状态是 RUNNING 后,这时调用了 shutdown 方法把状态改成了 SHUTDOWN,这时若是不进行中断,那么当前的工做线程在调用了 workQueue.take() 后会一直阻塞而不会被销毁,由于在 SHUTDOWN 状态下不容许再有新的任务添加到 workQueue 中,这样一来线程池永远都关闭不了了;

  • 由上可知,shutdown 方法与 getTask 方法(从队列中获取任务时)存在竞态条件;

  • 解决这一问题就须要用到线程的中断,也就是为何要用 interruptIdleWorkers 方法。在调用 workQueue.take() 时,若是发现当前线程在执行以前或者执行期间是中断状态,则会抛出 InterruptedException,解除阻塞的状态;

  • 可是要中断工做线程,还要判断工做线程是不是空闲的,若是工做线程正在处理任务,就不该该发生中断;

  • 因此 Worker 继承自 AQS,在工做线程处理任务时会进行 lock,interruptIdleWorkers 在进行中断时会使用 tryLock 来判断该工做线程是否正在处理任务,若是 tryLock 返回 true,说明该工做线程当前未执行任务,这时才能够被中断。

下面就来分析一下 interruptIdleWorkers 方法。

interruptIdleWorkers方法

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers 遍历 workers 中全部的工做线程,若线程没有被中断 tryLock 成功,就中断该线程。

为何须要持有 mainLock ?由于 workers 是 HashSet 类型的,不能保证线程安全。


 

shutdownNow方法

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        // 中断全部工做线程,不管是否空闲
        interruptWorkers();
        // 取出队列中没有被执行的任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

shutdownNow 方法与 shutdown 方法相似,不一样的地方在于:

  1. 设置状态为 STOP;

  2. 中断全部工做线程,不管是不是空闲的;

  3. 取出阻塞队列中没有被执行的任务并返回。

shutdownNow 方法执行完以后调用 tryTerminate 方法,该方法在上文已经分析过了,目的就是使线程池的状态设置为 TERMINATED。


 

线程池的监控

经过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可使用

  • getTaskCount:线程池已经执行的和未执行的任务总数;

  • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于 taskCount;

  • getLargestPoolSize:线程池曾经建立过的最大线程数量。经过这个数据能够知道线程池是否满过,也就是达到了maximumPoolSize;

  • getPoolSize:线程池当前的线程数量;

  • getActiveCount:当前线程池中正在执行任务的线程数量。

经过这些方法,能够对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute 方法,afterExecute 方法和 terminated 方法,能够扩展这些方法在执行前或执行后增长一些新的操做,例如统计线程池的执行任务的时间等,能够继承自 ThreadPoolExecutor 来进行扩展。

到此,关于 ThreadPoolExecutor 的内容就讲完了。

  

 参考文献

Java中线程池ThreadPoolExecutor原理探究

【Java】 之ThreadPoolExcutor源码浅析

线程池ThreadPoolExecutor实现原理

深刻理解Java线程池:ThreadPoolExecutor

相关文章
相关标签/搜索