ThreadPoolExecutor 原理及源码详细分析

总览线程池工做机制

新任务来到的时候,先去判断核心线程池数量(corePoolSize)满了没有,若是没有满那么建立一个 Worker 去执行这个任务。java

某一时刻发现核心线程池数量满了,那么这个任务就暂时放到 workQueue 中去了。后续只要线程池中的 Worker 空闲下来了(任务执行完了)就去 workQueue 中取任务执行。多线程

随着任务不断的增长,核心线程池中 Worker 的消费能力跟不上了,即 workQueue 中堆积的任务开始变得愈来愈多,某一时刻 workQueue 满了。this

这个时候新来的任务就要去检查线程池的容许建立的最大容量是多少了(maximumPoolSize),好比核心线程池数量限制为 5 它满了,队列长度限制为 10 也满了,最大线程池数量限制为 10,则容许再建立 5 个非核心的 Worker 去执行任务。spa

某一时刻 maximumPoolSize 也满了,那么新来的任务就将被采起对应的拒绝策略来执行了线程

这时候线程池逐渐闲下来了(好比到了半夜之类的用户请求不多了),此时队列里面的任务可能已经被消费完了,或者是不多,即须要执行的任务远远小于线程池 Worker 数量,那么最初建立的 10 个 Worker 中大部分的 Worker 都没法取得任务执行了,这个时候就要将部分闲置的 Worker 释放了好比说某个 Worker 在 keepAliveTime 的时间尚未获取到任务执行,那么就认为它空闲时间知足释放要求了,释放对应的 Workerrest

须要注意的是在底层实现中 Worker,并无一个状态标识他是核心仍是非核心的,在线程池这里仅仅是一个数量的概念,即在释放的时候会看状况释放某几个 Worker 而无论他建立的时候是不是核心 Worker,若是此时已经没有任务须要执行了,那么释放 Worker 后就能保证线程池中剩下的 worker 数量与 corePoolSize 一致。可是有一点须要注意的是若是将 allowCoreThreadTimeOut 设置为 true 了后,核心线程池中的 Worker 也会释放即最终 worker 数量可能为 0,可是 allowCoreThreadTimeOut 通常是默认为 false 的code

一些关键字段含义

// 线程池状态,高 3 位为线程池状态,低 29 位为线程池容量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池最大容量为 2^29 - 1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 取出线程池的运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 取出目前的线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 得到 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }    
复制代码
线程池状态 程序移位 高 3 位 低 29 位 说明
RUNNING -1 << 29 111 000... 运行状态
SHUTDOWN 0 << 29 000 000... 关闭状态
STOP 1 << 29 001 000... 中止状态
TIDYING 2 << 29 010 000... 全部线程终止了后调用 terminated()
TERMINATED 3 << 29 011 000... terminated() 调用结束

其中高 3 位表示线程池目前处于什么状态,低 29 位用来表示线程池目前线程的数量cdn

构造方法

// 核心线程池数量
    private volatile int corePoolSize;
    // 最大线程池数量
    private volatile int maximumPoolSize;
    // 非核心线程空闲时间,超过该时间后会被回收
    private volatile long keepAliveTime;
    // 工做队列,当核心线程池数量满了后,新的任务会放入工做队列中
    private final BlockingQueue<Runnable> workQueue;
    // 拒绝策略,当线程池没法再执行新的任务的时候调用
    private volatile RejectedExecutionHandler handler;
    // 默认的拒绝策略
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
复制代码

Worker

线程池中执行任务的线程,它会将 Runnable 封装进去,当执行完了某一个任务后,它会继续从队列里面取任务执行,能够经过 prestartAllCoreThreads() 预先初始化建立好 corePoolSize 个数量的 Worker,后续任务来了就直接用建立好的 Worker 执行就好,或者懒加载的方式每当一个任务过来时候建立一个 Worker 去执行。blog

当池中 Worker 数量达到 corePoolSize 的时候,新来任务的那么就直接添加到 workQueue 当中(注意这时候只是添加的 Runnable 没有新的 Worker 产生,即 Worker 是用来执行任务的线程),这时线程池里面全部的 Worker 只要谁闲下来了那么就去工做队列中取出任务来执行便可。队列

当 workQueue 满了后(意味着核心线程池里面的 Worker 们忙不过来了,都在不停的执行任务都执行不完),那么就会添加非核心的 Worker 去执行任务。

而后任务继续过来达到 maximumPoolSize 限制的线程池最大容量后(核心 Worker 和非核心 Worker 所有都忙不过来了没法处理新的任务了)这个时候就选择对应的策略处理,拒绝新的任务,仍是将其忽略等等。

非核心 Worker 若是空闲时间超过了 keepAliveTime 那么就会释放这个 Worker,这里须要注意的一点是,释放的时候只是保证释放一个 Worker 并不必定是释放以前 addWorker(task, false) 的这个,即若是目前比较空闲的话,会释放几个空闲的 Worker 最终保证核心 Worker 数量与 corePoolSize 一致便可

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        // 用以执行任务的线程
        final Thread thread;
        // 须要执行的任务
        Runnable firstTask;
        // 统计每一个 Worker 完成了多少任务
        volatile long completedTasks;
        
        Worker(Runnable firstTask) {
            // 新建立的 Worker 没有执行是不容许中断的
            // 后面会提到为何 setState(-1); 就没法中断
            setState(-1);
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
        public void run() {
            // 执行任务
            runWorker(this);
        }
}
复制代码

这里先大概介绍下 Worker 由于任务执行的时候须要用它,便于理解后续的分析,runWorker(this) 后面会详细分析是如何运行,如何释放非核心 Worker 等

执行线程池的任务

public void execute(Runnable command) {
        if (command == null)
                throw new NullPointerException();
        int c = ctl.get();
        // 检测正在运行的 Worker 数量是否小于核心 Worker 数
        if (workerCountOf(c) < corePoolSize) {
            // 若是核心 worker 数没有到 corePoolSize 那么建立一个新的 Worker 来执行任务
            if (addWorker(command, true))
                return;
            // 若是核心 worker 添加失败则再次获取一下线程池状态值
            // 为何会失败?addWorker() 中会说明
            c = ctl.get();
        }
        // 若是线程池处于运行状态,而且任务可以放入 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次检查线程池的状态若是已经不处于活动状态
            // 删除刚刚放入队列里面的任务
            // 而且使用对应的拒绝策略拒绝任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 若是说在任务添加队列成功后,没有 Worker 了
            // 那么就添加一个非核心的 Worker 用来取队列里面的任务来执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 虽然队列已经满了可是若是尚未达到最大线程数 maximum 那么继续添加 Worker
        // 若是达到 maximum 没法添加了则采起对应的拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
复制代码

addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            // 获取线程池状态
            int c = ctl.get();
            // 获取线程池运行状态
            int rs = runStateOf(c);
            // @1
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 获取线程池 worker 数量
                int wc = workerCountOf(c);
                // 若是 worker 数量达到了最大线程池容许的数量则拒绝添加 worker
                // 不然如果添加核心线程 worker 的话不能超过 corePoolSize 核心线程数
                // 若不是核心线程的话则不能超过限制的 maximumPoolSize 最大线程池数
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 线程池中 worker 数量 + 1
                // 若是添加成功的话则跳出准备执行 worker 线程
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 检查运行状态若是和以前的状态不一致那么重试添加
                if (runStateOf(c) != rs)
                    continue retry;
                // 若是状态一直而且添加失败则说明同时有有不少线程进来那么 CAS 循环重试直到成功
            }
        }
        
        // worker 启动成功标识
        boolean workerStarted = false;
        // worker 添加成功标识
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 建立 Worker
            w = new Worker(firstTask);
            // 获取对应的线程
            final Thread t = w.thread;
            // 线程不为 Null 的话
            if (t != null) {
                // 重入锁锁住一个一个的添加而且启动
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // 再次检查状态
                    // 线程池处于运行状态或者
                    // 线程池处于 shutdown 可是 firstTask 为 Null
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 预检查线程是否能够启动不能的话抛出异常
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 添加 worker
                        workers.add(w);
                        // 记录 worker 数量而且标识为添加成功
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // worker 添加成功那么则将 worker 标识为启动成功
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 若是 worker 启动失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
复制代码

这个方法主要的功能是添加核心、非核心 Worker 同时校验其合法性,而且增长 ctl 中 worker 计数,若是添加成功的话就执行对应的 Worker 来处理任务,若是添加失败的话则须要保证从 worker 池中移除而且 ctl 中 worker 统计数量 -1.

咱们来看下 @1:

  • 线程池此时的状态为 > SHUTDOWN 的状态即 STOP、TIDYING、TERMINATED 不容许添加新的 Worker
  • 线程池此时的状态为 SHUTDOWN 可是却添加了一个新的任务要来执行,即 firstTask != null 这种状况是不容许的
  • 线程池此时的状态为 SHUTDOWN 添加了一个 Worker (firstTask == null) 可能须要用来执行队列中尚未终止的任务,可是却发现队列里面没有任务了!(队列里面没有任务了还添加新的 Worker 来作什么都 SHUTDOWN 了,我都准备关闭了,因此添加失败)

addWorkerFailed()

private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 从线程池 workers 中删除对应的 worker
            if (w != null)
                workers.remove(w);
            // 线程池 worker 数量 -1
            decrementWorkerCount();
            // 尝试去终止线程池后面再分析
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
复制代码

这个方法比较简单,在 addWorker() 中分别对线程池 worker 数量+1 而后将 worker 放入池中这两部,若是后者添加失败的话就从 workers 中移除,而后保证 worker 数量 -1,而后去尝试调用 tryTerminate() 终止线程池,这个方法后面再分析

runWorker()

Worker 添加成功而且成功启动后就会调用 runWorker 了就是调用上面 Worker 的 run() 方法而后调用 runWorker()

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 取出 worker 的第一个任务准备执行
        Runnable task = w.firstTask;
        w.firstTask = null;
        // @1 unlock 容许中断
        w.unlock(); // allow interrupts
        // 标识任务是否是被异常终止的默认是
        boolean completedAbruptly = true;
        try {
            // 执行添加的任务或者从队列里面取出任务不断的执行
            while (task != null || (task = getTask()) != null) { // @2
                // lock 锁住一个个的执行任务
                w.lock();
                // @3
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 执行以前能够作一些事情留给子类调用
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 运行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 执行任务以后能够作一些事情留给子类调用
                        afterExecute(task, thrown);
                    }
                } finally {
                    // task 置位 null
                    task = null;
                    // 完成任务 + 1
                    w.completedTasks++;
                    // 解锁
                    w.unlock();
                    // 开启又一轮循环取任务执行等过程
                }
            }
            // 任务是正常结束的,即处理完了队列里面的全部任务
            completedAbruptly = false;
        } finally {
            // @4 处理退出逻辑
            processWorkerExit(w, completedAbruptly);
        }
    }

复制代码

这个方法的主要功能是,用 Worker 去不断的取 workQueue 里面的任务来执行。

@1 处,为何 unlock 就能够容许中断了呢,首先 unlock 保证这个 Worker 后续能够成功的 lock 而后执行任务,其次就是在 shutdown() 的时候,会去中断全部的 Worker 的时候会去尝试获取 Worker 的锁,若是这里释放了那么就能成功中断,在后面 shutdown() 会详细分析。

@2 处,关键就是全部的 Worker 都会经过 getTask() 去获取任务执行,若是获取到了那么就执行后续操做,若是任务没有呢?是否是就说明目前工做队列里面没有任务了,Worker 们好像有点闲啊,若是非核心的 Worker 达到了 keepAliveTime 这个时间尚未任务作,是否是该被释放了呢?这个操做就是 getTask() 和 processWorkerExit() 共同完成的,后面会详细分析。

@3 处,若是线程池状态 >= STOP 确保线程被正确中断,不然的话清除中断标记

@4 处 processWorkerExit() 的时候分析

getTask()

private Runnable getTask() {
        // 判断是否 poll() 超时
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // @1
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 若是线程池中的 worker 数量大于 corePoolSize 
            // 则说明可能须要淘汰一些空闲的线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            // @2
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 在 keepAliveTime 时间内是否获取到了任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 获取到了成功返回
                if (r != null)
                    return r;
                // 若是没有获取到设置 timedOut 为 true
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
复制代码

这个方法的主要功能是不断的从 workQueue 中取出任务,若是在指定的 keepAliveTime 时间尚未取到任务的话则返回 null 须要考虑是否将这个外层没有取到任务的 Worker 释放了,由于他暂时没有事作了。

@1 处,若是线程池状态为 SHUTDOWN 而且 workQueue 为空,则返回为 Null。若是线程池状态 > SHUTDOWN 则返回 Null。同时执行 decrementWorkerCount 减小 ctl 中低 29 位表示的 workers 数量

@2 处,若是线程池 worker 数量大于 maximumPoolSize 或者当前线程池 worker 数量大于了 corePoolSize 而且其它 worker 在 keepAliveTime 都没有获取到任务,那么返回 null 而且减小 workCount 数量

当 getTask() 返回为 null 的时候 completedAbruptly = false; 表示任务是正常结束的,最终执行 processWorkerExit() 方法

processWorkerExit()

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 若是 runWorker() 上述操做是正常完成的即 completedAbruptly = false
        // 那么在 getTask() 逻辑里面已经进行了线程池 worker 数量 -1 的操做了
        // 这里就不须要再次执行了
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 完成的任务数量 +1
            completedTaskCount += w.completedTasks;
            // 将空闲的 worker 从线程池中移除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        
        // 尝试终止线程池
        tryTerminate();

        int c = ctl.get();
        // 若是线程池状态处于 RUNNING 和 SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 不是异常结束的 runWorker()
            if (!completedAbruptly) {
                // 默认返回 corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 若是线程池 worker 数量已经大于了 corePoolSize
                // 那么就直接返回
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 若是是异常结束的或者线程池 worker 数量没有大于 corePoolSize
            // 那么能够再添加一个 worker 去执行任务
            addWorker(null, false);
        }
    }
复制代码

这个方法的主要功能是若是一个 Worker 没有事作了,那么就能够将其释放了(主要是在线程池数量 > 核心线程池数量,而且队列任务比较少执行 Worker 大都比较闲),在释放的同时看下是否须要终止线程池。

shutdown()

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 将线程池状态设置为 SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 而后去中断全部的 worker
            interruptIdleWorkers();
            // 留给子类使用的 hook 关闭后作一些事情
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 尝试终止
        tryTerminate();
    }
复制代码

这个方法的主要功能是将线程池的状态标记为 SHUTDOWN,此时拒绝接收新的任务,可是若是发现工做队列中还有任务,是能够添加一个不携带任务的非核心的 Wroker 去将队列里面的任务执行完成的,而后去中断全部的 worker

shutdownNow()

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 将线程池状态设置为 STOP
            advanceRunState(STOP);
            // 中断全部的 worker
            interruptWorkers();
            // 抛弃 workQueue 里面全部的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 尝试终止
        tryTerminate();
        return tasks;
    }
复制代码

这个方法的主要功能是将线程池的状态标记为 STOP,此时拒绝接收新的任务,同时抛弃 workQueue 里面的全部任务,而后去中断全部的 worker

tryTerminate()

for (;;) {
            int c = ctl.get();
            // 只有在线程池状态为 SHUTDOWN 而且队列不为空的时候才会继续执行
            // 或者线程池状态位 STOP 的时候才会继续执行
            // 不然的话就直接返回
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 若是线程池 worker 数量不为 0 依次中断对应的 worker
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 将线程池的状态改成 TIDYING
                // 从这里也能看出为何上面要判断只能线程池状态位 SHUTDOWN 和 STOP
                // 才能执行该方法 SHUTDOWN < STOP < TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 而后调用 terminated 
                        terminated();
                    } finally {
                        // 最终将线程池状态改成 TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // 不然的话 CAS 不断循环处理
            // else retry on failed CAS
        }
复制代码

从代码能够看到调用了 shutdown() 后线程池状态从 RUNNING 变为 SHUTDOWN,而后调用 tryTerminate() 若是 workQueue 不为空的话表示还有任务没有执行完那么不能终止须要等待队列里面的任务执行完毕后才能终止。

若是调用了 shutdownNow() 后线程池状态从 RUNNING 变为 STOP,而且将 workQueue 里面的任务所有移除了,最终线程池 worker 数量为 0 了后调用 terminated() 后将线程池状态设置为 TERMINATED

总结下状态的转换主要为

调用 shutdown() 后线程池状态从 RUNNING -> SHUTDOWN,调用以后的队列和线程池的任务都执行完成后那么 SHUTDOWN -> TIDYING

调用 shuwdownNow() 后线程池状态从 (RUNNING or SHUTDOWN) -> STOP,调用以后会抛弃队列里面等待执行的任务,而后等待线程池里面的任务执行完成后 STOP -> TIDYING

terminated() 方法执行完毕后 TIDYING -> TERMINATED

可是若是说线程不能正常结束或者不能响应中断那么意味着 shutdown() 和 shutdownNow() 都没法将线程池正常结束,以下。 调用 shutdownNow() 后

public class Test {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(5,
                10,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(5));
        fillThreadPoll(threadPoolExecutor, 5);
        System.out.println("此时核心线程数已经满了");
        fillThreadPoll(threadPoolExecutor, 5);
        System.out.println("此时队列已经已经满了");
        fillThreadPoll(threadPoolExecutor, 5);
        System.out.println("此时最大线程数已经满了");
        threadPoolExecutor.shutdownNow();
    }

    private static void fillThreadPoll(ThreadPoolExecutor threadPoolExecutor, int size) {
        for (int i = 0; i < size; i++) {
            threadPoolExecutor.execute(() -> {
                while (true) {
                }
            });
        }
    }
}
复制代码

调整下代码就能正常结束

private static void fillThreadPoll(ThreadPoolExecutor threadPoolExecutor, int size) {
        for (int i = 0; i < size; i++) {
            threadPoolExecutor.execute(() -> {
                while (!Thread.currentThread().isInterrupted()) {

                }
            });
        }
    }
复制代码

可是调用 shutdown() 发现仍是没法正常结束,由于他会去调用 interruptIdleWorkers(false);

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 这里没法得到锁
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
复制代码

由于 w.tryLock() 没法得到锁,全部的 worker 都处于忙碌状态,每个 worker 执行对应的任务,那个任务都没有结束,锁都没有是释放,因此 shutdown() 没法结束。

而 shutdownNow() 能够结束的缘由是它调用的是

private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
复制代码

能够看到就算 Worker 正在执行中没有释放锁也能够直接对其进行中断

拒绝任务

当ThreadPoolExecutor关闭、队列和线程池饱和时,会拒绝新提交的任务,同时调用RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)方法。

ThreadPoolExecutor预置了如下四种策略:

  • ThreadPoolExecutor.AbortPolicy,默认策略,在拒绝任务时,会抛出RejectedExecutionException。
  • ThreadPoolExecutor.CallerRunsPolicy,由提交的线程本身来执行(execute)当前提交的任务。这种策略提供了简单的反馈控制机制,可以下降新的任务提交的速率。
  • ThreadPoolExecutor.DiscardPolicy,简单粗暴的抛弃不能执行的任务。
  • ThreadPoolExecutor.DiscardOldestPolicy,若是ThreadPoolExecutor没有被关闭,那么删除队列头部的任务,而且再次尝试提交任务,若是仍然被拒绝,那么再删除队列头部任务,如此反复。 能够自定义RejectedExecutionHandler拒绝策略,可是要当心处理好策略生效时须要知足的条件,例如队列和线程池大小等等。
相关文章
相关标签/搜索