(手机横屏看源码更方便)java
注:java源码分析部分如无特殊说明均基于 java8 版本。设计模式
注:线程池源码部分如无特殊说明均指ThreadPoolExecutor类。框架
前面咱们一块儿学习了线程池中普通任务的执行流程,但其实线程池中还有一种任务,叫做将来任务(future task),使用它您能够获取任务执行的结果,它是怎么实现的呢?异步
建议学习本章前先去看看彤哥以前写的《死磕 java线程系列之本身动手写一个线程池(续)》,有助于理解本章的内容,且那边的代码比较短小,学起来相对容易一些。源码分析
(1)线程池中的将来任务是怎么执行的?学习
(2)咱们能学到哪些比较好的设计模式?this
(3)对咱们将来学习别的框架有什么帮助?spa
咱们仍是从一个例子入手,来说解来章的内容。线程
咱们定义一个线程池,并使用它提交5个任务,这5个任务分别返回0、一、二、三、4,在将来的某一时刻,咱们再取用它们的返回值,作一个累加操做。设计
public class ThreadPoolTest02 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 新建一个固定5个线程的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(5);
List<Future<Integer>> futureList = new ArrayList<>();
// 提交5个任务,分别返回0、一、二、三、4
for (int i = 0; i < 5; i++) {
int num = i;
// 任务执行的结果用Future包装
Future<Integer> future = threadPool.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("return: " + num);
// 返回值
return num;
});
// 把future添加到list中
futureList.add(future);
}
// 任务所有提交完再从future中get返回值,并作累加
int sum = 0;
for (Future<Integer> future : futureList) {
sum += future.get();
}
System.out.println("sum=" + sum);
}
}复制代码
这里咱们思考两个问题:
(1)若是这里使用普通任务,要怎么写,时间大概是多少?
若是使用普通任务,那么就要把累加操做放到任务里面,并且并非那么好写(final的问题),总时间大概是1秒多一点。可是,这样有一个缺点,就是累加操做跟任务自己的内容耦合到一块儿了,后面若是改为累乘,还要修改任务的内容。
(2)若是这里把future.get()放到for循环里面,时间大概是多少?
这个问题咱们先不回答,先来看源码分析。
submit方法,它是提交有返回值任务的一种方式,内部使用将来任务(FutureTask)包装,再交给execute()去执行,最后返回将来任务自己。
public <T> Future<T> submit(Callable<T> task) {
// 非空检测
if (task == null) throw new NullPointerException();
// 包装成FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 交给execute()方法去执行
execute(ftask);
// 返回futureTask
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
// 将普通任务包装成FutureTask
return new FutureTask<T>(callable);
}复制代码
这里的设计很巧妙,实际上这两个方法都是在AbstractExecutorService这个抽象类中完成的,这是模板方法的一种运用。
咱们来看看FutureTask的继承体系:
FutureTask实现了RunnableFuture接口,而RunnableFuture接口组合了Runnable接口和Future接口的能力,而Future接口提供了get任务返回值的能力。
问题:submit()方法返回的为何是Future接口而不是RunnableFuture接口或者FutureTask类呢?
答:这是由于submit()返回的结果,对外部调用者只想暴露其get()的能力(Future接口),而不想暴露其run()的能力(Runaable接口)。
通过上一章的学习,咱们知道execute()方法最后调用的是task的run()方法,上面咱们传进去的任务,最后被包装成了FutureTask,也就是说execute()方法最后会调用到FutureTask的run()方法,因此咱们直接看这个方法就能够了。
public void run() {
// 状态不为NEW,或者修改成当前线程来运行这个任务失败,则直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 真正的任务
Callable<V> c = callable;
// state必须为NEW时才运行
if (c != null && state == NEW) {
// 运行的结果
V result;
boolean ran;
try {
// 任务执行的地方【本文由公从号“彤哥读源码”原创】
result = c.call();
// 已执行完毕
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 处理异常
setException(ex);
}
if (ran)
// 处理结果
set(result);
}
} finally {
// 置空runner
runner = null;
// 处理中断
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}复制代码
能够看到代码也比较简单,先作状态的检测,再执行任务,最后处理结果或异常。
执行任务这里没啥问题,让咱们看看处理结果或异常的代码。
protected void setException(Throwable t) {
// 将状态从NEW置为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 返回值置为传进来的异常(outcome为调用get()方法时返回的)
outcome = t;
// 最终的状态设置为EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 调用完成方法
finishCompletion();
}
}
protected void set(V v) {
// 将状态从NEW置为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 返回值置为传进来的结果(outcome为调用get()方法时返回的)
outcome = v;
// 最终的状态设置为NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 调用完成方法
finishCompletion();
}
}复制代码
咋一看,这两个方法彷佛差很少,不一样的是出去的结果不同且状态不同,最后都调用了finishCompletion()方法。
private void finishCompletion() {
// 若是队列不为空(这个队列实际上为调用者线程)
for (WaitNode q; (q = waiters) != null;) {
// 置空队列
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// 调用者线程
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 若是调用者线程不为空,则唤醒它
// 【本文由公从号“彤哥读源码”原创】
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 钩子方法,子类重写
done();
// 置空任务
callable = null; // to reduce footprint
}复制代码
整个run()方法总结下来:
(1)FutureTask有一个状态state控制任务的运行过程,正常运行结束state从NEW->COMPLETING->NORMAL,异常运行结束state从NEW->COMPLETING->EXCEPTIONAL;
(2)FutureTask保存了运行任务的线程runner,它是线程池中的某个线程;
(3)调用者线程是保存在waiters队列中的,它是何时设置进去的呢?
(4)任务执行完毕,除了设置状态state变化以外,还要唤醒调用者线程。
调用者线程是何时保存在FutureTask中(waiters)的呢?查看构造方法:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}复制代码
发现并无相关信息,咱们再试想一下,若是调用者不调用get()方法,那么这种将来任务是否是跟普通任务没有什么区别?确实是的哈,因此只有调用get()方法了才有必要保存调用者线程到FutureTask中。
因此,咱们来看看get()方法中是什么鬼。
get()方法调用时若是任务未执行完毕,会阻塞直到任务结束。
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 若是状态小于等于COMPLETING,则进入队列等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 返回结果(异常)
return report(s);
}复制代码
是否是很清楚,若是任务状态小于等于COMPLETING,则进入队列等待。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 咱们这里假设不带超时
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 处理中断
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 4. 若是状态大于COMPLETING了,则跳出循环并返回
// 这是自旋的出口
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 若是状态等于COMPLETING,说明任务快完成了,就差设置状态到NORMAL或EXCEPTIONAL和设置结果了
// 这时候就让出CPU,优先完成任务
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 1. 若是队列为空
else if (q == null)
// 初始化队列(WaitNode中记录了调用者线程)
q = new WaitNode();
// 2. 未进入队列
else if (!queued)
// 尝试入队
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 超时处理
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
// 3. 阻塞当前线程(调用者线程)
else
// 【本文由公从号“彤哥读源码”原创】
LockSupport.park(this);
}
}复制代码
这里咱们假设调用get()时任务还未执行,也就是其状态为NEW,咱们试着按上面标示的一、二、三、4走一遍逻辑:
(1)第一次循环,状态为NEW,直接到1处,初始化队列并把调用者线程封装在WaitNode中;
(2)第二次循环,状态为NEW,队列不为空,到2处,让包含调用者线程的WaitNode入队;
(3)第三次循环,状态为NEW,队列不为空,且已入队,到3处,阻塞调用者线程;
(4)假设过了一会任务执行完毕了,根据run()方法的分析最后会unpark调用者线程,也就是3处会被唤醒;
(5)第四次循环,状态确定大于COMPLETING了,退出循环并返回;
问题:为何要在for循环中控制整个流程呢,把这里的每一步单独拿出来写行不行?
答:由于每一次动做都须要从新检查状态state有没有变化,若是拿出去写也是能够的,只是代码会很是冗长。这里只分析了get()时状态为NEW,其它的状态也能够自行验证,都是能够保证正确的,甚至两个线程交叉运行(断点的技巧)。
OK,这里返回以后,再看看是怎么处理最终的结果的。
private V report(int s) throws ExecutionException {
Object x = outcome;
// 任务正常结束
if (s == NORMAL)
return (V)x;
// 被取消了
if (s >= CANCELLED)
throw new CancellationException();
// 执行异常
throw new ExecutionException((Throwable)x);
}复制代码
还记得前面分析run的时候吗,任务执行异常时是把异常放在outcome里面的,这里就用到了。
(1)若是正常执行结束,则返回任务的返回值;
(2)若是异常结束,则包装成ExecutionException异常抛出;
经过这种方式,线程中出现的异常也能够返回给调用者线程了,不会像执行普通任务那样调用者是不知道任务执行到底有没有成功的。
FutureTask除了能够获取任务的返回值之外,还可以取消任务的执行。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}复制代码
这里取消任务是经过中断执行线程来处理的,有兴趣的同窗能够本身分析一下。
若是这里把future.get()放到for循环里面,时间大概是多少?
答:大概会是5秒多一点,由于每提交一个任务,都要阻塞调用者线程直到任务执行完毕,每一个任务执行都是1秒多,因此总时间就是5秒多点。
(1)将来任务是经过把普通任务包装成FutureTask来实现的。
(2)经过FutureTask不只可以获取任务执行的结果,还有感知到任务执行的异常,甚至还能够取消任务;
(3)AbstractExecutorService中定义了不少模板方法,这是一种很重要的设计模式;
(4)FutureTask其实就是典型的异常调用的实现方式,后面咱们学习到Netty、Dubbo的时候还会见到这种设计思想的。
RPC框架中异步调用是怎么实现的?
答:RPC框架经常使用的调用方式有同步调用、异步调用,其实它们本质上都是异步调用,它们就是用FutureTask的方式来实现的。
通常地,经过一个线程(咱们叫做远程线程)去调用远程接口,若是是同步调用,则直接让调用者线程阻塞着等待远程线程调用的结果,待结果返回了再返回;若是是异步调用,则先返回一个将来能够获取到远程结果的东西FutureXxx,固然,若是这个FutureXxx在远程结果返回以前调用了get()方法同样会阻塞着调用者线程。
有兴趣的同窗能够先去预习一下dubbo的异步调用(它是把Future扔到RpcContext中的)。
欢迎关注个人公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一块儿畅游源码的海洋。