Java线程池原理与源码详细解读,不再怕面试问线程池了!

线程池java

“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,若是被无限制的建立,不只会消耗系统资源,还会下降系统的稳定性,所以Java中提供线程池对线程进行统一分配、调优和监控。程序员

v2-6557eb22c58091aeef5938a1701a2aa0_hd.png

线程池介绍web

在web开发中,服务器须要接受并处理请求,因此会为一个请求来分配一个线程来进行处理。若是每次请求都新建立一个线程的话实现起来很是简便,可是存在一个问题:面试

若是并发的请求数量很是多,但每一个线程执行的时间很短,这样就会频繁的建立和销毁线程,如此一来会大大下降系统的效率。可能出现服务器在为每一个请求建立新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。数组

那么有没有一种办法使执行完一个任务,并不被销毁,而是能够继续执行其余的任务呢?缓存

这就是线程池的目的了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。经过对多个任务重用线程,线程建立的开销被分摊到了多个任务上。服务器

何时使用线程池?多线程

  • 单个任务处理时间比较短并发

  • 须要处理的任务数量很大app

线程池优点

  • 重用存在的线程,减小线程建立,消亡的开销,提升性能

  • 提升响应速度。当任务到达时,任务能够不须要的等到线程建立就能当即执行。

  • 提升线程的可管理性。线程是稀缺资源,若是无限制的建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够进行统一的分配,调优和监控。

线程的实现方式

Runnable,Thread,Callable

// 实现Runnable接口的类将被Thread执行,表示一个基本的任务

public interface Runnable {

// run方法就是它全部的内容,就是实际执行的任务

public abstract void run();

}

//Callable一样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容

public interface Callable<V> {

// 相对于run方法的带有返回值的call方法

V call() throws Exception;

}

Executor框架

Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。

下图为它的继承与实现

v2-d1163ae7cd6ad820c34df6bc5d7224e1_hd.jpg


从图中能够看出Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为

1,execute(Runnable command):履行Ruannable类型的任务,

2,submit(task):可用来提交Callable或Runnable任务,并返回表明此任务的Future对象

3,shutdown():在完成已提交的任务后封闭办事,再也不接管新任务,

4,shutdownNow():中止全部正在履行的任务并封闭办事。

5,isTerminated():测试是否全部任务都履行完毕了。

6,isShutdown():测试是否该ExecutorService已被关闭。

v2-802cdeae9d6a56301c9c17b3e99717a8_hd.png

线程池重点属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPACITY = (1 << COUNT_BITS) - 1;

ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里能够看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。


ctl相关方法

private static int runStateOf(int c) { return c & ~CAPACITY; }

private static int workerCountOf(int c) { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }
  • runStateOf:获取运行状态;

  • workerCountOf:获取活动线程数;

  • ctlOf:获取运行状态和活动线程数的值。

线程池存在5种状态

RUNNING = -1 << COUNT_BITS; //高3位为111

SHUTDOWN = 0 << COUNT_BITS; //高3位为000

STOP = 1 << COUNT_BITS; //高3位为001

TIDYING = 2 << COUNT_BITS; //高3位为010

TERMINATED = 3 << COUNT_BITS; //高3位为011

一、RUNNING

(1) 状态说明:线程池处在RUNNING状态时,可以接收新任务,以及对已添加的任务进行处理。

(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被建立,就处于RUNNING状态,而且线程池中的任务数为0!

二、 SHUTDOWN

(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。

(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

三、STOP

(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,而且会中断正在处理的任务。

(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

四、TIDYING

(1) 状态说明:当全部的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;能够经过重载terminated()函数来实现。

(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空而且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

五、 TERMINATED

(1) 状态说明:线程池完全终止,就变成TERMINATED状态。

(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()以后,就会由 TIDYING -> TERMINATED。

进入TERMINATED的条件以下:

  • 线程池不是RUNNING状态;

  • 线程池状态不是TIDYING状态或TERMINATED状态;

  • 若是线程池状态是SHUTDOWN而且workerQueue为空;

  • workerCount为0;

  • 设置TIDYING状态成功。

v2-abd400ab183777a343c0888d87cc7f5a_hd.jpg

线程池的具体实现

ThreadPoolExecutor 默认线程池

ScheduledThreadPoolExecutor 定时线程池

ThreadPoolExecutor线程池的建立

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

任务提交

一、public void execute() //提交任务无返回值
二、public Future<?> submit() //任务执行完成后有返回值


参数解释

corePoolSize

线程池中的核心线程数,当提交一个任务时,线程池建立一个新线程执行任务,直到当前线程数等于corePoolSize;若是当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;若是执行了线程池的prestartAllCoreThreads()方法,线程池会提早建立并启动全部核心线程。

maximumPoolSize

线程池中容许的最大线程数。若是当前阻塞队列满了,且继续提交任务,则建立新的线程执行任务,前提是当前线程数小于maximumPoolSize;

keepAliveTime

线程池维护线程所容许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,若是这时没有新的任务提交,核心线程外的线程不会当即销毁,而是会等待,直到等待的时间超过了keepAliveTime;

unit

keepAliveTime的单位;

workQueue

用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了以下阻塞队列:

  • 一、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;

  • 二、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量一般要高于ArrayBlockingQuene;

  • 三、SynchronousQuene:一个不存储元素的阻塞队列,每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态,吞吐量一般要高于LinkedBlockingQuene;

  • 四、priorityBlockingQuene:具备优先级的***阻塞队列;

threadFactory

它是ThreadFactory类型的变量,用来建立新线程。默认使用Executors.defaultThreadFactory() 来建立线程。使用默认的ThreadFactory来建立线程时,会使新建立的线程具备相同的NORM_PRIORITY优先级而且是非守护线程,同时也设置了线程的名称。欢迎你们关注个人公种浩【程序员追风】,整理了2019年多家公司java面试题资料100多页pdf文档,文章都会在里面更新,整理的资料也会放在里面。

handler

线程池的饱和策略,当阻塞队列满了,且没有空闲的工做线程,若是继续提交任务,必须采起一种策略处理该任务,线程池提供了4种策略:

  • 一、AbortPolicy:直接抛出异常,默认策略;

  • 二、CallerRunsPolicy:用调用者所在的线程来执行任务;

  • 三、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

  • 四、DiscardPolicy:直接丢弃任务;

上面的4种策略都是ThreadPoolExecutor的内部类。

固然也能够根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

v2-930c3b4ee609b00a961ecfdb16774584_hd.png

线程池监控

public long getTaskCount() //线程池已执行与未执行的任务总数
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量

线程池原理

v2-54e69cac13a1ba767ac25a14254c2d08_hd.jpg


源码分析

execute方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
/*
 * clt记录着runState和workerCount
 */
    int c = ctl.get();
/*
 * workerCountOf方法取出低29位的值,表示当前活动的线程数;
 * 若是当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
 * 并把任务添加到该线程中。
 */
    if (workerCountOf(c) < corePoolSize) {
        /*
         * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断仍是maximumPoolSize来判断;
         * 若是为true,根据corePoolSize来判断;
         * 若是为false,则根据maximumPoolSize来判断
         */
        if (addWorker(command, true))
            return;
/*
 * 若是添加失败,则从新获取ctl值
 */
        c = ctl.get();
    }
/*
 * 若是当前线程池是运行状态而且任务添加到队列成功
 */
    if (isRunning(c) && workQueue.offer(command)) {
        // 从新获取ctl值
        int recheck = ctl.get();
         // 再次判断线程池的运行状态,若是不是运行状态,因为以前已经把command添加到workQueue中了,
        // 这时须要移除该command
        // 执行事后经过handler使用拒绝策略对该任务进行处理,整个方法返回
        if (! isRunning(recheck) && remove(command))
            reject(command);
        /*
         * 获取线程池中的有效线程数,若是数量是0,则执行addWorker方法
         * 这里传入的参数表示:
         * 1. 第一个参数为null,表示在线程池中建立一个线程,但不去启动;
         * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
         * 若是判断workerCount大于0,则直接返回,在workQueue中新增的command会在未来的某个时刻被执行。
         */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
/*
 * 若是执行到这里,有两种状况:
 * 1. 线程池已经不是RUNNING状态;
 * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize而且workQueue已满。
 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
 * 若是失败则拒绝该任务
 */
    else if (!addWorker(command, false))
        reject(command);
}

简单来讲,在执行execute()方法时若是状态一直是RUNNING时,的执行过程以下:

  1. 若是workerCount < corePoolSize,则建立并启动一个线程来执行新提交的任务;

  2. 若是workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;

  3. 若是workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则建立并启动一个线程来执行新提交的任务;

  4. 若是workerCount >= maximumPoolSize,而且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

这里要注意一下addWorker(null, false);,也就是建立一个线程,但并无传入任务,由于任务已经被添加到workQueue中了,因此worker在执行的时候,会直接从workQueue中获取任务。因此,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必需要有一个线程来执行任务。

execute方法执行流程以下:

v2-ef4ebf5bc96f7ac1eae55af84819b638_hd.jpg


addWorker方法

addWorker方法的主要工做是在线程池中建立一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前须要判断当前活动线程数是否少于maximumPoolSize,代码以下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
    // 获取运行状态
        int rs = runStateOf(c);
    /*
     * 这个if判断
     * 若是rs >= SHUTDOWN,则表示此时再也不接收新任务;
     * 接着判断如下3个条件,只要有1个不知足,则返回false:
     * 1. rs == SHUTDOWN,这时表示关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保存的任务
     * 2. firsTask为空
     * 3. 阻塞队列不为空
     * 
     * 首先考虑rs == SHUTDOWN的状况
     * 这种状况下不会接受新提交的任务,因此在firstTask不为空的时候会返回false;
     * 而后,若是firstTask为空,而且workQueue也为空,则返回false,
     * 由于队列中已经没有任务了,不须要再添加线程了
     */
     // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 获取线程数
            int wc = workerCountOf(c);
            // 若是wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
            // 这里的core是addWorker方法的第二个参数,若是为true表示根据corePoolSize来比较,
            // 若是为false则根据maximumPoolSize来比较。
            // 
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 尝试增长workerCount,若是成功,则跳出第一个for循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 若是增长workerCount失败,则从新获取ctl的值
            c = ctl.get();  // Re-read ctl
            // 若是当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
     // 根据firstTask来建立Worker对象
        w = new Worker(firstTask);
     // 每个Worker对象都会建立一个线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING状态;
                // 若是rs是RUNNING状态或者rs是SHUTDOWN状态而且firstTask为null,向线程池中添加线程。
                // 由于在SHUTDOWN时不会在添加新的任务,但仍是会执行workQueue中的任务
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一个HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize记录着线程池中出现过的最大线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker类

线程池中的每个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,请参见JDK源码。

Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时经过ThreadFactory来建立的线程,是用来处理任务的线程。

在调用构造方法时,须要把任务传入,这里经过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,由于Worker自己继承了Runnable接口,也就是一个线程,因此一个Worker对象在启动的时候会调用Worker类中的run方法。

Worker继承了AQS,使用AQS来实现独占锁的功能。为何不使用ReentrantLock来实现呢?能够看到tryAcquire方法,它是不容许重入的,而ReentrantLock是容许重入的:

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;

  2. 若是正在执行任务,则不该该中断线程;

  3. 若是该线程如今不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时能够对该线程进行中断;

  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是不是空闲状态;

  5. 之因此设置为不可重入,是由于咱们不但愿任务在调用像setCorePoolSize这样的线程池控制方法时从新获取锁。若是使用ReentrantLock,它是可重入的,这样若是在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

因此,Worker继承自AQS,用于判断线程是否空闲以及是否能够被中断。

此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为何这么作呢?是由于AQS中默认的state是0,若是刚建立了一个Worker对象,尚未执行任务时,这时就不该该被中断,看一下tryAquire方法:

protected boolean tryAcquire(int unused) {
    //cas修改state,不可重入
    if (compareAndSetState(0, 1)) { 
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

tryAcquire方法是根据state是不是0来判断的,因此,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。

正由于如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。

runWorker方法

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码以下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 获取第一个任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 容许中断
    w.unlock(); // allow interrupts
    // 是否由于异常退出循环
    boolean completedAbruptly = true;
    try {
        // 若是task为空,则经过getTask来获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

这里说明一下第一个if判断,目的是:

  • 若是线程池正在中止,那么要保证当前线程是中断状态;

  • 若是不是的话,则要保证当前线程不是中断状态;

这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP,回顾一下STOP状态:

不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。

STOP状态要中断线程池中的全部线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,由于Thread.interrupted()方法会复位中断的状态。

总结一下runWorker方法的执行过程:

  1. while循环不断地经过getTask()方法获取任务;

  2. getTask()方法从阻塞队列中取任务;

  3. 若是线程池正在中止,那么要保证当前线程是中断状态,不然要保证当前线程不是中断状态;

  4. 调用task.run()执行任务;

  5. 若是task为null则跳出循环,执行processWorkerExit()方法;

  6. runWorker方法执行完毕,也表明着Worker中的run方法执行完毕,销毁线程。

这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。

completedAbruptly变量来表示在执行任务过程当中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。


getTask方法

getTask方法用来从阻塞队列中取任务,代码以下:

private Runnable getTask() {
    // timeOut变量的值表示上次从阻塞队列中取任务时是否超时
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
    /*
     * 若是线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行如下判断:
     * 1. rs >= STOP,线程池是否正在stop;
     * 2. 阻塞队列是否为空。
     * 若是以上条件知足,则将workerCount减1并返回null。
     * 由于若是当前线程池状态的值是SHUTDOWN或以上时,不容许再向阻塞队列中添加任务。
     */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // timed变量用于判断是否须要进行超时控制。
        // allowCoreThreadTimeOut默认是false,也就是核心线程不容许进行超时;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,须要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    /*
     * wc > maximumPoolSize的状况是由于可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
     * timed && timedOut 若是为true,表示当前操做须要进行超时控制,而且上次从阻塞队列中获取任务发生了超时
     * 接下来判断,若是有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
     * 若是减1失败,则返回重试。
     * 若是wc == 1时,也就说明当前线程是线程池中惟一的一个线程了。
     */
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
        /*
         * 根据timed来判断,若是为true,则经过阻塞队列的poll方法进行超时控制,若是在keepAliveTime时间内没有获取到任务,则返回null;
         * 不然经过take方法,若是这时队列为空,则take方法会阻塞直到队列不为空。
         *
         */
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            // 若是 r == null,说明已经超时,timedOut设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 若是获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
            timedOut = false;
        }
    }
}

这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析能够知道,在执行execute方法时,若是当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,而且workQueue已满时,则能够增长工做线程,但这时若是超时没有获取到任务,也就是timedOut为true的状况,说明workQueue已经为空了,也就说明了当前线程池中不须要那么多线程来执行任务了,能够把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize便可。

何时会销毁?固然是runWorker方法执行完以后,也就是Worker中的run方法执行完,由JVM自动回收。

getTask方法返回null时,在runWorker方法中会跳出while循环,而后会执行processWorkerExit方法。


processWorkerExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 若是completedAbruptly值为true,则说明线程执行时出现了异常,须要将workerCount减1;
    // 若是线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操做,这里就没必要再减了。  
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //统计完成的任务数
        completedTaskCount += w.completedTasks;
        // 从workers中移除,也就表示着从线程池中移除了一个工做线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 根据线程池状态进行判断是否结束线程池
    tryTerminate();
    int c = ctl.get();
/*
 * 当线程池是RUNNING或SHUTDOWN状态时,若是worker是异常结束,那么会直接addWorker;
 * 若是allowCoreThreadTimeOut=true,而且等待队列有任务,至少保留一个worker;
 * 若是allowCoreThreadTimeOut=false,workerCount很多于corePoolSize。
 */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此,processWorkerExit执行完以后,工做线程被销毁,以上就是整个工做线程的生命周期,从execute方法开始,Worker使用ThreadFactory建立新的工做线程,runWorker经过getTask获取任务,而后执行任务,若是getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:

v2-2298a9eebdfb55479fd90ffb35851c81_hd.jpg


总结

  • 分析了线程的建立,任务的提交,状态的转换以及线程池的关闭;

  • 这里经过execute方法来展开线程池的工做流程,execute方法经过corePoolSize,maximumPoolSize以及阻塞队列的大小来判断决定传入的任务应该被当即执行,仍是应该添加到阻塞队列中,仍是应该拒绝任务。

  • 介绍了线程池关闭时的过程,也分析了shutdown方法与getTask方法存在竞态条件;

  • 在获取任务时,要经过线程池的状态来判断应该结束工做线程仍是阻塞线程等待新的任务,也解释了为何关闭线程池时要中断工做线程以及为何每个worker都须要lock。

最后

欢迎你们一块儿交流,喜欢文章记得点个赞哟,感谢支持!

相关文章
相关标签/搜索