ThreadPoolExecutor 核心源码解析

本文只介绍 ThreadPoolExecutor 源码的关键部分,开篇会先介绍 ThreadPoolExecutor 中的一些核心常量定义,而后选取线程池工做周期中的几个关键方法分析其源码实现。其实,看 JDK 源码的最好途径就是看类文件注释,做者把想说的全都写在里面了。java

一些重要的常量

ThreadPoolExecutor 内部做者采用了一个 32bitint 值来表示线程池的运行状态(runState)和当前线程池中的线程数目(workerCount),这个变量取名叫 ctlcontrol 的缩写),其中高 3bit 表示容许状态,低 29bit表示线程数目(最多容许 2^29 - 1 个线程)。程序员

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3; // 29 位
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 线程池最大容量

    // runState is stored in the high-order bits
	// 定义的线程池状态常量
	// 111+29个0,值为 -4 + 2 + 1 = -1(不懂的面壁)
    private static final int RUNNING    = -1 << COUNT_BITS; 
	// 000+29个0
    private static final int SHUTDOWN   =  0 << COUNT_BITS; 
	// 001+29个0
    private static final int STOP       =  1 << COUNT_BITS; 
	// 010+29个0
    private static final int TIDYING    =  2 << COUNT_BITS; 
	// 011+29个0
    private static final int TERMINATED =  3 << COUNT_BITS; 

    // Packing and unpacking ctl
    private static int runStateOf(int c) { return c & ~CAPACITY; } // 获得线程池状态
    private static int workerCountOf(int c) { return c & CAPACITY; } // 获得线程池线程数
    private static int ctlOf(int rs, int wc) { return rs | wc; } // 反向构造 ctl 的值
复制代码

由于表明线程池状态的常量能够经过值的大小来表示前后关系(order),所以后续源码中会有:并发

rs >= SHUTDOWN // 那就表示SHUTDOWN、 STOP or TIDYING or TERMINATED,反正不是 RUNNING
复制代码

理解上述的常量意义有助于后面理解源码。异步

讨论线程池的状态转换

从第一节咱们已经知道了线程池分为五个状态,下面咱们聊聊这五个状态分别限制了线程池能执行怎样的行为:函数

  1. RUNNING:能够接受新任务,且执行 Queue 中的任务
  2. SHUTDOWN:再也不接受新的任务,但能继续执行 Queue 中已有的任务
  3. STOP:再也不接受新的任务,且也再也不执行 Queue 中已有的任务
  4. TIDYING:全部任务完成,workCount=0,线程池状态转为 TIDYING 且会执行 hook method,即 terminated()
  5. TERMINATED:``hook method terminated() 执行完毕以后进入的状态

线程池的关键逻辑

上图总结了 ThreadPoolExecutor 源码中的关键性步骤,正好对应咱们这次解析的核心源码(上图出处见水印)。oop

  1. execute 方法用来向线程池提交 task,这是用户使用线程池的第一步。若是线程池内未达到 corePoolSize 则新建一个线程,将该 task 设置为这个线程的 firstTask,而后加入 workerSet 等待调度,这步须要获取全局锁 mainLock
  2. 已达到 corePoolSize 后,将 task 放入阻塞队列
  3. 若阻塞队列放不下,则新建新的线程来处理,这一步也须要获取全局锁 mainLock
  4. 当前线程池 workerCount 超出 maxPoolSize 后用 rejectHandler 来处理

咱们能够看到,线程池的设计使得在 2 步骤时避免了使用全局锁,只须要塞进队列返回等待异步调度就能够,仅剩下 13 建立线程时须要获取全局锁,这有利于线程池的效率提高,由于一个线程池老是大部分时间在步骤 2 上,不然这线程池也没什么存在的意义。源码分析

源码分析

本文只分析 executeaddWorkerrunWorker,三个核心方法和一个 Worker 类,看懂了这几个,其实其余的代码都能看懂。ui

Worker 类

// 继承自 AQS 实现简单的锁控制
 	private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        // worker 运行所在的线程
        final Thread thread;
        // 赋予该线程的第一个 task,多是 null,若是不是 null 就运行这个,
		// 若是是 null 就经过 getTask 方法去 Queue 里取任务
        Runnable firstTask;
        // 线程完成的任务数量
        volatile long completedTasks;

        Worker(Runnable firstTask) {
		// 限制线程直到 runWorker 方法前都不容许被打断
            setState(-1); 
            this.firstTask = firstTask;
			// 线程工厂建立线程
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker */
        public void run() {
			// 线程内部的 run 方法调用了 runWorker 方法
            runWorker(this);
        }
	}
复制代码

execute 方法

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
		// 若是当前线程数小于 corePoolSize
        if (workerCountOf(c) < corePoolSize) {
		// 调用 addWorker 方法新建线程,若是新建成功返回 true,那么 execute 方法结束
            if (addWorker(command, true))
                return;
			// 这里意味着 addWorker 失败,向下执行,由于 addWorker 可能改变 ctl 的值,
			// 因此这里从新获取下 ctl
            c = ctl.get();
        }
		
		// 到这步要么是 corePoolSize 满了,要么是 addWorker 失败了
		// 前者很好理解,后者为何会失败呢?addWorker 中会讲
		
		// 若是线程池状态为 RUNNING 且 task 插入 Queue 成功
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
			// 若是已不处于 RUNNING 状态,那么删除已经入队的 task,而后执行拒绝策略
			// 这里主要是担忧并发场景下有别的线程改变了线程池状态,因此 double-check 下
            if (! isRunning(recheck) && remove(command))
                reject(command);
			// 这个分支有点难以理解,意为若是当前 workerCount=0 的话就建立一个线程
			// 那为何方法开头的那个 addWorker(command, true) 会返回 false 呢,其实
			// 这里有个场景就是 newCachedThreadPool,corePoolSize=0,maxPoolSize=MAX 的场景,
			// 就会进到这个分支,以 maxPoolSize 为界建立临时线程,firstTask=null
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
		// 这个分支很好理解,workQueue 满了那么要根据 maxPoolSize 建立线程了
		// 若是无法建立说明 maxPoolSize 满了,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
复制代码

addWorker 方法

// core 表示以 corePoolSize 仍是 maxPoolSize 为界
	private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 看看 addWorker 何时返回 false
			// 这里的 if 逻辑有点难懂,用下数学上的分配率,将第一个逻辑表达式放进括号里就好懂了
			// 一、rs >= SHUTDOWN && rs != SHUTDOWN 其实就表示当线程池状态是 STOP、TIDYING, 或 TERMINATED 的时候,固然不能添加 worker 了,任务都不执行了还想加 worker?
			// 二、rs >= SHUTDOWN && firstTask != null 表示当提交一个非空任务,但线程池状态已经不是 RUNNING 的时候,固然也不能 addWorker,由于你最多只能执行完 Queue 中已有的任务
			// 三、rs >= SHUTDOWN && workQueue.isEmpty() 若是 Queue 已经空了,那么不容许新增
			// 须要注意的是,若是 rs=SHUTDOWN && firstTask=null 或者 rs=SHUTDOWN && workQueue 非空的状况下,仍是能够新增 worker 的,须要建立临时线程处理 Queue 里的任务
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
				// 这里也是一个返回 false 的状况,但很简单,就是数目溢出了
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
				// CAS 成功了,就跳出 loop
                if (compareAndIncrementWorkerCount(c))
                    break retry;
				// CAS 失败的话,check 下目前线程池状态,若是发生改变就回到外层 loop 再来一遍,这个也好理解,不然单纯 CAS 失败可是线程池状态不变的话,就只要继续内层 loop 就好了
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
				// 这是全局锁,必须持有才能进行 addWorker 操做
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
					// 启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
复制代码

runWorker 方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
			// 循环直至 task = null,多是因为线程池关闭、等待超时等
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 下面这个 if 逻辑没怎么读懂。。。翻译了下注释
				// 若是线程池中止,确保线程中断;
				// 若是没有,确保线程不中断。这须要在第二种状况下进行从新获取ctl,以便在清除中断时处理shutdownNow竞争
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
					// 前置钩子函数,能够自定义
                    beforeExecute(wt, task); 
                    Throwable thrown = null;
                    try {
						// 运行 run 方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
						// 线程的 run 不容许抛出 Throwable,因此转换为 Error 
                        thrown = x; throw new Error(x);
                    } finally {
					// 后置钩子函数,也能够自定义
                        afterExecute(task, thrown);
                    }
                } finally {
					// 获取下一个任务
                    task = null;
					// 增长完成的任务数目
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
复制代码

总结

看完 ThreadPoolExecutor 的源码,不得不惊叹于代码写得真优雅,可是正由于写的太简洁优雅甚至找不到一句啰嗦的代码,因此让人有点难懂。看源码的建议是先仔细阅读一遍类注释,而后再配合 debug,理清关键性的步骤在作什么,有些 corner case 夹杂在主逻辑里面,若是一开始看不懂能够直接略过,过后再来反思。this

写在最后

这是一个不定时更新的、披着程序员外衣的文青小号。既分享极客技术,也记录人间烟火。spa

相关文章
相关标签/搜索