线程池源码分析——ThreadPoolExecutor

序言

咱们知道,线程池帮咱们重复管理线程,避免建立大量的线程增长开销。
合理的使用线程池可以带来3个很明显的好处:
1.下降资源消耗:经过重用已经建立的线程来下降线程建立和销毁的消耗
2.提升响应速度:任务到达时不须要等待线程建立就能够当即执行。
3.提升线程的可管理性:线程池能够统一管理、分配、调优和监控。
java源生的线程池,实现于ThreadPoolExecutor类,这也是咱们今天讨论的重点java

ThreadPoolExecutor类构造方法

Jdk使用ThreadPoolExecutor类来建立线程池,咱们来看看它的构造方法。安全

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • int corePoolSize, //核心线程的数量
  • int maximumPoolSize, //最大线程数量
  • long keepAliveTime, //超出核心线程数量之外的线程空闲时,线程存活的时间
  • TimeUnit unit, //存活时间的单位,有以下几种选择并发

    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
  • BlockingQueue<Runnable> workQueue, //保存待执行任务的队列,常见的也有以下几种:less

    ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;
    PriorityBlockingQueue
  • ThreadFactory threadFactory, //建立新线程使用的工厂
  • RejectedExecutionHandler handler // 当任务没法执行时的处理器(线程拒绝策略)

核心类变量

ctl变量

ThreadPoolExecutor中有一个控制状态的属性叫ctl,它是一个AtomicInteger类型的变量,它一个int值能够储存两个概念的信息:ide

  • workerCount:代表当前池中有效的线程数,经过workerCountOf方法得到,workerCount上限是(2^29)-1。(最后存放在ctl的低29bit)
  • runState:代表当前线程池的状态,经过workerCountOf方法得到,最后存放在ctl的高3bit中,他们是整个线程池的运行生命周期,有以下取值,分别的含义是:函数

    1. RUNNING:能够新加线程,同时能够处理queue中的线程。线程池的初始化状态是RUNNING。换句话说,线程池被一旦被建立,就处于RUNNING状态,
    2. SHUTDOWN:不增长新线程,可是处理queue中的线程。调用线程池的shutdown()方法时,线程池由RUNNING -> SHUTDOWN。
    3. STOP 不增长新线程,同时不处理queue中的线程。调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
    4. TIDYING 当全部的任务已终止,ctl记录的”任务数量”为0,阻塞队列为空,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;能够经过重载terminated()函数来实现。
    5. TERMINATED 线程池完全终止,就变成TERMINATED状态。线程池处在TIDYING状态时,执行完terminated()以后,就会由 TIDYING -> TERMINATED。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

COUNT_BITS=32(integer的size)-3=29,因而五种状态左移29位分别是:oop

  • RUNNING: 11100000000000000000000000000000
  • SHUTDOWN: 00000000000000000000000000000000
  • STOP: 00100000000000000000000000000000
  • TIDYING: 01000000000000000000000000000000
  • TERMINATED:01100000000000000000000000000000

而ThreadPoolExecutor是经过runStateOf和workerCountOf得到者两个概念的值的。ui

runStateOf和workerCountOf方法是如何剥离出ctl变量的两个有效值呢?这其中咱们能够看到CAPACITY是实现一个字段存两个值的最重要的字段。this

CAPACITY变量

CAPACITY=(1 << COUNT_BITS) – 1 转成二进制为:000 11111111111111111111111111111,他是线程池理论上能够容许的最大的线程数。
因此很明显,它的重点在于,其高3bit为0,低29bit为1;
这样,workderCountOf方法中,CAPACITY和ctl进行&运算时,它能得到高3位都是0,低29位和ctl低29位相同的值,这个值就是workerCount
同理,runStateOf方法,CAPACITY的取反和ctl进行&操做,得到高3位和ctl高三位相等,低29位都为0的值,这个值就是runStateatom

workQueue

/**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     */
    private final BlockingQueue<Runnable> workQueue;

一个BlockingQueue<Runnable>队列,自己的结构能够保证访问的线程安全(这里不展开了)。这是一个排队等待队列。当咱们线程池里线程达到corePoolSize的时候,一些须要等待执行的线程就放在这个队列里等待。

workers

/**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

一个HashSet<Worker>的集合。线程池里全部能够当即执行的线程都放在这个集合里。这也是咱们直观理解的线程的池子

mainLock

private final ReentrantLock mainLock = new ReentrantLock();

mainLock是线程池的主锁,是可重入锁,当要操做workers set这个保持线程的HashSet时,须要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时须要先获取mainLock

其余重要属性

private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
 
private long completedTaskCount;   //用来记录已经执行完毕的任务个数

private volatile boolean allowCoreThreadTimeOut;   //是否容许为核心线程设置存活时间

核心内部类

Worker

Worker类是线程池中具化一个线程的对象,是线程池的核心,咱们来看看源码:

/**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            //设置AQS的同步状态private volatile int state,是一个计数器,大于0表明锁已经被获取
            // 在调用runWorker()前,禁止interrupt中断,在interruptIfStarted()方法中会判断 getState()>=0
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);//根据当前worker建立一个线程对象
            //当前worker自己就是一个runnable任务,也就是不会用参数的firstTask建立线程,而是调用当前worker.run()时调用firstTask.run()
            //后面在addworker中,咱们会启动worker对象中组合的Thread,而咱们的执行逻辑runWorker方法是在worker的run方法中被调用。
            //为何执行thread的run方法会调用worker的run方法呢,缘由就是在这里进行了注入,将worker自己this注入到了thread中
        }
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }//runWorker()是ThreadPoolExecutor的方法

        // Lock methods
        //
        // The value 0 represents the unlocked state. 0表明“没被锁定”状态
        // The value 1 represents the locked state. 1表明“锁定”状态
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        /**
         * 尝试获取锁
         * 重写AQS的tryAcquire(),AQS原本就是让子类来实现的
         */
        protected boolean tryAcquire(int unused) {
            //尝试一次将state从0设置为1,即“锁定”状态,但因为每次都是state 0->1,而不是+1,那么说明不可重入
            //且state==-1时也不会获取到锁
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        /**
         * 尝试释放锁
         * 不是state-1,而是置为0
         */
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        /**
         * 中断(若是运行)
         * shutdownNow时会循环对worker线程执行
         * 且不须要获取worker锁,即便在worker运行时也能够中断
         */
        void interruptIfStarted() {
            Thread t;
            //若是state>=0、t!=null、且t没有被中断
            //new Worker()时state==-1,说明不能中断
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

咱们看worker类时,会发现最重要的几个部分在于它里面定义了一个Thread thread和Runnable firstTask。看到这里,咱们可能会比较奇怪,咱们只是要一个能够执行的线程,这里放一个Thread和一个Runnable的变量作什么呢?
其实之因此Worker本身实现Runnable,并建立Thread,在firstTask外包一层,是由于要经过Worker负责控制中断,而firstTask这个工做任务只是负责执行业务,worker的run方法调用了runWorker方法,在这里面,worker里的firstTask的run方法被执行。稍后咱们会聚焦这个执行任务的runWorker方法。

核心方法

好了,基本上咱们将线程池的几个主角,ctl,workQueue,workers,Worker简单介绍了一遍,如今,咱们来看看线程池是怎么玩的。

线程的运行

execute方法

这是线程池实现类外露供给外部实现提交线程任务command的核心方法,对于无需了解线程池内部的使用者来讲,这个方法就是把某个任务交给线程池,正常状况下,这个任务会在将来某个时刻被执行,实现和注释以下:

/**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     * * 在将来的某个时刻执行给定的任务。这个任务用一个新线程执行,或者用一个线程池中已经存在的线程执行
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     * 若是任务没法被提交执行,要么是由于这个Executor已经被shutdown关闭,要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 若是运行的线程少于corePoolSize,尝试开启一个新线程去运行command,command做为这个线程的第一个任务
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *  若是任务成功放入队列,咱们仍须要一个双重校验去确认是否应该新建一个线程(由于可能存在有些线程在咱们上次检查后死了)
         *  或者 从咱们进入这个方法后,pool被关闭了
         *  因此咱们须要再次检查state,若是线程池中止了须要回滚入队列,若是池中没有线程了,新开启 一个线程
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 若是没法将任务入队列(可能队列满了),须要新开区一个线程(本身:往maxPoolSize发展)
        * 若是失败了,说明线程池shutdown 或者 饱和了,因此咱们拒绝任务
         */
        int c = ctl.get();
        // 一、若是当前线程数少于corePoolSize(多是因为addWorker()操做已经包含对线程池状态的判断,如此处没加,而入workQueue前加了)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;

            /**
             * 没有成功addWorker(),再次获取c(凡是须要再次用ctl作判断时,都会再次调用ctl.get())
             * 失败的缘由多是:
             * 一、线程池已经shutdown,shutdown的线程池再也不接收新任务
             * 二、workerCountOf(c) < corePoolSize 判断后,因为并发,别的线程先建立了worker线程,致使workerCount>=corePoolSize
             */
            c = ctl.get();
        }
        /**
         * 二、若是线程池RUNNING状态,且入队列成功
         */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();

            /**
             * 再次校验放入workerQueue中的任务是否能被执行
             * 一、若是线程池不是运行状态了,应该拒绝添加新任务,从workQueue中删除任务
             * 二、若是线程池是运行状态,或者从workQueue中删除任务失败(恰好有一个线程执行完毕,并消耗了这个任务),
             * 确保还有线程执行任务(只要有一个就够了)
             */
            //若是再次校验过程当中,线程池不是RUNNING状态,而且remove(command)--workQueue.remove()成功,拒绝当前command
            if (! isRunning(recheck) && remove(command))
                reject(command);

            //若是当前worker数量为0,经过addWorker(null, false)建立一个线程,其任务为null
            //为何只检查运行的worker数量是否是0呢?? 为何不和corePoolSize比较呢??
            //只保证有一个worker线程能够从queue中获取任务执行就好了??
            //由于只要还有活动的worker线程,就能够消费workerQueue中的任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);//第一个参数为null,说明只为新建一个worker线程,没有指定firstTask
                                       ////第二个参数为true表明占用corePoolSize,false占用maxPoolSize
        }
        /**
         * 三、若是线程池不是running状态 或者 没法入队列
         *   尝试开启新线程,扩容至maxPoolSize,若是addWork(command, false)失败了,拒绝当前command
         */
        else if (!addWorker(command, false))
            reject(command);
    }

咱们能够简单概括以下(注:图来源见水印,谢谢大神的概括):
clipboard.png

addWorker

在execute方法中,咱们看到核心的逻辑是由addWorker方法来实现的,当咱们将一个任务提交给线程池,线程池会如何处理,就是主要由这个方法加以规范:

clipboard.png

该方法有两个参数:

  1. firstTask: worker线程的初始任务,能够为空
  2. core: true:将corePoolSize做为上限,false:将maximumPoolSize做为上限

排列组合,addWorker方法有4种传参的方式:

一、addWorker(command, true)
二、addWorker(command, false)
三、addWorker(null, false)
四、addWorker(null, true)

在execute方法中就使用了前3种,结合这个核心方法进行如下分析

第一个:线程数小于corePoolSize时,放一个须要处理的task进Workers Set。若是Workers Set长度超过corePoolSize,就返回false
第二个:当队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。若是线程池也满了的话就返回false
第三个:放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,这样就至关于建立了一个新的线程,只是没有立刻分配任务
第四个:这个方法就是放一个null的task进Workers Set,并且是在小于corePoolSize时,若是此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行
/**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     * 检查根据当前线程池的状态和给定的边界(core or maximum)是否能够建立一个新的worker
     * 若是是这样的话,worker的数量作相应的调整,若是可能的话,建立一个新的worker并启动,参数中的firstTask做为worker的第一个任务
     * 若是方法返回false,可能由于pool已经关闭或者调用过了shutdown
     * 若是线程工厂建立线程失败,也会失败,返回false
     * 若是线程建立失败,要么是由于线程工厂返回null,要么是发生了OutOfMemoryError
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        //外层循环,负责判断线程池状态
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            /**
             * 线程池的state越小越是运行状态,runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3
             * 要想这个if为true,线程池state必须已经至少是shutdown状态了
             * 这时候如下3个条件任意一个是false都会进入if语句,即没法addWorker():
             *   1,rs == SHUTDOWN         (隐含:rs>=SHUTDOWN)false状况: 线程池状态已经超过shutdown,
             *                               多是stop、tidying、terminated其中一个,即线程池已经终止
             *  2,firstTask == null      (隐含:rs==SHUTDOWN)false状况: firstTask不为空,rs==SHUTDOWN 且 firstTask不为空,
             *                               return false,场景是在线程池已经shutdown后,还要添加新的任务,拒绝
             *  3,! workQueue.isEmpty()  (隐含:rs==SHUTDOWN,firstTask==null)false状况: workQueue为空,
             *                               当firstTask为空时是为了建立一个没有任务的线程,再从workQueue中获取任务,
             *                               若是workQueue已经为空,那么就没有添加新worker线程的必要了
             * return false,
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //内层循环,负责worker数量+1
            for (;;) {
                int wc = workerCountOf(c);
                //入参core在这里起做用,表示加入的worker是加入corePool仍是非corepool,换句话说,受到哪一个size的约束
                //若是worker数量>线程池最大上限CAPACITY(即便用int低29位能够容纳的最大值)
                //或者( worker数量>corePoolSize 或  worker数量>maximumPoolSize ),即已经超过了给定的边界,不添加worker
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS尝试增长线程数,,若是成功加了wc,那么break跳出检查
                //若是失败,证实有竞争,那么从新到retry。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //若是不成功,从新获取状态继续检查
                c = ctl.get();  // Re-read ctl
                //若是状态不等于以前获取的state,跳出内层循环,继续去外层循环判断
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                // else CAS失败时由于workerCount改变了,继续内层循环尝试CAS对worker数量+1
            }
        }
         //worker数量+1成功的后续操做
         // 添加到workers Set集合,并启动worker线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //新建worker//构造方法作了三件事//一、设置worker这个AQS锁的同步状态state=-1
            w = new Worker(firstTask);  //二、将firstTask设置给worker的成员变量firstTask
                                        //三、使用worker自身这个runnable,调用ThreadFactory建立一个线程,并设置给worker的成员变量thread
            final Thread t = w.thread;
            if (t != null) {
                //获取重入锁,而且锁上
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                     // rs!=SHUTDOWN ||firstTask!=null
                     // 若是线程池在运行running<shutdown 或者
                     // 线程池已经shutdown,且firstTask==null(多是workQueue中仍有未执行完成的任务,建立没有初始任务的worker线程执行)
                     // worker数量-1的操做在addWorkerFailed()
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // // precheck that t is startable   线程已经启动,抛非法线程状态异常
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        //设置最大的池大小largestPoolSize,workerAdded设置为true
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {//若是往HashSet中添加worker成功,启动线程
                    //经过t.start()方法正式执行线程。在这里一个线程才算是真正的执行起来了。
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //若是启动线程失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

一样的,咱们能够概括一下:
clipboard.png

runWorker方法

在addWorker方法中,咱们将一个新增进去的worker所组合的线程属性thread启动了,但咱们知道,在worker的构造方法中,它将本身自己注入到了thread的target属性里,因此绕了一圈,线程启动后,调用的仍是worker的run方法,而在这里面,runWorker定义了线程执行的逻辑:

/**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     * 咱们可能使用一个初始化任务开始,即firstTask为null
     * 而后只要线程池在运行,咱们就从getTask()获取任务
     * 若是getTask()返回null,则worker因为改变了线程池状态或参数配置而退出
     * 其它退出由于外部代码抛异常了,这会使得completedAbruptly为true,这会致使在processWorkerExit()方法中替换当前线程
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     * 在任何任务执行以前,都须要对worker加锁去防止在任务运行时,其它的线程池中断操做
     * clearInterruptsForTaskRun保证除非线程池正在stoping,线程不会被设置中断标示
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     * 每一个任务执行前会调用beforeExecute(),其中可能抛出一个异常,这种状况下会致使线程die(跳出循环,且completedAbruptly==true),没有执行任务
     * 由于beforeExecute()的异常没有cache住,会上抛,跳出循环
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        //标识线程是否是异常终止的
        boolean completedAbruptly = true;
        try {
            //task不为null状况是初始化worker时,若是task为null,则去队列中取线程--->getTask()
            //能够看到,只要getTask方法被调用且返回null,那么worker一定被销毁,而肯定一个线程是否应该被销毁的逻辑,在getTask方法中
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //线程开始执行以前执行此方法,能够实现Worker未执行退出,本类中未实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//runWorker方法最本质的存在乎义,就是调用task的run方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //线程执行后执行,能够实现标识Worker异常中断的功能,本类中未实现
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;//运行过的task标null
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //标识线程不是异常终止的,是由于不知足while条件,被迫销毁的
            completedAbruptly = false;
        } finally {
            //处理worker退出的逻辑
            processWorkerExit(w, completedAbruptly);
        }
    }

咱们概括:
clipboard.png

getTask方法

runWorker方法中的getTask()方法是线程处理完一个任务后,从队列中获取新任务的实现,也是处理判断一个线程是否应该被销毁的逻辑所在:

/**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:  如下状况会返回null
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     *    超过了maximumPoolSize设置的线程数量(由于调用了setMaximumPoolSize())
     * 2. The pool is stopped.
     *    线程池被stop
     * 3. The pool is shutdown and the queue is empty.
     *    线程池被shutdown,而且workQueue空了
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait.
     *    线程等待任务超时
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     *         返回null表示这个worker要结束了,这种状况下workerCount-1
     */
    private Runnable getTask() {
        // timedOut 主要是判断后面的poll是否要超时
        boolean timedOut = false; // Did the last poll() time out?

        /**
         * 用于判断线程池状态
         */
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            /**
             * 对线程池状态的判断,两种状况会workerCount-1,而且返回null
             * 1,线程池状态为shutdown,且workQueue为空(反映了shutdown状态的线程池仍是要执行workQueue中剩余的任务的)
             * 2,线程池状态为>=stop(只有TIDYING和TERMINATED会大于stop)(shutdownNow()会致使变成STOP)(此时不用考虑workQueue的状况)
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//循环的CAS减小worker数量,直到成功
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?

            //allowCoreThreadTimeOut字段,表示是否容许核心线程超过闲置时间后被摧毁,默认为false
            //咱们前面说过,若是getTask方法返回null,那么这个worker只有被销毁一途
            //因而这个timed有3种状况
            //(1)当线程数没有超过核心线程数,且默认allowCoreThreadTimeOut为false时
            //          timed值为false。看下面if的判断逻辑,除非目前线程数大于最大值,不然下面的if始终进不去,该方法不可能返回null,worker也就不会被销毁。
            //          由于前提"线程数不超过核心线程数"与"线程数大于最大值"两个命题互斥,因此(1)状况,逻辑进入下面的if(返回null的线程销毁逻辑)的可能性不存在。
            //          也就是说,当线程数没有超过核心线程数时,线程不会被销毁。
            //(2)当当前线程数超过核心线程数,且默认allowCoreThreadTimeOut为false时//timed值为true。
            //(3)若是allowCoreThreadTimeOut为true,则timed始终为true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //wc > maximumPoolSize则必销毁,由于这状况下,wc>1也确定为true
            //wc <= maximumPoolSize,且(timed && timedOut) = true,这种状况下通常也意味着worker要被销毁,由于超时通常是由阻塞队列为空形成的,因此workQueue.isEmpty()也大几率为真,进入if逻辑。
            
            //通常状况是这样,那不通常的状况呢?阻塞队列没有为空,可是由于一些缘由,仍是超时了,这时候取决于wc > 1,它为真就销毁,为假就不销毁。
            // 也就是说,若是阻塞队列还有任务,可是wc=1,线程池里只剩下本身这个线程了,那么就不能销毁,这个if不知足,咱们的代码继续往下走
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //若是timed为true那么使用poll取线程。不然使用take()
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    //workQueue.poll():若是在keepAliveTime时间内,阻塞队列仍是没有任务,返回null
                    workQueue.take();
                    //workQueue.take():若是阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务
                //若是正常返回,那么返回取到的task。
                if (r != null)
                    return r;
                //不然,设为超时,从新执行循环,
                timedOut = true;
            } catch (InterruptedException retry) {
            //在阻塞从workQueue中获取任务时,能够被interrupt()中断,代码中捕获了InterruptedException,重置timedOut为初始值false,再次执行第1步中的判断,知足就继续获取任务,不知足return null,会进入worker退出的流程
                timedOut = false;
            }
        }

概括:

clipboard.png

processWorkerExit方法

在runWorker方法中,咱们看到当不知足while条件后,线程池会执行退出线程的操做,这个操做,就封装在processWorkerExit方法中。

/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //参数:
        //worker:                      要结束的worker
        //completedAbruptly: 是否忽然完成(是否由于异常退出)
        
    /**
     * 一、worker数量-1
     * 若是是忽然终止,说明是task执行时异常状况致使,即run()方法执行时发生了异常,那么正在工做的worker线程数量须要-1
     * 若是不是忽然终止,说明是worker线程没有task可执行了,不用-1,由于已经在getTask()方法中-1了
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 代码和注释正好相反啊
        decrementWorkerCount();
 
    /**
     * 二、从Workers Set中移除worker
     */
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks; //把worker的完成任务数加到线程池的完成任务数
        workers.remove(w); //从HashSet<Worker>中移除
    } finally {
        mainLock.unlock();
    }
 
    /**
     * 三、在对线程池有负效益的操做时,都须要“尝试终止”线程池
     * 主要是判断线程池是否知足终止的状态
     * 若是状态知足,但线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
     * 没有线程了,更新状态为tidying->terminated
     */
    tryTerminate();
 
    /**
     * 四、是否须要增长worker线程
     * 线程池状态是running 或 shutdown
     * 若是当前线程是忽然终止的,addWorker()
     * 若是当前线程不是忽然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
     * 故若是调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,而后再逐渐销毁这corePoolSize个线程
     */
    int c = ctl.get();
    //若是状态是running、shutdown,即tryTerminate()没有成功终止线程池,尝试再添加一个worker
    if (runStateLessThan(c, STOP)) {
        //不是忽然完成的,即没有task任务能够获取而完成的,计算min,并根据当前worker数量判断是否须要addWorker()
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默认为false,即min默认为corePoolSize
             
            //若是min为0,即不须要维持核心线程数量,且workQueue不为空,至少保持一个线程
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
             
            //若是线程数量大于最少数量,直接返回,不然下面至少要addWorker一个
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
         
        //添加一个没有firstTask的worker
        //只要worker是completedAbruptly忽然终止的,或者线程数量小于要维护的数量,就新添一个worker线程,即便是shutdown状态
        addWorker(null, false);
    }
}

总而言之:若是线程池尚未彻底终止,就仍须要保持必定数量的线程。

线程池状态是running 或 shutdown的状况下:

A、若是当前线程是忽然终止的,addWorker()
B、若是当前线程不是忽然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
故若是调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,而后再逐渐销毁这corePoolSize个线程

submit方法

前面咱们讲过execute方法,其做用是将一个任务提交给线程池,以期在将来的某个时间点被执行。
submit方法在做用上,和execute方法是同样的,将某个任务提交给线程池,让线程池调度线程去执行它。
那么它和execute方法有什么区别呢?咱们来看看submit方法的源码:
submit方法的实如今ThreadPoolExecutor的父类AbstractExecutorService类中,有三种重载方法:

/**
     * 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该Future的get方法在成功完成时将会返回null。
     * submit 参数: task - 要提交的任务 返回:表示任务等待完成的 Future
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * 提交一个Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。
     * submit 参数: task - 要提交的任务 result - 完成任务时要求返回的结果 
     * 返回: 表示任务等待完成的 Future
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * 提交一个Callable的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 
方法在成功完成时将会返回该任务的结果。 
     * 若是想当即阻塞任务的等待,则可使用 result = 
exec.submit(aCallable).get(); 形式的构造。
     * 参数: task - 要提交的任务 返回: 表示任务等待完成的Future
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

源码很简单,submit方法,将任务task封装成FutureTask(newTaskFor方法中就是new了一个FutureTask),而后调用execute。因此submit方法和execute的全部区别,都在这FutureTask所带来的差别化实现上

总而言之,submit方法将一个任务task用future模式封装成FutureTask对象,提交给线程执行,并将这个FutureTask对象返回,以供主线程该任务被线程池执行以后获得执行结果

注意,得到执行结果的方法FutureTask.get(),会阻塞执行该方法的线程,尤为是当任务被DiscardPolicy策略和DiscardOldestPolicy拒绝的时候,get方法会一直阻塞在那里,因此咱们最好使用自带超时时间的future。

线程池的关闭

shutdown方法

讲完了线程池的基本运转过程,在方法章的最后,咱们来看看负责线程池生命周期最后收尾工做的几个重要方法,首先是shutdown方法。

/**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     * 开始一个顺序的shutdown操做,shutdown以前被执行的已提交任务,新的任务不会再被接收了。若是线程池已经被shutdown了,该方法的调用没有其余任何效果了。
     * **该方法不会等待以前已经提交的任务执行完毕**,awaitTermination方法才有这个效果。
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
             //判断是否能够操做关闭目标线程。
            checkShutdownAccess();
            //advanceRunState方法,参数:目标状态;做用:一直执行,直到成功利用CAS将状态置为目标值。            
            //设置线程池状态为SHUTDOWN,此处以后,线程池中不会增长新Task
            advanceRunState(SHUTDOWN);
            //中断全部的空闲线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        //尝试进行terminate操做,但其实咱们上面将状态置为shutdown,就已经算是“停止”了一个线程池了,它不会再执行任务,于外部而言,已经失去了做用。而这里,也只是尝试去将线程池的状态一撸到底而已,并非必定要terminate掉。该方法咱们后面会说到。
        tryTerminate();
    }

咱们能够看到,shutdown方法只不过是中断唤醒了全部阻塞的线程,而且把线程池状态置为shutdown,正如注释所说的,它没有等待全部正在执行任务的线程执行完任务,把状态置为shutdown,已经足够线程池丧失基本的功能了。

在该方法中,线程池如何中断线程是咱们最须要关心的,咱们来看一下interruptIdleWorkers方法:

private void interruptIdleWorkers(boolean onlyOne) {//参数onlyOne表示是否值中断一个线程就退出,在shutdown中该值为false。
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //遍历workers 对全部worker作中断处理。
            for (Worker w : workers) {
                Thread t = w.thread;
                // w.tryLock()对Worker获取锁,由于正在执行的worker已经加锁了(见runWorker方法,w.lock()语句)
                //因此这保证了正在运行执行Task的Worker不会被中断。只有阻塞在getTask方法的空闲线程才会进这个if判断(被中断),但中断不表明线程马上中止,它要继续处理到阻塞队列为空时才会被销毁。
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

咱们能够看到,在中断方法中,咱们调用了worker的tryLock方法去尝试获取worker的锁,因此咱们说,worker类这一层的封装,是用来控制线程中断的,正在执行任务的线程已经上了锁,没法被中断,只有在获取阻塞队列中的任务的线程(咱们称为空闲线程)才会有被中断的可能。
以前咱们看过getTask方法,在这个方法中, worker是不加锁的,因此能够被中断。咱们为何说“中断不表明线程马上中止,它要继续处理到阻塞队列为空时才会被销毁”呢?具体逻辑,咱们再来看一下getTask的源码,以及咱们的注释(咱们模拟中断发生时的场景):

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        /**
         * 当执行过程当中抛出InterruptedException 的时候,该异常被catch住,逻辑从新回到这个for循环
         * catch块在getTask方法的最后。
         */
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            /**
             * 由于逻辑是在抛出中断异常后来到这里的,那说明线程池的状态已经在shutdown方法中被置为shutdown了,rs >= SHUTDOWN为true,rs >=STOP为false(只有TIDYING和TERMINATED状态会大于stop)
             * 这时候,若是workQueue为空,判断为真,线程被销毁。
             * 不然,workQueue为非空,判断为假,线程不会进入销毁逻辑。
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//循环的CAS减小worker数量,直到成功
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?

            //由于在catch块中,timeOut已经为false了。
            //因此只要不发生当前线程数超过最大线程数这种极端状况,命题(wc > maximumPoolSize || (timed && timedOut)必定为false,线程依旧不被销毁。
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //继续执行正常的从阻塞队列中取任务的逻辑,直到阻塞队列完全为空,这时候,上面第一个if判断符合,线程被销毁,寿命完全结束。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //若是正常返回,那么返回取到的task。
                if (r != null)
                    return r;
                //不然,设为超时,从新执行循环,
                timedOut = true;
            } catch (InterruptedException retry) {
                //捕获中断异常
                timedOut = false;
            }
        }
    }

总结:正阻塞在getTask()获取任务的worker在被中断后,会抛出InterruptedException,再也不阻塞获取任务。捕获中断异常后,将继续循环到getTask()最开始的判断线程池状态的逻辑,当线程池是shutdown状态,且workQueue.isEmpty时,return null,进行worker线程退出逻辑

因此,这就是咱们为何说,shutdown方法不会马上中止线程池,它的做用是阻止新的任务被添加进来(逻辑在addWorker方法的第一个if判断中,能够返回去看一下),而且继续处理完剩下的任务,而后tryTerminated,尝试关闭。

tryTerminate方法

/**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     * 在如下状况将线程池变为TERMINATED终止状态
     * shutdown 且 正在运行的worker 和 workQueue队列 都empty
     * stop 且  没有正在运行的worker
     * 
     * 这个方法必须在任何可能致使线程池终止的状况下被调用,如:
     * 减小worker数量
     * shutdown时从queue中移除任务
     * 
     * 这个方法不是私有的,因此容许子类ScheduledThreadPoolExecutor调用
     */
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /**
             * 线程池是否须要终止
             * 若是如下3中状况任一为true,return,不进行终止
             * 一、还在运行状态
             * 二、状态是TIDYING、或 TERMINATED,已经终止过了
             * 三、SHUTDOWN 且 workQueue不为空
             */
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
                /**
                 * 只有shutdown状态 且 workQueue为空,或者 stop状态能执行到这一步
                 * 若是此时线程池还有线程(正在运行任务或正在等待任务,总之count不等于0)
                 * 中断唤醒一个正在等任务的空闲worker
                 *(中断唤醒的意思就是让阻塞在阻塞队列中的worker抛出异常,而后从新判断状态,getTask方法逻辑)
                 * 线程被唤醒后再次判断线程池状态,会return null,进入processWorkerExit()流程(runWorker逻辑)
                 */
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);//中断workers集合中的空闲任务,参数为true,只中断一个。(该逻辑的意义应该在于通知被阻塞在队列中的线程:别瞎jb等了,这个线程池都要倒闭了,赶忙收拾铺盖准备销毁吧你个逼玩意儿)。
                //尝试终止失败,返回。可能你们会有疑问,shutdown只调用了一次tryTerminate方法,若是一次尝试失败了,是否是就意味着shutdown方法极可能最终没法终止线程池?
                //其实看注释,咱们知道线程池在进行全部负面效益的操做时都会调用该方法尝试终止,上面咱们中断了一个阻塞线程让他被销毁,他销毁时也会尝试终止(这其中又唤醒了一个阻塞线程去销毁),以此类推,直到最后一个线程执行tryTerminate时,逻辑才有可能走到下面去。
                return;
            }
            /**
             * 若是状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated
             */
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //CAS:将线程池的ctl变成TIDYING(全部的任务被终止,workCount为0,为此状态时将会调用terminated()方法),期间ctl有变化就会失败,会再次for循环
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        //方法为空,需子类实现
                        terminated();
                    } finally {
                        //将状态置为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        //最后执行termination.signalAll(),并唤醒全部等待线程池终止这个Condition的线程(也就是调用了awaitTermination方法的线程,这个方法的做用是阻塞调用它的线程,直到调用该方法的线程池真的已经被终止了。)
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

总结一下:tryTerminate被调用的时机主要有:
1,shutdown方法时
2,processWorkerExit方法销毁一个线程时
3,addWorkerFailed方法添加线程失败或启动线程失败时
4,remove方法,从阻塞队列中删掉一个任务时

shutdownNow方法

咱们知道,shutdown后线程池将变成shutdown状态,此时不接收新任务,但会处理完正在运行的 和 在阻塞队列中等待处理的任务。

咱们接下来要说的shutdownNow方法,做用是:shutdownNow后线程池将变成stop状态,此时不接收新任务,再也不处理在阻塞队列中等待的任务,还会尝试中断正在处理中的工做线程。
代码以下:

/**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     * 尝试中止全部活动的正在执行的任务,中止等待任务的处理,并返回正在等待被执行的任务列表
     * 这个任务列表是从任务队列中排出(删除)的
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     * 这个方法不用等到正在执行的任务结束,要等待线程池终止可以使用awaitTermination()
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     * 除了尽力尝试中止运行中的任务,没有任何保证
     * 取消任务是经过Thread.interrupt()实现的,因此任何响应中断失败的任务可能永远不会结束
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //判断调用者是否有权限shutdown线程池
            checkShutdownAccess();
            //CAS+循环设置线程池状态为stop
            advanceRunState(STOP);
            //中断全部线程,包括正在运行任务的
            interruptWorkers();
            //将workQueue中的元素放入一个List并返回
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        //尝试终止线程池
        tryTerminate();
        //返回workQueue中未执行的任务
        return tasks;
    }

interruptWorkers 很简单,循环对全部worker调用 interruptIfStarted,其中会判断worker的AQS state是否大于0,即worker是否已经开始运做,再调用Thread.interrupt
须要注意的是,对于运行中的线程调用Thread.interrupt并不能保证线程被终止,task.run内部可能捕获了InterruptException,没有上抛,致使线程一直没法结束

awaitTermination方法

该方法的做用是等待线程池终止,参数是timeout:超时时间和unit: timeout超时时间的单位,返回结果:true:线程池终止,false:超过timeout指定时间

//
public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                //是否terminated终止
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                 //是否已经超过超时时间
                if (nanos <= 0)
                    return false;
                 //核心逻辑:看注释咱们能知道,该方法让调用线程等待一段时间,直到被唤醒(有且仅有以前咱们说过的tryTerminate方法中的 termination.signalAll()),或者被异常中断,或者传入了nanos时间参数流逝完。
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

termination.awaitNanos() 是经过 LockSupport.parkNanos(this, nanosTimeout)实现的阻塞等待

阻塞等待过程当中发生如下具体状况会解除阻塞(对上面3种状况的解释):

一、若是发生了 termination.signalAll()(内部实现是 LockSupport.unpark())会唤醒阻塞等待,且因为ThreadPoolExecutor只有在 tryTerminated()尝试终止线程池成功,将线程池更新为terminated状态后才会signalAll(),故awaitTermination()再次判断状态会return true退出

二、若是达到了超时时间 termination.awaitNanos() 也会返回,此时nano==0,再次循环判断return false,等待线程池终止失败

三、若是当前线程被 Thread.interrupt(),termination.awaitNanos()会上抛InterruptException,awaitTermination()继续上抛给调用线程,会以异常的形式解除阻塞

综上,要想优雅的关闭线程池,咱们应该:

executorService.shutdown();
try{
    while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
        LOGGER.debug("Waiting for terminate");
    }
} 
catch (InterruptedException e) {
    //中断处理
}
相关文章
相关标签/搜索