线程池是很是重要的工具,若是你要成为一个好的工程师,仍是得比较好地掌握这个知识。即便你为了谋生,也要知道,这基本上是面试必问的题目,并且面试官很容易从被面试者的回答中捕捉到被面试者的技术水平。java
本文略长,建议在 pc 上阅读,边看文章边翻源码(Java7 和 Java8 都同样),建议想好好看的读者抽出至少 15 至 30 分钟的整块时间来阅读。固然,若是读者仅为面试准备,能够直接滑到最后的总结部分。面试
开篇来一些废话。下图是 java 线程池几个相关类的继承结构:多线程
先简单说说这个继承结构,Executor 位于最顶层,也是最简单的,就一个 execute(Runnable runnable) 接口方法定义。并发
ExecutorService 也是接口,在 Executor 接口的基础上添加了不少的接口方法,因此通常来讲咱们会使用这个接口。app
而后再下来一层是 AbstractExecutorService,从名字咱们就知道,这是抽象类,这里实现了很是有用的一些方法供子类直接使用,以后咱们再细说。工具
而后才到咱们的重点部分 ThreadPoolExecutor 类,这个类提供了关于线程池所需的很是丰富的功能。oop
另外,咱们还涉及到下图中的这些类:源码分析
同在并发包中的 Executors 类,类名中带字母 s,咱们猜到这个是工具类,里面的方法都是静态方法,如如下咱们最经常使用的用于生成 ThreadPoolExecutor 的实例的一些方法:性能
public static ExecutorService newCachedThreadPool() {ui
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
另外,因为线程池支持获取线程执行的结果,因此,引入了 Future 接口,RunnableFuture 继承自此接口,而后咱们最须要关心的就是它的实现类 FutureTask。到这里,记住这个概念,在线程池的使用过程当中,咱们是往线程池提交任务(task),使用过线程池的都知道,咱们提交的每一个任务是实现了 Runnable 接口的,其实就是先将 Runnable 的任务包装成 FutureTask,而后再提交到线程池。这样,读者才能比较容易记住 FutureTask 这个类名:它首先是一个任务(Task),而后具备 Future 接口的语义,便可以在未来(Future)获得执行的结果。
固然,线程池中的 BlockingQueue 也是很是重要的概念,若是线程数达到 corePoolSize,咱们的每一个任务会提交到等待队列中,等待线程池中的线程来取任务并执行。这里的 BlockingQueue 一般咱们使用其实现类 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每一个实现类都有不一样的特征,使用场景以后会慢慢分析。想要详细了解各个 BlockingQueue 的读者,能够参考个人前面的一篇对 BlockingQueue 的各个实现类进行详细分析的文章。
把事情说完整:除了上面说的这些类外,还有一个很重要的类,就是定时任务实现类 ScheduledThreadPoolExecutor,它继承自本文要重点讲解的 ThreadPoolExecutor,用于实现定时执行。不过本文不会介绍它的实现,我相信读者看完本文后能够比较容易地看懂它的源码。
以上就是本文要介绍的知识,废话很少说,开始进入正文。
/*
*/
public interface Executor {
void execute(Runnable command);
}
咱们能够看到 Executor 接口很是简单,就一个 void execute(Runnable command) 方法,表明提交一个任务。为了让你们理解 java 线程池的整个设计方案,我会按照 Doug Lea 的设计思路来多说一些相关的东西。
咱们常常这样启动一个线程:
new Thread(new Runnable(){
// do something
}).start();
用了线程池 Executor 后就能够像下面这么使用:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
若是咱们但愿线程池同步执行每个任务,咱们能够这么实现这个接口:
class DirectExecutor implements Executor {
public void execute(Runnable r) { r.run();// 这里不是用的new Thread(r).start(),也就是说没有启动任何一个新的线程。 }
}
咱们但愿每一个任务提交进来后,直接启动一个新的线程来执行这个任务,咱们能够这么实现:
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); // 每一个任务都用一个新的线程来执行 }
}
咱们再来看下怎么组合两个 Executor 来使用,下面这个实现是将全部的任务都加到一个 queue 中,而后从 queue 中取任务,交给真正的执行器执行,这里采用 synchronized 进行并发控制:
class SerialExecutor implements Executor {
// 任务队列 final Queue<Runnable> tasks = new ArrayDeque<Runnable>(); // 这个才是真正的执行器 final Executor executor; // 当前正在执行的任务 Runnable active; // 初始化的时候,指定执行器 SerialExecutor(Executor executor) { this.executor = executor; } // 添加任务到线程池: 将任务添加到任务队列,scheduleNext 触发执行器去任务队列取任务 public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { // 具体的执行转给真正的执行器 executor executor.execute(active); } }
}
固然了,Executor 这个接口只有提交任务的功能,太简单了,咱们想要更丰富的功能,好比咱们想知道执行结果、咱们想知道当前线程池有多少个线程活着、已经完成了多少任务等等,这些都是这个接口的不足的地方。接下来咱们要介绍的是继承自 Executor 接口的 ExecutorService 接口,这个接口提供了比较丰富的功能,也是咱们最常使用到的接口。
通常咱们定义一个线程池的时候,每每都是使用这个接口:
ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);
由于这个接口中定义的一系列方法大部分状况下已经能够知足咱们的须要了。
那么咱们简单初略地来看一下这个接口中都有哪些方法:
public interface ExecutorService extends Executor {
// 关闭线程池,已提交的任务继续执行,不接受继续提交新任务 void shutdown(); // 关闭线程池,尝试中止正在执行的全部任务,不接受继续提交新任务 // 它和前面的方法相比,加了一个单词“now”,区别在于它会去中止当前正在进行的任务 List<Runnable> shutdownNow(); // 线程池是否已关闭 boolean isShutdown(); // 若是调用了 shutdown() 或 shutdownNow() 方法后,全部任务结束了,那么返回true // 这个方法必须在调用shutdown或shutdownNow方法以后调用才会返回true boolean isTerminated(); // 等待全部任务完成,并设置超时时间 // 咱们这么理解,实际应用中是,先调用 shutdown 或 shutdownNow, // 而后再调这个方法等待全部的线程真正地完成,返回值意味着有没有超时 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交一个 Callable 任务 <T> Future<T> submit(Callable<T> task); // 提交一个 Runnable 任务,第二个参数将会放到 Future 中,做为返回值, // 由于 Runnable 的 run 方法自己并不返回任何东西 <T> Future<T> submit(Runnable task, T result); // 提交一个 Runnable 任务 Future<?> submit(Runnable task); // 执行全部任务,返回 Future 类型的一个 list <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 也是执行全部任务,可是这里设置了超时时间 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 只有其中的一个任务结束了,就能够返回,返回执行完的那个任务的结果 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; // 同上一个方法,只有其中的一个任务结束了,就能够返回,返回执行完的那个任务的结果, // 不过这个带超时,超过指定的时间,抛出 TimeoutException 异常 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
这些方法都很好理解,一个简单的线程池主要就是这些功能,能提交任务,能获取结果,能关闭线程池,这也是为何咱们常常用这个接口的缘由。
在继续往下层介绍 ExecutorService 的实现类以前,咱们先来讲说相关的类 FutureTask。
Future -> RunnableFuture -> FutureTask
Runnable -> RunnableFuture
FutureTask 经过 RunnableFuture 间接实现了 Runnable 接口,
因此每一个 Runnable 一般都先包装成 FutureTask,
而后调用 executor.execute(Runnable command) 将其提交给线程池
咱们知道,Runnable 的 void run() 方法是没有返回值的,因此,一般,若是咱们须要的话,会在 submit 中指定第二个参数做为返回值:
<T> Future<T> submit(Runnable task, T result);
其实到时候会经过这两个参数,将其包装成 Callable。
Callable 也是由于线程池的须要,因此才有了这个接口。它和 Runnable 的区别在于 run() 没有返回值,而 Callable 的 call() 方法有返回值,同时,若是运行出现异常,call() 方法会抛出异常。
public interface Callable<V> {
V call() throws Exception;
}
在这里,就不展开说 FutureTask 类了,由于本文篇幅原本就够大了,这里咱们须要知道怎么用就好了。
下面,咱们来看看 ExecutorService 的抽象实现 AbstractExecutorService 。
AbstractExecutorService 抽象类派生自 ExecutorService 接口,而后在其基础上实现了几个实用的方法,这些方法提供给子类进行调用。
这个抽象类实现了 invokeAny 方法和 invokeAll 方法,这里的两个 newTaskFor 方法也比较有用,用于将任务包装成 FutureTask。定义于最上层接口 Executor中的 void execute(Runnable command) 因为不须要获取结果,不会进行 FutureTask 的包装。
须要获取结果(FutureTask),用 submit 方法,不须要获取结果,能够用 execute 方法。
下面,我将一行一行源码地来分析这个类,跟着源码来看看其实现吧:
Tips: invokeAny 和 invokeAll 方法占了这整个类的绝大多数篇幅,读者能够选择适当跳过,由于它们可能在你的实践中使用的频次比较低,并且它们不带有承前启后的做用,不用担忧会漏掉什么致使看不懂后面的代码。
public abstract class AbstractExecutorService implements ExecutorService {
// RunnableFuture 是用于获取执行结果的,咱们经常使用它的子类 FutureTask // 下面两个 newTaskFor 方法用于将咱们的任务包装成 FutureTask 提交到线程池中执行 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } // 提交任务 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 1. 将任务包装成 FutureTask RunnableFuture<Void> ftask = newTaskFor(task, null); // 2. 交给执行器执行,execute 方法由具体的子类来实现 // 前面也说了,FutureTask 间接实现了Runnable 接口。 execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); // 1. 将任务包装成 FutureTask RunnableFuture<T> ftask = newTaskFor(task, result); // 2. 交给执行器执行 execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 1. 将任务包装成 FutureTask RunnableFuture<T> ftask = newTaskFor(task); // 2. 交给执行器执行 execute(ftask); return ftask; } // 此方法目的:将 tasks 集合中的任务提交到线程池执行,任意一个线程执行完后就能够结束了 // 第二个参数 timed 表明是否设置超时机制,超时时间为第三个参数, // 若是 timed 为 true,同时超时了尚未一个线程返回结果,那么抛出 TimeoutException 异常 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); // 任务数 int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); // List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); // ExecutorCompletionService 不是一个真正的执行器,参数 this 才是真正的执行器 // 它对执行器进行了包装,每一个任务结束后,将结果保存到内部的一个 completionQueue 队列中 // 这也是为何这个类的名字里面有个 Completion 的缘由吧。 ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); try { // 用于保存异常信息,此方法若是没有获得任何有效的结果,那么咱们能够抛出最后获得的一个异常 ExecutionException ee = null; long lastTime = timed ? System.nanoTime() : 0; Iterator<? extends Callable<T>> it = tasks.iterator(); // 首先先提交一个任务,后面的任务到下面的 for 循环一个个提交 futures.add(ecs.submit(it.next())); // 提交了一个任务,因此任务数量减 1 --ntasks; // 正在执行的任务数(提交的时候 +1,任务结束的时候 -1) int active = 1; for (;;) { // ecs 上面说了,其内部有一个 completionQueue 用于保存执行完成的结果 // BlockingQueue 的 poll 方法不阻塞,返回 null 表明队列为空 Future<T> f = ecs.poll(); // 为 null,说明刚刚提交的第一个线程尚未执行完成 // 在前面先提交一个任务,加上这里作一次检查,也是为了提升性能 if (f == null) { if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } // 这里是 else if,不是 if。这里说明,没有任务了,同时 active 为 0 说明 // 任务都执行完成了。其实我也没理解为何这里作一次 break? // 由于我认为 active 为 0 的状况,必然从下面的 f.get() 返回了 // 2018-02-23 感谢读者 newmicro 的 comment, // 这里的 active == 0,说明全部的任务都执行失败,那么这里是 for 循环出口 else if (active == 0) break; // 这里也是 else if。这里说的是,没有任务了,可是设置了超时时间,这里检测是否超时 else if (timed) { // 带等待的 poll 方法 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); // 若是已经超时,抛出 TimeoutException 异常,这整个方法就结束了 if (f == null) throw new TimeoutException(); long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } // 这里是 else。说明,没有任务须要提交,可是池中的任务没有完成,尚未超时(若是设置了超时) // take() 方法会阻塞,直到有元素返回,说明有任务结束了 else f = ecs.take(); } /* * 我感受上面这一段并非很好理解,这里简单说下。 * 1. 首先,这在一个 for 循环中,咱们设想每个任务都没那么快结束, * 那么,每一次都会进到第一个分支,进行提交任务,直到将全部的任务都提交了 * 2. 任务都提交完成后,若是设置了超时,那么 for 循环其实进入了“一直检测是否超时” 这件事情上 * 3. 若是没有设置超时机制,那么没必要要检测超时,那就会阻塞在 ecs.take() 方法上, 等待获取第一个执行结果 * 4. 若是全部的任务都执行失败,也就是说 future 都返回了, 可是 f.get() 抛出异常,那么从 active == 0 分支出去(感谢 newmicro 提出) // 固然,这个须要看下面的 if 分支。 */ // 有任务结束了 if (f != null) { --active; try { // 返回执行结果,若是有异常,都包装成 ExecutionException return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } }// 注意看 for 循环的范围,一直到这里 if (ee == null) ee = new ExecutionException(); throw ee; } finally { // 方法退出以前,取消其余的任务 for (Future<T> f : futures) f.cancel(true); } } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout)); } // 执行全部的任务,返回任务结果。 // 先不要看这个方法,咱们先想一想,其实咱们本身提交任务到线程池,也是想要线程池执行全部的任务 // 只不过,咱们是每次 submit 一个任务,这里以一个集合做为参数提交 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { // 这个很简单 for (Callable<T> t : tasks) { // 包装成 FutureTask RunnableFuture<T> f = newTaskFor(t); futures.add(f); // 提交任务 execute(f); } for (Future<T> f : futures) { if (!f.isDone()) { try { // 这是一个阻塞方法,直到获取到值,或抛出了异常 // 这里有个小细节,其实 get 方法签名上是会抛出 InterruptedException 的 // 但是这里没有进行处理,而是抛给外层去了。此异常发生于还没执行完的任务被取消了 f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; // 这个方法返回,不像其余的场景,返回 List<Future>,其实执行结果还没出来 // 这个方法返回是真正的返回,任务都结束了 return futures; } finally { // 为何要这个?就是上面说的有异常的状况 if (!done) for (Future<T> f : futures) f.cancel(true); } } // 带超时的 invokeAll,咱们找不一样吧 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null || unit == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) futures.add(newTaskFor(t)); long lastTime = System.nanoTime(); Iterator<Future<T>> it = futures.iterator(); // 提交一个任务,检测一次是否超时 while (it.hasNext()) { execute((Runnable)(it.next())); long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; // 超时 if (nanos <= 0) return futures; } for (Future<T> f : futures) { if (!f.isDone()) { if (nanos <= 0) return futures; try { // 调用带超时的 get 方法,这里的参数 nanos 是剩余的时间, // 由于上面其实已经用掉了一些时间了 f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } } done = true; return futures; } finally { if (!done) for (Future<T> f : futures) f.cancel(true); } }
}
到这里,咱们发现,这个抽象类包装了一些基本的方法,但是像 submit、invokeAny、invokeAll 等方法,它们都没有真正开启线程来执行任务,它们都只是在方法内部调用了 execute 方法,因此最重要的 execute(Runnable runnable) 方法还没出现,须要等具体执行器来实现这个最重要的部分,这里咱们要说的就是 ThreadPoolExecutor 类了。
鉴于本文的篇幅,我以为看到这里的读者应该已经很少了,快餐文化使然啊!我写的每篇文章都力求让读者能够经过个人一篇文章而记住全部的相关知识点,因此篇幅难免长了些。其实,工做了不少年的话,会有一个感受,好比说线程池,即便看了 20 篇各类总结,也不如一篇长文实实在在讲解清楚每个知识点,有点少便是多,多便是少的意味了。
ThreadPoolExecutor 是 JDK 中的线程池实现,这个类实现了一个线程池须要的各个方法,它实现了任务提交、线程管理、监控等等方法。
咱们能够基于它来进行业务上的扩展,以实现咱们须要的其余功能,好比实现定时任务的类 ScheduledThreadPoolExecutor 就继承自 ThreadPoolExecutor。固然,这不是本文关注的重点,下面,仍是赶忙进行源码分析吧。
首先,咱们来看看线程池实现中的几个概念和处理流程。
咱们先回顾下提交任务的几个方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;
}
一个最基本的概念是,submit 方法中,参数是 Runnable 类型(也有Callable 类型),这个参数不是用于 new Thread(runnable).start() 中的,此处的这个参数不是用于启动线程的,这里指的是任务,任务要作的事情是 run() 方法里面定义的或 Callable 中的 call() 方法里面定义的。
初学者每每会搞混这个,由于 Runnable 老是在各个地方出现,常常把一个 Runnable 包到另外一个 Runnable 中。请把它想象成有个 Task 接口,这个接口里面有一个 run() 方法(我想做者只是不想由于这个再定义一个彻底能够用 Runnable 来代替的接口,Callable 的出现,彻底是由于 Runnable 不能知足须要)。
咱们回过神来继续往下看,我画了一个简单的示意图来描述线程池中的一些主要的构件:
固然,上图没有考虑队列是否有界,提交任务时队列满了怎么办?什么状况下会建立新的线程?提交任务时线程池满了怎么办?空闲线程怎么关掉?这些问题下面咱们会一一解决。
咱们常常会使用 Executors 这个工具类来快速构造一个线程池,对于初学者而言,这种工具类是颇有用的,开发者不须要关注太多的细节,只要知道本身须要一个线程池,仅仅提供必需的参数就能够了,其余参数都采用做者提供的默认值。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
这里先不说有什么区别,它们最终都会导向这个构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // 这几个参数都是必需要有的 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
基本上,上面的构造方法中列出了咱们最须要关心的几个属性了,下面逐个介绍下构造方法中出现的这几个属性:
corePoolSize
核心线程数,不要抠字眼,反正先记着有这么个属性就能够了。
maximumPoolSize
最大线程数,线程池容许建立的最大线程数。
workQueue
任务队列,BlockingQueue 接口的某个实现(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)。
keepAliveTime
空闲线程的保活时间,若是某线程的空闲时间超过这个值都没有任务给它作,那么能够被关闭了。注意这个值并不会对全部线程起做用,若是线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会由于空闲太长时间而被关闭,固然,也能够经过调用 allowCoreThreadTimeOut(true)使核心线程数内的线程也能够被回收。
threadFactory
用于生成线程,通常咱们能够用默认的就能够了。一般,咱们能够经过它将咱们的线程的名字设置得比较可读一些,如 Message-Thread-1, Message-Thread-2 相似这样。
handler:
当线程池已经满了,可是又有新的任务提交的时候,该采起什么策略由这个来指定。有几种方式可供选择,像抛出异常、直接拒绝而后返回等,也能够本身实现相应的接口实现本身的逻辑,这个以后再说。
除了上面几个属性外,咱们再看看其余重要的属性。
Doug Lea 采用一个 32 位的整数来存放线程池的状态和当前池中的线程数,其中高 3 位用于存放线程池状态,低 29 位表示线程数(即便只有 29 位,也已经不小了,大概 5 亿多,如今尚未哪一个机器能起这么多线程的吧)。咱们知道,java 语言在整数编码上是统一的,都是采用补码的形式,下面是简单的移位操做和布尔操做,都是挺简单的。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 这里 COUNT_BITS 设置为 29(32-3),意味着前三位用于存放线程状态,后29位用于存放线程数
// 不少初学者很喜欢在本身的代码中写不少 29 这种数字,或者某个特殊的字符串,而后分布在各个地方,这是很是糟糕的
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000 11111111111111111111111111111
// 这里获得的是 29 个 1,也就是说线程池的最大线程数是 2^29-1=536870911
// 以咱们如今计算机的实际状况,这个数量仍是够用的
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 咱们说了,线程池的状态存放在高 3 位中
// 运算结果为 111跟29个0:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 将整数 c 的低 29 位修改成 0,就获得了线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 将整数 c 的高 3 为修改成 0,就获得了线程池中的线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
上面就是对一个整数的简单的位操做,几个操做方法将会在后面的源码中一直出现,因此读者最好把方法名字和其表明的功能记住,看源码的时候也就不须要来来回回翻了。
在这里,介绍下线程池中的各个状态和状态变化的转换过程:
RUNNING:这个没什么好说的,这是最正常的状态:接受新的任务,处理等待队列中的任务
SHUTDOWN:不接受新的任务提交,可是会继续处理等待队列中的任务
STOP:不接受新的任务提交,再也不处理等待队列中的任务,中断正在执行任务的线程
TIDYING:全部的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个
RUNNING 定义为 -1,SHUTDOWN 定义为 0,其余的都比 0 大,因此等于 0 的时候不能提交任务,大于 0 的话,连正在执行的任务也须要中断。
看了这几种状态的介绍,读者大致也能够猜到十之八九的状态转换了,各个状态的转换过程有如下几种:
RUNNING -> SHUTDOWN:当调用了 shutdown() 后,会发生这个状态转换,这也是最重要的
(RUNNING or SHUTDOWN) -> STOP:当调用 shutdownNow() 后,会发生这个状态转换,这下要清楚 shutDown() 和 shutDownNow() 的区别了
SHUTDOWN -> TIDYING:当任务队列和线程池都清空后,会由 SHUTDOWN 转换为 TIDYING
STOP -> TIDYING:当任务队列清空后,发生这个转换
TIDYING -> TERMINATED:这个前面说了,当 terminated() 方法结束后
上面的几个记住核心的就能够了,尤为第一个和第二个。
另外,咱们还要看看一个内部类 Worker,由于 Doug Lea 把线程池中的线程包装成了一个个 Worker,翻译成工人,就是线程池中作任务的线程。因此到这里,咱们知道任务是 Runnable(内部叫 task 或 command),线程是 Worker。
Worker 这里又用到了抽象类 AbstractQueuedSynchronizer。题外话,AQS 在并发中真的是处处出现,并且很是容易使用,写少许的代码就能实现本身须要的同步方式(对 AQS 源码感兴趣的读者请参看我以前写的几篇文章)。
private final class Worker
extends AbstractQueuedSynchronizer implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L; // 这个是真正的线程,任务靠你啦 final Thread thread; // 前面说了,这里的 Runnable 是任务。为何叫 firstTask?由于在建立线程的时候,若是同时指定了 // 这个线程起来之后须要执行的第一个任务,那么第一个任务就是存放在这里的(线程可不止执行这一个任务) // 固然了,也能够为 null,这样线程起来了,本身到任务队列(BlockingQueue)中取任务(getTask 方法)就好了 Runnable firstTask; // 用于存放此线程彻底的任务数,注意了,这里用了 volatile,保证可见性 volatile long completedTasks; // Worker 只有这一个构造方法,传入 firstTask,也能够传 null Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 调用 ThreadFactory 来建立一个新的线程 this.thread = getThreadFactory().newThread(this); } // 这里调用了外部类的 runWorker 方法 public void run() { runWorker(this); } ...// 其余几个方法没什么好看的,就是用 AQS 操做,来获取这个线程的执行权,用了独占锁
}
前面虽然啰嗦,可是简单。有了上面的这些基础后,咱们终于能够看看 ThreadPoolExecutor 的 execute 方法了,前面源码分析的时候也说了,各类方法都最终依赖于 execute 方法:
public void execute(Runnable command) {
if (command == null) throw new NullPointerException(); // 前面说的那个表示 “线程池状态” 和 “线程数” 的整数 int c = ctl.get(); // 若是当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务, // 建立一个新的线程,并把当前任务 command 做为这个线程的第一个任务(firstTask) if (workerCountOf(c) < corePoolSize) { // 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就能够返回了 // 至于执行的结果,到时候会包装到 FutureTask 中。 // 返回 false 表明线程池不容许提交任务 if (addWorker(command, true)) return; c = ctl.get(); } // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了 // 若是线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { /* 这里面说的是,若是任务进入了 workQueue,咱们是否须要开启新的线程 * 由于线程数在 [0, corePoolSize) 是无条件开启新的线程 * 若是线程数已经大于等于 corePoolSize,那么将任务添加到队列中,而后进到这里 */ int recheck = ctl.get(); // 若是线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,而且执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 若是线程池仍是 RUNNING 的,而且线程数为 0,那么开启新的线程 // 到这里,咱们知道了,这块代码的真正意图是:担忧任务提交到队列中了,可是线程都关闭了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是 workQueue 队列满了,那么进入到这个分支 // 以 maximumPoolSize 为界建立新的 worker, // 若是失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略 else if (!addWorker(command, false)) reject(command);
}
对建立线程的错误理解:若是线程数少于 corePoolSize,建立一个线程,若是线程数在 [corePoolSize, maximumPoolSize] 之间那么能够建立线程或复用空闲线程,keepAliveTime 对这个区间的线程有效。
从上面的几个分支,咱们就能够看出,上面的这段话是错误的。
上面这些一时半会也不可能所有消化搞定,咱们先继续往下吧,到时候再回头看几遍。
这个方法很是重要 addWorker(Runnable firstTask, boolean core) 方法,咱们看看它是怎么建立新的线程的:
// 第一个参数是准备提交给这个线程执行的任务,以前说了,能够为 null
// 第二个参数为 true 表明使用核心线程数 corePoolSize 做为建立线程的界线,也就说建立这个线程的时候,
// 若是线程池中的线程总数已经达到 corePoolSize,那么不能响应此次建立线程的请求
// 若是是 false,表明使用最大线程数 maximumPoolSize 做为界线
private boolean addWorker(Runnable firstTask, boolean core) {
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 这个很是很差理解 // 若是线程池已关闭,并知足如下条件之一,那么不建立新的 worker: // 1. 线程池状态大于 SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED // 2. firstTask != null // 3. workQueue.isEmpty() // 简单分析下: // 仍是状态控制的问题,当线程池处于 SHUTDOWN 的时候,不容许提交任务,可是已有的任务继续执行 // 当状态大于 SHUTDOWN 时,不容许提交任务,且中断正在执行的任务 // 多说一句:若是线程池处于 SHUTDOWN,可是 firstTask 为 null,且 workQueue 非空,那么是容许建立 worker 的 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 若是成功,那么就是全部建立线程前的条件校验都知足了,准备建立线程执行任务了 // 这里失败的话,说明有其余线程也在尝试往线程池中建立线程 if (compareAndIncrementWorkerCount(c)) break retry; // 因为有并发,从新再读取一下 ctl c = ctl.get(); // 正常若是是 CAS 失败的话,进到下一个里层的for循环就能够了 // 但是若是是由于其余线程的操做,致使线程池的状态发生了变动,若有其余线程关闭了这个线程池 // 那么须要回到外层的for循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } /* * 到这里,咱们认为在当前这个时刻,能够开始建立线程来执行任务了, * 由于该校验的都校验了,至于之后会发生什么,那是之后的事,至少当前是知足条件的 */ // worker 是否已经启动 boolean workerStarted = false; // 是否已将这个 worker 添加到 workers 这个 HashSet 中 boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; // 把 firstTask 传给 worker 的构造方法 w = new Worker(firstTask); // 取 worker 中的线程对象,以前说了,Worker的构造方法会调用 ThreadFactory 来建立一个新的线程 final Thread t = w.thread; if (t != null) { // 这个是整个类的全局锁,持有这个锁才能让下面的操做“瓜熟蒂落”, // 由于关闭一个线程池须要这个锁,至少我持有锁的期间,线程池不会被关闭 mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); // 小于 SHUTTDOWN 那就是 RUNNING,这个自没必要说,是最正常的状况 // 若是等于 SHUTDOWN,前面说了,不接受新的任务,可是会继续执行等待队列中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // worker 里面的 thread 可不能是已经启动的 if (t.isAlive()) throw new IllegalThreadStateException(); // 加到 workers 这个 HashSet 中 workers.add(w); int s = workers.size(); // largestPoolSize 用于记录 workers 中的个数的最大值 // 由于 workers 是不断增长减小的,经过这个值能够知道线程池的大小曾经达到的最大值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 添加成功的话,启动这个线程 if (workerAdded) { // 启动线程 t.start(); workerStarted = true; } } } finally { // 若是线程没有启动,须要作一些清理工做,如前面 workCount 加了 1,将其减掉 if (! workerStarted) addWorkerFailed(w); } // 返回线程是否启动成功 return workerStarted;
}
简单看下 addWorkFailed 的处理:
// workers 中删除掉相应的 worker
// workCount 减 1
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); // rechecks for termination, in case the existence of this worker was holding up termination tryTerminate(); } finally { mainLock.unlock(); }
}
回过头来,继续往下走。咱们知道,worker 中的线程 start 后,其 run 方法会调用 runWorker 方法:
// Worker 类的 run() 方法
public void run() {
runWorker(this);
}
继续往下看 runWorker 方法:
// 此方法由 worker 线程启动后调用,这里用一个 while 循环来不断地从等待队列中获取任务并执行
// 前面说了,worker 在初始化的时候,能够指定 firstTask,那么第一个任务也就能够不须要从队列中获取
final void runWorker(Worker w) {
// Thread wt = Thread.currentThread(); // 该线程的第一个任务(若是有的话) Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 循环调用 getTask 获取任务 while (task != null || (task = getTask()) != null) { w.lock(); // 若是线程池状态大于等于 STOP,那么意味着该线程也要中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 这是一个钩子方法,留给须要的子类实现 beforeExecute(wt, task); Throwable thrown = null; try { // 到这里终于能够执行任务了 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { // 这里不容许抛出 Throwable,因此转换为 Error thrown = x; throw new Error(x); } finally { // 也是一个钩子方法,将 task 和异常做为参数,留给须要的子类实现 afterExecute(task, thrown); } } finally { // 置空 task,准备 getTask 获取下一个任务 task = null; // 累加完成的任务数 w.completedTasks++; // 释放掉 worker 的独占锁 w.unlock(); } } completedAbruptly = false; } finally { // 若是到这里,须要执行线程关闭: // 1. 说明 getTask 返回 null,也就是说,这个 worker 的使命结束了,执行关闭 // 2. 任务执行过程当中发生了异常 // 第一种状况,已经在代码处理了将 workCount 减 1,这个在 getTask 方法分析中会说 // 第二种状况,workCount 没有进行处理,因此须要在 processWorkerExit 中处理 // 限于篇幅,我不许备分析这个方法了,感兴趣的读者请自行分析源码 processWorkerExit(w, completedAbruptly); }
}
咱们看看 getTask() 是怎么获取任务的,这个方法写得真的很好,每一行都很简单,组合起来却全部的状况都想好了:
// 此方法有三种可能:
// 1. 阻塞直到获取到任务返回。咱们知道,默认 corePoolSize 以内的线程是不会被回收的,
// 它们会一直等待任务
// 2. 超时退出。keepAliveTime 起做用的时候,也就是若是这么多时间内都没有任务,那么应该执行关闭
// 3. 若是发生了如下条件,此方法必须返回 null:
// - 池中有大于 maximumPoolSize 个 workers 存在(经过调用 setMaximumPoolSize 进行设置)
// - 线程池处于 SHUTDOWN,并且 workQueue 是空的,前面说了,这种再也不接受新的任务
// - 线程池处于 STOP,不只不接受新的线程,连 workQueue 中的线程也再也不执行
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 两种可能 // 1. rs == SHUTDOWN && workQueue.isEmpty() // 2. rs >= STOP if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // CAS 操做,减小工做线程数 decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); // 容许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭 timed = allowCoreThreadTimeOut || wc > corePoolSize; // 这里 break,是为了避免往下执行后一个 if (compareAndDecrementWorkerCount(c)) // 两个 if 一块儿看:若是当前线程数 wc > maximumPoolSize,或者超时,都返回 null // 那这里的问题来了,wc > maximumPoolSize 的状况,为何要返回 null? // 换句话说,返回 null 意味着关闭线程。 // 那是由于有可能开发者调用了 setMaximumPoolSize 将线程池的 maximumPoolSize 调小了 if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl // compareAndDecrementWorkerCount(c) 失败,线程池中的线程数发生了改变 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } // wc <= maximumPoolSize 同时没有超时 try { // 到 workQueue 中获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { // 若是此 worker 发生了中断,采起的方案是重试 // 解释下为何会发生中断,这个读者要去看 setMaximumPoolSize 方法, // 若是开发者将 maximumPoolSize 调小了,致使其小于当前的 workers 数量, // 那么意味着超出的部分线程要被关闭。从新进入 for 循环,天然会有部分线程会返回 null timedOut = false; } }
}
到这里,基本上也说完了整个流程,读者这个时候应该回到 execute(Runnable command) 方法,看看各个分支,我把代码贴过来一下:
public void execute(Runnable command) {
if (command == null) throw new NullPointerException(); // 前面说的那个表示 “线程池状态” 和 “线程数” 的整数 int c = ctl.get(); // 若是当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务, // 建立一个新的线程,并把当前任务 command 做为这个线程的第一个任务(firstTask) if (workerCountOf(c) < corePoolSize) { // 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就能够返回了 // 至于执行的结果,到时候会包装到 FutureTask 中。 // 返回 false 表明线程池不容许提交任务 if (addWorker(command, true)) return; c = ctl.get(); } // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了 // 若是线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { /* 这里面说的是,若是任务进入了 workQueue,咱们是否须要开启新的线程 * 由于线程数在 [0, corePoolSize) 是无条件开启新的线程 * 若是线程数已经大于等于 corePoolSize,那么将任务添加到队列中,而后进到这里 */ int recheck = ctl.get(); // 若是线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,而且执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 若是线程池仍是 RUNNING 的,而且线程数为 0,那么开启新的线程 // 到这里,咱们知道了,这块代码的真正意图是:担忧任务提交到队列中了,可是线程都关闭了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是 workQueue 队列满了,那么进入到这个分支 // 以 maximumPoolSize 为界建立新的 worker, // 若是失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略 else if (!addWorker(command, false)) reject(command);
}
上面各个分支中,有两种状况会调用 reject(command) 来处理任务,由于按照正常的流程,线程池此时不能接受这个任务,因此须要执行咱们的拒绝策略。接下来,咱们说一说 ThreadPoolExecutor 中的拒绝策略。
final void reject(Runnable command) {
// 执行拒绝策略 handler.rejectedExecution(command, this);
}
此处的 handler 咱们须要在构造线程池的时候就传入这个参数,它是 RejectedExecutionHandler 的实例。
RejectedExecutionHandler 在 ThreadPoolExecutor 中有四个已经定义好的实现类可供咱们直接使用,固然,咱们也能够实现本身的策略,不过通常也没有必要。
// 只要线程池没有被关闭,那么由提交任务的线程本身来执行这个任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
}
// 无论怎样,直接抛出 RejectedExecutionException 异常
// 这个是默认的策略,若是咱们构造线程池的时候不传相应的 handler 的话,那就会指定使用这个
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
}
// 不作任何处理,直接忽略掉这个任务
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
}
// 这个相对霸道一点,若是线程池没有被关闭的话,
// 把队列队头的任务(也就是等待了最长时间的)直接扔掉,而后提交这个任务到等待队列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
}
到这里,ThreadPoolExecutor 的源码算是分析结束了。单纯从源码的难易程度来讲,ThreadPoolExecutor 的源码还算是比较简单的,只是须要咱们静下心来好好看看罢了。
这节其实也不是分析 Executors 这个类,由于它仅仅是工具类,它的全部方法都是 static 的。
生成一个固定大小的线程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(由于这里它是没用的,即便不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列。
过程分析:刚开始,每提交一个任务都建立一个 worker,当 worker 的数量达到 nThreads 后,再也不建立新的线程,而是把任务提交到 LinkedBlockingQueue 中,并且以后线程数始终为 nThreads。
生成只有一个线程的固定线程池,这个更简单,和上面的同样,只要设置线程数为 1 就能够了:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
生成一个须要的时候就建立新的线程,同时能够复用以前建立的线程(若是这个线程当前没有任务)的线程池:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue。
这种线程池对于任务能够比较快速地完成的状况有比较好的性能。若是线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。因此若是线程池空闲了很长时间也不会有问题,由于随着全部的线程都会被关闭,整个线程池不会占用任何的系统资源。
过程分析:我把 execute 方法的主体黏贴过来,让你们看得明白些。鉴于 corePoolSize 是 0,那么提交任务的时候,直接将任务提交到队列中,因为采用了 SynchronousQueue,因此若是是第一个任务提交的时候,offer 方法确定会返回 false,由于此时没有任何 worker 对这个任务进行接收,那么将进入到最后一个分支来建立第一个 worker。以后再提交任务的话,取决因而否有空闲下来的线程对任务进行接收,若是有,会进入到第二个 if 语句块中,不然就是和第一个任务同样,进到最后的 else if 分支。
int c = ctl.get();
// corePoolSize 为 0,因此不会进到这个 if 分支
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return; c = ctl.get();
}
// offer 若是有空闲线程恰好能够接收此任务,那么返回 true,不然返回 false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
SynchronousQueue 是一个比较特殊的 BlockingQueue,其自己不储存任何元素,它有一个虚拟队列(或虚拟栈),无论读操做仍是写操做,若是当前队列中存储的是与当前操做相同模式的线程,那么当前操做也进入队列中等待;若是是相反模式,则配对成功,从当前队列中取队头节点。具体的信息,能够看个人另外一篇关于 BlockingQueue 的文章。
我一贯不喜欢写总结,由于我把全部须要表达的都写在正文中了,写小篇幅的总结并不能真正将话说清楚,本文的总结部分为准备面试的读者而写,但愿能帮到面试者或者没有足够的时间看彻底文的读者。
java 线程池有哪些关键属性?
corePoolSize,maximumPoolSize,workQueue,keepAliveTime,rejectedExecutionHandler
corePoolSize 到 maximumPoolSize 之间的线程会被回收,固然 corePoolSize 的线程也能够经过设置而获得回收(allowCoreThreadTimeOut(true))。
workQueue 用于存听任务,添加任务的时候,若是当前线程数超过了 corePoolSize,那么往该队列中插入任务,线程池中的线程会负责到队列中拉取任务。
keepAliveTime 用于设置空闲时间,若是线程数超出了 corePoolSize,而且有些线程的空闲时间超过了这个值,会执行关闭这些线程的操做
rejectedExecutionHandler 用于处理当线程池不能执行此任务时的状况,默认有抛出 RejectedExecutionException 异常、忽略任务、使用提交任务的线程来执行此任务和将队列中等待最久的任务删除,而后提交此任务这四种策略,默认为抛出异常。
说说线程池中的线程建立时机?
若是当前线程数少于 corePoolSize,那么提交任务的时候建立一个新的线程,并由这个线程执行这个任务;
若是当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取任务;
若是队列已满,那么建立新的线程来执行任务,须要保证池中的线程数不会超过 maximumPoolSize,若是此时线程数超过了 maximumPoolSize,那么执行拒绝策略。
Executors.newFixedThreadPool(…) 和 Executors.newCachedThreadPool() 构造出来的线程池有什么差异?
细说太长,往上滑一点点,在 Executors 的小节进行了详尽的描述。
任务执行过程当中发生异常怎么处理?
若是某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其余任务。而后会启动一个新的线程来代替它。
何时会执行拒绝策略?
workers 的数量达到了 corePoolSize(任务此时须要进入任务队列),任务入队成功,与此同时线程池被关闭了,并且关闭线程池并无将这个任务出队,那么执行拒绝策略。这里说的是很是边界的问题,入队和关闭线程池并发执行,读者仔细看看 execute 方法是怎么进到第一个 reject(command) 里面的。
workers 的数量大于等于 corePoolSize,将任务加入到任务队列,但是队列满了,任务入队失败,那么准备开启新的线程,但是线程数已经达到 maximumPoolSize,那么执行拒绝策略。
由于本文实在太长了,因此我没有说执行结果是怎么获取的,也没有说关闭线程池相关的部分,这个就留给读者吧。
本文篇幅是有点长,若是读者发现什么不对的地方,或者有须要补充的地方,请不吝提出,谢谢。
(全文完)