自Java 5开始添加了Future
,用来描述一个异步计算的结果。获取一个结果时方法较少,要么经过轮询isDone()
,确认完成后调用get()
获取值,要么调用get()
设置一个超时时间。可是get()
方法会阻塞调用线程,这种阻塞的方式显然和咱们的异步编程的初衷相违背。如:html
package com.common.future; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class FutureRunnerTest { public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newSingleThreadExecutor(); //提交一个 Callable Future<Integer> f = es.submit(() -> { // 长时间的异步计算 Thread.sleep(2000L); System.out.println("长时间的异步计算"); return 100; }); // 轮询 while (true) { System.out.println("f.isDone"); if (f.isDone()) { try { System.out.println(f.get()); es.shutdown(); break; } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } Thread.sleep(100L); } } }
虽然Future
以及相关使用方法提供了异步执行任务的能力,可是对于结果的获取倒是很不方便,只能经过阻塞或者轮询的方式获得任务的结果。阻塞的方式显然和咱们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,并且也不能及时地获得计算结果。java
get()
方法通知你结果。你没法给 Future 植入一个回调函数,当 Future 结果可用的时候,用该回调函数自动的调用 Future 的结果。CompletableFuture 实现了 Future
和 CompletionStage
接口,而且提供了许多关于建立,链式调用和组合多个 Future 的便利方法集,并且有普遍的异常处理支持。web
在该类中提供了四个静态方法建立CompletableFuture对象:编程
runAsync()
运行异步计算supplyAsync()
运行一个异步任务而且返回结果package com.common.future; import java.util.concurrent.*; public class CompletableFutureRunnerTest { // 建立一个固定大小的线程池子 static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() { int count = 1; @Override public Thread newThread(Runnable runnable) { return new Thread(runnable, "custom-executor-" + count++); } }); public static void sleep() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws ExecutionException, InterruptedException { // 直接使用new 建立 CompletableFuture newCompletable = new CompletableFuture(); // 经过给定的值 建立一个 已经完成的 future CompletableFuture<String> cf = CompletableFuture.completedFuture("message"); System.out.println(cf.isDone()); System.out.println(cf.getNow(null)); // 使用 runAsync() 运行异步计算 CompletableFuture<Void> completableFuture01 = CompletableFuture.runAsync(() -> { sleep(); System.out.println("runAsync..."); }); CompletableFuture<Void> completableFuture02 = CompletableFuture.runAsync(() -> { sleep(); System.out.println("runAsync..."); }, executor); // 使用 supplyAsync() 运行一个异步任务而且返回结果 CompletableFuture<String> completableFuture03 = CompletableFuture.supplyAsync(() -> { sleep(); System.out.println("supplyAsync03..."); return "hello world"; }); System.out.println(completableFuture03.isDone()); // Block and wait for the future to complete System.out.println(completableFuture03.get()); CompletableFuture<String> completableFuture04 = CompletableFuture.supplyAsync(() -> { sleep(); System.out.println("supplyAsync04..."); return "hello world"; }, executor); System.out.println(completableFuture04.isDone()); System.out.println(completableFuture04.get()); } }
上面示例中的isDone() ,get() 方法都是 继承于 Future 接口中的,通常状况下,使用CompletableFuture 不须要调用isDone() 方法判断是否完成,也不须要调用get 方法获取异步执行的结果的。缓存
当CompletableFuture的计算结果完成时,有以下三个方法进行处理:异步
CompletableFuture<String> completableFuture03 = CompletableFuture.supplyAsync(() -> { sleep(); System.out.println("supplyAsync03..."); return "hello world"; }); System.out.println(completableFuture03.isDone()); System.out.println(completableFuture03.get()); completableFuture03.whenComplete((t, ex) -> { if (t != null) { System.out.println(t); } else { ex.printStackTrace(); } }); completableFuture03.whenCompleteAsync((t, ex) -> { if (t != null) { System.out.println(t); } else { ex.printStackTrace(); } }); completableFuture03.whenCompleteAsync((t, ex) -> { if (t != null) { System.out.println(t); } else { ex.printStackTrace(); } }, executor);
若是抛出了异常,对异常的处理以下所示,ide
CompletableFuture<Integer> completableFuture05 = CompletableFuture.supplyAsync(() -> { sleep(); return 1 / 0; }); completableFuture05.whenComplete((t, ex) -> { if (t != null) { System.out.println(t); } else { ex.printStackTrace(); } });
这里首先判断 t 的值是否为空,若是为空直接打印异常的堆栈信息异步编程
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: java.lang.ArithmeticException: / by zero at com.common.future.CompletableFutureRunnerTest.lambda$main$4(CompletableFutureRunnerTest.java:93) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 5 more
还有一个专门处理异常状况的方法,如 exceptionally,函数
CompletableFuture<Integer> completableFuture05 = CompletableFuture.supplyAsync(() -> { sleep(); return 1 / 0; }); completableFuture05.exceptionally((e) -> { e.printStackTrace(); return 0; }).whenComplete((t, e) -> { if (t != null) { System.out.println(t); } else { e.printStackTrace(); } }).join();
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
这一组函数的功能是当原来的CompletableFuture计算完后,将结果传递给函数fn
,将fn
的结果做为新的CompletableFuture
计算结果。所以它的功能至关于将CompletableFuture<T>
转换成CompletableFuture<U>
。网站
不以Async
结尾的方法由原来的线程计算,以Async
结尾的方法由默认的线程池ForkJoinPool.commonPool()
或者指定的线程池executor
运行。看下面的例子,
package com.common.future; import java.util.concurrent.CompletableFuture; public class ThenApplyTest { public static void main(String[] args) { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); String f = future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString()).join(); System.out.println(f); CompletableFuture.supplyAsync(() -> "hello world".substring(0, 5)) .thenApply(String::length) .whenComplete((r, t) -> { if (t == null) { System.out.println("the length = " + r); } }); } }
若是你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片断,你可使用thenAccept()
和 thenRun()
方法,这些方法常常在调用链的最末端的最后一个回调函数中使用。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action); public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action); public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor); public CompletableFuture<Void> thenRun(Runnable action); public CompletableFuture<Void> thenRunAsync(Runnable action); public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);
CompletableFuture.thenAccept()
持有一个Consumer<T>
,返回一个CompletableFuture<Void>
。它能够访问CompletableFuture
的结果:
// thenAccept() example CompletableFuture.supplyAsync(() -> { return ProductService.getProductDetail(productId); }).thenAccept(product -> { System.out.println("Got product detail from remote service " + product.getName()) });
thenAcceptBoth
以及相关方法提供了相似的功能,当两个CompletionStage都正常完成计算的时候,就会执行提供的action,它用来组合另一个异步的结果。
//thenAccept public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor);
以下面的使用示例,
CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } return "first"; }).thenAcceptBoth(CompletableFuture.completedFuture("second"), (first, second) -> System.out.println(first + " : " + second)).join();
虽然thenAccept()
能够访问CompletableFuture的结果,但thenRun()
不能访Future的结果,它持有一个Runnable返回CompletableFuture<Void>:
// thenRun() example CompletableFuture.supplyAsync(() -> { // Run some computation }).thenRun(() -> { // Computation Finished. });
假设你想从一个远程API中获取一个用户的详细信息,一旦用户信息可用,你想从另一个服务中获取他的贷款利率。
考虑下如下两个方法getUserDetail()
和getCreditRating()
的实现:
static CompletableFuture<User> getUsersDetail(String userId) { return CompletableFuture.supplyAsync(() -> { return new User("12312121", "xiaoming", 12); }); } static CompletableFuture<Double> getCreditRating(User user) { return CompletableFuture.supplyAsync(() -> { if (user.getUserId() == null || user.getUserId().equals("")) { return 0.0; } else { return 0.1; } }); }
如今让咱们弄明白当使用了thenApply()
后是否会达到咱们指望的结果-
// 若是使用 thenApply ,则返回了一个 最终结果是一个嵌套的CompletableFuture。 CompletableFuture<CompletableFuture<Double>> res = getUsersDetail("testUserId").thenApply(user -> getCreditRating(user));
在更早的示例中,Supplier
函数传入thenApply
将返回一个简单的值,可是在本例中,将返回一个CompletableFuture。以上示例的最终结果是一个嵌套的CompletableFuture。
若是你想获取最终的结果给最顶层future,使用 thenCompose()
方法代替-
// 若是你想获取最终的结果给最顶层future,使用 thenCompose()方法代替- CompletableFuture<Double> result = getUsersDetail("testUserId") .thenCompose(user -> getCreditRating(user));
所以,规则就是-若是你的回调函数返回一个CompletableFuture,可是你想从CompletableFuture链中获取一个直接合并后的结果,这时候你可使用thenCompose()
。
虽然thenCompose()
被用于当一个future依赖另一个future的时候用来组合两个future。thenCombine()
被用来当两个独立的Future
都完成的时候,用来作一些事情。
System.out.println("Retrieving weight."); CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 65.0; }); System.out.println("Retrieving height."); CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 177.8; }); System.out.println("Calculating BMI."); CompletableFuture<Double> combinedFuture = weightInKgFuture .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> { Double heightInMeter = heightInCm / 100; return weightInKg / (heightInMeter * heightInMeter); }); System.out.println("Your BMI is - " + combinedFuture.get());
当两个Future都完成的时候,传给``thenCombine()的回调函数将被调用。
咱们使用thenCompose()
和 thenCombine()
把两个CompletableFuture组合在一块儿。如今若是你想组合任意数量的CompletableFuture,应该怎么作?咱们可使用如下两个方法组合任意数量的CompletableFuture。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs); public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
CompletableFuture.allOf
的使用场景是当你有一个列表的独立future,而且你想在它们都完成后并行的作一些事情。
假设你想下载一个网站的100个不一样的页面。你能够串行的作这个操做,可是这很是消耗时间。所以你想写一个函数,传入一个页面连接,返回一个CompletableFuture,异步的下载页面内容。
public static CompletableFuture<String> downloadWebPage(String pageLink) { return CompletableFuture.supplyAsync(() -> { // Code to download and return the web page's content try { String html = Jsoup.connect(pageLink).execute().body(); return html; } catch (IOException e) { e.printStackTrace(); } return ""; }); }
如今,当全部的页面已经下载完毕,你想计算包含关键字CompletableFuture
页面的数量。可使用CompletableFuture.allOf()
达成目的。
List<String> webPageLinks = Arrays.asList("https://my.oschina.net/xinxingegeya/blog/674006", "https://my.oschina.net/xinxingegeya/blog/637773", "https://my.oschina.net/xinxingegeya/blog/2222079"); List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream() .map(webPageLink -> downloadWebPage(webPageLink)) .collect(Collectors.toList()); // Create a combined Future using allOf() CompletableFuture<Void> allFutures = CompletableFuture.allOf(pageContentFutures.toArray(new CompletableFuture[0]));
使用CompletableFuture.allOf()
的问题是它返回CompletableFuture<Void>。可是咱们能够经过写一些额外的代码来获取全部封装的CompletableFuture结果。
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list - CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> { return pageContentFutures.stream() .map(pageContentFuture -> pageContentFuture.join()) .collect(Collectors.toList()); });
花一些时间理解下以上代码片断。当全部future完成的时候,咱们调用了future.join()
,所以咱们不会在任何地方阻塞。
join()
方法和get()
方法很是相似,这惟一不一样的地方是若是最顶层的CompletableFuture完成的时候发生了异常,它会抛出一个未经检查的异常。
如今让咱们计算包含关键字页面的数量。
// Count the number of web pages having the "CompletableFuture" keyword. CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> { return pageContents.stream() .filter(pageContent -> pageContent.contains("CompletableFuture")) .count(); }); System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());
完整的代码以下,
package common.future; import org.jsoup.Jsoup; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; public class MoreCompletableFutureTest { public static CompletableFuture<String> downloadWebPage(String pageLink) { return CompletableFuture.supplyAsync(() -> { // Code to download and return the web page's content try { String html = Jsoup.connect(pageLink).execute().body(); return html; } catch (IOException e) { e.printStackTrace(); } return ""; }); } public static void main(String[] args) throws ExecutionException, InterruptedException { List<String> webPageLinks = Arrays.asList("https://my.oschina.net/xinxingegeya/blog/674006", "https://my.oschina.net/xinxingegeya/blog/637773", "https://my.oschina.net/xinxingegeya/blog/2222079"); List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream() .map(webPageLink -> downloadWebPage(webPageLink)) .collect(Collectors.toList()); // Create a combined Future using allOf() CompletableFuture<Void> allFutures = CompletableFuture.allOf(pageContentFutures.toArray(new CompletableFuture[0])); // When all the Futures are completed, call `future.join()` to get their results and collect the results in a list - CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> { return pageContentFutures.stream() .map(pageContentFuture -> pageContentFuture.join()) .collect(Collectors.toList()); }); // Count the number of web pages having the "CompletableFuture" keyword. CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> { return pageContents.stream() .filter(pageContent -> pageContent.contains("CompletableFuture")) .count(); }); System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get()); } }
CompletableFuture.anyOf()
和其名字介绍的同样,当任何一个CompletableFuture完成的时候【相同的结果类型】,返回一个新的CompletableFuture。如下示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 2"; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 3"; }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3); System.out.println(anyOfFuture.get()); // Result of Future 2
在以上示例中,当三个中的任何一个CompletableFuture完成, anyOfFuture
就会完成。由于future2
的休眠时间最少,所以她最早完成,最终的结果将是future2
的结果。
CompletableFuture.anyOf()
传入一个Future可变参数,返回CompletableFuture<Object>。CompletableFuture.anyOf()
的问题是若是你的CompletableFuture返回的结果是不一样类型的,这时候你讲会不知道你最终CompletableFuture是什么类型。
======END======