以前的两篇文章中,咱们介绍了异步编程,也介绍了线程池的基本概念。也说了,线程池的实现天生也实现了异步任务,容许直接向线程池中进行任务的提交与结果获取。java
可是,咱们始终没有去深刻的了解下,异步任务框架对于任务执行的进度是如何监控的,任务执行的结果该如何获取。git
那么,本篇文章就来详细地探讨下异步框架中,关于任务执行过程当中的一些状态以及执行结果反馈的相关细节。程序员
咱们说过,异步编程的一个好处是:github
我只须要定义好任务,向 ExecutorService 中提交便可,而不用关心何时,什么线程在执行咱们的任务。它会返回一个 Future 对象,咱们经过他了解当前任务的执行细节。编程
Future 接口中定义了如下一些方法:bash
public interface Future<V> {
//取消执行当前任务
boolean cancel(boolean mayInterruptIfRunning);
//当前任务是否被取消了
boolean isCancelled();
//当前任务是否已经完成
boolean isDone();
//返回任务执行的返回结果,若是任务未完成
//将阻塞在 Future 内部队列上等待
V get()
//新增超时限制
V get(long timeout, TimeUnit unit)
}
复制代码
这五个方法,每个都很重要,为咱们监控任务的执行提供有力的支持。而咱们的 ThreadPoolExecutor 使用的是 FutureTask 做为 Future 的实现类。微信
而咱们也不妨看看这个 FutureTask 内部都有些哪些成员:多线程
state 和它可取的这些值共同描述了当前任务的执行状态,是刚开始执行,仍是正在执行中,仍是正常结束,仍是异常结束,仍是被取消了,都由这个 state 来体现。框架
callable 表明当前正在执行的工做内容,这里说一下为何只有 Callable 类型的任务,由于全部的 Runnable 类型任务都会被事先转换成 Callable 类型,我以为主要是统一和抽象实现吧。异步
outcome 是任务执行结束的返回值,runner 是正在执行当前任务的线程,waiters 是一个简单的单链表,维护的是全部在任务执行结束以前尝试调用 get 方法获取执行结果的线程集合。当任务执行结束自当唤醒队列中全部的线程。
除此以外,还有一个稍显重要的方法,就是 run 方法,这个方法会在任务开始时由 ExecutorService 调用,这是一个很核心的方法,虽然方法体有点长,可是逻辑简单,咱们大致上归纳下。
这里须要额外去说一下,第三步中的 set 方法除了会将任务执行的返回结果设置到 FutureTask 的 outcome 字段上,还会调用 finishCompletion 方法完成任务的调用,尝试唤醒全部在等待任务执行结果的线程。
其余的方法就不去看了,也比较多,还算是简单的,若是有所想法,也欢迎你和我探讨交流。
那么,咱们也来看一个最简单的应用示例:
咱们向线程池提交了一个任务,这个任务的工做量不大,就是睡觉而后返回执行结果。而咱们能够直接调用 get 方法去获取任务执行的结果,不过 get 方法是阻塞式的,一旦任务还未执行结束,当前线程将丢失 CPU 进而被阻塞到 Future 的内部队列上。
因此,推荐你们在 get 返回结果以前,先判断下目标任务是否已经执行结束,进而避免当前线程的阻塞唤醒所带来的代价。
到这里,相信你也必定看出来了,FutureTask 实现的 Future 的弊端在 get 方法,这个方法非异步,若是没有成功获取到任务的执行结果就将直接阻塞当前线程,以等待任务的执行完成。
可是,有一种情境,当咱们向线程池中提交了不少任务,可是不清楚各个任务的执行效率,也就是不知道谁先执行结束,若是直接 get 某个未完成的任务,将致使当前线程阻塞等待。
那么咱们能不能阻塞,直接获取已经执行结束的任务 Future,而未完成的任务不容许获取它的 Future?
分析 CompletionService 以前,咱们搬出以前分析过的一张类图:
左半边的类咱们已经在前面的文章中都涉猎了,惟独落下了 CompletionService 这个接口,咱们当时说之后会分析它的,如今咱们来看看这个接口会给咱们带来哪些能力。
首先,从类的继承体系上来看,CompletionService 并不与咱们的 Executor 产生任何直接关系,线程池的实现也没有继承该接口。
实际上来讲,CompletionService 只是利用了 Executor 乃至线程池为本身提供任务的提交与执行能力,而本身不过额外的维护一个队列,保存着全部已经完成的任务的 Future,以致于咱们能够直接在外部调用 take 方法直接获取已完成的任务返回结果,无需阻塞。
废话很少说,咱们写个小 demo,或许你会有更直接的体验:
==要求:使用多线程计算 1-10000 之间的总和==
==思路:分段计算,最后总和相加==
实现:
相信你运行后必定和我是一样的答案:50005000
可能不少人会有疑问,这段代码其实也没什么特别的地方啊,我使用基本的线程池不同也能实现吗?
可是,实际上并无那么简单,由于你不能肯定哪一个任务完成了,哪一个尚未,因此你至少须要写五个循环自旋等待。
而若是你的运气很差,第一个任务特别慢,即使后续的任务已经结束了,主线程也依然因为第一个任务的结果拿不到而阻塞,耽误了对其余已完成任务的返回结果处理。
乍一看,你可能以为差异不大,但仔细分析了才会发现,一旦任务量增大、增多,真的是「差之毫厘,谬以千里」。
其实,原理我也能够带你们一块儿来看看,并不难:
先从你们最关心的 CompletionService 实现子类内部结构开始:
这里,至少能够看出来两点,字段 executor 是一个任务调度器,completionQueue 是一个阻塞队列。
也就是说,Completion 是彻底依赖外部传入的 Executor 来实现任务的提交与执行的。而这个阻塞队列 completionQueue 就是保存的全部已经完成的任务 Future 对象。
除此以外,ExecutorCompletionService 还自定义了一个内部类 QueueingFuture,重写了 FutureTask 的 done 方法。
可能你们对这个 done 没什么印象,可是还记得咱们说过的 finishCompletion 方法吗?
FutureTask 抽象的描述了一个任务,当线程启动后将调用 FutureTask 内部的 run 方法执行任务的核心逻辑,并在执行的最后调用 finishCompletion 唤醒全部阻塞在本身队列上等待返回结果的线程。
而其中 finishCompletion 方法在结束前,会调用一个 done 方法,这个 done 方法在 FutureTask 中是空实现,没有任何的代码实现,表示并无什么用。
可是咱们的 QueueingFuture 充分利用这一点,重写了 done 方法,而逻辑就是将已结束的任务添加到咱们在外部维护的一个新队列 completionQueue 中,供外部获取调用。
这些就是 CompletionService 的秘密。