咱们通常不会选择直接使用线程类Thread
进行多线程编程,而是使用更方便的线程池来进行任务的调度和管理。线程池就像共享单车,咱们只要在咱们有须要的时候去获取就能够了。甚至能够说线程池更棒,咱们只须要把任务提交给它,它就会在合适的时候运行了。可是若是直接使用Thread
类,咱们就须要在每次执行任务时本身建立、运行、等待线程了,并且很难对线程进行总体的管理,这可不是一件轻松的事情。既然咱们已经有了线程池,那仍是把这些麻烦事交给线程池来处理吧。java
这篇文章将会从线程池的概念与通常使用入手,首先让你们能够了解线程池的基本使用方法,以后会介绍实践中最经常使用的四种线程池。最后,咱们会经过对JDK源代码的剖析深刻了解线程池的运行过程和具体设计,真正达到知其然而知其因此然的水平。虽然只要了解了API就能够知足通常的平常使用了,可是只有当咱们真正厘清了多线程相关的知识点,才能在面对多线程的实践与面试问题时作到游刃有余、胸有成竹。面试
本文是一系列多线程文章中的第三篇,主要讲解了线程池相关的知识,这个系列总共有十篇文章,前五篇暂定结构以下,感兴趣的读者能够关注一下:编程
通常咱们最经常使用的线程池实现类是ThreadPoolExecutor
,咱们接下来会介绍这个类的基本使用方法。JDK已经对线程池作了比较好的封装,相信这个过程会很是轻松。数组
既然线程池是一个Java类,那么最直接的使用方法必定是new一个ThreadPoolExecutor
类的对象,例如ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() )
。那么这个构造器的里每一个参数是什么意思呢?缓存
下面就是这个构造器的方法签名:安全
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
各个参数分别表示下面的含义:数据结构
keepAliveTime
是时间数量,unit
是时间单位,单位加数量组成了最终的超时时间。这个超时时间表示若是线程池中包含了超过corePoolSize
数量的线程,则在有线程空闲的时间超过了超时时间时该线程就会被销毁;线程池中的阻塞队列专门用于存放待执行的任务,在ThreadPoolExecutor
中一个任务能够经过两种方式被执行:第一种是直接在建立一个新的Worker时被做为第一个任务传入,由这个新建立的线程来执行;第二种就是把任务放入一个阻塞队列,等待线程池中的工做线程捞取任务进行执行。多线程
上面提到的阻塞队列是这样的一种数据结构,它是一个队列(相似于一个List),能够存放0到N个元素。咱们能够对这个队列进行插入和弹出元素的操做,弹出操做能够理解为是一个获取并从队列中删除一个元素的操做。当队列中没有元素时,对这个队列的获取操做将会被阻塞,直到有元素被插入时才会被唤醒;当队列已满时,对这个队列的插入操做将会被阻塞,直到有元素被弹出后才会被唤醒。这样的一种数据结构很是适合于线程池的场景,当一个工做线程没有任务可处理时就会进入阻塞状态,直到有新任务提交后才被唤醒。并发
当建立了一个线程池以后咱们就能够将任务提交到线程池中执行了。提交任务到线程池中至关简单,咱们只要把原来传入Thread
类构造器的Runnable
对象传入线程池的execute
方法或者submit
方法就能够了。execute
方法和submit
方法基本没有区别,二者的区别只是submit
方法会返回一个Future
对象,用于检查异步任务的执行状况和获取执行结果(异步任务完成后)。异步
咱们能够先试试如何使用比较简单的execute
方法,代码例子以下:
public class ThreadPoolTest { private static int count = 0; public static void main(String[] args) throws Exception { Runnable task = new Runnable() { public void run() { for (int i = 0; i < 1000000; ++i) { synchronized (ThreadPoolTest.class) { count += 1; } } } }; // 重要:建立线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // 重要:向线程池提交两个任务 threadPool.execute(task); threadPool.execute(task); // 等待线程池中的全部任务完成 threadPool.shutdown(); while (!threadPool.awaitTermination(1L, TimeUnit.MINUTES)) { System.out.println("Not yet. Still waiting for termination"); } System.out.println("count = " + count); } }
上面的代码中为了等待线程池中的全部任务执行完已经使用了shutdown()
方法,关闭线程池的方法主要有两个:
shutdown()
,有序关闭线程池,调用后线程池会让已经提交的任务完成执行,可是不会再接受新任务。shutdownNow()
,直接关闭线程池,线程池中正在运行的任务会被中断,正在等待执行的任务不会再被执行,可是这些还在阻塞队列中等待的任务会被做为返回值返回。咱们能够经过调用线程池对象上的一些方法来获取线程池当前的运行信息,经常使用的方法有:
不少状况下咱们也不会直接建立ThreadPoolExecutor
类的对象,而是根据须要经过Executors
的几个静态方法来建立特定用途的线程池。目前经常使用的线程池有四种:
Executors.newCachedThreadPool
方法建立Executors.newFixedThreadPool
方法建立Executors.newScheduledThreadPool
方法建立Executors.newSingleThreadExecutor
方法建立下面经过这些静态方法的源码来具体了解一下不一样类型线程池的特性与适用场景。
JDK中的源码咱们经过在IDE中进行跳转能够很方便地进行查看,下面就是Executors.newCachedThreadPool
方法中的源代码。从代码中咱们能够看到,可缓存线程池其实也是经过直接建立ThreadPoolExecutor
类的构造器建立的,只是其中的参数都已经被设置好了,咱们能够不用作具体的设置。因此咱们要观察的重点就是在这个方法中具体产生了一个怎样配置的ThreadPoolExecutor
对象,以及这样的线程池适用于怎样的场景。
从下面的代码中,咱们能够看到,传入ThreadPoolExecutor
构造器的值有:
- corePoolSize核心线程数为0,表明线程池中的线程数能够为0 - maximumPoolSize最大线程数为Integer.MAX_VALUE,表明线程池中最多能够有无限多个线程 - 超时时间设置为60秒,表示线程池中的线程在空闲60秒后会被回收 - 最后传入的是一个`SynchronousQueue`类型的阻塞队列,表明每个新添加的任务都要立刻有一个工做线程进行处理
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
因此可缓存线程池在添加任务时会优先使用空闲的线程,若是没有就建立一个新线程,线程数没有上限,因此每个任务都会立刻被分配到一个工做线程进行执行,不须要在阻塞队列中等待;若是线程池长期闲置,那么其中的全部线程都会被销毁,节约系统资源。
优势
缺点
适用场景
传入ThreadPoolExecutor
构造器的值有:
nThreads
,即线程池中的线程数量会保持在nThreads
,因此被称为“定长线程池”public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
定长线程池中的线程数会逐步增加到nThreads个,而且在以后空闲线程不会被释放,线程数会一直保持在nThreads
个。若是添加任务时全部线程都处于忙碌状态,那么就会把任务添加到阻塞队列中等待执行,阻塞队列中任务的总数没有上限。
优势
缺点
适用场景
与以前的两个方法不一样,Executors.newScheduledThreadPool
返回的是ScheduledExecutorService
接口对象,能够提供延时执行、定时执行等功能。在线程池配置上有以下特色:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
延时任务线程池实现了ScheduledExecutorService
接口,主要用于须要延时执行和定时执行的状况。
单线程线程池中只有一个工做线程,能够保证添加的任务都以指定顺序执行(先进先出、后进先出、优先级)。可是若是线程池里只有一个线程,为何咱们还要用线程池而不直接用Thread
呢?这种状况下主要有两种优势:一是咱们能够经过共享的线程池很方便地提交任务进行异步执行,而不用本身管理线程的生命周期;二是咱们可使用任务队列并指定任务的执行顺序,很容易作到任务管理的功能。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
经过前面的内容咱们其实已经能够在代码中使用线程池了,可是咱们为何还要去深究线程池的内部实现呢?首先,可能有一个很功利性的目的就是为了面试,在面试时若是能准确地说出一些底层的运行机制与原理那必定能够成为过程当中一个重要的亮点。
可是我认为学习探究线程池的内部实现的做用绝对不只是如此,只有深刻了解并厘清了线程池的具体实现,咱们才能解决实践中须要考虑的各类边界条件。由于多线程编程所表明的并发编程并非一个固定的知识点,而是实践中不断在发展和完善的一个知识门类。咱们也许会须要同时考虑多个维度,最后获得一个特定于应用场景的解决方案,这就要求咱们具有从细节着手构建出解决方案并作好各个考虑维度之间的取舍的能力。
并且我相信只要在某一个点上能突破到至关的深度,那么之后从这个点上向外扩展就会容易得多。也许在刚开始咱们的探究会碰到很是大的阻力,可是咱们要相信,最后咱们能够获得的将不止是一个知识点而是一整个知识面。
在IDE中,例如IDEA里,咱们能够点击咱们样例代码里的ThreadPoolExecutor
类跳转到JDK中ThreadPoolExecutor
类的源代码。在源代码中咱们能够看到不少java.util.concurrent
包的缔造者大牛“Doug Lea”所留下的各类注释,下面的图片就是该类源代码的一个截图。
这些注释的内容很是有参考价值,建议有能力的读者朋友能够本身阅读一遍。下面,咱们就一步步地抽丝剥茧,来揭开线程池类ThreadPoolExecutor
源代码的神秘面纱。
在ThreadPoolExecutor
类定义的开头,咱们能够看到以下的几行代码:
// 控制变量,前3位表示状态,剩下的数据位表示有效的线程数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // Integer的位数减去3位状态位就是线程数的位数 private static final int COUNT_BITS = Integer.SIZE - 3; // CAPACITY就是线程数的上限(含),即2^COUNT_BITS - 1个 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
第一行是一个用来做为控制变量的整型值,即一个Integer。之因此要用AtomicInteger
类是由于要保证多线程安全,在本系列以后的文章中会对AtomicInteger
进行具体介绍。一个整型通常是32位,可是这里的代码为了保险起见,仍是使用了Integer.SIZE
来表示整型的总位数。这里的“位”指的是数据位(bit),在计算机中,8bit = 1字节,1024字节 = 1KB,1024KB = 1MB。每一位都是一个0或1的数字,咱们若是把整型想象成一个二进制(0或1)的数组,那么一个Integer就是32个数字的数组。其中,前三个被用来表示状态,那么咱们就能够表示2^3 = 8个不一样的状态了。剩下的29位二进制数字都会被用于表示当前线程池中有效线程的数量,上限就是(2^29 - 1)个,即常量CAPACITY
。
以后的部分列出了线程池的全部状态:
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
在这里能够忽略数字后面的<< COUNT_BITS
,能够把状态简单地理解为前面的数字部分,这样的简化基本不影响结论。
各个状态的解释以下:
terminated()
方法terminated()
方法调用完成后进入这几个状态所对应的数字值是按照顺序排列的,也就是说线程池的状态只能从小到大变化,这也方便了经过数字比较来判断状态所在的阶段,这种经过数字大小来比较状态值的方法在ThreadPoolExecutor
的源码中会有大量的使用。
下图是这五个状态之间的变化过程:
shutdown()
方法被直接调用,或者在线程池对象被GC回收时经过finalize()
方法隐式调用了shutdown()
方法时,线程池会进入SHUTDOWN状态。该状态下线程池仍然会继续执行完阻塞队列中的任务,只是再也不接受新的任务了。当队列中的任务被执行完后,线程池中的线程也会被回收。当队列和线程都被清空后,线程池将进入TIDYING状态;shutdownNow()
方法,则线程池会进入STOP状态。在STOP状态下,线程池会直接清空阻塞队列中待执行的任务,而后中断全部正在进行中的任务并回收线程。当线程都被清空之后,线程池就会进入TIDYING状态;terminated()
方法,该方法执行完后,线程池就会进入最终的TERMINATED状态,完全结束。到这里咱们就已经清楚地了解了线程从刚被建立时的RUNNING状态一直到最终的TERMINATED状态的整个生命周期了。那么当咱们要向一个RUNNING状态的线程池提交任务时会发生些什么呢?
咱们通常会使用execute
方法提交咱们的任务,那么线程池在这个过程当中作了什么呢?在ThreadPoolExecutor
类的execute()
方法的源代码中,咱们主要作了四件事:
addWorker
方法中的第一个参数是该线程的第一个任务,而第二个参数就是表明是否建立的是核心线程,在execute
方法中addWorker
总共被调用了三次,其中第一次传入的是true,后两次传入的都是false;workQueue.offer()
方法将任务添加到阻塞队列中等待执行;整体上的执行流程以下,下方的黑色同心圆表明流程结束:
这里再重复一次阻塞队列的定义,方便你们阅读:
线程池中的阻塞队列专门用于存放待执行的任务,在ThreadPoolExecutor
中一个任务能够经过两种方式被执行:第一种是直接在建立一个新的Worker时被做为第一个任务传入,由这个新建立的线程来执行;第二种就是把任务放入一个阻塞队列,等待线程池中的工做线程捞取任务进行执行。上面提到的阻塞队列是这样的一种数据结构,它是一个队列(相似于一个List),能够存放0到N个元素。咱们能够对这个队列进行插入和弹出元素的操做,弹出操做能够理解为是一个获取并从队列中删除一个元素的操做。当队列中没有元素时,对这个队列的获取操做将会被阻塞,直到有元素被插入时才会被唤醒;当队列已满时,对这个队列的插入操做将会被阻塞,直到有元素被弹出后才会被唤醒。这样的一种数据结构很是适合于线程池的场景,当一个工做线程没有任务可处理时就会进入阻塞状态,直到有新任务提交后才被唤醒。
下面是带有注释的源代码,你们能够和上面的流程对照起来参考一下:
public void execute(Runnable command) { // 检查提交的任务是否为空 if (command == null) throw new NullPointerException(); // 获取控制变量值 int c = ctl.get(); // 检查当前线程数是否达到了核心线程数 if (workerCountOf(c) < corePoolSize) { // 未达到核心线程数,则建立新线程 // 并将传入的任务做为该线程的第一个任务 if (addWorker(command, true)) // 添加线程成功则直接返回,不然继续执行 return; // 由于前面调用了耗时操做addWorker方法 // 因此线程池状态有可能发生了改变,从新获取状态值 c = ctl.get(); } // 判断线程池当前状态是不是运行中 // 若是是则调用workQueue.offer方法将任务放入阻塞队列 if (isRunning(c) && workQueue.offer(command)) { // 由于执行了耗时操做“放入阻塞队列”,因此从新获取状态值 int recheck = ctl.get(); // 若是当前状态不是运行中,则将刚才放入阻塞队列的任务拿出,若是拿出成功,则直接拒绝这个任务 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) // 若是线程池中没有线程了,那就建立一个 addWorker(null, false); } // 若是放入阻塞队列失败(如队列已满),则添加一个线程 else if (!addWorker(command, false)) // 若是添加线程失败(如已经达到了最大线程数),则拒绝任务 reject(command); }
在前面execute
方法的代码中咱们能够看到线程池是经过addWorker
方法来向线程池中添加新线程的,那么新的线程又是如何运行起来的呢?
这里咱们暂时跳过addWorker
方法的详细源代码,由于虽然这个方法的代码行数较多,可是功能相对比较直接,只是建立一个表明线程的Worker
类对象,并调用这个对象所对应线程对象的start()
方法。咱们知道一旦调用了Thread
类的start()
方法,则这个线程就会开始调用建立线程时传入的Runnable
对象。从下面的Worker
类构造器源代码能够看出,Worker
类正是把本身(this指针)传入了线程的构造器当中,那么这个线程就会运行Worker
类的run()
方法了,这个run()
方法只执行了一行很简单的代码runWorker(this);
。
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); }
咱们看到线程池中的线程在启动时会调用对应的Worker
类的runWorker
方法,而这里就是整个线程池任务执行的核心所在了。runWorker
方法中包含有一个相似无限循环的while语句,让worker对象能够不断执行提交到线程池中的新任务。
你们能够配合代码上带有的注释来理解该方法的具体实现:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 将worker的状态重置为正常状态,由于state状态值在构造器中被初始化为-1 w.unlock(); // 经过completedAbruptly变量的值判断任务是否正常执行完成 boolean completedAbruptly = true; try { // 若是task为null就经过getTask方法获取阻塞队列中的下一个任务 // getTask方法通常不会返回null,因此这个while相似于一个无限循环 // worker对象就经过这个方法的持续运行来不断处理新的任务 while (task != null || (task = getTask()) != null) { // 每一次任务的执行都必须获取锁来保证下方临界区代码的线程安全 w.lock(); // 若是状态值大于等于STOP(状态值是有序的,即STOP、TIDYING、TERMINATED) // 且当前线程尚未被中断,则主动中断线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); // 开始 try { // 执行任务前处理操做,默认是一个空实现 // 在子类中能够经过重写来改变任务执行前的处理行为 beforeExecute(wt, task); // 经过thrown变量保存任务执行过程当中抛出的异常 // 提供给下面finally块中的afterExecute方法使用 Throwable thrown = null; try { // *** 重要:实际执行任务的代码 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { // 由于Runnable接口的run方法中不能抛出Throwable对象 // 因此要包装成Error对象抛出 thrown = x; throw new Error(x); } finally { // 执行任务后处理操做,默认是一个空实现 // 在子类中能够经过重写来改变任务执行后的处理行为 afterExecute(task, thrown); } } finally { // 将循环变量task设置为null,表示已处理完成 task = null; // 累加当前worker已经完成的任务数 w.completedTasks++; // 释放while体中第一行获取的锁 w.unlock(); } } // 将completedAbruptly变量设置为false,表示任务正常处理完成 completedAbruptly = false; } finally { // 销毁当前的worker对象,并完成一些诸如完成任务数量统计之类的辅助性工做 // 在线程池当前状态小于STOP的状况下会建立一个新的worker来替换被销毁的worker processWorkerExit(w, completedAbruptly); } }
在runWorker
方法的源代码中有两个比较重要的方法调用,一个是while条件中对getTask
方法的调用,一个是在方法的最后对processWorkerExit
方法的调用。下面是对这两个方法更详细的解释。
getTask
方法在阻塞队列中有待执行的任务时会从队列中弹出一个任务并返回,若是阻塞队列为空,那么就会阻塞等待新的任务提交到队列中直到超时(在一些配置下会一直等待而不超时),若是在超时以前获取到了新的任务,那么就会将这个任务做为返回值返回。
当getTask
方法返回null时会致使当前Worker退出,当前线程被销毁。在如下状况下getTask
方法才会返回null:
setMaximumPoolSize
修改了最大线程数而致使的结果;线程能够被超时回收的状况下等待新任务超时。线程被超时回收通常有如下两种状况:
processWorkerExit
方法会销毁当前线程对应的Worker对象,并执行一些累加总处理任务数等辅助操做。但在线程池当前状态小于STOP的状况下会建立一个新的Worker来替换被销毁的Worker,有兴趣的读者能够自行参考processWorkerExit
方法源代码。
到这里咱们的线程池源代码之旅就结束了,但愿你们在看完这篇文章以后能对线程池的使用和运行都有一个大概的印象。为何说只是有了一个大概的印象呢?由于我以为不少没有相关基础的读者读到这里可能还只是对线程池有了一个本身的认识,对其中的一些细节可能尚未彻底捕捉到。因此我建议你们在看完下面的总结以后不妨再返回到文章的开头多读几遍,相信第二遍的阅读能给你们带来不同的体验,由于我本身也是在第三次读ThreadPoolExecutor
类的源代码时才真正打通了其中的一些重要关节的。
在这篇文章中咱们从线程池的概念和基本使用方法提及,而后介绍了ThreadPoolExecutor
的构造器参数和经常使用的四种具体配置。最后的一大半篇幅咱们一块儿在TheadPoolExecutor
类的源代码中畅游了一番,了解了从线程池的建立到任务执行的完整执行模型。
在浏览ThreadPoolExexutor
源码的过程当中,有几个点咱们其实并无彻底说清楚,好比对锁的加锁操做、对控制变量的屡次获取、控制变量的AtomicInteger类型。在下一篇文章中,我将会介绍这些以锁、volatile变量、CAS操做、AQS抽象类为表明的一系列线程同步方法,欢迎感兴趣的读者继续关注我后续发布的文章~