Java并发——线程池原理解析

前言

能够说,线程池是Java并发场景中应用到的最多并发框架了。几乎全部须要异步或者并发执行的任务程序均可以使用线程池。在开发过程当中,合理的使用线程池会带来如下3个好处:前端

  • 下降资源的消耗。若是了解Java线程的来龙去脉,对于这一点应该很好理解。经过重复利用已建立的线程下降线程建立和销毁形成的消耗。
  • 提升响应速度。当任务到达时,任务能够不一样等到建立线程当即就能当即执行。
  • 提升线程的可管理性。线程是稀缺资源,经过线程池框架能够进行统一的线程分配、监控、调优。

世界上没有完美无瑕的事情,任何事情都有正反两面。若是滥用线程池或者使用不当,也有可能带来安全隐患。所以,必须合理的使用线程池才能使得收益最大化。接下来,咱们就来系统的了解线程池,以便可以达到“合理使用”的境界。java

线程池的原理

当任务提交到线程池以后,线程池是如何处理这些任务的呢?它是这样处理的的:数据库

从以上图中,能够看见,当提交了一个新任务时,线程池的处理过程以下:安全

  • 核心线程数corePoolSize是否已满?若是没有满,则建立线程(全局锁),并执行任务,不然进行下一步;
  • 队列是否已满?若是没有满,则将任务入队,不然进行下一步;
  • 线程池是否已满?若是没有满,则建立线程(全局锁),并执行任务,不然进行下一步;
  • 到了这一步,说明线程池没法接收任务了,此时将执行拒绝策略。

以上2/3步骤是否能够调换顺序呢?实际上,线程池之因此采用以上的设计思路,是由于,1/3步骤都是要获取全局锁的。若是任务频繁提交执行,此时将加重锁的竞争,而2步骤是不须要额外的全局锁的竞争。
带着以上的认知,咱们来剖析一下源码:架构

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        // 首先获取ctl变量的值(ctl变量及其重要,在这里咱们只需知道它能够同时表明线程数和线程池状态便可)
        int c = ctl.get();
        // 基于ctl计算出当前的线程数量,若是小于设定的核心线程数。
        if (workerCountOf(c) < corePoolSize) {
            // 则经过addWorker建立一个线程并执行当前任务,addWorker方法内部须要获取全局锁
            if (addWorker(command, true))
                return;
            // 若是addWorker返回失败标志,则从新获取当前ctl的值。
            c = ctl.get();
        }
        // 基于ctl获取当前线程池状态,若是是RUNNING状态而且任务添加到队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 则从新检查线程池的状态,若是当前不是RUNNING状态了,则移除当前任务。
            if (! isRunning(recheck) && remove(command))
                若是成功,则执行拒绝策略
                reject(command);
            else if (workerCountOf(recheck) == 0) // 若是当前线程数为0
                // 则经过addWorker方法初始化ctl
                addWorker(null, false);
        }
        // 若是失败,则经过addWorker方法建立新的线程来执行任务,
        // 若是addWorker方法返回false标志,说明此时建立新的线程来执行任务失败了。
        // 此时说明线程池已满,或者线程池已经不是RUNNING状态了。
        else if (!addWorker(command, false))
            // 此时将执行拒绝策略
            reject(command);
    }
复制代码

经过以上源码解析,可以清晰的了解一个任务提交到线程池是如何处理的了。源码中有两个重要的地方尚未讲解。一是ctl变量的做用;二是addWorker方法的解析。这两个点能够说是线程池的精髓所在了。并发

ctl变量

ctl变量是一个AtomicInteger类型,它包含了两个概念:框架

  • 线程池中的有效线程数,即当前的工做线程数;
  • 线程池的状态
    这就奇怪了,为何ctl能够同时表示数量和状态呢?其实,若是咱们阅读源码比较多的话,会发现,Java中不少地方都有这种使用方式,其目的就是为了在保证性能时还尽量的高效利用内存空间。所以经常会用一个变量表示多种业务状态。为了更加清晰的理解ctl变量,咱们直接贴出源码的解释:

In order to pack them into one int, we limit workerCount to (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 billion) otherwise representable. If this is ever an issue in the future, the variable can be changed to be an AtomicLong, and the shift/mask constants below adjusted. But until the need arises, this code is a bit faster and simpler using an int.异步

以上大概意思就是:为了让有效线程数和线程池的状态可以用一个int变量表示,将线程数限制在了2^29-1(约为5亿),这样的话就能够用低29位来表示有效线程数,高3位来表示线程的状态。为了更好的理解,咱们直接上源码:函数

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    private static int workerCountOf(int c) { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码

基于源码,能够得出它们的实际值:
COUNT_BITS = Integer.SIZE - 3;即:COUNT_BITS = 32 -3 = 29;
CAPACITY     = (1 << 29) - 1;即:CAPACITY = 2^29 - 1 约等于5亿;
RUNNING     = -1 << 29; --> 高3位为:111
SHUTDOWN =  0 << 29; --> 高3位为:000
STOP             =  1 << 29; --> 高3位为:001
TIDYING        =  2 << 29; --> 高3位为:010
TERMINATED=  3 << 29; --> 高3位为:011性能

有了以上的计算,咱们再来看:
runStateO()方法是获取当前线程池状态的方法,它的计算公式为: ctl & ~ CAPACITY.
~ CAPACITY获取到的值实际上就是高3位为1,低29位为0. 所以 ctl & ~ CAPACITY 获得的实际上就是ctl高3位的值
同理,workerCountOf()方法获取到的实际上就是ctl低29位的值。表示为当前有效的线程数。

经过以上解析,应该就能够理解ctl变量的含义了!

addWorker()方法

在线程池中,线程并非Thread,而是基于Thread包装成了一个Worker。Worker是ThreadPoolExecutor的一个内部类。通所以,addWorker()方法实际上就是基于当前线程池的状态来决定是否构建Worker并执行。Worker执行万当前任务后,并不会直接退出,而是循环获取队列中的任务来执行,从源码中咱们能证实这个结论:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 若是task不为空,则执行task。
            // rugo task为空,则从队列中继续获取task。
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 判断线程池的状态和当前task的中断标志,是否知足继续执行的条件。
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 钩子函数:执行task以前的钩子函数
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 钩子函数:执行task以后调用的钩子函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
复制代码

钩子函数

从源码中,咱们能够得知,ThreadPoolExecutor框架预留了几个钩子函数.worker在执行任务的过程当中,会触发钩子函数,若是咱们须要执行一些特殊的业务(好比统计任务的执行时长),则能够继承ThreadPoolExecutor实现钩子函数来达到特定业务的目的。

钩子函数 说明
beforeExecute() 任务执行前触发
afterExecute() 任务执行后触发
terminated() 当线程池状态变成TIDYING时,会触发此方法

线程池的状态

线程池一共有5种状态。

状态 解释
RUNNING 运行状态。当线程池建立成功后,线程池的状态就是RUNNING状态了。此状态下能够接收新的任务和执行队列中的任务
SHUTDOWN 此状态下,线程池再也不接收新的任务了,可是会将执行中的任务和队列中的任务执行完成。
STOP 此状态下,线程池再也不接收新的任务;再也不执行队列中的任务;会中断正在执行中的任务。
TIDYING 此状态下,线程池中的workCount=0,线程池将调用调用terminated()方法。
TERMINATED 当terminated()方法执行完成以后,线程池状态将变成TERMINATED,至此,线程池的生命周期完成。

线程池状态的流转:

线程池状态流转

TIP:线程池状态的流转和线程状态的流转是彻底不同的概念。

线程池的使用

基于以上介绍,咱们对线程池的原理已经了然于胸。接下来,经过一个例子来看看实战中线程池的使用方式和技巧。

快捷建立线程池

建立线程池的方式有不少种。好比:

// 1.建立固定大小的线程池
Executors.newFixedThreadPool(); 
// 2.建立一个基于SynchronousQueue队列的线程池.(此队列的特性在前文是有解析)
Executors.newCachedThreadPool();  
// 3.建立一个具备延时功能的线程池(实际上就是基于DelayQueue实现,此队列在前文中解析)
Executors.newScheduledThreadPool(); 
// 4.建立一个只有一个线程的线程池
Executors.newSingleThreadExecutor();
复制代码

以上是快捷建立线程池的的4种方式,其实若是咱们再深刻理解一波的话,能够发现其实他们底层都是基于ThreadPoolExecutor提供的构造方法构建的线程池。这4种方式建立出来的线程池都具有必定的特性在里面,若是对于队列理解透彻的话,能够发现,它们的本质其实就是选择的队列不一样。从而能够基于队列提供的特色实现特殊的功能。
在实战中,除非应用场景比较简单,任务量不是很大的状况下,咱们能够采用这种快捷建立线程池的方式。但若是咱们在大中型工程中,则最好基于ThreadPoolExecutor自定义建立线程池,这样能够更加贴切实际的场景使用。

基于构造方法建立线程池

ThreadPoolExecutor提供了如下几个构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
复制代码

能够发现,经过原生构造器来构建线程池,是最灵活的。所以,在《阿里巴巴Java开发规范》中也强烈建议采用这种方式来建立线程池。每一个参数的含义以下:

参数名 含义
corePoolSize 核心线程数
maximumPoolSize 最大线程数
keepAliveTime 当线程数大于核心线程数,多余的线程的存活时长,和unit配合使用
unit 和keepAliveTime配合使用
workQueue 工做队列。当核心线程数满时,任务会先入队。若无特殊要求,应该尽可能选择有界队列!不然当任务激增时,有可能撑爆内存致使性能降低甚至崩溃。
threadFactory 线程工厂类,用于建立线程。若是没有指定,则采用默认的工厂类
handler 任务拒绝策略。当线程池不能再接收新的任务时,将执行任务拒绝策略。若是没有指定,则采用默认的拒绝策略。

任务拒绝策略

当线程池满了以后,将会执行拒绝策略。线程池提供了4种拒绝策略。以下:

拒绝策略 说明
AbortPolicy 当线程池满时,将会拒绝接收新任务,并抛出RejectedExecutionException。若是没有指定拒绝策略,此为默认策略
CallerRunsPolicy 当线程池满时,将会使用任务提交者所在的线程来执行这个任务。线程池自己会丢弃掉这个任务。
DiscardPolicy 当线程池满时,将会默默地丢弃掉这个任务。tips:在实际开发当中,若是任务不影响业务,则能够采用此策略,不然断然不可采起这个策略。
DiscardOldestPolicy 当线程池满时,将会默默地丢弃掉队列最前端的任务。而后执行提交的任务。tips:和以上同样

关闭线程池

关闭线程池是颇有必要的。当应用程序须要退出时,能够经过注册回调函数来关闭线程池。若是咱们暴力关闭应用程序的话,会致使正在执行的任务和队列中的任务丢失。在企业工程中,这一点千万注意。 线程池提供了两种关闭方法:

关闭方式 说明
shutdown() 当调用此方法时,线程池状态会变成SHUTDOWN状态,此时线程池将不会再接收新的任务,但以及接收的任务会执行完毕。
shutdownNow() 当调用此方法时,线程池状态会变成STOP状态,此时线程池将不会再接收新的任务,而且会中断全部正在执行中的任务以及丢弃掉队列中的任务。

在实际工程中,具体采起哪一种方式,应该根据实际状况来抉择。若是任务对业务有影响,则应当选择shutdown(),不然能够视状况选择shutdownNow()。

线程数的设置策略

在Java应用中,线程属于稀有资源。那么线程数是设置的越大越好么?非也。在计算机体系中,若是想让性能发挥极致,应该是各个子系统之间的合理配置使用。对于线程数而言也是如此。要想合理的设置线程数,就必须首先分析人物的特性。能够从如下几个角度来分析:

  • 任务的性质:CPU密集型、IO密集型、混合型。
  • 任务的优先级:高、中、低。
  • 任务的执行时间:长、中、短。
  • 任务的依赖性:是否依赖其余资源,好比数据库链接。

咱们能够根据任务的不一样特性来综合考虑线程数的设置。通常而言。若是是CPU密集型,则应该分配尽量小的线程数:一般状况下,能够设置为CPU核数 + 1;若是是IO密集型,则线程并不老是在执行任务,则应该分配尽量大的线程数:一般状况下,能够设置为2 * CPU核数;若是是混合型,则能够将任务拆分红一个CPU密集型和一个IO密集型,只要两个任务执行的时间不会相差太大,则性能会比串行执行的效率要高,若是拆分后任务执行相差的时间过大,则没有必要拆分。

线程池的监控

经过以上的介绍,如何用好线程池应该不是问题。对于一个完善的应用而言,应当还要有良好的监控能力,以便在任务执行出现问题时,能够快速的定位、分析、解决问题。
ThreadPoolExecutor提供了一些基本且好用的方法来监控线程池的运行状况:

方法名 说明
getTaskCount() 线程池队列中须要执行的任务数量
getCompletedTaskCount() 线程池中已经执行完成的任务数量
getActiveCount() 线程池中正在执行任务的线程数量

若是咱们想要更加全面的监控线程池的运行状态以及任务的执行过程。能够继承ThreadPoolExecutor来自定义线程池。

总结

本篇文章围绕ThreadPoolExecutor,系统介绍了线程池的实现;以及实际项目中如何正确的使用线程池。经过本篇文章的写做,本身对于线程池的认识有多了一些不同的感受。好比clt变量的设计真的很精妙。像Doug Lea大神致以崇高的敬意!

架构师之美
相关文章
相关标签/搜索