JAVA线程池原理与源码分析


一、线程池经常使用接口介绍

1.一、Executor

public interface Executor {
void execute(Runnable command);
}

执行提交的Runnable任务。其中的execute方法在未来的某个时候执行给定的任务,该任务能够在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定。java

1.二、ExecutorService

ExecutorService继承自Executor,下面挑几个方法介绍:并发

1.2.一、shutdown()
void shutdown();

启动有序关闭线程池,在此过程当中执行先前提交的任务,但不接受任何新任务。若是线程池已经关闭,调用此方法不会产生额外的效果。此方法不等待之前提交的任务完成执行,可使用awaitTermination去实现。异步

1.2.二、shutdownNow()
List<Runnable> shutdownNow();

尝试中止全部正在积极执行的任务, 中止处理等待的任务,并返回等待执行的任务列表。 此方法不等待之前提交的任务完成执行,可使用awaitTermination去实现。除了尽最大努力中止处理积极执行的任务外,没有任何保证。例如,典型的实现是:经过Thread#interrupt取消任务执行,可是任何未能响应中断的任务均可能永远不会终止。函数

1.2.三、isShutdown()
boolean isShutdown();

返回线程池关闭状态。oop

1.2.四、isTerminated()
boolean isTerminated();

若是关闭后全部任务都已完成,则返回 true。注意,除非首先调用了shutdown或shutdownNow,不然isTerminated永远不会返回true。测试

1.2.五、awaitTermination(long timeout, TimeUnit unit)
boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

线程阻塞阻塞,直到全部任务都在shutdown请求以后执行完毕,或者超时发生,或者当前线程被中断(以先发生的状况为准)。this

1.2.六、submit
<T> Future<T> submit(Callable<T> task);

提交一个value-returning任务以执行,并返回一个表示该任务未决结果的Future。 Future的 get方法将在成功完成任务后返回任务的结果。spa

1.三、ScheduledExecutorService

安排命令在给定的延迟以后运行,或者按期执行,继承自ExecutorService接口由如下四个方法组成:线程

//在给定延迟以后启动任务,返回ScheduledFuture
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//建立并执行一个周期性操做,该操做在给定的初始延迟以后首次启动,而后在给定的周期内执行;
//若是任务的任何执行遇到异常,则禁止后续执行。不然,任务只会经过执行器的取消或终止而终止。
//若是此任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会并发执行。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
//建立并执行一个周期性操做,该操做在给定的初始延迟以后首次启动,而后在一次执行的终止和下一次执行的开始之间使用给定的延迟。
//若是任务的任何执行遇到异常,则禁止后续执行。不然,任务只会经过执行器的取消或终止而终止。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

1.四、ThreadFactory

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

按需建立新线程的对象。设计

1.五、Callable

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

返回任务结果也可能抛出异常。

1.六、Future

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;

Future表示异步计算的结果。方法用于检查计算是否完成,等待计算完成并检索计算结果。只有当计算完成时,才可使用方法get检索结果,若是须要,能够阻塞,直到准备好为止。取消由cancel方法执行。还提供了其余方法来肯定任务是否正常完成或被取消。一旦计算完成,就不能取消计算。

1.七、Delayed

public interface Delayed extends Comparable<Delayed> {
    //在给定的时间单位中返回与此对象关联的剩余延迟
    long getDelay(TimeUnit unit);
}

一种混合风格的接口,用于标记在给定延迟以后应该执行的对象。

1.八、ScheduledFuture

public interface ScheduledFuture<V> extends Delayed, Future<V> {}

二、线程池工做流程

线程池的主要工做流程.jpg

新任务进来时:

  1. 若是当前运行的线程少于corePoolSize,则建立新线程(核心线程)来执行任务。
  2. 若是运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue。
  3. 若是BlockingQueue队列已满,则建立新的线程(非核心)来处理任务。
  4. 若是核心线程与非核心线程总数超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler拒绝策略。

三、ThreadPoolExecutor介绍

构造方法:

public ThreadPoolExecutor(
int corePoolSize,int maximumPoolSize,
long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

参数说明:

  • corePoolSize

除非设置了 allowCoreThreadTimeOut,不然要保留在线程池中的线程数(即便它们是空闲的)。

  • maximumPoolSize

线程池中容许的最大线程数。

  • keepAliveTime

当线程数大于corePoolSize时,这是多余的空闲线程在终止新任务以前等待新任务的最长时间。

  • unit

keepAliveTime参数的时间单位。

  • workQueue

用于在任务执行前保存任务的队列。这个队列只包含execute方法提交的Runnable任务。

  • threadFactory

执行程序建立新线程时使用的工厂。

  • handler

因为达到线程边界和队列容量而阻塞执行时使用的处理程序。

3.一、BlockingQueue

  • SynchronousQueue
    不存储元素的阻塞队列,一个插入操做,必须等待移除操做结束,每一个任务一个线程。使用的时候maximumPoolSize通常指定成Integer.MAX_VALUE。
  • LinkedBlockingQueue
    若是当前线程数大于等于核心线程数,则进入队列等待。因为这个队列没有最大值限制,即全部超过核心线程数的任务都将被添加到队列中。
  • ArrayBlockingQueue
    能够限定队列的长度,接收到任务的时候,若是没有达到corePoolSize的值,则新建线程(核心线程)执行任务,若是达到了,则入队等候,若是队列已满,则新建线程(非核心线程)执行任务,又若是总线程数到了maximumPoolSize,而且队列也满了,则执行拒绝策略。
  • DelayQueue
    队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。
  • priorityBlockingQuene
    具备优先级的无界阻塞队列。

3.二、RejectedExecutionHandler

有4个ThreeadPoolExecutor内部类。

  • AbortPolicy

直接抛出异常,默认策略。

  • CallerRunsPolicy

用调用者所在的线程来执行任务。

  • DiscardOldestPolicy

丢弃阻塞队列中靠最前的任务,并执行当前任务。
四、DiscardPolicy
直接丢弃任务。

最好自定义饱和策略,实现RejectedExecutionHandler接口,如:记录日志或持久化存储不能处理的任务。

3.三、线程池大小设置

  • CPU密集型

尽可能使用较小的线程池,减小CUP上下文切换,通常设置为CPU核心数+1。

  • IO密集型

能够适当加大线程池数量,IO多,因此在等待IO的时候,充分利用CPU,通常设置为CPU核心数2倍。
可是对于一些特别耗时的IO操做,盲目的用线程池可能也不是很好,经过异步+单线程轮询,上层再配合上一个固定的线程池,效果可能更好,参考Reactor模型。

  • 混合型

视具体状况而定。

3.四、任务提交

  • Callable

经过submit函数提交,返回Future对象。

  • Runnable

经过execute提交,没有返回结果。

3.五、关闭线程池

  • shutdown()

仅中止阻塞队列中等待的线程,那些正在执行的线程就会让他们执行结束。

  • shutdownNow()

不只会中止阻塞队列中的线程,并且会中止正在执行的线程。

四、线程池实现原理

4.一、 线程池状态

线程池的内部状态由AtomicInteger修饰的ctl表示,其高3位表示线程池的运行状态,低29位表示线程池中的线程数量。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

主池控制状态ctl是一个原子整数,包含两个概念字段:

  • workerCount:指示有效线程数。
  • runState:指示是否运行、关闭等。

为了将这两个字段打包成一个整型,因此将workerCount限制为(2^29)-1个线程,而不是(2^31)-1个线程。

workerCount是工做线程数量。该值可能与实际活动线程的数量存在暂时性差别,例如,当ThreadFactory在被请求时没法建立线程,以及退出的线程在终止前仍在执行bookkeeping时。 用户可见的池大小报告为工做线程集的当前大小。

runState提供了生命周期,具备如下值:

  • RUNNING:接受新任务并处理排队的任务
  • SHUTDOWN:不接受新任务,而是处理队列的任务。
  • STOP:不接受新任务,不处理队列的任务,中断正在进行的任务。
  • TIDYING:全部任务都已终止,workerCount为零,过渡到状态TIDYING的线程将运行terminated()钩子方法。
  • TERMINATED:terminated()方法执行完毕。

为了容许有序比较,这些值之间的数值顺序很重要。运行状态会随着时间单调地增长,但不须要达到每一个状态。转换:
线程池内部状态转换图.png

  • RUNNING -> SHUTDOWN

在调用shutdown()时,能够隐式地在finalize()中调用。

  • (RUNNING or SHUTDOWN) -> STOP

调用shutdownNow()。

  • SHUTDOWN -> TIDYING

当队列和池都为空时。

  • STOP -> TIDYING

当池是空的时候。

  • TIDYING -> TERMINATED

当terminated()钩子方法完成时。

当状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。

下面看如下其余状态信息:

//Integer.SIZE为32,COUNT_BITS为29
private static final int COUNT_BITS = Integer.SIZE - 3;
//2^29-1 最大线程数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
* 111 0 0000 0000 0000 0000 0000 0000 0000
* -1 原码:0000 ... 0001 反码:1111 ... 1110 补码:1111 ... 1111
* 左移操做:后面补 0
* 111 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
* 000 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,并且会中断正在* 运行的任务;
* 001 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 即高3位为010,全部任务都已终止,workerCount为零,过渡到状态TIDYING的线程将运行terminated()钩子方法;
* 010 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 即高3位为011,terminated()方法执行完毕;
* 011 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TERMINATED = 3 << COUNT_BITS;
//根据ctl计算runState
private static int runStateOf(int c) {
//2^29   =  001 0 0000 0000 0000 0000 0000 0000 0000
//2^29-1 =  000 1 1111 1111 1111 1111 1111 1111 1111
//~(2^29-1)=111 0 0000 0000 0000 0000 0000 0000 0000
//假设c为 STOP 001 0 0000 0000 0000 0000 0000 0000 0000
// 最终值:    001 0 0000 0000 0000 0000 0000 0000 0000
    return c & ~CAPACITY;
}
//根据ctl计算 workerCount
private static int workerCountOf(int c) {
//2^29-1 =  000 1 1111 1111 1111 1111 1111 1111 1111
//假设c =   000 0 0000 0000 0000 0000 0000 0000 0001  1个线程
//最终值:  000 0 0000 0000 0000 0000 0000 0000 0001  1
    return c & CAPACITY;
}
// 根据runState和workerCount计算ctl
private static int ctlOf(int rs, int wc) {
//假设 rs: STOP  001 0 0000 0000 0000 0000 0000 0000 0000
//假设 wc:       000 0 0000 0000 0000 0000 0000 0000 0001  1个线程
//最终值:       001 0 0000 0000 0000 0000 0000 0000 0001
    return rs | wc;
}
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
//RUNNING状态为负数,确定小于SHUTDOWN,返回线程池是否为运行状态
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
//试图增长ctl的workerCount字段值。
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
//尝试减小ctl的workerCount字段值。
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
//递减ctl的workerCount字段。这只在线程忽然终止时调用(请参阅processWorkerExit)。在getTask中执行其余递减。
private void decrementWorkerCount() {
    do {
    } while (!compareAndDecrementWorkerCount(ctl.get()));
}

Doug Lea大神的设计啊,感受计算机的基础真的是数学。

4.二、 内部类Worker

Worker继承了AbstractQueuedSynchronizer,而且实现了Runnable接口。
维护了如下三个变量,其中completedTasks由volatile修饰。

//线程这个工做程序正在运行。若是工厂失败,则为空。
final Thread thread;
//要运行的初始任务。多是null。
Runnable firstTask;
//线程任务计数器
volatile long completedTasks;

构造方法:

//使用ThreadFactory中给定的第一个任务和线程建立。
Worker(Runnable firstTask) {
    //禁止中断,直到运行工做程序
    setState(-1); 
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

既然实现了Runnable接口,必然实现run方法:

//Delegates main run loop to outer runWorker
public void run() {
    //核心
    runWorker(this);
}

4.三、runWorker(Worker w)执行任务

先看一眼执行流程图,再看源码,会更清晰一点:
runWorker.png

首先来看runWorker(Worker w)源码:

final void runWorker(Worker w) {
    //获取当前线程
    Thread wt = Thread.currentThread();
    //获取第一个任务
    Runnable task = w.firstTask;
    //第一个任务位置置空
    w.firstTask = null;
    //由于Worker实现了AQS,此处是释放锁,new Worker()是state==-1,此处是调用Worker类的 release(1)方法,将state置为0。Worker中interruptIfStarted()中只有state>=0才容许调用中断
    w.unlock();
    //是否忽然完成,若是是因为异常致使的进入finally,那么completedAbruptly==true就是忽然完成的
    boolean completedAbruptly = true;
    try {
        //先处理firstTask,以后依次处理其余任务
        while (task != null || (task = getTask()) != null) {
            //获取锁
            w.lock();
            //若是池中止,确保线程被中断;若是没有,请确保线程没有中断。这须要在第二种状况下从新检查,以处理清除中断时的shutdownNow竞争
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
                //自定义实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    //自定义实现
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //任务完成数+1
                w.completedTasks++;
                //释放锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //Worker的结束后的处理工做
        processWorkerExit(w, completedAbruptly);
    }
}

下面再来看上述源码中的getTask()与processWorkerExit(w, completedAbruptly)方法:

4.3.一、getTask()

根据当前配置设置执行阻塞或定时等待任务,或者若是该worker由于任何缘由必须退出,则返回null,在这种状况下workerCount将递减。

返回空的状况:

  1. 大于 maximumPoolSize 个 workers(因为调用setMaximumPoolSize)
  2. 线程池关闭
  3. 线程池关闭了而且队列为空
  4. 这个worker超时等待任务,超时的worker在超时等待以前和以后均可能终止(即allowCoreThreadTimeOut || workerCount > corePoolSize),若是队列不是空的,那么这个worker不是池中的最后一个线程。
private Runnable getTask() {
    // Did the last poll() time out?
    boolean timedOut = false;
    for (; ; ) {
        //获取线程池状态
        int c = ctl.get();
        int rs = runStateOf(c);
        //仅在必要时检查队列是否为空。
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //递减ctl的workerCount字段
            decrementWorkerCount();
            return null;
        }
        //获取workerCount数量
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //线程超时控制
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            //尝试减小ctl的workerCount字段
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            //若是有超时控制,则使用带超时时间的poll,不然使用take,没有任务的时候一直阻塞,这两个方法都会抛出InterruptedException
            Runnable r = timed ?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) :workQueue.take();
            //有任务就返回
            if (r != null)
                return r;
            //获取任务超时,确定是走了poll逻辑
            timedOut = true;
        } catch (InterruptedException retry) {
            //被中断
            timedOut = false;
        }
    }
}
4.3.一、processWorkerExit(Worker w, boolean completedAbruptly)

为垂死的worker进行清理和bookkeeping。仅从工做线程调用。除非completedAbruptly被设置,不然假定workerCount已经被调整以考虑退出。此方法从工做集中移除线程,若是线程池因为用户任务异常而退出,或者运行的工做池小于corePoolSize,或者队列非空但没有工做池, 则可能终止线程池或替换工做池。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // If abrupt, then workerCount wasn't adjusted
    // true:用户线程运行异常,须要扣减
    // false:getTask方法中扣减线程数量
    if (completedAbruptly)
        //递减ctl的workerCount字段。
        decrementWorkerCount();
    //获取主锁,锁定
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //更新完成任务计数器
        completedTaskCount += w.completedTasks;
        //移除worker
        workers.remove(w);
    } finally {
        //解锁
        mainLock.unlock();
    }
    // 有worker线程移除,多是最后一个线程退出须要尝试终止线程池
    tryTerminate();
    int c = ctl.get();
    // 若是线程为running或shutdown状态,即tryTerminate()没有成功终止线程池,则判断是否有必要一个worker
    if (runStateLessThan(c, STOP)) {
        // 正常退出,计算min:须要维护的最小线程数量
        if (!completedAbruptly) {
            // allowCoreThreadTimeOut 默认false:是否须要维持核心线程的数量
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 若是min ==0 或者workerQueue为空,min = 1
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            // 若是线程数量大于最少数量min,直接返回,不须要新增线程
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 添加一个没有firstTask的worker
        addWorker(null, false);
    }
}

4.四、任务提交

提交有两种:

  • Executor#execute(Runnable command)

Executor接口提供的方法,在未来的某个时候执行给定的命令.该命令能够在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定。

  • ExecutorService#submit(Callable<T> task)

提交一个value-returning任务以执行,并返回一个表示该任务未决结果的Future。Future的get方法将在成功完成任务后返回任务的结果。

4.五、任务执行

4.5.一、 execute(Runnable command)

任务执行流程图:

execute.png

三步处理:

  1. 若是运行的线程小于corePoolSize,则尝试用给定的命令做为第一个任务启动一个新线程。对addWorker的调用原子性地检查runState和workerCount,所以能够经过返回false来防止错误警报,由于错误警报会在不该该添加线程的时候添加线程。
  2. 若是一个任务能够成功排队,那么咱们仍然须要再次检查是否应该添加一个线程 (由于自上次检查以来已有的线程已经死亡),或者池在进入这个方法后关闭。所以,咱们从新检查状态,若是必要的话,若是中止,则回滚队列;若是没有,则启动一个新线程。
  3. 若是没法对任务排队,则尝试添加新线程。 若是它失败了,咱们知道pool被关闭或饱和,因此拒绝任务。
public void execute(Runnable command) {
    //任务为空,抛出异常
    if (command == null)
        throw new NullPointerException();
   //获取线程控制字段的值
   int c = ctl.get();
   //若是当前工做线程数量少于corePoolSize(核心线程数)
   if (workerCountOf(c) < corePoolSize) {
       //建立新的线程并执行任务,若是成功就返回
       if (addWorker(command, true))
            return;
       //上一步失败,从新获取ctl
       c = ctl.get();
    }
    //若是线城池正在运行,且入队成功
    if (isRunning(c) && workQueue.offer(command)) {
        //从新获取ctl
        int recheck = ctl.get();
        //若是线程没有运行且删除任务成功
        if (!isRunning(recheck) && remove(command))
            //拒绝任务
            reject(command);
        //若是当前的工做线程数量为0,只要还有活动的worker线程,就能够消费workerQueue中的任务
        else if (workerCountOf(recheck) == 0)
            //第一个参数为null,说明只为新建一个worker线程,没有指定firstTask
            addWorker(null, false);
    } else if (!addWorker(command, false))
    //若是线程池不是running状态 或者 没法入队列,尝试开启新线程,扩容至maxPoolSize,若是addWork(command, false)失败了,拒绝当前command
        reject(command);
}

下面详细看一下上述代码中出现的方法:addWorker(Runnable firstTask, boolean core)。

4.5.1.一、addWorker(Runnable firstTask, boolean core)

addWorker.jpg

检查是否能够根据当前池状态和给定的界限(核心或最大值)添加新worker,若是是这样,worker计数将相应地进行调整,若是可能,将建立并启动一个新worker, 并将运行firstTask做为其第一个任务。 若是池已中止或有资格关闭,则此方法返回false。若是线程工厂在被请求时没有建立线程,则返回false。若是线程建立失败,要么是因为线程工厂返回null,要么是因为异常 (一般是Thread.start()中的OutOfMemoryError)),咱们将回滚。

private boolean addWorker(Runnable firstTask, boolean core) {
    //很久没见过这种写法了
    retry:
    //线程池状态与工做线程数量处理,worker数量+1
    for (; ; ) {
        //获取当前线程池状态与线程数
        int c = ctl.get();
        //获取当前线程池状态
        int rs = runStateOf(c);
        // 仅在必要时检查队列是否为空。若是池子处于SHUTDOWN,STOP,TIDYING,TERMINATED的时候 不处理提交的任务,判断线程池是否能够添加worker线程
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;
        //线程池处于工做状态
        for (; ; ) {
            //获取工做线程数量
            int wc = workerCountOf(c);
            //若是线程数量超过最大值或者超过corePoolSize或者超过maximumPoolSize 拒绝执行任务
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //试图增长ctl的workerCount字段
            if (compareAndIncrementWorkerCount(c))
                //中断外层循环
                break retry;
            // Re-read ctl
            c = ctl.get();
            //若是当前线程池状态已经改变
            if (runStateOf(c) != rs)
                //继续外层循环
                continue retry;
            //不然CAS因workerCount更改而失败;重试内循环
        }
    }
    //添加到worker线程集合,并启动线程,工做线程状态
    boolean workerStarted = false;
    boolean workerAdded = false;
    //继承AQS并实现了Runnable接口
    Worker w = null;
    try {
        //将任务封装
        w = new Worker(firstTask);
        //获取当前线程
        final Thread t = w.thread;
        if (t != null) {
            //获取全局锁
            final ReentrantLock mainLock = this.mainLock;
            //全局锁定
            mainLock.lock();
            try {
                //持锁时从新检查。退出ThreadFactory故障,或者在获取锁以前关闭。
                int rs = runStateOf(ctl.get());
                //若是当前线程池关闭了
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                   //测试该线程是否活动。若是线程已经启动而且尚未死,那么它就是活的。                                       
                   if (t.isAlive())
                        throw new IllegalThreadStateException();
                   //入工做线程池
                   workers.add(w);
                   int s = workers.size();
                   //跟踪最大的池大小
                   if (s > largestPoolSize)
                        largestPoolSize = s;
                   //状态
                   workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            //若是工做线程加入成功,开始线程的执行,并设置状态
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //判断工做线程是否启动成功
        if (!workerStarted)
            //回滚工做线程建立
            addWorkerFailed(w);
    }
    //返回工做线程状态
    return workerStarted;
}

再分析回滚工做线程建立逻辑方法:addWorkerFailed(w)。
回滚工做线程建立,若是存在,则从worker中移除worker, 递减ctl的workerCount字段。,从新检查终止,以防这个worker的存在致使终止。

private void addWorkerFailed(Worker w) {
    //获取全局锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //若是存在,则从worker中移除worker
        if (w != null)
            workers.remove(w);
        //递减ctl的workerCount字段。
        decrementWorkerCount();
        //从新检查终止
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

其中的tryTerminate()方法:
若是是SHUTDOWN或者STOP 且池子为空,转为TERMINATED状态。若是有条件终止,可是workerCount不为零,则中断空闲worker,以确保关机信号传播。必须在任何可能使终止成为可能的操做以后调用此方法--在关机期间减小worker数量或从队列中删除任务。该方法是非私有的,容许从ScheduledThreadPoolExecutor访问。

final void tryTerminate() {
    for (; ; ) {
        int c = ctl.get();
        //若是线程池处于运行中,或者阻塞队列中仍有任务,返回
        if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
           return;
        //还有工做线程
        if (workerCountOf(c) != 0) {
            //中断空闲工做线程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        //获取全局锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //设置ctl状态TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //方法在执行程序终止时调用,默认什么都不执行
                    terminated();
                } finally {
                    //完成terminated()方法,状态为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    //唤醒全部等待条件的节点
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

//方法在执行程序终止时调用,默认什么都不执行
protected void terminated() {}

4.5.1.二、 reject(Runnable command)拒绝策略

为给定的命令调用被拒绝的执行处理程序。

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

tencent.jpg

相关文章
相关标签/搜索