ThreadPoolExecutor源码阅读

在阿里巴巴的Java开发手册中看到了线程池比较推荐使用ThreadPoolExecutor,因而每次也都是照葫芦画瓢地使用,对于其中的参数(corePoolSize, maximumPoolSize,keepAliveTime , workQueue)等彻底靠着yy去使用。每次用的是时候都感受心慌慌的,总算是找了个时间来真正地去阅读其源码。java

四个主要参数

在使用ThreadPoolExecutor的时候,咱们一般会使用它的以下构造函数,(此处未考虑拒绝策略)安全

ThreadPoolExecutor(int corePoolSize,
                   int maximumPoolSize,
                   long keepAliveTime,
                   TimeUnit unit,
                   BlockingQueue<Runnable> workQueue)
复制代码

在这里主要有四个参数:核心线程池大小、最大线程池大小、存活时间、工做队列。其实看到这四个参数我是很懵的,好比,核心线程池与最大线程池之间的区别、工做队列又是用来作什么的,存活时间指的是谁的存活时间。在讲解源码以前不妨猜猜。并发

流程总览

image-20190520193752856

这个流程粗看没太大问题,可是有一块一方却异常突兀、反常识,就是workQueue和maximum的顺序,在个人想象中应该是先maximum再workQueue。可是事实上的确是先workQueue,再maximum。能够尝试运行下面这段demo,函数

public class ThreadPoolExecutorMain {
    private static final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));

    public static void main(String[] args) {
        for (int i = 1; i <= 20; i++) {
            final int tmp = i;
            pool.execute(() -> {
                try {
                    Thread.sleep(5000);
                    System.out.println(tmp);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
复制代码

在这段demo中,不发生意外的时候,执行顺序为(1,12,13),(2,3,4),(5,6,7),(8,9,10),11,每组内部顺序能够混乱。(注意:在真正使用的时候,咱们须要将ThreadPoolExecutor看成无序的使用)this

源码解析

execute()

首先直接看execute()方法的源码spa

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 1. 判断core是否塞得下
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2. 判断workQueue是否塞得下
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. addWorker中判断max是否塞得下
    else if (!addWorker(command, false))
        reject(command);
}
复制代码

在这里ctl是一个设计很是精巧的状态管理器,它实际上是一个AtomicInteger,它利用int的前三位来存储当前线程池的状态(RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED),后29位用来存储线程数量。线程

在这段代码中,咱们能够看到对线程的执行策略分为了三个部分:1. core部分 2. workQueue部分 3. max部分。其中workQueue部分比较直观,就是直接调用workQueue.offer(command)将线程加入了待执行队列。那么接下来须要关注的是addWorker()方法。设计

addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
    // 判断firstTask可否被执行
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&  // SHUTDOWN状态不会执行新线程,可是能够执行workQueue中的线程
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||  // 最多支持2^29-1个线程
                wc >= (core ? corePoolSize : maximumPoolSize))  // 此处判断max是否塞得下
                return false;
            if (compareAndIncrementWorkerCount(c)) // 利用CAS防止并发问题
                break retry;
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    // 执行新的线程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {    // 尝试添加线程
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) { // SHUTDOWN状态不会执行新线程,可是能够执行workQueue中的线程
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();         // 执行线程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
复制代码

addWorker()这段代码看起来比较复杂,可是若是去除掉一些细节和并发安全相关的代码,总体的代码逻辑就是判断线程是否能够执行,若是能够执行则新建线程执行。在这段代码中,咱们能够看到咱们的线程被封装到了一个叫作Worker的类中,接下来,咱们继续探究Worker的源码。code

Worker

在上面的代码中咱们能够看到Worker的执行是经过worker.thread.start()来执行的,先看一下构造函数。cdn

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
复制代码

这里面Worker又做为了Runnable参数传给了Worker.thread。那接下来看run()方法

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {   // 获取Task
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);  // 空方法,扩展使用
                Throwable thrown = null;
                try {
                    task.run();     // 执行Task
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);   // 空方法,扩展使用
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
复制代码

这段run()方法能够看到ThreadPoolExecutor是经过不停地getTask()来复用线程的,可是到这里,其实我还有一个疑问,就是ThreadPoolExecutor如何保持线程一直处于存活状态的。那这个问题一样经过源码来继续解读。

getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 判断是否超时消亡
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 根据超时设置选择不一样的策略获取Task
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
复制代码

在这段代码中咱们能够看到此处利用workQueue是阻塞队列的特性来保持core线程一直处于存活状态(workQueue.take),max线程超时消亡(workQueue.poll)。固然在这段代码中,咱们发现也能够经过设置ThreadPoolExecutor的allowCoreThreadTimeOut来使得core线程超时消亡。至于workQueue的内部实现(take和poll)此处就不继续深究下去了。

总结

至此,咱们已经知道了ThreadPoolExecutor的总体执行流程以及经常使用参数的意义,一样也清楚了流程总览中的demo代码的执行结果为什么具备顺序性。至于workQueue内部的实现就留到下一次,初步看了一下,感受其内部也有不少很是有意思的东西。

相关文章
相关标签/搜索