在 我会手动建立线程,为何要使用线程池? 中详细的介绍了 ExecutorService,能够将整块任务拆分作简单的并行处理;html
在 不会用Java Future,我怀疑你泡茶没我快 中又详细的介绍了 Future 的使用,填补了 Runnable 不能获取线程执行结果的空缺java
将两者结合起来使用看似要一招吃天下了(Java有并发,并发之大,一口吃不下), but ~~ 是我太天真编程
假设咱们有 4 个任务(A, B, C, D)用来执行复杂的计算,每一个任务的执行时间随着输入参数的不一样而不一样,若是将任务提交到 ExecutorService, 相信你已经能够“信手拈来”并发
ExecutorService executorService = Executors.newFixedThreadPool(4); List<Future> futures = new ArrayList<Future<Integer>>(); futures.add(executorService.submit(A)); futures.add(executorService.submit(B)); futures.add(executorService.submit(C)); futures.add(executorService.submit(D)); // 遍历 Future list,经过 get() 方法获取每一个 future 结果 for (Future future:futures) { Integer result = future.get(); // 其余业务逻辑 }
先直入主题,用 CompletionService 实现一样的场景异步
ExecutorService executorService = Executors.newFixedThreadPool(4); // ExecutorCompletionService 是 CompletionService 惟一实现类 CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService ); List<Future> futures = new ArrayList<Future<Integer>>(); futures.add(executorCompletionService.submit(A)); futures.add(executorCompletionService.submit(B)); futures.add(executorCompletionService.submit(C)); futures.add(executorCompletionService.submit(D)); // 遍历 Future list,经过 get() 方法获取每一个 future 结果 for (int i=0; i<futures.size(); i++) { Integer result = executorCompletionService.take().get(); // 其余业务逻辑 }
两种方式在代码实现上几乎一毛同样,咱们曾经说过 JDK 中不会重复造轮子,若是要造一个新轮子,一定是原有的轮子在某些场景的使用上有致命缺陷异步编程
既然新轮子出来了,两者到底有啥不一样呢? 在 搞定 CompletableFuture,并发异步编程和编写串行程序还有什么区别? 文中,咱们提到了 Future get()
方法的致命缺陷:函数
若是 Future 结果没有完成,调用 get() 方法,程序会 阻塞在那里,直至获取返回结果
先来看第一种实现方式,假设任务 A 因为参数缘由,执行时间相对任务 B,C,D 都要长不少,可是按照程序的执行顺序,程序在 get() 任务 A 的执行结果会阻塞在那里,致使任务 B,C,D 的后续任务没办法执行。又由于每一个任务执行时间是不固定的,因此不管怎样调整将任务放到 List 的顺序,都不合适,这就是致命弊端高并发
新轮子天然要解决这个问题,它的设计理念就是哪一个任务先执行完成,get() 方法就会获取到相应的任务结果,这么作的好处是什么呢?来看个图你就瞬间理解了源码分析
两张图一对比,执行时长高下立判了,在当今高并发的时代,这点时间差,在吞吐量上起到的效果可能不是一点半点了性能
那 CompletionService 是怎么作到获取最早执行完的任务结果的呢?
若是你使用过消息队列,你应该秒懂我要说什么了,CompletionService 实现原理很简单
就是一个将异步任务的生产和任务完成结果的消费解耦的服务
用人话解释一下上面的抽象概念我只能再画一张图了
说白了,哪一个任务执行的完,就直接将执行结果放到队列中,这样消费者拿到的结果天然就是最先拿到的那个了
从上图中看到,有任务,有结果队列,那 CompletionService
天然也要围绕着几个关键字作文章了
带着这些线索,咱们走进 CompletionService 源码看一看
CompletionService
是一个接口,它简单的只有 5 个方法:
Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
关于 2 个 submit 方法, 我在 不会用Java Future,我怀疑你泡茶没我快 文章中作了很是详细的分析以及案例使用说明,这里再也不过多赘述
另外 3 个方法都是从阻塞队列中获取并移除阻塞队列第一个元素,只不过他们的功能略有不一样
因此说,按大类划分上面5个方法,其实就是两个功能
CompletionService
只是接口,ExecutorCompletionService
是该接口的惟一实现类
先来看一下类结构, 实现类里面并无多少内容
<fancybox></fancybox>
ExecutorCompletionService
有两种构造函数:
private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
两个构造函数都须要传入一个 Executor 线程池,由于是处理异步任务的,咱们是不被容许手动建立线程的,因此这里要使用线程池也就很好理解了
另一个参数是 BlockingQueue,若是不传该参数,就会默认队列为 LinkedBlockingQueue
,任务执行结果就是加入到这个阻塞队列中的
因此要完全理解 ExecutorCompletionService
,咱们只须要知道一个问题的答案就能够了:
它是如何将异步任务结果放到这个阻塞队列中的?
想知道这个问题的答案,那只须要看它提交任务以后都作了些什么?
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
咱们前面也分析过,execute 是提交 Runnable 类型的任务,自己得不到返回值,但又能够将执行结果放到阻塞队列里面,因此确定是在 QueueingFuture 里面作了文章
从上图中看一看出,QueueingFuture 实现的接口很是多,因此说也就具有了相应的接口能力。
重中之重是,它继承了 FutureTask ,FutureTask 重写了 Runnable 的 run() 方法 (方法细节分析能够查看FutureTask源码分析 ) 文中详细说明了,不管是set() 正常结果,仍是setException() 结果,都会调用 finishCompletion()
方法:
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 重点 重点 重点 done(); callable = null; // to reduce footprint }
上述方法会执行 done() 方法,而 QueueingFuture 恰巧重写了 FutureTask 的 done() 方法:
方法实现很简单,就是将 task 放到阻塞队列中
protected void done() { completionQueue.add(task); }
执行到此的 task 已是前序步骤 set 过结果的 task,因此就能够经过消费阻塞队列获取相应的结果了
相信到这里,CompletionService 在你面前应该没什么秘密可言了
在 JDK docs 上明确给了两个例子来讲明 CompletionService 的用途:
假设你有一组针对某个问题的solvers,每一个都返回一个类型为Result的值,而且想要并发地运行它们,处理每一个返回一个非空值的结果,在某些方法使用(Result r)
其实就是文中开头的使用方式
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); for (Callable<Result> s : solvers) ecs.submit(s); int n = solvers.size(); for (int i = 0; i < n; ++i) { Result r = ecs.take().get(); if (r != null) use(r); } }
假设你想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消全部其余任务
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); int n = solvers.size(); List<Future<Result>> futures = new ArrayList<Future<Result>>(n); Result result = null; try { for (Callable<Result> s : solvers) futures.add(ecs.submit(s)); for (int i = 0; i < n; ++i) { try { Result r = ecs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) {} } } finally { for (Future<Result> f : futures) // 注意这里的参数给的是 true,详解一样在前序 Future 源码分析文章中 f.cancel(true); } if (result != null) use(result); }
这两种方式都是很是经典的 CompletionService 使用 范式 ,请你们仔细品味每一行代码的用意
范式没有说明 Executor 的使用,使用 ExecutorCompletionService,须要本身建立线程池,看上去虽然有些麻烦,但好处是你可让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险 (这也是咱们反复说过屡次的,不要全部业务共用一个线程池)
CompletionService 的应用场景仍是很是多的,好比
CompletionService 不但能知足获取最快结果,还能起到必定 "load balancer" 做用,获取可用服务的结果,使用也很是简单, 只须要遵循范式便可
并发系列 讲了这么多,分析源码的过程也碰到各类队列,接下来咱们就看看那些让人眼花缭乱的队列
日拱一兵 | 原创