前言编程
在咱们的意识里,同步执行的程序都比较符合人们的思惟方式,而异步的东西一般都很差处理。在异步计算的状况下,以回调表示的动做每每会分散在代码中,也可能相互嵌套在内部,若是须要处理其中一个步骤中可能发生的错误时,状况变得更加糟糕。Java 8 引入了不少的新特性,其中就包含了 CompletableFuture 类的引入,这让咱们编写清晰可读的异步代码变得更加容易,该类功能很是强大,包含了超过 50 多个方法。。。多线程
什么是 CompletableFuture并发
CompletableFuture 类的设计灵感来自于 Google Guava 的 ListenableFuture 类,它实现了 Future 和 CompletionStage 接口而且新增了许多方法,它支持 lambda,经过回调利用非阻塞方法,提高了异步编程模型。它容许咱们经过在与主应用程序线程不一样的线程上(也就是异步)运行任务,并向主线程通知任务的进度、完成或失败,来编写非阻塞代码。异步
为何要引入 CompletableFutureide
Java 的 1.5 版本引入了 Future,你能够把它简单的理解为运算结果的占位符,它提供了两个方法来获取运算结果。异步编程
get():调用该方法线程将会无限期等待运算结果。函数
get(long timeout, TimeUnit unit):调用该方法线程将仅在指定时间 timeout 内等待结果,若是等待超时就会抛出 TimeoutException 异常。源码分析
Future 可使用 Runnable 或 Callable 实例来完成提交的任务,经过其源码能够看出,它存在以下几个问题:this
阻塞 调用 get() 方法会一直阻塞,直到等待直到计算完成,它没有提供任何方法能够在完成时通知,同时也不具备附加回调函数的功能。线程
链式调用和结果聚合处理 在不少时候咱们想连接多个 Future 来完成耗时较长的计算,此时须要合并结果并将结果发送到另外一个任务中,该接口很难完成这种处理。
异常处理 Future 没有提供任何异常处理的方式。
以上这些问题在 CompletableFuture 中都已经解决了,接下来让咱们看看如何去使用 CompletableFuture。
如何建立 CompletableFuture
最简单的建立方式就是调用 CompletableFuture.completedFuture(U value) 方法来获取一个已经完成的 CompletableFuture 对象。
@Test public void testSimpleCompletableFuture() { CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello mghio"); assertTrue(completableFuture.isDone()); try { assertEquals("Hello mghio", completableFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
须要注意的是当咱们对不完整的 CompleteableFuture 调用 get 方法的话,会因为 Future 未完成,所以 get 调用将永远阻塞,此时可使用 CompletableFuture.complete 方法手动完成 Future。
任务异步处理
当咱们想让程序在后台异步执行任务而不关心任务的处理结果时,可使用 runAsync 方法,该方法接收一个 Runnable 类型的参数返回 CompletableFuture。
@Test public void testCompletableFutureRunAsync() { AtomicInteger variable = new AtomicInteger(0); CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable)); runAsync.join(); assertEquals(100, variable.get()); } public void process(AtomicInteger variable) { System.out.println(Thread.currentThread() + " Process..."); variable.set(100); }
若是咱们想让任务在后台异步执行并且须要获取任务的处理结果时,可使用 supplyAsync 方法,该方法接收一个 Supplier类型的参数返回一个 CompletableFuture。
@Test public void testCompletableFutureSupplyAsync() { CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process); try { assertEquals("Hello mghio", supplyAsync.get()); // Blocking } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } public String process() { return "Hello mghio"; }
看到这里你可能会有个问题,上面执行 runAsync 和 supplyAsync 任务的线程是从哪里来的、谁建立的呢?实际上它和 Java 8 中的 parallelStream 相似, CompletableFuture 也是从全局 ForkJoinPool.commonPool() 得到的线程中执行这些任务的。同时,上面的两个方法也提供了自定义线程池去执行任务,其实你若是去了解过 CompletableFuture 的源码的话,你会发现其 API 中的全部方法都有个重载的版本,有或没有自定义 Executor 执行器。
@Test public void testCompletableFutureSupplyAsyncWithExecutor() { ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2); CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process, newFixedThreadPool); try { assertEquals("Hello mghio", supplyAsync.get()); // Blocking } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } public String process() { return "Hello mghio"; }
链式调用和结果聚合处理
咱们知道 CompletableFuture 的 get() 方法会一直阻塞直到获取到结果,CompletableFuture 提供了 thenApply、thenAccept 和 thenRun 等方法来避免这种状况,并且咱们还能够添加任务完成后的回调通知。这几个方法的使用场景以下:
thenApply 当咱们若是要在从 Future 接收值后任务以前运行自定义的业务代码,而后要为此任务返回一些值时,则可使用该方法
thenAccept 若是咱们但愿在从 Future 接收到一些值后执行任务以前运行自定义的业务代码而不关心返回结果值时,则可使用该方法
thenRun 若是咱们想在 Future 完成后运行自定义的业务代码,而且不想为此返回任何值时,则可使用该方法
@Test public void testCompletableFutureThenApply() { Integer notificationId = CompletableFuture.supplyAsync(this::thenApplyProcess) .thenApply(this::thenApplyNotify) // Non Blocking .join(); assertEquals(new Integer(1), notificationId); } @Test public void testCompletableFutureThenAccept() { CompletableFuture.supplyAsync(this::processVariable) .thenAccept(this::thenAcceptNotify) // Non Blocking .join(); assertEquals(100, variable.get()); } @Test public void testCompletableFutureThenRun() { CompletableFuture.supplyAsync(this::processVariable) .thenRun(this::thenRunNotify) .join(); assertEquals(100, variable.get()); } private String processVariable() { variable.set(100); return "success"; } private void thenRunNotify() { System.out.println("thenRun completed notify ...."); } private Integer thenApplyNotify(Integer integer) { return integer; } private void thenAcceptNotify(String s) { System.out.println( String.format("Thread %s completed notify ....", Thread.currentThread().getName())); } public Integer thenApplyProcess() { return 1; }
若是有大量的异步计算,那么咱们能够继续将值从一个回调传递到另外一个回调中去,也就是使用链式调用方式,使用方式很简单。
@Test public void testCompletableFutureThenApplyAccept() { CompletableFuture.supplyAsync(this::findAccountNumber) .thenApply(this::calculateBalance) .thenApply(this::notifyBalance) .thenAccept((i) -> notifyByEmail()).join(); } private void notifyByEmail() { // business code System.out.println("send notify by email ..."); } private Double notifyBalance(Double d) { // business code System.out.println(String.format("your balance is $%s", d)); return 9527D; } private Double calculateBalance(Object o) { // business code return 9527D; } private Double findAccountNumber() { // business code return 9527D; }
比较细心的朋友可能注意到在全部前面的几个方法示例中,全部方法都是在同一线程上执行的。若是咱们但愿这些任务在单独的线程上运行时,那么咱们可使用这些方法对应的异步版本。
@Test public void testCompletableFutureApplyAsync() { ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2); ScheduledExecutorService newSingleThreadScheduledExecutor = Executors .newSingleThreadScheduledExecutor(); CompletableFuture<Double> completableFuture = CompletableFuture .supplyAsync(this::findAccountNumber, newFixedThreadPool) // 从线程池 newFixedThreadPool 获取线程执行任务 .thenApplyAsync(this::calculateBalance, newSingleThreadScheduledExecutor) .thenApplyAsync(this::notifyBalance); Double balance = completableFuture.join(); assertEquals(9527D, balance); }
执行结果处理
thenCompose 方法适合有依赖性的任务处理,好比一个计算帐户余额的业务:首先咱们要先找到账号,而后为该账户计算余额,而后计算完成后再发送通知。全部这些任务都是依赖前一个任务的返回 CompletableFuture 结果,此时咱们须要使用 thenCompose 方法,其实有点相似于 Java 8 流的 flatMap 操做。
@Test public void testCompletableFutureThenCompose() { Double balance = this.doFindAccountNumber() .thenCompose(this::doCalculateBalance) .thenCompose(this::doSendNotifyBalance).join(); assertEquals(9527D, balance); } private CompletableFuture<Double> doSendNotifyBalance(Double aDouble) { sleepSeconds(2); // business code System.out.println(String.format("%s doSendNotifyBalance ....", Thread.currentThread().getName())); return CompletableFuture.completedFuture(9527D); } private CompletableFuture<Double> doCalculateBalance(Double d) { sleepSeconds(2); // business code System.out.println(String.format("%s doCalculateBalance ....", Thread.currentThread().getName())); return CompletableFuture.completedFuture(9527D); } private CompletableFuture<Double> doFindAccountNumber() { sleepSeconds(2); // business code System.out.println(String.format("%s doFindAccountNumber ....", Thread.currentThread().getName())); return CompletableFuture.completedFuture(9527D); } private void sleepSeconds(int timeout) { try { TimeUnit.SECONDS.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); } }
thenCombine 方法主要是用于合并多个独立任务的处理结果。假设咱们须要查找一我的的姓名和住址,则可使用不一样的任务来分别获取,而后要得到这我的的完整信息(姓名 + 住址),则须要合并这两种方法的结果,那么咱们可使用 thenCombine 方法。
@Test public void testCompletableFutureThenCombine() { CompletableFuture<String> thenCombine = this.findName().thenCombine(this.findAddress(), (name, address) -> name + address); String personInfo = thenCombine.join(); assertEquals("mghio Shanghai, China", personInfo); } private CompletableFuture<String> findAddress() { return CompletableFuture.supplyAsync(() -> { sleepSeconds(2); // business code return "Shanghai, China"; }); } private CompletableFuture<String> findName() { return CompletableFuture.supplyAsync(() -> { sleepSeconds(2); // business code return "mghio "; }); }
等待多个任务执行完成
在许多状况下,咱们但愿并行运行多个任务,并在全部任务完成后再进行一些处理。假设咱们要查找 3 个不一样用户的姓名并将结果合并。此时就可使用 CompletableFuture 的静态方法 allOf,该方法会等待全部任务完成,须要注意的是该方法它不会返回全部任务的合并结果,所以咱们必须手动组合任务的执行结果。
@Test public void testCompletableFutureAllof() { List<CompletableFuture<String>> list = Lists.newArrayListWithCapacity(4); IntStream.range(0, 3).forEach(num -> list.add(findName(num))); CompletableFuture<Void> allFuture = CompletableFuture .allOf(list.toArray(new CompletableFuture[0])); CompletableFuture<List<String>> allFutureList = allFuture .thenApply(val -> list.stream().map(CompletableFuture::join).collect(Collectors.toList())); CompletableFuture<String> futureHavingAllValues = allFutureList .thenApply(fn -> String.join("", fn)); String result = futureHavingAllValues.join(); assertEquals("mghio0mghio1mghio2", result); } private CompletableFuture<String> findName(int num) { return CompletableFuture.supplyAsync(() -> { sleepSeconds(2); // business code return "mghio" + num; }); }
异常处理
在多线程中程序异常其实不太好处理,可是幸运的是在 CompletableFuture 中给咱们提供了很方便的异常处理方式,在咱们上面的例子代码中:
@Test public void testCompletableFutureThenCompose() { Double balance = this.doFindAccountNumber() .thenCompose(this::doCalculateBalance) .thenCompose(this::doSendNotifyBalance).join(); }
在上面的代码中,三个方法 doFindAccountNumber、doCalculateBalance 和 doSendNotifyBalance 只要任意一个发生异常了,则以后调用的方法将不会运行。CompletableFuture 提供了三种处理异常的方式,分别是 exceptionally、handle 和 whenComplete 方法。
第一种方式是使用 exceptionally 方法处理异常,若是前面的方法失败并发生异常,则会调用异常回调。
@Test public void testCompletableFutureExceptionally() { CompletableFuture<Double> thenApply = CompletableFuture.supplyAsync(this::findAccountNumber) .thenApply(this::calculateBalance) .thenApply(this::notifyBalance) .exceptionally(ex -> { System.out.println("Exception " + ex.getMessage()); return 0D; }); Double join = thenApply.join(); assertEquals(9527D, join); }
第二种方式是使用 handle 方法处理异常,使用该方式处理异常比上面的 exceptionally 方式更为灵活,咱们能够同时获取到异常对象和当前的处理结果。
@Test public void testCompletableFutureHandle() { CompletableFuture.supplyAsync(this::findAccountNumber) .thenApply(this::calculateBalance) .thenApply(this::notifyBalance) .handle((ok, ex) -> { System.out.println("最终要运行的代码..."); if (ok != null) { System.out.println("No Exception !!"); } else { System.out.println("Exception " + ex.getMessage()); return -1D; } return ok; }); }
第三种是使用 whenComplete 方法处理异常。
@Test public void testCompletableFutureWhenComplete() { CompletableFuture.supplyAsync(this::findAccountNumber) .thenApply(this::calculateBalance) .thenApply(this::notifyBalance) .whenComplete((result, ex) -> { System.out.println("result = " + result + ", ex = " + ex); System.out.println("最终要运行的代码..."); }); }
总结
在本文中,简要的介绍了 CompletableFuture 类的部分方法和使用方式,这个类的方法不少同时提供的功能也很是强大,在异步编程中使用的比较多,熟悉了基本的使用方法以后要深刻了解仍是要深刻源码分析其实现原理。