从0到1玩转线程池

咱们通常不会选择直接使用线程类Thread进行多线程编程,而是使用更方便的线程池来进行任务的调度和管理。线程池就像共享单车,咱们只要在咱们有须要的时候去获取就能够了。甚至能够说线程池更棒,咱们只须要把任务提交给它,它就会在合适的时候运行了。可是若是直接使用Thread类,咱们就须要在每次执行任务时本身建立、运行、等待线程了,并且很难对线程进行总体的管理,这可不是一件轻松的事情。既然咱们已经有了线程池,那仍是把这些麻烦事交给线程池来处理吧。java

以前一篇介绍线程池使用及其源码的文章篇幅太长了、跨度太大了一些,感受不是很好理解。因此我把内容从新组织了一下,拆为了两篇文章,而且补充了一些内容,但愿能让你们更容易地理解相关内容。编程

这篇文章将从线程池的概念与通常使用入手,首先介绍线程池的通常使用。而后详细介绍线程池中经常使用的可配置项,例如任务队列、拒绝策略等,最后会介绍四种经常使用的线程池配置。经过这篇文章,你们能够熟练掌握线程池的使用方式,在实践中游刃有余地使用线程池对线程进行灵活的调度。缓存

阅读本文须要对多线程编程有基本的认识,例如什么是线程、多线程解决的是什么问题等。不了解的读者能够参考一下我以前发布的一篇文章《这一次,让咱们彻底掌握Java多线程(2/10)》bash

通常咱们最经常使用的线程池实现类是ThreadPoolExecutor,咱们接下来会介绍这个类的基本使用方法。JDK已经对线程池作了比较好的封装,相信这个过程会很是轻松。数据结构

线程池的基本使用

建立线程池

既然线程池是一个Java类,那么最直接的使用方法必定是new一个ThreadPoolExecutor类的对象,例如ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() )。那么这个构造器的里每一个参数是什么意思呢?咱们能够暂时不用关心这些细节,继续完成线程池的使用之旅,稍后再回头来研究这个问题。多线程

提交任务

当建立了一个线程池以后咱们就能够将任务提交到线程池中执行了。提交任务到线程池中至关简单,咱们只要把原来传入Thread类构造器的Runnable对象传入线程池的execute方法或者submit方法就能够了。execute方法和submit方法基本没有区别,二者的区别只是submit方法会返回一个Future对象,用于检查异步任务的执行状况和获取执行结果(异步任务完成后)。异步

咱们能够先试试如何使用比较简单的execute方法,代码例子以下:性能

public class ThreadPoolTest {

    private static int count = 0;

    public static void main(String[] args) throws Exception {
        Runnable task = new Runnable() {
            public void run() {
                for (int i = 0; i < 1000000; ++i) {
                    synchronized (ThreadPoolTest.class) {
                        count += 1;
                    }
                }
            }
        };

        // 重要:建立线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L,
        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        // 重要:向线程池提交两个任务
        threadPool.execute(task);
        threadPool.execute(task);

        // 等待线程池中的全部任务完成
        threadPool.shutdown();
        while (!threadPool.awaitTermination(1L, TimeUnit.MINUTES)) {
            System.out.println("Not yet. Still waiting for termination");
        }
        
        System.out.println("count = " + count);
    }
}
复制代码

运行以后获得的结果是两百万,咱们成功实现了第一个使用线程池的程序。那么回到刚才的问题,建立线程池时传入的那些参数有什么做用的呢?ui

深刻解析线程池

建立线程池的参数

下面是ThreadPoolExecutor的构造器定义:spa

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
复制代码

各个参数分别表示下面的含义:

  1. corePoolSize,核心线程池大小,通常线程池会至少保持这么多的线程数量;
  2. maximumPoolSize,最大线程池大小,也就是线程池最大的线程数量;
  3. keepAliveTime和unit共同组成了一个超时时间,keepAliveTime是时间数量,unit是时间单位,单位加数量组成了最终的超时时间。这个超时时间表示若是线程池中包含了超过corePoolSize数量的线程,则在有线程空闲的时间超过了超时时间时该线程就会被销毁;
  4. workQueue是任务的阻塞队列,在没有线程池中没有足够的线程可用的状况下会将任务先放入到这个阻塞队列中等待执行。这里传入的队列类型就决定了线程池在处理这些任务时的策略,具体类型会在下文中介绍;
  5. threadFactory,线程的工厂对象,线程池经过该对象建立线程。咱们能够经过传入自定义的实现了ThreadFactory接口的类来修改线程的建立逻辑,能够不传,默认使用Executors.defaultThreadFactory()做为默认的线程工厂;
  6. handler,拒绝策略,在线程池没法执行或保存新提交的任务时进行处理的对象,经常使用的有如下几种策略类:
    • ThreadPoolExecutor.AbortPolicy,默认策略,行为是直接抛出RejectedExecutionException异常
    • ThreadPoolExecutor.CallerRunsPolicy,用调用者所在的线程来执行任务
    • ThreadPoolExecutor.DiscardOldestPolicy,丢弃阻塞队列中最先提交的任务,并重试execute方法
    • ThreadPoolExecutor.DiscardPolicy,静默地直接丢弃任务,不返回任何错误

看到这里可能大部分读者并不能理解每一个参数具体的做用,接下来咱们就经过线程池源代码中使用了这些参数配置的代码来深刻理解每个参数的意义。

execute方法的实现

咱们通常会使用execute方法提交咱们的任务,那么线程池在这个过程当中作了什么呢?在ThreadPoolExecutor类的execute()方法的源代码中,咱们主要作了四件事:

  1. 若是当前线程池中的线程数小于核心线程数corePoolSize,则经过threadFactory建立一个新的线程,并把入参中的任务做为第一个任务传入该线程;
  2. 若是当前线程池中的线程数已经达到了核心线程数corePoolSize,那么就会经过阻塞队列workerQueueoffer方法来将任务添加到队列中保存,并等待线程空闲后进行执行;
  3. 若是线程数已经达到了corePoolSize且阻塞队列中没法插入该任务(好比已满),那么线程池就会再增长一个线程来执行该任务,除非线程数已经达到了最大线程数maximumPoolSize
  4. 若是确实已经达到了最大线程数,那么就会经过拒绝策略对象handler拒绝这个任务。

整体上的执行流程以下,左侧的实心黑点表明流程开始,下方的黑色同心圆表明流程结束:

上面提到了线程池构造器参数中除了超时时间以外的全部参数的做用,相信你们根据上面的流程已经能够理解每一个参数的意义了。可是有一个名词咱们还一直没有深刻讲解,那就是阻塞队列的含义。

线程池中的阻塞队列

线程池中的阻塞队列专门用于存放须要等待线程空闲的待执行任务,而阻塞队列是这样的一种数据结构,它是一个队列(相似于一个List),能够存放0到N个元素。咱们能够对这个队列进行插入和弹出元素的操做,弹出操做能够理解为是一个获取并从队列中删除一个元素的操做。当队列中没有元素时,对这个队列的获取操做将会被阻塞,直到有元素被插入时才会被唤醒;当队列已满时,对这个队列的插入操做将会被阻塞,直到有元素被弹出后才会被唤醒。

这样的一种数据结构很是适合于线程池的场景,当一个工做线程没有任务可处理时就会进入阻塞状态,直到有新任务提交后才被唤醒。

在线程池中,不一样的阻塞队列类型会被线程池的行为产生不一样的影响,下面是三种咱们最经常使用的阻塞队列类型:

  1. 直连队列,以SynchronousQueue类为表明,队列不会存储任何任务。当有任务提交线程试图向队列中添加待执行任务时会被阻塞,直到有任务处理线程试图从队列中获取待执行任务时会与阻塞状态中的任务提交线程发生直接联系,由任务提交线程把任务直接交给任务执行线程;
  2. 无界队列,以LinkedBlockingQueue类为表明,队列中能够存储无限数量的任务。这种队列永远不会由于队列已满致使任务放入队列失败,因此结合前面介绍的流程咱们能够发现,当使用无界队列时,线程池中的线程最多只能达到核心线程数就不会再增加了,最大线程数maximumPoolSize参数不会产生做用;
  3. 有界队列,以ArrayBlockingQueue类为表明,能够保存固定数量的任务。这种队列在实践中比较经常使用,由于它既不会由于保存太多任务致使资源消耗过多(无界队列),又不会由于任务提交线程被阻塞而影响到系统的性能(直连队列)。整体上来讲,有界队列在实际效果上比较均衡。

阅读execute方法的源码

在IDE中,例如IDEA里,咱们能够点击咱们样例代码里的ThreadPoolExecutor类跳转到JDK中ThreadPoolExecutor类的源代码。在源代码中咱们能够看到不少java.util.concurrent包的缔造者大牛“Doug Lea”所留下的各类注释,下面的图片就是该类源代码的一个截图。

这些注释的内容很是有参考价值,建议有能力的读者朋友能够本身阅读一遍。下面,咱们就一步步地抽丝剥茧,来揭开线程池类ThreadPoolExecutor源代码的神秘面纱。不过这一步并非必须的,能够跳过。

下面是ThreadPoolExecutorexecute方法带有中文解释的源代码,有兴趣的朋友能够和上面的流程对照起来参考一下:

public void execute(Runnable command) {
    // 检查提交的任务是否为空
    if (command == null)
        throw new NullPointerException();
    
    // 获取控制变量值
    int c = ctl.get();
    // 检查当前线程数是否达到了核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 未达到核心线程数,则建立新线程
        // 并将传入的任务做为该线程的第一个任务
        if (addWorker(command, true))
            // 添加线程成功则直接返回,不然继续执行
            return;

        // 由于前面调用了耗时操做addWorker方法
        // 因此线程池状态有可能发生了改变,从新获取状态值
        c = ctl.get();
    }

    // 判断线程池当前状态是不是运行中
    // 若是是则调用workQueue.offer方法将任务放入阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 由于执行了耗时操做“放入阻塞队列”,因此从新获取状态值
        int recheck = ctl.get();
        // 若是当前状态不是运行中,则将刚才放入阻塞队列的任务拿出,若是拿出成功,则直接拒绝这个任务
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 若是线程池中没有线程了,那就建立一个
            addWorker(null, false);
    }
    // 若是放入阻塞队列失败(如队列已满),则添加一个线程
    else if (!addWorker(command, false))
        // 若是添加线程失败(如已经达到了最大线程数),则拒绝任务
        reject(command);
}
复制代码

在这段源代码中,咱们能够看到,线程池是经过addWorker方法来建立线程的,这里的这个Worker指的就是ThreadPoolExecutor类中用来对线程进行包装和管理的Worker类对象。若是想了解Worker类的具体执行流程能够阅读一下下一篇深刻剖析线程池的任务执行流程的文章。

超时时间

那么还有一个咱们没有提到的超时时间在这个过程当中发挥了什么做用呢?从前面咱们能够看出,线程数量被划分为了核心线程数和最大线程数。当线程没有任务可执行时会阻塞在从队列中获取新任务这个操做上,这时咱们称这个线程为空闲线程,一旦有新任务被提交,则该线程就会退出阻塞状态并开始执行这个新任务。

若是当前线程池中的线程总数大于核心线程数,那么只要有线程的空闲时间超过了超时时间,那么这个线程就会被销毁;若是线程池中的线程总数小于等于核心线程数,那么超时线程就不会被销毁了(除了一些特殊状况外)。这也就是超时时间参数所发挥的做用了。

其余线程池操做

关闭线程池

在以前使用线程池执行任务的代码中为了等待线程池中的全部任务执行完已经使用了shutdown()方法,这是关闭线程池的一种方法。对于ThreadPoolExecutor,关闭线程池的方法主要有两个:

  1. shutdown(),有序关闭线程池,调用后线程池会让已经提交的任务完成执行,可是不会再接受新任务。
  2. shutdownNow(),直接关闭线程池,线程池中正在运行的任务会被中断,正在等待执行的任务不会再被执行,可是这些还在阻塞队列中等待的任务会被做为返回值返回。

监控线程池运行状态

咱们能够经过调用线程池对象上的一些方法来获取线程池当前的运行信息,经常使用的方法有:

  • getTaskCount,线程池中已完成、执行中、等待执行的任务总数估计值。由于在统计过程当中任务会发生动态变化,因此最后的结果并非一个准确值;
  • getCompletedTaskCount,线程池中已完成的任务总数,这一样是一个估计值;
  • getLargestPoolSize,线程池曾经建立过的最大线程数量。经过这个数据能够知道线程池是否充满过,也就是达到过maximumPoolSize;
  • getPoolSize,线程池当前的线程数量;
  • getActiveCount,当前线程池中正在执行任务的线程数量估计值。

四种经常使用线程池

不少状况下咱们也不会直接建立ThreadPoolExecutor类的对象,而是根据须要经过Executors的几个静态方法来建立特定用途的线程池。目前经常使用的线程池有四种:

  1. 可缓存线程池,使用Executors.newCachedThreadPool方法建立
  2. 定长线程池,使用Executors.newFixedThreadPool方法建立
  3. 延时任务线程池,使用Executors.newScheduledThreadPool方法建立
  4. 单线程线程池,使用Executors.newSingleThreadExecutor方法建立

下面经过这些静态方法的源码来具体了解一下不一样类型线程池的特性与适用场景。

可缓存线程池

JDK中的源码咱们经过在IDE中进行跳转能够很方便地进行查看,下面就是Executors.newCachedThreadPool方法中的源代码。从代码中咱们能够看到,可缓存线程池其实也是经过直接建立ThreadPoolExecutor类的构造器建立的,只是其中的参数都已经被设置好了,咱们能够不用作具体的设置。因此咱们要观察的重点就是在这个方法中具体产生了一个怎样配置的ThreadPoolExecutor对象,以及这样的线程池适用于怎样的场景。

从下面的代码中,咱们能够看到,传入ThreadPoolExecutor构造器的值有: - corePoolSize核心线程数为0,表明线程池中的线程数能够为0 - maximumPoolSize最大线程数为Integer.MAX_VALUE,表明线程池中最多能够有无限多个线程 - 超时时间设置为60秒,表示线程池中的线程在空闲60秒后会被回收 - 最后传入的是一个SynchronousQueue类型的阻塞队列,表明每个新添加的任务都要立刻有一个工做线程进行处理

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
复制代码

因此可缓存线程池在添加任务时会优先使用空闲的线程,若是没有就建立一个新线程,线程数没有上限,因此每个任务都会立刻被分配到一个工做线程进行执行,不须要在阻塞队列中等待;若是线程池长期闲置,那么其中的全部线程都会被销毁,节约系统资源。

  • 优势
    • 任务在添加后能够立刻执行,不须要进入阻塞队列等待
    • 在闲置时不会保留线程,能够节约系统资源
  • 缺点
    • 对线程数没有限制,可能会过量消耗系统资源
  • 适用场景
    • 适用于大量短耗时任务和对响应时间要求较高的场景

定长线程池

传入ThreadPoolExecutor构造器的值有:

  • corePoolSize核心线程数和maximumPoolSize最大线程数都为固定值nThreads,即线程池中的线程数量会保持在nThreads,因此被称为“定长线程池”
  • 超时时间被设置为0毫秒,由于线程池中只有核心线程,因此不须要考虑超时释放
  • 最后一个参数使用了无界队列,因此在全部线程都在处理任务的状况下,能够无限添加任务到阻塞队列中等待执行
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
复制代码

定长线程池中的线程数会逐步增加到nThreads个,而且在以后空闲线程不会被释放,线程数会一直保持在nThreads个。若是添加任务时全部线程都处于忙碌状态,那么就会把任务添加到阻塞队列中等待执行,阻塞队列中任务的总数没有上限。

  • 优势
    • 线程数固定,对系统资源的消耗可控
  • 缺点
    • 在任务量暴增的状况下线程池不会弹性增加,会致使任务完成时间延迟
    • 使用了无界队列,在线程数设置太小的状况下可能会致使过多的任务积压,引发任务完成时间过晚和资源被过分消耗的问题
  • 适用场景
    • 任务量峰值不会太高,且任务对响应时间要求不高的场景

延时任务线程池

与以前的两个方法不一样,Executors.newScheduledThreadPool返回的是ScheduledExecutorService接口对象,能够提供延时执行、定时执行等功能。在线程池配置上有以下特色:

  • maximumPoolSize最大线程数为无限,在任务量较大时能够建立大量新线程执行任务
  • 超时时间为0,线程空闲后会被当即销毁
  • 使用了延时工做队列,延时工做队列中的元素都有对应的过时时间,只有过时的元素才会被弹出
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
复制代码

延时任务线程池实现了ScheduledExecutorService接口,主要用于须要延时执行和定时执行的状况。

单线程线程池

单线程线程池中只有一个工做线程,能够保证添加的任务都以指定顺序执行(先进先出、后进先出、优先级)。可是若是线程池里只有一个线程,为何咱们还要用线程池而不直接用Thread呢?这种状况下主要有两种优势:一是咱们能够经过共享的线程池很方便地提交任务进行异步执行,而不用本身管理线程的生命周期;二是咱们可使用任务队列并指定任务的执行顺序,很容易作到任务管理的功能。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
复制代码

总结

在这篇文章中咱们从线程池的概念和基本使用方法提及,经过execute方法的源码深刻剖析了任务提交的全过程和各个线程池构造器参数在线程池实际运行过程当中所发挥的做用,还真正阅读了线程池类ThreadPoolExecutor的execute方法的源代码。最后,咱们介绍了线程池的其余经常使用操做和四种经常使用的线程池。

到这里咱们的线程池源代码之旅就结束了,但愿你们在看完这篇文章以后能对线程池的使用和运行流程有了一个大概的印象。为何说只是有了一个大概的印象呢?由于我以为不少没有相关基础的读者读到这里可能还只是对线程池有了一个本身的认识,对其中的一些细节可能尚未彻底捕捉到。因此我建议你们在看完这篇文章后不妨再返回到文章的开头多读几遍,相信第二遍的阅读能给你们带来不同的体验,由于我本身也是在第三次读ThreadPoolExecutor类的源代码时才真正打通了其中的一些重要关节的。

引子

在这篇文章中,咱们还只是探究了线程池的基本使用方法,以及提交任务方法execute的源代码。那么在任务提交之后是怎么被线程池所执行的呢?在下一篇文章中咱们就能够找到答案,在下一篇文章中,咱们会深刻剖析线程池的任务执行流程。