在Markdown的语法中,<u>下划线</u>
中的文字会被解析器加上下划线,为了避免影响阅读,本文中JDK文档涉及到<U>
都会替换为<N>
,请各位注意。html
Java 1.8 新增长的 CompletableFuture 类内部是使用 ForkJoinPool 来实现的,CompletableFuture 实现了 Future接口 和 CompletionStage接口。java
在以前的Future的介绍和基本用法一文中,咱们了解到 Future 表示异步计算的结果。编程
CompletionStage 表明了一个特定的计算的阶段,能够同步或者异步的被完成。api
CompletionStage 能够被看做一个计算流水线上的一个单元,最终会产生一个最终结果,这意味着几个 CompletionStage 能够串联起来,一个完成的阶段能够触发下一阶段的执行,接着触发下一次,直到完成全部阶段。bash
Future是Java 5添加的类,用来描述一个异步计算的结果。你可使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可使用cancel方法中止任务的执行。oracle
虽然Future以及相关使用方法提供了异步执行任务的能力,可是对于结果的获取倒是很不方便,只能经过阻塞或者轮询的方式获得任务的结果。阻塞的方式显然和咱们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,并且也不能及时地获得计算结果。异步
package net.ijiangtao.tech.concurrent.jsd.future.completable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/** * java5 future * * @author ijiangtao * @create 2019-07-22 9:40 **/
public class Java5Future {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//经过 while 循环等待异步计算处理成功
ExecutorService pool = Executors.newFixedThreadPool(10);
Future<Integer> f1 = pool.submit(() -> {
// 长时间的异步计算 ……
Thread.sleep(1);
// 而后返回结果
return 1001;
});
while (!f1.isDone())
System.out.println("is not done yet");
;
System.out.println("while isDone,result=" + f1.get());
//经过阻塞的方式等待异步处理成功
Future<Integer> f2 = pool.submit(() -> {
// 长时间的异步计算 ……
Thread.sleep(1);
// 而后返回结果
return 1002;
});
System.out.println("after blocking,result=" + f2.get());
}
}
复制代码
前面咱们提到 CompletableFuture 为咱们提供了异步计算的实现,而这些实现都是经过它的方法实现的。async
若是你打开它的文档CompletableFuture-Java8Docs,你会发现CompletableFuture提供了将近60个方法。虽然方法不少,若是你仔细看的话,你会这些方法其中不少都是有类似性的。ide
只要你熟练掌握了这些方法,就可以驾轻就熟地使用 CompletableFuture 来进行异步计算了。异步编程
Java的CompletableFuture类老是遵循这样的原则:
下面再也不一一赘述了。
返回值 | 方法名及参数 | 方法说明 |
---|---|---|
static CompletableFuture | supplyAsync(Supplier supplier) | Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier. |
static CompletableFuture | supplyAsync(Supplier supplier, Executor executor) | Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. |
supplyAsync方法以Supplier函数式接口类型为参数,CompletableFuture的计算结果类型为U。由于该方法的参数类型都是函数式接口,因此可使用lambda表达式实现异步任务。后面讲解其余方法的时候,会举例子。
runAsync方法也好理解,它以Runnable函数式接口类型为参数,因此CompletableFuture的计算结果也为空(Runnable的run方法返回值为空)。这里就再也不一一介绍了,感兴趣的小伙伴能够查看API文档。
返回值 | 方法名及参数 | 方法说明 |
---|---|---|
T | get() | Waits if necessary for this future to complete, and then returns its result. |
T | get(long timeout, TimeUnit unit) | Waits if necessary for at most the given time for this future to complete, and then returns its result, if available. |
T | getNow(T valueIfAbsent) | Returns the result value (or throws any encountered exception) if completed, else returns the given valueIfAbsent. |
T | join() | Returns the result value when complete, or throws an (unchecked) exception if completed exceptionally. |
你能够像使用Future同样使用CompletableFuture,来进行阻塞式的计算(虽然不推荐使用)。其中getNow方法比较特殊:结果已经计算完则返回结果或者抛出异常,不然返回给定的valueIfAbsent值。
join和get方法均可以阻塞到计算完成而后得到返回结果,但二者的处理流程有所不一样,能够参考下面一段代码来比较二者处理异常的不一样之处:
public static void main(String[] args) {
try {
new CompletableFutureDemo().test2();
new CompletableFutureDemo().test3();
} catch (Exception e) {
e.printStackTrace();
}
}
public void test2() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return 100;
});
future.join();
}
public void test3() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return 100;
});
future.get();
}
复制代码
输出以下:
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.ArithmeticException: / by zero
at net.ijiangtao.tech.concurrent.jsd.future.completable.CompletableFutureDemo.lambda$test2$0(CompletableFutureDemo.java:32)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 5 more
复制代码
返回值 | 方法名及参数 | 方法说明 |
---|---|---|
CompletableFuture | thenApply(Function<? super T,? extends U> fn) | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function. |
CompletableFuture | thenApplyAsync(Function<? super T,? extends U> fn) | Returns a new CompletionStage that, when this stage completes normally, is executed using this stage's default asynchronous execution facility, with this stage's result as the argument to the supplied function. |
CompletableFuture | thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) | Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. |
使用CompletableFuture,咱们没必要由于等待一个计算完成而阻塞着调用线程,而是告诉CompletableFuture当计算完成的时候请执行某个function。并且咱们还能够将这些操做串联起来,或者将CompletableFuture组合起来。
这一组函数的功能是当原来的CompletableFuture计算完后,将结果传递给函数fn,将fn的结果做为新的CompletableFuture计算结果。所以它的功能至关于将CompletableFuture 转换成 CompletableFuture 。
请看下面的例子:
public static void main(String[] args) throws Exception {
try {
// new CompletableFutureDemo().test1();
} catch (Exception e) {
e.printStackTrace();
}
try {
//new CompletableFutureDemo().test2();
//new CompletableFutureDemo().test3();
} catch (Exception e) {
e.printStackTrace();
}
new CompletableFutureDemo().test4();
}
public void test4() throws Exception {
// Create a CompletableFuture
CompletableFuture<Integer> calculateFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("1");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("2");
return 1 + 2;
});
// Attach a callback to the Future using thenApply()
CompletableFuture<String> resultFuture = calculateFuture.thenApply(number -> {
System.out.println("3");
return "1 + 2 is " + number;
});
// Block and get the result of the future.
System.out.println(resultFuture.get());
}
复制代码
输出结果为:
1
2
3
1 + 2 is 3
复制代码
返回值 | 方法名及参数 | 方法说明 |
---|---|---|
CompletableFuture | thenAccept(Consumer<? super T> action) | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied action. |
CompletableFuture | thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) | Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied action. |
篇幅缘由,Async和Executor方法再也不列举。
只对结果执行Action,而不返回新的计算值,所以计算值为Void。这就好像生产者生产了消息,消费者消费消息之后再也不进行消息的生产同样,所以thenAccept是对计算结果的纯消费。
例如以下方法:
public void test5() throws Exception {
// thenAccept() example
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return "ijiangtao";
}).thenAccept(name -> {
System.out.println("Hi, " + name);
});
System.out.println(future.get());
}
复制代码
thenAccept的get返回为null:
Hi, ijiangtao
null
复制代码
thenAcceptBoth能够消费二者(生产和消费)的结果,下面提供一个例子:
public void test6() throws Exception {
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "ijiangtao";
}), (s1, s2) -> {
System.out.println(s1 + " " + s2);
});
while (true){
}
}
复制代码
输出以下:
hello ijiangtao
复制代码
返回值 | 方法名及参数 | 方法说明 |
---|---|---|
<U,V> CompletableFuture | thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied function. |
<U,V> CompletableFuture | thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the two results as arguments to the supplied function. |
从功能上来说, thenCombine的功能更相似thenAcceptBoth,只不过thenAcceptBoth是纯消费,它的函数参数没有返回值,而thenCombine的函数参数fn有返回值。
public void test7() throws Exception {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
return 1+2;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "1+2 is";
});
CompletableFuture<String> f = future1.thenCombine(future2, (x, y) -> y + " " + x);
System.out.println(f.get()); // 输出:1+2 is 3
}
复制代码
返回值 | 方法名及参数 | 方法说明 |
---|---|---|
CompletableFuture | thenCompose(Function<? super T,? extends CompletionStage> fn) | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage as the argument to the supplied function. |
因为篇幅缘由,Async和Executor方法再也不列举。
thenCompose方法接受一个Function做为参数,这个Function的输入是当前的CompletableFuture的计算值,返回结果将是一个新的CompletableFuture。
假如你须要将两个CompletableFutures相互整合,若是使用thenApply,则结果会是嵌套的CompletableFuture:
CompletableFuture<String> getUsersName(Long id) {
return CompletableFuture.supplyAsync(() -> {
return "ijiangtao";
});
}
CompletableFuture<Integer> getUserAge(String userName) {
return CompletableFuture.supplyAsync(() -> {
return 20;
});
}
public void test8(Long id) throws Exception {
CompletableFuture<CompletableFuture<Integer>> result1 = getUsersName(id)
.thenApply(userName -> getUserAge(userName));
}
复制代码
这时候可使用thenCompose来得到第二个计算的CompletableFuture:
public void test9(Long id) throws Exception {
CompletableFuture<Integer> result2 = getUsersName(id)
.thenCompose(userName -> getUserAge(userName));
}
复制代码
返回值 | 方法名及参数 | 方法说明 |
---|---|---|
CompletableFuture | whenComplete(BiConsumer<? super T,? super Throwable> action) | Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes. |
CompletableFuture | whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) | Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using this stage's default asynchronous execution facility when this stage completes. |
CompletableFuture | whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) | Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using the supplied Executor when this stage completes. |
当CompletableFuture的计算结果完成,或者抛出异常的时候,咱们能够执行特定的Action。whenComplete的参数Action的类型是BiConsumer<? super T,? super Throwable>,它能够处理正常的计算结果,或者异常状况。注意这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。
whenComplete方法不以Async结尾,意味着Action使用相同的线程执行,而以Async结尾的方法可能会使用其它的线程去执行(若是使用相同的线程池,也可能会被同一个线程选中执行)。
下面演示一下异常状况:
public void test10() throws Exception {
String result = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("an RuntimeException");
}
return "s1";
}).whenComplete((s, t) -> {
System.out.println("whenComplete s:"+s);
System.out.println("whenComplete exception:"+t.getMessage());
}).exceptionally(e -> {
System.out.println("exceptionally exception:"+e.getMessage());
return "hello ijiangtao";
}).join();
System.out.println(result);
}
复制代码
输出:
whenComplete s:null
whenComplete exception:java.lang.RuntimeException: an RuntimeException
exceptionally exception:java.lang.RuntimeException: an RuntimeException
hello ijiangtao
复制代码
Java5新增的Future类,能够实现阻塞式的异步计算,但这种阻塞的方式显然和咱们的异步编程的初衷相违背。为了解决这个问题,JDK吸取了Guava的设计思想,加入了Future的诸多扩展功能造成了CompletableFuture。
本文重点介绍了CompletableFuture的不一样类型的API,掌握了这些API对于使用非阻塞的函数式异步编程进行平常开发很是有帮助,同时也为下面深刻了解异步编程的各类原理和特性打下了良好的基础。
CompletableFuture - javase 8 docs
CompletableFuture - Guide To CompletableFuture
CompletableFuture - Java CompletableFuture Tutorial with Examples
CompletableFuture - Java 8: Writing asynchronous code with CompletableFuture
《写给大忙人的JavaSE9核心技术》- 10.2 异步计算
CompletableFuture - 经过实例理解 JDK8 的 CompletableFuture
CompletableFuture - CompletableFuture 详解
CompletableFuture - 使用 Java 8 的 CompletableFuture 实现函数式的回调
CompletableFuture - Java CompletableFuture 详解
CompletableFuture - [译]20个使用 Java CompletableFuture的例子