Java并发(三)线程池原理

Java中的线程池是运用场景最多的并发框架,几乎全部须要异步或并发执行任务的程序均可以使用线程池。在开发过程当中,合理地使用线程池可以带来3个好处。java

1. 下降资源消耗。经过重复利用已建立的线程下降线程建立和销毁形成的消耗;安全

2. 提升响应速度。当任务到达时,任务能够不须要等到线程建立就能当即执行;多线程

3. 提升线程的可管理性。线程是稀缺资源,若是无限制地建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够进行统一分配、调优和监控。可是,要作到合理利用线程池,必须对其实现原理了如指掌。并发

线程池实现原理

当向线程池提交一个任务以后,线程池是如何处理这个任务的呢?本节来看一下线程池的主要处理流程,处理流程图以下图所示:框架

从图中能够看出,当提交一个新任务到线程池时,线程池的处理流程以下。less

1. 线程池判断核心线程池里的线程是否都在执行任务。若是不是,则建立一个新的工做线程来执行任务。若是核心线程池里的线程都在执行任务,则进入下个流程。异步

2. 线程池判断工做队列是否已经满。若是工做队列没有满,则将新提交的任务存储在这个工做队列里。若是工做队列满了,则进入下个流程。ide

3. 线程池判断线程池的线程是否都处于工做状态。若是没有,则建立一个新的工做线程来执行任务。若是已经满了,则交给饱和策略来处理这个任务。oop

ThreadPoolExecutor执行execute()方法的示意图,以下图所示源码分析

 

ThreadPoolExecutor执行execute方法分下面4种状况。

1)若是当前运行的线程少于corePoolSize,则建立新线程来执行任务(注意,执行这一步骤须要获取全局锁)。

2)若是运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

3)若是没法将任务加入BlockingQueue(队列已满),则建立新的线程来处理任务(注意,执行这一步骤须要获取全局锁)。

4)若是建立新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采起上述步骤的整体设计思路,是为了在执行execute()方法时,尽量地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热以后(当前运行的线程数大于等于corePoolSize),几乎全部的execute()方法调用都是执行步骤2,而步骤2不须要获取全局锁。

源码分析:上面的流程分析让咱们很直观地了解了线程池的工做原理,让咱们再经过源代码来看看是如何实现的。

1、变量

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * The main pool control state, ctl, is an atomic integer packing
     * two conceptual fields
     *   workerCount, indicating the effective number of threads
     *   runState,    indicating whether running, shutting down etc
     *
     * In order to pack them into one int, we limit workerCount to
     * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
     * billion) otherwise representable. If this is ever an issue in
     * the future, the variable can be changed to be an AtomicLong,
     * and the shift/mask constants below adjusted. But until the need
     * arises, this code is a bit faster and simpler using an int.
     *
     * The workerCount is the number of workers that have been
     * permitted to start and not permitted to stop.  The value may be
     * transiently different from the actual number of live threads,
     * for example when a ThreadFactory fails to create a thread when
     * asked, and when exiting threads are still performing
     * bookkeeping before terminating. The user-visible pool size is
     * reported as the current size of the workers set.
     *
     * The runState provides the main lifecycle control, taking on values:
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     *
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
     *
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
     *
     * Detecting the transition from SHUTDOWN to TIDYING is less
     * straightforward than you'd like because the queue may become
     * empty after non-empty and vice versa during SHUTDOWN state, but
     * we can only terminate if, after seeing that it is empty, we see
     * that workerCount is 0 (which sometimes entails a recheck -- see
     * below).
     */
    /**
     * ctl 为原子类型的变量, 有两个概念
     * workerCount, 表示有效的线程数
     * runState, 表示线程状态, 是否正在运行, 关闭等
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 容量 2²⁹-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // 线程池的五种状态
    // 即高3位为111, 接受新任务并处理排队任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 即高3位为000, 不接受新任务, 但处理排队任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 即高3位为001, 不接受新任务, 不处理排队任务, 并中断正在进行的任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 即高3位为010, 全部任务都已终止, 工做线程为0, 线程转换到状态TIDYING, 将运行terminate()钩子方法
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 即高3位为011, 标识terminate()已经完成
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl 用来计算线程的方法
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    ... ...
}

 ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里能够看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

下面再介绍下线程池的运行状态,线程池一共有五种状态,分别是:

状态 描述
RUNNING  能接受新提交的任务,而且也能处理阻塞队列中的任务
SHUTDOWN 关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程当中也会调用shutdown()方法进入该状态)
STOP 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态
TIDYING 若是全部的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态
TERMINATED 在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有作

进入TERMINATED的条件以下:

  • 线程池不是RUNNING状态;
  • 线程池状态不是TIDYING状态或TERMINATED状态;
  • 若是线程池状态是SHUTDOWN而且workerQueue为空;
  • workerCount为0;
  • 设置TIDYING状态成功。

下图为线程池的状态转换过程:

计算线程的几个方法:

方法 描述
runStateOf 获取运行状态
workerCountOf 获取活动线程数
ctlOf 获取运行状态和活动线程数的值

 

2、execute方法

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    // 空则抛出异常
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    /*
     * 获取当前线程池的状态
     * clt记录着runState和workerCount
     *
     */
    int c = ctl.get();
    /*
     * 计算工做线程数 并判断是否小于核心线程数
     * workerCountOf方法取出低29位的值,表示当前活动的线程数;
     * 若是当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
     * 并把任务添加到该线程中。
     *
     */
    if (workerCountOf(c) < corePoolSize) {
        // addWorker提交任务, 提交成功则结束
        /*
         * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断仍是maximumPoolSize来判断;
         * 若是为true,根据corePoolSize来判断;
         * 若是为false,则根据maximumPoolSize来判断
         */
        if (addWorker(command, true))
            return;
        // 提交失败再次获取当前状态
        c = ctl.get();
    }
    // 判断线程状态, 并插入队列, 失败则移除
    /*
     * 若是当前线程池是运行状态而且任务添加到队列成功
     */
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次获取状态
        int recheck = ctl.get();
        // 若是状态不是RUNNING, 并移除失败
        /*
         * 再次判断线程池的运行状态,若是不是运行状态,因为以前已经把command添加到workQueue中了,
         * 这时须要移除该command
         * 执行事后经过handler使用拒绝策略对该任务进行处理,整个方法返回
         */
        if (! isRunning(recheck) && remove(command))
            // 调用拒绝策略
            reject(command);
        // 若是工做线程为0 则调用 addWorker
        /*
         * 获取线程池中的有效线程数,若是数量是0,则执行addWorker方法
         * 这里传入的参数表示:
         * 1. 第一个参数为null,表示在线程池中建立一个线程,但不去启动;
         * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
         * 若是判断workerCount大于0,则直接返回,在workQueue中新增的command会在未来的某个时刻被执行。
         */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 提交任务失败 走拒绝策略
    /*
     * 若是执行到这里,有两种状况:
     * 1. 线程池已经不是RUNNING状态;
     * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize而且workQueue已满。
     * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
     * 若是失败则拒绝该任务
     */
    else if (!addWorker(command, false))
        reject(command);
}

简单来讲,在执行 execute() 方法时若是状态一直是RUNNING时,的执行过程以下:

  1. 若是workerCount < corePoolSize,则建立并启动一个线程来执行新提交的任务;
  2. 若是workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 若是workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则建立并启动一个线程来执行新提交的任务;
  4. 若是workerCount >= maximumPoolSize,而且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

这里要注意一下 addWorker(null, false) ,也就是建立一个线程,但并无传入任务,由于任务已经被添加到workQueue中了,因此worker在执行的时候,会直接从workQueue中获取任务。因此,在 workerCountOf(recheck) == 0 时执行 addWorker(null, false) 也是为了保证线程池在RUNNING状态下必需要有一个线程来执行任务。

execute方法执行流程以下:

3、addWorker方法

addWorker方法的主要工做是在线程池中建立一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前须要判断当前活动线程数是否少于maximumPoolSize,代码以下:

/**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread.start()), we roll back cleanly.
 *
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).
 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.
 *
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).
 * @return true if successful
 */
/**
 * 检查任务是否能够提交
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外层循环
    for (;;) {
        // 获取运行状态
        int c = ctl.get();
        int rs = runStateOf(c);

        /*
         * 这个if判断
         * 若是rs >= SHUTDOWN,则表示此时再也不接收新任务;
         * 接着判断如下3个条件,只要有1个不知足,则返回false:
         * 1. rs == SHUTDOWN,这时表示关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保存的任务
         * 2. firsTask为空
         * 3. 阻塞队列不为空
         * 
         * 首先考虑rs == SHUTDOWN的状况
         * 这种状况下不会接受新提交的任务,因此在firstTask不为空的时候会返回false;
         * 而后,若是firstTask为空,而且workQueue也为空,则返回false,
         * 由于队列中已经没有任务了,不须要再添加线程了
         */
        // Check if queue empty only if necessary. 检查线程池是否关闭
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        // 内层循环
        for (;;) {
            // 获取线程数
            int wc = workerCountOf(c);
            // 工做线程大于容量 或者大于 核心或最大线程数
            /*
             * 若是wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
             * 这里的core是addWorker方法的第二个参数,若是为true表示根据corePoolSize来比较,
             * 若是为false则根据maximumPoolSize来比较。
             */
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // CAS 线程数增长, 成功则调到外层循环
            /*
             * 尝试增长workerCount,若是成功,则跳出第一个for循环
             */
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 若是增长workerCount失败,则从新获取ctl的值
            c = ctl.get();  // Re-read ctl
            // 若是当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /**
     * 建立新worker 开始新线程
     */
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根据firstTask来建立Worker对象
        w = new Worker(firstTask);
        // 每个Worker对象都会建立一个线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                /*
                 * rs < SHUTDOWN表示是RUNNING状态;
                 * 若是rs是RUNNING状态或者rs是SHUTDOWN状态而且firstTask为null,向线程池中添加线程。
                 * 由于在SHUTDOWN时不会在添加新的任务,但仍是会执行workQueue中的任务
                 */
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 判断线程是否存活, 已存活抛出非法异常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //  设置包含池中的全部工做线程。仅在持有mainLock时访问 workers是 HashSet 集合
                    workers.add(w);
                    int s = workers.size();
                    // 设置池最大大小, 并将 workerAdded设置为 true
                    // largestPoolSize记录着线程池中出现过的最大线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                // 解锁
                mainLock.unlock();
            }
            // 添加成功 开始启动线程 并将 workerStarted 设置为 true
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 启动线程失败
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

注意一下这里的 t.start() 这个语句,启动时会调用Worker类中的run方法,Worker自己实现了Runnable接口,因此一个Worker类型的对象也是一个线程。

4、Worker类

工做线程:线程池建立线程时,会将线程封装成工做线程Worker,接下来看看源码:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker类继承了AQS,并实现了Runnable接口,注意其中的 firstTask 和 thread 属性: firstTask 用它来保存传入的任务; thread 是在调用构造方法时经过 ThreadFactory 来建立的线程,是用来处理任务的线程。

在调用构造方法时,须要把任务传入,这里经过 getThreadFactory().newThread(this)来新建一个线程, newThread 方法传入的参数是this,由于Worker自己继承了Runnable接口,也就是一个线程,因此一个Worker对象在启动的时候会调用Worker类中的run方法。

Worker继承了AQS,使用AQS来实现独占锁的功能。为何不使用ReentrantLock来实现呢?能够看到tryAcquire方法,它是不容许重入的,而ReentrantLock是容许重入的:

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
  2. 若是正在执行任务,则不该该中断线程;
  3. 若是该线程如今不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时能够对该线程进行中断;
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是不是空闲状态;
  5. 之因此设置为不可重入,是由于咱们不但愿任务在调用像setCorePoolSize这样的线程池控制方法时从新获取锁。若是使用ReentrantLock,它是可重入的,这样若是在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

因此,Worker继承自AQS,用于判断线程是否空闲以及是否能够被中断。

此外,在构造方法中执行了 setState(-1) ,把state变量设置为-1,为何这么作呢?是由于AQS中默认的state是0,若是刚建立了一个Worker对象,尚未执行任务时,这时就不该该被中断,看一下tryAquire方法:

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }

tryAcquire方法是根据state是不是0来判断的,因此,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。

正由于如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。

5、runWorker方法

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码以下:

/**
 * Main worker run loop.  Repeatedly gets tasks from queue and
 * executes them, while coping with a number of issues:
 *
 * 1. We may start out with an initial task, in which case we
 * don't need to get the first one. Otherwise, as long as pool is
 * running, we get tasks from getTask. If it returns null then the
 * worker exits due to changed pool state or configuration
 * parameters.  Other exits result from exception throws in
 * external code, in which case completedAbruptly holds, which
 * usually leads processWorkerExit to replace this thread.
 *
 * 2. Before running any task, the lock is acquired to prevent
 * other pool interrupts while the task is executing, and then we
 * ensure that unless pool is stopping, this thread does not have
 * its interrupt set.
 *
 * 3. Each task run is preceded by a call to beforeExecute, which
 * might throw an exception, in which case we cause thread to die
 * (breaking loop with completedAbruptly true) without processing
 * the task.
 *
 * 4. Assuming beforeExecute completes normally, we run the task,
 * gathering any of its thrown exceptions to send to afterExecute.
 * We separately handle RuntimeException, Error (both of which the
 * specs guarantee that we trap) and arbitrary Throwables.
 * Because we cannot rethrow Throwables within Runnable.run, we
 * wrap them within Errors on the way out (to the thread's
 * UncaughtExceptionHandler).  Any thrown exception also
 * conservatively causes thread to die.
 *
 * 5. After task.run completes, we call afterExecute, which may
 * also throw an exception, which will also cause thread to
 * die. According to JLS Sec 14.20, this exception is the one that
 * will be in effect even if task.run throws.
 *
 * The net effect of the exception mechanics is that afterExecute
 * and the thread's UncaughtExceptionHandler have as accurate
 * information as we can provide about any problems encountered by
 * user code.
 *
 * @param w the worker
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 获取第一个任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 容许中断
    w.unlock(); // allow interrupts
    // 是否由于异常退出循环
    boolean completedAbruptly = true;
    try {
        // 若是task为空,则经过getTask来获取任务
        // getTask()方法循环获取工做队列的任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

这里说明一下第一个if判断,目的是:

  • 若是线程池正在中止,那么要保证当前线程是中断状态;
  • 若是不是的话,则要保证当前线程不是中断状态;

这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP,回顾一下STOP状态:

不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。

STOP状态要中断线程池中的全部线程,而这里使用 Thread.interrupted() 来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,由于 Thread.interrupted() 方法会复位中断的状态。

总结一下runWorker方法的执行过程:

  1. while循环不断地经过getTask()方法获取任务;
  2. getTask()方法从阻塞队列中取任务;
  3. 若是线程池正在中止,那么要保证当前线程是中断状态,不然要保证当前线程不是中断状态;
  4. 调用task.run()执行任务;
  5. 若是task为null则跳出循环,执行processWorkerExit()方法;
  6. runWorker方法执行完毕,也表明着Worker中的run方法执行完毕,销毁线程。

这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。

completedAbruptly变量来表示在执行任务过程当中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。

6、getTask方法

getTask方法用来从阻塞队列中取任务,代码以下:

/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait, and if the queue is
 *    non-empty, this worker is not the last thread in the pool.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
private Runnable getTask() {
    // timeOut变量的值表示上次从阻塞队列中取任务时是否超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /*
         * 若是线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行如下判断:
         * 1. rs >= STOP,线程池是否正在stop;
         * 2. 阻塞队列是否为空。
         * 若是以上条件知足,则将workerCount减1并返回null。
         * 由于若是当前线程池状态的值是SHUTDOWN或以上时,不容许再向阻塞队列中添加任务。
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 容许核心线程超时 或者当前线程数大于核心线程数
        /* timed变量用于判断是否须要进行超时控制。
         * allowCoreThreadTimeOut默认是false,也就是核心线程不容许进行超时;
         * wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
         * 对于超过核心线程数量的这些线程,须要进行超时控制
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        /*
         * wc > maximumPoolSize的状况是由于可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
         * timed && timedOut 若是为true,表示当前操做须要进行超时控制,而且上次从阻塞队列中获取任务发生了超时
         * 接下来判断,若是有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
         * 若是减1失败,则返回重试。
         * 若是wc == 1时,也就说明当前线程是线程池中惟一的一个线程了。
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /*
             * 根据timed来判断,若是为true,则经过阻塞队列的poll方法进行超时控制,若是在keepAliveTime时间内没有获取到任务,则返回null;
             * 不然经过take方法,若是这时队列为空,则take方法会阻塞直到队列不为空。
             * 
             */
            Runnable r = timed ?
                    // 从工做队列poll任务,不阻塞
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    // 阻塞等待任务
                workQueue.take();
            if (r != null)
                return r;
            // 若是 r == null,说明已经超时,timedOut设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 若是获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
            timedOut = false;
        }
    }
}

这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析能够知道,在执行execute方法时,若是当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,而且workQueue已满时,则能够增长工做线程,但这时若是超时没有获取到任务,也就是timedOut为true的状况,说明workQueue已经为空了,也就说明了当前线程池中不须要那么多线程来执行任务了,能够把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize便可。

何时会销毁?固然是runWorker方法执行完以后,也就是Worker中的run方法执行完,由JVM自动回收。

getTask方法返回null时,在runWorker方法中会跳出while循环,而后会执行processWorkerExit方法。

ThreadPoolExecutor中线程执行任务的示意图以下图所示。

线程池中的线程执行任务分两种状况,以下。

1)在execute()方法中建立一个线程时,会让这个线程执行当前任务。

2)这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行。

7、processWorkerExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 若是completedAbruptly值为true,则说明线程执行时出现了异常,须要将workerCount减1;
    // 若是线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操做,这里就没必要再减了。  
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //统计完成的任务数
        completedTaskCount += w.completedTasks;
        // 从workers中移除,也就表示着从线程池中移除了一个工做线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 根据线程池状态进行判断是否结束线程池
    tryTerminate();
    int c = ctl.get();
    /*
     * 当线程池是RUNNING或SHUTDOWN状态时,若是worker是异常结束,那么会直接addWorker;
     * 若是allowCoreThreadTimeOut=true,而且等待队列有任务,至少保留一个worker;
     * 若是allowCoreThreadTimeOut=false,workerCount很多于corePoolSize。
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此,processWorkerExit执行完以后,工做线程被销毁,以上就是整个工做线程的生命周期,从execute方法开始,Worker使用ThreadFactory建立新的工做线程,runWorker经过getTask获取任务,而后执行任务,若是getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:

8、tryTerminate方法

tryTerminate方法根据线程池状态进行判断是否结束线程池,代码以下:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        /*
         * 当前线程池的状态为如下几种状况时,直接返回:
         * 1. RUNNING,由于还在运行中,不能中止;
         * 2. TIDYING或TERMINATED,由于线程池中已经没有正在运行的线程了;
         * 3. SHUTDOWN而且等待队列非空,这时要执行完workQueue中的task;
         */
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 若是线程数量不为0,则中断一个空闲的工做线程,并返回
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 这里尝试设置状态为TIDYING,若是设置成功,则调用terminated方法
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // terminated方法默认什么都不作,留给子类实现
                    terminated();
                } finally {
                    // 设置状态为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

 interruptIdleWorkers(ONLY_ONE); 的做用是由于在getTask方法中执行 workQueue.take() 时,若是不执行中断会一直阻塞。在下面介绍的shutdown方法中,会中断全部空闲的工做线程,若是在执行shutdown时工做线程没有空闲,而后又去调用了getTask方法,这时若是workQueue中没有任务了,调用 workQueue.take() 时就会一直阻塞。因此每次在工做线程结束时调用tryTerminate方法来尝试中断一个空闲工做线程,避免在队列为空时取任务一直阻塞的状况。

9、shutdown方法

shutdown方法要将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers方法请求中断全部空闲的worker,最后调用tryTerminate尝试结束线程池。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 安全策略判断
        checkShutdownAccess();
        // 切换状态为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断空闲线程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 尝试结束线程池
    tryTerminate();
}

这里思考一个问题:在runWorker方法中,执行任务时对Worker对象w进行了lock操做,为何要在执行任务的时候对每一个工做线程都加锁呢?

下面仔细分析一下:

  • 在getTask方法中,若是这时线程池的状态是SHUTDOWN而且workQueue为空,那么就应该返回null来结束这个工做线程,而使线程池进入SHUTDOWN状态须要调用shutdown方法;
  • shutdown方法会调用interruptIdleWorkers来中断空闲的线程,interruptIdleWorkers持有mainLock,会遍历workers来逐个判断工做线程是否空闲。但getTask方法中没有mainLock;
  • 在getTask中,若是判断当前线程池状态是RUNNING,而且阻塞队列为空,那么会调用 workQueue.take() 进行阻塞;
  • 若是在判断当前线程池状态是RUNNING后,这时调用了shutdown方法把状态改成了SHUTDOWN,这时若是不进行中断,那么当前的工做线程在调用了 workQueue.take() 后会一直阻塞而不会被销毁,由于在SHUTDOWN状态下不容许再有新的任务添加到workQueue中,这样一来线程池永远都关闭不了了;
  • 由上可知,shutdown方法与getTask方法(从队列中获取任务时)存在竞态条件;
  • 解决这一问题就须要用到线程的中断,也就是为何要用interruptIdleWorkers方法。在调用 workQueue.take() 时,若是发现当前线程在执行以前或者执行期间是中断状态,则会抛出InterruptedException,解除阻塞的状态;
  • 可是要中断工做线程,还要判断工做线程是不是空闲的,若是工做线程正在处理任务,就不该该发生中断;
  • 因此Worker继承自AQS,在工做线程处理任务时会进行lock,interruptIdleWorkers在进行中断时会使用tryLock来判断该工做线程是否正在处理任务,若是tryLock返回true,说明该工做线程当前未执行任务,这时才能够被中断。

下面就来分析一下interruptIdleWorkers方法。

10、interruptIdleWorkers方法

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers遍历workers中全部的工做线程,若线程没有被中断tryLock成功,就中断该线程。

为何须要持有mainLock?由于workers是HashSet类型的,不能保证线程安全。

11、shutdownNow方法

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        // 中断全部工做线程,不管是否空闲
        interruptWorkers();
        // 取出队列中没有被执行的任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

shutdownNow方法与shutdown方法相似,不一样的地方在于:

  1. 设置状态为STOP;
  2. 中断全部工做线程,不管是不是空闲的;
  3. 取出阻塞队列中没有被执行的任务并返回。

shutdownNow方法执行完以后调用tryTerminate方法,该方法在上文已经分析过了,目的就是使线程池的状态设置为TERMINATED。

 

参考:深刻理解Java线程池:ThreadPoolExecutor

相关文章
相关标签/搜索