Java中线程池ThreadPoolExecutor原理探究

1、 前言

线程池主要解决两个问题:一方面当执行大量异步任务时候线程池可以提供较好的性能,这是由于使用线程池可使每一个任务的调用开销减小(由于线程池线程是能够复用的)。另外一方面线程池提供了一种资源限制和管理的手段,好比当执行一系列任务时候对线程的管理,每一个ThreadPoolExecutor也保留了一些基本的统计数据,好比当前线程池完成的任务数目。数组

2、 类图结构

图片描述

Executors实际上是个工具类,里面提供了好多静态方法,根据用户选择返回不一样的线程池实例。
ThreadPoolExecutor继承了AbstractExecutorService,成员变量ctl是个Integer的原子变量用来记录线程池状态 和 线程池线程个数,相似于ReentrantReadWriteLock使用一个变量存放两种信息。
Integer类型是32位二进制标示,其中高3位用来表示线程池状态,后面 29位用来记录线程池线程个数。安全

clipboard.png

clipboard.png

线程池状态含义:多线程

RUNNING:接受新任务而且处理阻塞队列里的任务
SHUTDOWN:拒绝新任务可是处理阻塞队列里的任务
STOP:拒绝新任务而且抛弃阻塞队列里的任务同时会中断正在处理的任务
TIDYING:全部任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
TERMINATED:终止状态。terminated方法调用完成之后的状态
线程池状态转换:并发

RUNNING -> SHUTDOWN
显式调用shutdown()方法,或者隐式调用了finalize(),它里面调用了shutdown()方法。
RUNNING or SHUTDOWN)-> STOP
显式 shutdownNow()方法
SHUTDOWN -> TIDYING
当线程池和任务队列都为空的时候
STOP -> TIDYING
当线程池为空的时候
TIDYING -> TERMINATED
当 terminated() hook 方法执行完成时候
线程池参数:异步

corePoolSize:线程池核心线程个数
workQueue:用于保存等待执行的任务的阻塞队列。
好比基于数组的有界ArrayBlockingQueue、,基于链表的无界LinkedBlockingQueue,最多只有一个元素的同步队列SynchronousQueue,优先级队列PriorityBlockingQueue,具体可参考 https://www.atatech.org/artic...
maximunPoolSize:线程池最大线程数量。
ThreadFactory:建立线程的工厂
RejectedExecutionHandler:饱和策略,当队列满了而且线程个数达到maximunPoolSize后采起的策略,好比AbortPolicy(抛出异常),CallerRunsPolicy(使用调用者所在线程来运行任务),DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务),DiscardPolicy(默默丢弃,不抛出异常)
keeyAliveTime:存活时间。若是当前线程池中的线程数量比基本数量要多,而且是闲置状态的话,这些闲置的线程能存活的最大时间
TimeUnit,存活时间的时间单位
线程池类型:函数

newFixedThreadPool工具

建立一个核心线程个数和最大线程个数都为nThreads的线程池,而且阻塞队列长度为Integer.MAX_VALUE,keeyAliveTime=0说明只要线程个数比核心线程个数多而且当前空闲则回收。oop

clipboard.png

newSingleThreadExecutor
建立一个核心线程个数和最大线程个数都为1的线程池,而且阻塞队列长度为Integer.MAX_VALUE,keeyAliveTime=0说明只要线程个数比核心线程个数多而且当前空闲则回收。源码分析

clipboard.png

newCachedThreadPool
建立一个按需建立线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,而且阻塞队列为同步队列,keeyAliveTime=60说明只要当前线程60s内空闲则回收。这个特殊在于加入到同步队列的任务会被立刻被执行,同步队列里面最多只有一个任务,而且存在后立刻会拿出执行。性能

clipboard.png

newSingleThreadScheduledExecutor

建立一个最小线程个数corePoolSize为1,最大为Integer.MAX_VALUE,阻塞队列为DelayedWorkQueue的线程池。

clipboard.png

其中Worker继承AQS和Runnable是具体承载任务的对象,Worker继承了AQS本身实现了简单的不可重入独占锁,其中status=0标示锁未被获取状态也就是未被锁住的状态,state=1标示锁已经被获取的状态也就是锁住的状态。

DefaultThreadFactory是线程工厂,newThread方法是对线程的一个分组包裹,其中poolNumber是个静态的原子变量,用来统计线程工厂的个数,threadNumber用来记录每一个线程工厂建立了多少线程。

3、 源码分析

3.1 添加任务到线程池exectue方法

clipboard.png

clipboard.png

若是当前线程池线程个数小于corePoolSize则开启新线程
不然添加任务到任务队列
若是任务队列满了,则尝试新开启线程执行任务,若是线程个数>maximumPoolSize则执行拒绝策略。

重点看addWorkder方法:

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // 检查队列是否只在必要时为空.(1)
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    //循环cas增长线程个数
    for (;;) {
        int wc = workerCountOf(c);

        //若是线程个数超限则返回false
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        //cas增长线程个数,同时只有一个线程成功
        if (compareAndIncrementWorkerCount(c))
            break retry;
        //cas失败了,则看线程池状态是否变化了,变化则跳到外层循环重试从新获取线程池状态,否者内层循环从新cas。
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
    }
}

//到这里说明cas成功了,(2)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    //建立worker
    final ReentrantLock mainLock = this.mainLock;
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {

        //加独占锁,为了workers同步,由于可能多个线程调用了线程池的execute方法。
        mainLock.lock();
        try {

            //从新检查线程池状态,为了不在获取锁前调用了shutdown接口(3)
            int c = ctl.get();
            int rs = runStateOf(c);

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                //添加任务
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        //添加成功则启动任务
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;}

代码比较长,主要分两部分,第一部分双重循环目的是经过cas增长线程池线程个数,第二部分主要是并发安全的把任务添加到workers里面,而且启动任务执行。

先看第一部分的(1)

clipboard.png

展开!运算后等价于

clipboard.png

也就是说下面几种状况下会返回false:

当前线程池状态为STOP,TIDYING,TERMINATED
当前线程池状态为SHUTDOWN而且已经有了第一个任务
当前线程池状态为SHUTDOWN而且任务队列为空
内层循环做用是使用cas增长线程个数,若是线程个数超限则返回false,否者进行cas,cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了,若是变了,则从新进入外层循环从新获取线程池状态,否者进入内层循环继续进行cas尝试。

到了第二部分说明CAS成功了,也就是说线程个数加一了,可是如今任务还没开始执行,这里使用全局的独占锁来控制workers里面添加任务,其实也可使用并发安全的set,可是性能没有独占锁好(这个从注释中知道的)。这里须要注意的是要在获取锁后从新检查线程池的状态,这是由于其余线程可可能在本方法获取锁前改变了线程池的状态,好比调用了shutdown方法。添加成功则启动任务执行。

3.2 工做线程Worker的执行

先看下构造函数:

clipboard.png

这里添加一个新状态-1是为了不当前线程worker线程被中断,好比调用了线程池的shutdownNow,若是当前worker状态>=0则会设置该线程的中断标志。这里设置了-1因此条件不知足就不会中断该线程了。运行runWorker时候会调用unlock方法,该方法吧status变为了0,因此这时候调用shutdownNow会中断worker线程。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // status设置为0,容许中断
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {

            w.lock();
            // 若是线程池当前状态至少是stop,则设置中断标志;
            // 若是线程池当前状态是RUNNININ,则重置中断标志,重置后须要从新
            //检查下线程池状态,由于当重置中断标志时候,可能调用了线程池的shutdown方法
            //改变了线程池状态。
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();


            try {
                //任务执行前干一些事情
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();//执行任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //任务执行完毕后干一些事情
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //统计当前worker完成了多少个任务
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {

        //执行清了工做
        processWorkerExit(w, completedAbruptly);
    }
}

若是当前task为空,则直接执行,否者调用getTask从任务队列获取一个任务执行,若是任务队列为空,则worker退出。

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // 若是当前线程池状态>=STOP 或者线程池状态为shutdown而且工做队列为空则,减小工做线程个数
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
        decrementWorkerCount();
        return null;
    }

    boolean timed;      // Are workers subject to culling?

    for (;;) {
        int wc = workerCountOf(c);
        timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if (wc <= maximumPoolSize && ! (timedOut && timed))
            break;
        if (compareAndDecrementWorkerCount(c))
            return null;
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }

    try {

        //根据timed选择调用poll仍是阻塞的take
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
            return r;
        timedOut = true;
    } catch (InterruptedException retry) {
        timedOut = false;
    }
}}

private void processWorkerExit(Worker w, boolean completedAbruptly){
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();

//统计整个线程池完成的任务个数
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
} finally {
    mainLock.unlock();
}

//尝试设置线程池状态为TERMINATED,若是当前是shutdonw状态而且工做队列为空
//或者当前是stop状态当前线程池里面没有活动线程
tryTerminate();

//若是当前线程个数小于核心个数,则增长
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        if (min == 0 && ! workQueue.isEmpty())
            min = 1;
        if (workerCountOf(c) >= min)
            return; // replacement not needed
    }
    addWorker(null, false);
}}

3.3 shutdown操做
调用shutdown后,线程池就不会在接受新的任务了,可是工做队列里面的任务仍是要执行的,可是该方法马上返回的,并不等待队列任务完成在返回。

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    //权限检查
    checkShutdownAccess();

    //设置当前线程池状态为SHUTDOWN,若是已是SHUTDOWN则直接返回
    advanceRunState(SHUTDOWN);

    //设置中断标志
    interruptIdleWorkers();
    onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
    mainLock.unlock();
}
//尝试状态变为TERMINATED
tryTerminate();
}

若是当前状态>=targetState则直接返回,否者设置当前状态为targetState
private void advanceRunState(int targetState) {

for (;;) {
    int c = ctl.get();
    if (runStateAtLeast(c, targetState) ||
        ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
        break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

设置全部线程的中断标志,主要这里首先加了全局锁,同时只有一个线程能够调用shutdown时候设置中断标志,而后尝试获取worker本身的锁,获取成功则设置中断标示

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    for (Worker w : workers) {
        Thread t = w.thread;
        if (!t.isInterrupted() && w.tryLock()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            } finally {
                w.unlock();
            }
        }
        if (onlyOne)
            break;
    }
} finally {
    mainLock.unlock();
}}

3.4 shutdownNow操做
调用shutdown后,线程池就不会在接受新的任务了,而且丢弃工做队列里面里面的任务,正在执行的任务会被中断,可是该方法马上返回的,并不等待激活的任务执行完成在返回。返回队列里面的任务列表。

调用队列的drainTo一次当前队列的元素到taskList,
可能失败,若是调用drainTo后队列海不为空,则循环删除,并添加到taskList
public List<Runnable> shutdownNow() {

List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    checkShutdownAccess();//权限检查
    advanceRunState(STOP);// 设置线程池状态为stop
    interruptWorkers();//中断线程
    tasks = drainQueue();//移动队列任务到tasks
} finally {
    mainLock.unlock();
}
tryTerminate();
return tasks;

}

调用队列的drainTo一次当前队列的元素到taskList,
可能失败,若是调用drainTo后队列海不为空,则循环删除,并添加到taskList
private List<Runnable> drainQueue() {

BlockingQueue<Runnable> q = workQueue;
List<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
    for (Runnable r : q.toArray(new Runnable[0])) {
        if (q.remove(r))
            taskList.add(r);
    }
}
return taskList;

}

3.5 awaitTermination操做

等待线程池状态变为TERMINATED则返回,或者时间超时。因为整个过程独占锁,因此通常调用shutdown或者shutdownNow后使用。

public boolean awaitTermination(long timeout, TimeUnit unit)

throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

4、总结

线程池巧妙的使用一个Integer类型原子变量来记录线程池状态和线程池线程个数,设计时候考虑到将来(2^29)-1个线程可能不够用,到时只须要把原子变量变为Long类型,而后掩码位数变下就能够了,可是为啥如今不一劳永逸的定义为Long那,主要是考虑到使用int类型操做时候速度上比Long类型快些。

经过线程池状态来控制任务的执行,每一个worker线程能够处理多个任务,线程池经过线程的复用减小了线程建立和销毁的开销,经过使用任务队列避免了线程的阻塞从而避免了线程调度和线程上下文切换的开销。

另外须要注意的是调用shutdown方法做用仅仅是修改线程池状态让如今任务失败并中断当前线程,这个中断并非让正在运行的线程终止,而是仅仅设置下线程的中断标志,若是线程内没有使用中断标志作一些事情,那么这个对线程没有影响。

相关文章
相关标签/搜索