ThreadPoolExecutor源码共读

一块儿共读,共同阅步。java

零:序言

源码阅读本就是要贯注全神、戒骄戒躁的沉浸式过程。我本着浮躁至极的心态,单刀直入,从入口方法先杀入“敌军”内部,让你们短期内享受到最大的学习成就感,而后再横向铺开,带你们一窥源码的究竟。有不对之处,轻喷、指出。安全

java1.5引入的线程池的标准类,ThreadPoolExecutor。bash

咱们一般是经过Executors这个工厂类来建立具体的实例,如:多线程

Executors.newCachedThreadPool(...)
Executors.newScheduledThreadPool(...)
复制代码

前者建立的就是咱们要讲的ThreadPoolExecutor实例。后者是有延迟功能的线程池,ScheduledThreadPoolExecutor,有机会再讲吧。ThreadPoolExecutor这个线程池实例,内部维护了一个线程的集合,用来存放线程;有一个存放待执行任务的队列,在池内线程数达到最大值时,任务就暂时入队,等待线程取走运行。因此,目前来看,ThreadPoolExecutor的结构以下: 函数

ThreadPoolExecutor结构图.png

零点一:ThreadPoolExecutor源码阅读思惟导图

我会先列一下该源码涉及到的重要的逻辑方法,而后按使用时一般的调用顺序,挨个讲解,最后合并总结。oop

  • execute:执行任务方法,内部封装了新建线程、任务入队等重要逻辑
  • addWorker:新建线程方法
  • getTask:从任务队列内获取一个任务
  • runWorker:池内线程的主循环逻辑。提醒一下,多线程都会调用这同一个方法,因此尤为注意同步问题。

零点五:ThreadPoolExecutor内的关键变量解释

  • workQueue[BlockingQueue],任务队列,是一个BlockingQueue对象,线程安全
  • ctl[Integer],记录了线程池的运行状态值跟池内的线程数
  • workers[HashSet<Worker>],具体存放线程的set对象
  • corePoolSize[volatile int],线程池核心线程数配置,低于这个数值时,新进来的任务一概以新启动线程处理

一:线程池状态ctl基础知识准备

ThreadPoolExecutor实例表明一个线程池,相应地,这个池子就有一些运行状态,以及池子内线程数量的配置。这二者的实现以下:学习

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码

线程池状态跟池内线程数的统计都记录在ctl这个AtomicInteger(它是谁,先本身查吧。后面估计会专门写下,这里只要记住它的加减是线程安全的就好)中。具体实现是: java内一个整型量是32位,这里AtomicInteger也是。源码做者将ctl的32位中高3位用来记录线程池状态,低29位用来记录线程数量。 验证来看,COUNT_BITS的值是29,方法 runStateOf(int c) {return c & ~CAPACITY;} 这里的c就是ctl变量,而CAPACITY就是一个mask面纱,用来从 ctl中提取上面两个变量的,它是这样的: ui

CAPACITY的位图.png

因此,runState就是ctl取反后CAPACITY相与,也就是只有高4位有效,正好对应线程池状态的记录位。 因此,各类状态下,ctl的值以下: RUNNING:1001 x(28个),-1,最高位是符号位,这个<<位移操做是忽略符号位的位移 SHUTDOWN:0000 x(28), 0 STOP: 0001 x(28), 1 TIDYING: 0010 x(28), 2 TERMINATED: 0011 x(28), 3this

二:入口函数execute(Runnable command)

用过线程池的人应该都用过这个入口函数,它就是用来将一个Runnable任务体放入线程池,下面让咱们来具体看看它的逻辑(代码块无法高亮了,你们看下代码段中注释的翻译部分):atom

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 主要的逻辑注释,在这
         * 1. If fewer than corePoolSize threads are running, try to
         * 若是当前池内线程数小于corePoolSize,那就尝试去新建一
         * start a new thread with the given command as its first
         * 条线程,传进来的command参数做为该线程的第一个任务 
         * task.  The call to addWorker atomically checks runState and
         * 体。调用addWorker函数会自动检查线程池的状态和池内活跃的线程数
         * workerCount, and so prevents false alarms that would add
         * 若是在不应或不能新建线程时新建了,那不会抛出异常,会返回false
         * threads when it shouldn't, by returning false. * * 即便任务体成功入队,咱们仍须要再去检查一遍,咱们是否应该是新建线程而不是入队, * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (由于可能自第一次检查后,原池内活跃的线程 * (because existing ones died since last checking) or that * 又死掉啦)又或者线程池的状态发生了变化 * the pool shut down since entry into this method. So we * 因此咱们去二次检查池内状态,若线程池状态变为中止,就去回滚,或者线程池此时一个活跃线程都没有的话, * recheck state and if necessary roll back the enqueuing if * 新建一个线程去执行任务体。(PS:由于ThreadPoolExecutor * stopped, or start a new thread if there are none. * 主要是经过addWorker来建立线程,因此,若是池内一个活跃线程都没有, * 这时咱们任务体入队了,也没有线程去跑...固然为何只检查一遍?我是想,可 * 能就只是做者单纯地在这里想检查一遍,稍微确保下。由于即便这个二次检查 * 没问题,后续的,到池内线程确切地去跑这个任务体以前的代码,每一行 * 代码,都仍有发生这种状况的可能。这,就是多线程...) * 3. If we cannot queue task, then we try to add a new * 若是咱们任务体入队失败,那我尝试新建线程,若是还失败 * 那就说明线程池已经被shutdown了,或者整个池子已经满了,那咱们 * 就去拒绝这个任务体。这个拒绝,就会用到所谓的RejectPolicy对象 * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 获取ctl对象 int c = ctl.get(); // 若是池内活跃线程数小于corePoolSize if (workerCountOf(c) < corePoolSize) { // 新建线程,第二个参数true能够先忽略 if (addWorker(command, true)) return; // 新建线程失败,那咱们获取最新的线程池状态变量ctl c = ctl.get(); } // 若是当前线程池仍在运行并且任务体入队成功。 // (workQueue就是ThreadPoolExecutor具体的任务队列。 // 而这里就是咱们上面注释提到的那段二次检查的逻辑) if (isRunning(c) && workQueue.offer(command)) { // 二次检查。获取最新的线程池状态字段 int recheck = ctl.get(); // 若是线程不在运行状态 而且也成功把入队的任务体删除了 // 那就菜哦用拒绝策略来拒绝 if (! isRunning(recheck) && remove(command)) reject(command); // 或者,在线程池内活跃的线程数为0时,新建一个线程 // 这里传参跟上面不同,先忽略。记录这个新启动一个线程就够了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是上面的If失败了,就尝试新启动线程,启动失败了,那说明 // 上面的失败,是isRunning形成的,因此拒绝任务体。启动成功了,那就是成功了。 else if (!addWorker(command, false)) reject(command); } 复制代码

这里涉及到ThreadPoolExecutor线程池增长线程的一个判断逻辑: 每当ThreadPoolExecutor.execute执行一个任务时,先判断corePoolSize,当池内线程数小于这个时,直接新增线程,若大于这个,则向workQueue任务队列入队,队列满了时,则以maximumPoolSize为界开始继续新建线程,当超过maximumPoolSize时,就采用最后的RejectPolicy进行拒绝处理。

三: 函数addWorker(Runnable firstTask, boolean core)

这个函数主要逻辑是新启动一个线程,firstTask是新启动线程的第一个任务,能够为null,为null时,就是单纯地启动一个线程,记得咱们以前在execute(Runnable command)方法中,在线程池内没有有效线程时,调用firstTasknull的方法来启动一条线程。 第二个参数core是用来辨别,启动一个新线程时,是以corePoolSize这个线程数配置量来做为限制,仍是以maximumPoolSize这个线程数配置量做为限制。 看下源码(逻辑主要放注释里了):

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            // 获取到线程池的运行状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 检查任务队列在它必要的时候才能为空。这里
            // 只能直接翻译下,具体的,if里的判断逻辑也能推断出来
            // 可是目前我也不能肯定说出在这种状况下要false退出的
            // 缘由。若是想搞清它,可能只能完全把线程池的运行状态、
            // 线程池内的线程数、任务队列内的任务数三者全部可能的状况的
            // 前提下才能肯定。这里待大神指出来了。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 第二个参数的做用就在这里产生了!
                // 这里在确保池内线程数不超过ctl极限CAPACITY
                // 以及不超过相应的xxxPoolSize的状况下,经过
                // CAI操做去给线程数加1,成功了,则跳出retry标记后
                // 的循环。至于CAI是什么?先记住它是线程安全的给数值+1的操做就好
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 到此位置,线程池内的线程数标记字段已经加1了
        // 接下来的,就是具体添加一个线程的操做了
        // 
        // 这里就不可避免的涉及到了ThreadPoolSize中的
        // Worker这个内部类了,这个类就是具体的ThreadPoolSize
        // 内部用来表明一个线程的封装对象,他封装了一个线程实例
        // ,用来跑具体的任务;封装了一个Runnable 实例,表明具体的任务;
        // 同时,它继承、实现了AQS(AbstractQueuedSynchronizer)跟Runnable,因此,这个
        // Worker实例能够理解成一个小劳工,有本身的运行线程,有
        // 本身的具体的执行任务体,同时,本身也有同步机制AQS。
        // 这里涉及到AQS,你们伙就暂且理解成AQS赋予Worker同步的性质便可(调用AQS的方法就能实现)
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 初始化一个Worker劳工,同时指定给他的任务。这个任务
            // 能够为null,空。表示什么也不作。同时,初始化的时候,也会
           // 初始化Worker体内的线程对象,这条线程的对象的启动,是
           // 在worker对象的Runnable.run实现方法里
            w = new Worker(firstTask);
            final Thread t = w.thread;
                // 这个mainLock是ThreadPoolExecutor用来同步对
                // workers线程队列的CRUD操做的
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // 当线程池处于RUNNING状态,则能够继续操做;
                    // 或者当线程池处于SHUTDOWN,可是firstTask 为null
                    // 也就是说,这里是为了增长一个线程的,因此,也能够放行
                    // 由于SHUTDOWN状态,是容许启动线程将任务队列内的任务跑完的
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将新生成的线程劳工加入到线程集合中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 若是线程劳工添加成功,则启动它
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 若是线程启动失败,则运行回滚操做
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
复制代码

四: Worker对象的线程start()

这里讲述的方法是接上面addWorker时,成功调用的t.start(),这里启动了Worker封装的线程。这个线程是Worker构造函数里生成的,以下:

/**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            // 这里设置了AQS的state,用来抑制interrupt直到
            // runWorker方法
            setState(-1); // inhibit interrupts until runWorker
            // 这里传递了线程的任务体,能够为null
            this.firstTask = firstTask;
            // 初始化线程时,给线程指定了worker实例自身这个Runnable,所以,线程在start后,
            // 就是在运行worker当前实例自身的run方法
            this.thread = getThreadFactory().newThread(this);
        }
复制代码

看完上面代码的注释,接着看worker实例自身的run方法

/** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
复制代码

能够看到这里调用了runWorker方法,传参是worker自身。runWorker是同一个ThreadPoolExecutor实例的方法,因此,线程池实例下的全部Worker线程都是在跑这同一个runWorker方法。

五: 线程池的循环体方法runWorker(Worker worker)

/**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them...
     */
final void runWorker(Worker w) {
        // 这里获取到线程对象,其实就是参数Worker对象内封装的Thread对象。
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // allow interrupts,容许Interrupt打断,记得Worker对象的构造函数嘛?
        // 构造函数一开始就调用了setState(-1)去抑制interrupt。这里就是去释放它。
        // 固然,这里具体的抑制interrupt的含义,要结合AQS来了解了,我后面再加吧。
        w.unlock();
        boolean completedAbruptly = true;
        try {
            // 若是Worker中的firstTask对象不是空的,则
            // 直接跑它;若否则,调用getTask从队列中获取一条任务来执行。这里
            // 会一直while循环,因此worker们在任务队列中有任务时
            // 会一直在这个runWorker中循环while取任务执行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 这里有一个很巧妙的判断逻辑,这段代码也主要是实现了
                // 上面英文注释中的逻辑,可是可能比较难理解,稍等
                // 在后面我会专门讲一下
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 任务体的前置函数,如今默认是空函数
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 任务体的执行函数
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; 
                        throw new Error(x);
                    } finally {
                        // 后置函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // 这个参数记录worker运行是不是被打断的,若是不是,代码
            // 会安全地走到这里,而后置字段为false。
            // 不然,异常状况下就直接跳到finally中了,值仍为初始化时的true
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
复制代码

这段源码上方我放了一段注释,翻译过来就是: worker对象的主要Loop循环体。从队列(workQueue)中获取任务体,而后执行。

六: 从任务队列获取任务的getTask函数

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        // 这个for(;;)也是个无限循环的实现,它比while(true)的好处是
        // 在字节码层面上,它比while(true)少两行字节码代码,因此
        // 效率更高
        for (;;) {
            // 获取线程池最新状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // allowCoreThreadTimeOut表示coreThread,实际上是判断
            // 线程数在这个coreThreadPoolSize范围内时,线程是否能够超时。
            // 这里的判断逻辑也很巧妙,若是allowxxxTimeOut为true,coreThread
            // 能够超时,则 || 后面判断coreThread的逻辑也就无所谓了,是吧。
            // 但若是allowxxxxTimeOut为false,coreThread不容许超时,
            // 则须要去判断在判断的线程是否实在coreThread范围内,是的话,
            // 则最终结果也为false,符合coreThread不能超时的逻辑;若是大于,
            // 则说明当前方法的线程不是在coreThread,
            // 注意去理解这个是否是coreThread这个概念
            // 因此,timed为true,也就是能够超时,符合逻辑
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 这里我把代码格式化了下,方便你们去看
            // 这里的判断就是,在符合了一些逻辑后,就去直接
            // wokerCount减一,表明当前这个woker就直接干掉了,
            // 而在方法内返回null这个逻辑,在调用getTask的代码处
            // 确实也是去干掉当前的worker实例。可是,woker不能
            // 瞎干掉,必需要确保线程池能正常产生做用,这个正常做用
            // 的实现,要么就是干掉当前的worker还剩下至少一个,
            // 要么就是任务队列空了,这个逻辑就在(wc > 1 || workQueue.isEmpty)
            // 实现了。再来看 && 以前,在当前线程数大于
            // maximumPoolSize限制时,或者当前woker能够超时,
            // 即timed为true,同时,上一次获取任务体时也超时了(timedOut)
            // 则,当前的worker就干掉。这段逻辑有一个timedOut
            // 判断,即上一次当前worker获取任务体时就超时了。
            // 我猜想,加这个逻辑,可能就是纯粹的统计学上的效率
            // 提升。固然,欢迎更多想法。
            //
            // 在符合上述条件后,CAS操做来减小workerCount数
            // 再返回null,去干掉当前worker实例。
            if (
                (wc > maximumPoolSize || (timed && timedOut))
                &&
                 (wc > 1 || workQueue.isEmpty())
                ) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 根据当前的worker是否能够超时,调用BlockingQueue
                // 的不一样方法来获取任务体。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 获取到任务体,则返回
                if (r != null)
                    return r;
                // 超时了,记录标记位
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
复制代码

七: 截止目前,全部的任务入队、线程循环、取任务执行任务的逻辑就已经都看完了

这里咱们总结下:

ThreadPoolExecutor的实际逻辑图

ThreadPoolExecutor结构图.png

workers线程集合中的Worker对象,在runWorker中循环自workQueue中获取Runnable任务体进行执行。对workers线程集合的访问要通过mainLock这个锁。

shutdown等线程池结束等方法,后续会讲。代码讲解中涉及一些跟线程同步相关的细碎的小逻辑,建议在理解了主要逻辑后,去重点理解,这些点的思想是颇有价值的思想。

相关文章
相关标签/搜索