做为Java 8 Concurrency API改进而引入,本文是CompletableFuture类的功能和用例的介绍。同时在Java 9 也有对CompletableFuture有一些改进,以后再进入讲解。设计模式
Future异步计算很难操做,一般咱们但愿将任何计算逻辑视为一系列步骤。可是在异步计算的状况下,表示为回调的方法每每分散在代码中或者深深地嵌套在彼此内部。可是当咱们须要处理其中一个步骤中可能发生的错误时,状况可能会变得更复杂。bash
Futrue接口是Java 5中做为异步计算而新增的,但它没有任何方法去进行计算组合或者处理可能出现的错误。并发
在Java 8中,引入了CompletableFuture类。与Future接口一块儿,它还实现了CompletionStage接口。此接口定义了可与其余Future组合成异步计算契约。框架
CompletableFuture同时是一个组合和一个框架,具备大约50种不一样的构成,结合,执行异步计算步骤和处理错误。异步
如此庞大的API可能会使人难以招架,下文将调一些重要的作重点介绍。函数
首先,CompletableFuture类实现Future接口,所以你能够将其用做Future实现,但须要额外的完成实现逻辑。ui
例如,你可使用无构参构造函数建立此类的实例,而后使用complete
方法完成。消费者可使用get方法来阻塞当前线程,直到get()
结果。this
在下面的示例中,咱们有一个建立CompletableFuture实例的方法,而后在另外一个线程中计算并当即返回Future。spa
计算完成后,该方法经过将结果提供给完整方法来完成Future:线程
public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture
= new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
复制代码
为了分离计算,咱们使用了Executor API ,这种建立和完成CompletableFuture的方法能够与任何并发包(包括原始线程)一块儿使用。
请注意,该calculateAsync
方法返回一个Future
实例。
咱们只是调用方法,接收Future实例并在咱们准备阻塞结果时调用它的get方法。
另请注意,get方法抛出一些已检查的异常,即ExecutionException(封装计算期间发生的异常)和InterruptedException(表示执行方法的线程被中断的异常):
Future<String> completableFuture = calculateAsync();
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
复制代码
若是你已经知道计算的结果,也能够用变成同步的方式来返回结果。
Future<String> completableFuture =
CompletableFuture.completedFuture("Hello");
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
复制代码
做为在某些场景中,你可能但愿取消Future任务的执行。
假设咱们没有找到结果并决定彻底取消异步执行任务。这能够经过Future的取消方法完成。此方法mayInterruptIfRunning
,但在CompletableFuture的状况下,它没有任何效果,由于中断不用于控制CompletableFuture的处理。
这是异步方法的修改版本:
public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return null;
});
return completableFuture;
}
复制代码
当咱们使用Future.get()方法阻塞结果时,cancel()
表示取消执行,它将抛出CancellationException:
Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException
复制代码
上面的代码很简单,下面介绍几个 static 方法,它们使用任务来实例化一个 CompletableFuture 实例。
CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);
CompletableFuture.supplyAsync(Supplier<U> supplier);
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
复制代码
静态方法runAsync
和supplyAsync
容许咱们相应地从Runnable和Supplier功能类型中建立CompletableFuture实例。
该Runnable的接口是在线程使用旧的接口,它不容许返回值。
Supplier接口是一个不具备参数,并返回参数化类型的一个值的单个方法的通用功能接口。
这容许将Supplier的实例做为lambda表达式提供,该表达式执行计算并返回结果:
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");
// ...
assertEquals("Hello", future.get());
复制代码
在两个任务任务A,任务B中,若是既不须要任务A的值也不想在任务B中引用,那么你能够将Runnable lambda 传递给thenRun()
方法。在下面的示例中,在调用future.get()方法以后,咱们只需在控制台中打印一行:
模板
CompletableFuture.runAsync(() -> {}).thenRun(() -> {});
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
复制代码
thenRun(Runnable runnable)
,任务 A 执行完执行 B,而且 B 不须要 A 的结果。thenRun(Runnable runnable)
,任务 A 执行完执行 B,会返回resultA
,可是 B 不须要 A 的结果。实战
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenRun(() -> System.out.println("Computation finished."));
future.get();
复制代码
在两个任务任务A,任务B中,若是你不须要在Future中有返回值,则能够用 thenAccept
方法接收将计算结果传递给它。最后的future.get()调用返回Void类型的实例。
模板
CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {});
CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
复制代码
runAsync
不会有返回值,第二个方法thenAccept
,接收到的resultA值为null,同时任务B也不会有返回结果supplyAsync
有返回值,同时任务B不会有返回结果。实战
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenAccept(s -> System.out.println("Computation returned: " + s));
future.get();
复制代码
在两个任务任务A,任务B中,任务B想要任务A计算的结果,能够用thenApply
方法来接受一个函数实例,用它来处理结果,并返回一个Future函数的返回值:
模板
CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
复制代码
实战
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApply(s -> s + " World");
assertEquals("Hello World", future.get());
复制代码
固然,多个任务的状况下,若是任务 B 后面还有任务 C,往下继续调用 .thenXxx() 便可。
接下来会有一个颇有趣的设计模式;
CompletableFuture API 的最佳场景是可以在一系列计算步骤中组合CompletableFuture实例。
这种组合结果自己就是CompletableFuture,容许进一步再续组合。这种方法在函数式语言中无处不在,一般被称为monadic设计模式
。
简单说,Monad就是一种设计模式,表示将一个运算过程,经过函数拆解成互相链接的多个步骤。你只要提供下一步运算所需的函数,整个运算就会自动进行下去。
在下面的示例中,咱们使用thenCompose方法按顺序组合两个Futures。
请注意,此方法采用返回CompletableFuture实例的函数。该函数的参数是先前计算步骤的结果。这容许咱们在下一个CompletableFuture的lambda中使用这个值:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
复制代码
该thenCompose方法连同thenApply同样实现告终果的合并计算。可是他们的内部形式是不同的,它们与Java 8中可用的Stream和Optional类的map和flatMap方法是有着相似的设计思路在里面的。
两个方法都接收一个CompletableFuture并将其应用于计算结果,但thenCompose(flatMap)方法接收一个函数,该函数返回相同类型的另外一个CompletableFuture对象。此功能结构容许将这些类的实例继续进行组合计算。
取两个任务的结果
若是要执行两个独立的任务,并对其结果执行某些操做,能够用Future的thenCombine方法:
模板
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});
cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
复制代码
实战
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2));
assertEquals("Hello World", completableFuture.get());
复制代码
更简单的状况是,当你想要使用两个Future结果时,但不须要将任何结果值进行返回时,能够用thenAcceptBoth
,它表示后续的处理不须要返回值,而 thenCombine 表示须要返回值:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));
复制代码
在前面的部分中,咱们展现了关于thenApply()和thenCompose()的示例。这两个API都是使用的CompletableFuture调用,但这两个API的使用是不一样的。
此方法用于处理先前调用的结果。可是,要记住的一个关键点是返回类型是转换泛型中的类型,是同一个CompletableFuture。
所以,当咱们想要转换CompletableFuture 调用的结果时,效果是这样的 :
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);
复制代码
该thenCompose()方法相似于thenApply()在都返回一个新的计算结果。可是,thenCompose()使用前一个Future做为参数。它会直接使结果变新的Future,而不是咱们在thenApply()中到的嵌套Future,而是用来链接两个CompletableFuture,是生成一个新的CompletableFuture:
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
复制代码
所以,若是想要继续嵌套连接CompletableFuture 方法,那么最好使用thenCompose()。
当咱们须要并行执行多个任务时,咱们一般但愿等待全部它们执行,而后处理它们的组合结果。
该CompletableFuture.allOf
静态方法容许等待全部的完成任务:
API
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}
复制代码
实战
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
// ...
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
复制代码
请注意,CompletableFuture.allOf()的返回类型是CompletableFuture 。这种方法的局限性在于它不会返回全部任务的综合结果。相反,你必须手动从Futures获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API能够解决:
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
复制代码
CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是同样的,都是阻塞获取值,它们的区别在于 join() 抛出的是 unchecked Exception。这使得它能够在Stream.map()方法中用做方法引用。
说到这里,咱们顺便来讲下 CompletableFuture 的异常处理。这里咱们要介绍两个方法:
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
复制代码
看下代码
CompletableFuture.supplyAsync(() -> "resultA")
.thenApply(resultA -> resultA + " resultB")
.thenApply(resultB -> resultB + " resultC")
.thenApply(resultC -> resultC + " resultD");
复制代码
上面的代码中,任务 A、B、C、D 依次执行,若是任务 A 抛出异常(固然上面的代码不会抛出异常),那么后面的任务都得不到执行。若是任务 C 抛出异常,那么任务 D 得不到执行。
那么咱们怎么处理异常呢?看下面的代码,咱们在任务 A 中抛出异常,并对其进行处理:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException();
})
.exceptionally(ex -> "errorResultA")
.thenApply(resultA -> resultA + " resultB")
.thenApply(resultB -> resultB + " resultC")
.thenApply(resultC -> resultC + " resultD");
System.out.println(future.join());
复制代码
上面的代码中,任务 A 抛出异常,而后经过.exceptionally()
方法处理了异常,并返回新的结果,这个新的结果将传递给任务 B。因此最终的输出结果是:
errorResultA resultB resultC resultD
String name = null;
// ...
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
})}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
assertEquals("Hello, Stranger!", completableFuture.get());
复制代码
固然,它们也能够都为 null,由于若是它做用的那个 CompletableFuture 实例没有返回值的时候,s 就是 null。
CompletableFuture类中的API的大多数方法都有两个带有Async后缀的附加修饰。这些方法表示用于异步线程。
没有Async后缀的方法使用调用线程运行下一个执行线程阶段。不带Async方法使用ForkJoinPool.commonPool()线程池的fork / join实现运算任务。带有Async方法使用传递式的Executor任务去运行。
下面附带一个案例,能够看到有thenApplyAsync方法。在程序内部,线程被包装到ForkJoinTask实例中。这样能够进一步并行化你的计算并更有效地使用系统资源。
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApplyAsync(s -> s + " World");
assertEquals("Hello World", future.get());
复制代码
在Java 9中, CompletableFuture API经过如下更改获得了进一步加强:
引入了新的实例API:
还有一些静态实用方法:
最后,为了解决超时问题,Java 9又引入了两个新功能:
在本文中,咱们描述了CompletableFuture类的方法和典型用例。