Java线程池ThreadPoolExecutor实现原理剖析 #28

原文连接: github.com/aCoder2013/…java

引言

在Java中,使用线程池来异步执行一些耗时任务是很是常见的操做。最初咱们通常都是直接使用new Thread().start的方式,但咱们知道,线程的建立和销毁都会耗费大量的资源,关于线程能够参考以前的一片博客Java线程那点事儿, 所以咱们须要重用线程资源。git

固然也有其余待解决方案,好比说coroutine, 目前Kotlin已经支持了,JDK也已经有了相关的提案:Project Loom, 目前的实现方式和Kotlin有点相似,都是基于ForkJoinPool,固然目前还有不少限制,以及问题没解决,好比synchronized仍是锁住当前线程等。github

继承结构

image
继承结构看起来很清晰,最顶层的Executor只提供了一个最简单的 void execute(Runnable command)方法,而后是ExecutorService,ExecutorService提供了一些管理相关的方法,例如关闭、判断当前线程池的状态等,另外不一样于 Executor#execute,ExecutorService提供了一系列方法,能够将任务包装成一个Future,从而使得任务提交方能够跟踪任务的状态。而父类AbstractExecutorService则提供了一些默认的实现。

构造器

ThreadPoolExecutor的构造器提供了很是多的参数,每个参数都很是的重要,一不当心就容易踩坑,所以设置的时候,你必需要知道本身在干什么。安全

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
复制代码
  1. corePoolSize、 maximumPoolSize。线程池会自动根据corePoolSize和maximumPoolSize去调整当前线程池的大小。当你经过submit或者execute方法提交任务的时候,若是当前线程池的线程数小于corePoolSize,那么线程池就会建立一个新的线程处理任务, 即便其余的core线程是空闲的。若是当前线程数大于corePoolSize而且小于maximumPoolSize,那么只有在队列"满"的时候才会建立新的线程。所以这里会有不少的坑,好比你的core和max线程数设置的不同,但愿请求积压在队列的时候可以实时的扩容,但若是制定了一个无界队列,那么就不会扩容了,由于队列不存在满的概念。多线程

  2. keepAliveTime。若是当前线程池中的线程数超过了corePoolSize,那么若是在keepAliveTime时间内都没有新的任务须要处理,那么超过corePoolSize的这部分线程就会被销毁。默认状况下是不会回收core线程的,能够经过设置allowCoreThreadTimeOut改变这一行为。并发

  3. workQueue。即实际用于存储任务的队列,这个能够说是最核心的一个参数了,直接决定了线程池的行为,好比说传入一个有界队列,那么队列满的时候,线程池就会根据core和max参数的设置状况决定是否须要扩容,若是传入了一个SynchronousQueue,这个队列只有在另外一个线程在同步remove的时候才能够put成功,对应到线程池中,简单来讲就是若是有线程池任务处理完了,调用poll或者take方法获取新的任务的时候,新提交的任务才会put成功,不然若是当前的线程都在忙着处理任务,那么就会put失败,也就会走扩容的逻辑,若是传入了一个DelayedWorkQueue,顾名思义,任务就会根据过时时间来决定何时弹出,即为ScheduledThreadPoolExecutor的机制。框架

  4. threadFactory。建立线程都是经过ThreadFactory来实现的,若是没指定的话,默认会使用Executors.defaultThreadFactory(),通常来讲,咱们会在这里对线程设置名称、异常处理器等。异步

  5. handler。即当任务提交失败的时候,会调用这个处理器,ThreadPoolExecutor内置了多个实现,好比抛异常、直接抛弃等。这里也须要根据业务场景进行设置,好比说当队列积压的时候,针对性的对线程池扩容或者发送告警等策略。函数

看完这几个参数的含义,咱们看一下Executors提供的一些工具方法,只要是为了方便使用,可是我建议最好少用这个类,而是直接用ThreadPoolExecutor的构造函数,多了解一下这几个参数究竟是什么意思,本身的业务场景是什么样的,好比线程池需不须要扩容、用不用回收空闲的线程等。工具

public class Executors {
    
    /* * 提供一个固定大小的线程池,而且线程不会回收,因为传入的是一个无界队列,至关于队列永远不会满 * 也就不会扩容,所以须要特别注意任务积压在队列中致使内存爆掉的问题 */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }


    /* * 这个线程池会一直扩容,因为SynchronousQueue的特性,若是当前全部的线程都在处理任务,那么 * 新的请求过来,就会致使建立一个新的线程处理任务。若是线程一分钟没有新任务处理,就会被回 * 收掉。特别注意,若是每个任务都比较耗时,并发又比较高,那么可能每次任务过来都会建立一个线 * 程 */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
}

复制代码

源码分析

既然是个线程池,那就必然有其生命周期:运行中、关闭、中止等。ThreadPoolExecutor是用一个AtomicInteger去的前三位表示这个状态的,另外又重用了低29位用于表示线程数,能够支持最大大概5亿多,绝逼够用了,若是之后硬件真的发展到可以启动这么多线程,改为AtomicLong就能够了。 状态这里主要分为下面几种:

  1. RUNNING: 表示当前线程池正在运行中,能够接受新任务以及处理队列中的任务
  2. SHUTDOWN: 再也不接受新的任务,但会继续处理队列中的任务
  3. STOP: 再也不接受新的任务,也不处理队列中的任务了,而且会中断正在进行中的任务
  4. TIDYING: 全部任务都已经处理完毕,线程数为0,转为为TIDYING状态以后,会调用terminated()回调
  5. TERMINATED: terminated()已经执行完毕

同时咱们能够看到全部的状态都是用二进制位表示的,而且依次递增,从而方便进行比较,好比想获取当前状态是否至少为SHUTDOWN等,同时状态以前有几种转换:

  1. RUNNING -> SHUTDOWN。调用了shutdown()以后,或者执行了finalize()
  2. (RUNNING 或者 SHUTDOWN) -> STOP。调用了shutdownNow()以后会转换这个状态
  3. SHUTDOWN -> TIDYING。当线程池和队列都为空的时候
  4. STOP -> TIDYING。当线程池为空的时候
  5. IDYING -> TERMINATED。执行完terminated()回调以后会转换为这个状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    //因为前三位表示状态,所以将CAPACITY取反,和进行与操做便可
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    private static int workerCountOf(int c) { return c & CAPACITY; }
    
    //高三位+第三位进行或操做便可
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    
    //下面三个方法,经过CAS修改worker的数目
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    
    //只尝试一次,失败了则返回,是否重试由调用方决定
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    
    //跟上一个不同,会一直重试
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
复制代码

下面是比较核心的字段,这里workers采用的是非线程安全的HashSet, 而不是线程安全的版本,主要是由于这里有些复合的操做,好比说将worker添加到workers后,咱们还须要判断是否须要更新largestPoolSize等,workers只在获取到mainLock的状况下才会进行读写,另外这里的mainLock也用于在中断线程的时候串行执行,不然若是不加锁的话,可能会形成并发去中断线程,引发没必要要的中断风暴。

private final ReentrantLock mainLock = new ReentrantLock();

private final HashSet<Worker> workers = new HashSet<Worker>();

private final Condition termination = mainLock.newCondition();

private int largestPoolSize;

private long completedTaskCount;

复制代码

核心方法

拿到一个线程池以后,咱们就能够开始提交任务,让它去执行了,那么咱们看一下submit方法是如何实现的。

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

复制代码

这两个方法都很简单,首先将提交过来的任务(有两种形式:Callable、Runnable )都包装成统一的 RunnableFuture,而后调用execute方法,execute能够说是线程池最核心的一个方法。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        /* 获取当前worker的数目,若是小于corePoolSize那么就扩容, 这里不会判断是否已经有core线程,而是只要小于corePoolSize就会直接增长worker */
        if (workerCountOf(c) < corePoolSize) {
            /* 调用addWorker(Runnable firstTask, boolean core)方法扩容 firstTask表示为该worker启动以后要执行的第一个任务,core表示要增长的为core线程 */
            if (addWorker(command, true))
                return;
            //若是增长失败了那么从新获取ctl的快照,好比可能线程池在这期间关闭了
            c = ctl.get();
        }
        /* 若是当前线程池正在运行中,而且将任务丢到队列中成功了, 那么就会进行一次double check,看下在这期间线程池是否关闭了, 若是关闭了,好比处于SHUTDOWN状态,如上文所讲的,SHUTDOWN状态的时候, 再也不接受新任务,remove成功后调用拒绝处理器。而若是仍然处于运行中的状态, 那么这里就double check下当前的worker数,若是为0,有可能在上述逻辑的执行 过程当中,有worker销毁了,好比说任务抛出了未捕获异常等,那么就会进行一次扩容, 但不一样于扩容core线程,这里因为任务已经丢到队列中去了,所以就不须要再传递firstTask了, 同时要注意,这里扩容的是非core线程 */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            /* 若是在上一步中,将任务丢到队列中失败了,那么就进行一次扩容, 这里会将任务传递到firstTask参数中,而且扩容的是非core线程, 若是扩容失败了,那么就执行拒绝策略。 */
            reject(command);
    }
复制代码

这里要特别注意下防止队列失败的逻辑,不一样的队列丢任务的逻辑也不同,例如说无界队列,那么就永远不会put失败,也就是说扩容也永远不会执行,若是是有界队列,那么当队列满的时候,会扩容非core线程,若是是SynchronousQueue,这个队列比较特殊,当有另一个线程正在同步获取任务的时候,你才能put成功,所以若是当前线程池中全部的worker都忙着处理任务的时候,那么后续的每次新任务都会致使扩容, 固然若是worker没有任务处理了,阻塞在获取任务这一步的时候,新任务的提交就会直接丢到队列中去,而不会扩容。 上文中屡次提到了扩容,那么咱们下面看一下线程池具体是如何进行扩容的:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            //获取当前线程池的状态
            int rs = runStateOf(c);

            /* 若是状态为大于SHUTDOWN, 好比说STOP,STOP上文说过队列中的任务不处理了,也不接受新任务, 所以能够直接返回false不扩容了,若是状态为SHUTDOWN而且firstTask为null,同时队列非空, 那么就能够扩容 */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                    firstTask == null &&
                    ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                /* 若worker的数目大于CAPACITY则直接返回, 而后根据要扩容的是core线程仍是非core线程,进行判断worker数目 是否超过设置的值,超过则返回 */
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                /* 经过CAS的方式自增worker的数目,成功了则直接跳出循环 */
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //从新读取状态变量,若是状态改变了,好比线程池关闭了,那么就跳到最外层的for循环,
                //注意这里跳出的是retry。
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //建立Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    /* 获取锁,并判断线程池是否已经关闭 */
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 若线程已经启动了,好比说已经调用了start()方法,那么就抛异常,
                            throw new IllegalThreadStateException();
                        //添加到workers中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize) //更新largestPoolSize
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //若Worker建立成功,则启动线程,这么时候worker就会开始执行任务了
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                //添加失败
                addWorkerFailed(w);
        }
        return workerStarted;
    } 

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            //每次减小worker或者从队列中移除任务的时候都须要调用这个方法
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
复制代码

这里有个貌似不太起眼的方法tryTerminate,这个方法会在全部可能致使线程池终结的地方调用,好比说减小worker的数目等,若是知足条件的话,那么将线程池转换为TERMINATED状态。另外这个方法没有用private修饰,由于ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,而ScheduledThreadPoolExecutor也会调用这个方法。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /* 若是当前线程处于运行中、TIDYING、TERMINATED状态则直接返回,运行中的没 什么好说的,后面两种状态能够说线程池已经正在终结了,另外若是处于SHUTDOWN状态, 而且workQueue非空,代表还有任务须要处理,也直接返回 */
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //能够退出,可是线程数非0,那么就中断一个线程,从而使得关闭的信号可以传递下去,
            //中断worker后,worker捕获异常后,会尝试退出,并在这里继续执行tryTerminate()方法,
            //从而使得信号传递下去
            if (workerCountOf(c) != 0) {
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //尝试转换成TIDYING状态,执行完terminated回调以后
                //会转换为TERMINATED状态,这个时候线程池已经完整关闭了,
                //经过signalAll方法,唤醒全部阻塞在awaitTermination上的线程
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

    /** * 中断空闲的线程 * @param onlyOne */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                //遍历全部worker,若以前没有被中断过,
                //而且获取锁成功,那么就尝试中断。
                //锁可以获取成功,那么代表当前worker没有在执行任务,而是在
                //获取任务,所以也就达到了只中断空闲线程的目的。
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
复制代码

image

Worker

下面看一下Worker类,也就是这个类实际负责执行任务,Worker类继承自AbstractQueuedSynchronizer,AQS能够理解为一个同步框架,提供了一些通用的机制,利用模板方法模式,让你可以原子的管理同步状态、blocking和unblocking线程、以及队列,具体的内容以后有时间会再写,仍是比较复杂的。这里Worker对AQS的使用相对比较简单,使用了状态变量state表示是否得到锁,0表示解锁、1表示已得到锁,同时经过exclusiveOwnerThread存储当前持有锁的线程。另外再简单提一下,好比说CountDownLatch, 也是基于AQS框架实现的,countdown方法递减state,await阻塞等待state为0。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
      
        /** Thread this worker is running in. Null if factory fails. */
        final Thread thread;

        /** Initial task to run. Possibly null. */
        Runnable firstTask;

        /** Per-thread task counter */
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker */
        public void run() {
            runWorker(this);
        }
       protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock() { acquire(1); }
        public boolean tryLock() { return tryAcquire(1); }
        public void unlock() { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

复制代码

注意这里Worker初始化的时候,会经过setState(-1)将state设置为-1,并在runWorker()方法中置为0,上文说过Worker是利用state这个变量来表示锁的状态,那么加锁的操做就是经过CAS将state从0改为1,那么初始化的时候改为-1,也就是表示在Worker启动以前,都不容许加锁操做,咱们再看interruptIfStarted()以及interruptIdleWorkers()方法,这两个方法在尝试中断Worker以前,都会先加锁或者判断state是否大于0,所以这里的将state设置为-1,就是为了禁止中断操做,并在runWorker中置为0,也就是说只能在Worker启动以后才可以中断Worker。 另外线程启动以后,其实就是调用了runWorker方法,下面咱们看一下具体是如何实现的。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 调用unlock()方法,将state置为0,表示其余操做能够得到锁或者中断worker
        boolean completedAbruptly = true;
        try {
            /* 首先尝试执行firstTask,若没有的话,则调用getTask()从队列中获取任务 */
            while (task != null || (task = getTask()) != null) {
                w.lock();
                /* 若是线程池正在关闭,那么中断线程。 */
                if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                        runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //执行beforeExecute回调
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //实际开始执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //执行afterExecute回调
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //这里加了锁,所以没有线程安全的问题,volatile修饰保证其余线程的可见性
                    w.completedTasks++;
                    w.unlock();//解锁
                }
            }
            completedAbruptly = false;
        } finally {
            //抛异常了,或者当前队列中已没有任务须要处理等
            processWorkerExit(w, completedAbruptly);
        }
    }

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //若是是异常终止的,那么减小worker的数目
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //将当前worker中workers中删除掉,并累加当前worker已执行的任务到completedTaskCount中
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        //上文说过,减小worker的操做都须要调用这个方法
        tryTerminate();

        /* 若是当前线程池仍然是运行中的状态,那么就看一下是否须要新增另一个worker替换此worker */
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            /* 若是是异常结束的则直接扩容,不然的话则为正常退出,好比当前队列中已经没有任务须要处理, 若是容许core线程超时的话,那么看一下当前队列是否为空,空的话则不用扩容。不然话看一下 是否少于corePoolSize个worker在运行。 */
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

     private Runnable getTask() {
        boolean timedOut = false; // 上一次poll()是否超时了

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 若线程池关闭了(状态大于STOP)
            // 或者线程池处于SHUTDOWN状态,可是队列为空,那么返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            /* 若是容许core线程超时 或者 不容许core线程超时但当前worker的数目大于core线程数, 那么下面的poll()则超时调用 */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            /* 获取任务超时了而且(当前线程池中还有不止一个worker 或者 队列中已经没有任务了),那么就尝试 减小worker的数目,若失败了则重试 */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //从队列中抓取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //走到这里代表,poll调用超时了
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

复制代码

关闭线程池

关闭线程池通常有两种形式,shutdown()和shutdownNow()

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //经过CAS将状态更改成SHUTDOWN,这个时候线程池不接受新任务,但会继续处理队列中的任务
            advanceRunState(SHUTDOWN);
            //中断全部空闲的worker,也就是说除了正在处理任务的worker,其余阻塞在getTask()上的worker
            //都会被中断
            interruptIdleWorkers();
            //执行回调
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        //这个方法不会等待全部的任务处理完成才返回
    }
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            /* 不一样于shutdown(),会转换为STOP状态,再也不处理新任务,队列中的任务也不处理, 并且会中断全部的worker,而不仅是空闲的worker */
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();//将全部的任务从队列中弹出
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        /* 将队列中全部的任务remove掉,并添加到taskList中, 可是有些队列比较特殊,好比说DelayQueue,若是第一个任务还没到过时时间,则不会弹出, 所以这里经过调用toArray方法,而后再一个一个的remove掉 */
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }
复制代码

从上文中能够看到,调用了shutdown()方法后,不会等待全部的任务处理完毕才返回,所以须要调用awaitTermination()来实现

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                //线程池若已经终结了,那么就返回
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                //若超时了,也返回掉
                if (nanos <= 0)
                    return false;
                //阻塞在信号量上,等待线程池终结,可是要注意这个方法可能会由于一些未知缘由随时唤醒当前线程,
                //所以须要重试,在tryTerminate()方法中,执行完terminated()回调后,代表线程池已经终结了,
                //而后会经过termination.signalAll()唤醒当前线程
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }
复制代码

一些统计相关的方法

public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //若线程已终结则直接返回0,不然计算works中的数目
           //想一下为何不用workerCount呢?
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
        } finally {
            mainLock.unlock();
        }
    }

   public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                if (w.isLocked())//上锁的代表worker当前正在处理任务,也就是活跃的worker
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }


    public int getLargestPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return largestPoolSize;
        } finally {
            mainLock.unlock();
        }
    }

    //获取任务的总数,这个方法慎用,如果个无解队列,或者队列挤压比较严重,会很蛋疼
    public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;//好比有些worker被销毁后,其处理完成的任务就会叠加到这里
            for (Worker w : workers) {
                n += w.completedTasks;//叠加历史处理完成的任务
                if (w.isLocked())//上锁代表正在处理任务,也算一个
                    ++n;
            }
            return n + workQueue.size();//获取队列中的数目
        } finally {
            mainLock.unlock();
        }
    }


    public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
        } finally {
            mainLock.unlock();
        }
    }
复制代码

总结

这篇博客基本上覆盖了线程池的方方面面,但仍然有很是多的细节能够深究,好比说异常的处理,能够参照以前的一篇博客:深度解析Java线程池的异常处理机制 ,另外还有AQS、unsafe等能够以后再单独总结。

Flag Counter

相关文章
相关标签/搜索