前面两篇文章已经整理了CompletableFuture大部分的特性,本文会整理完CompletableFuture余下的特性,以及将它跟RxJava进行比较。java
Either 表示的是两个CompletableFuture,当其中任意一个CompletableFuture计算完成的时候就会执行。编程
方法名 | 描述 |
---|---|
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) | 当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。 |
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) | 当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。使用ForkJoinPool |
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) | 当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。使用指定的线程池 |
Random random = new Random();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from future1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from future2";
});
CompletableFuture<Void> future = future1.acceptEither(future2,str->System.out.println("The future is "+str));
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}复制代码
执行结果:The future is from future1 或者 The future is from future2。
由于future1和future2,执行的顺序是随机的。架构
applyToEither 跟 acceptEither 相似。并发
方法名 | 描述 |
---|---|
applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) | 当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会看成新的CompletableFuture<U>的计算结果。 |
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) | 当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会看成新的CompletableFuture<U>的计算结果。使用ForkJoinPool |
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor) | 当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会看成新的CompletableFuture<U>的计算结果。使用指定的线程池 |
Random random = new Random();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from future1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from future2";
});
CompletableFuture<String> future = future1.applyToEither(future2,str->"The future is "+str);
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}复制代码
执行结果也跟上面的程序相似。app
allOf、anyOf是CompletableFuture的静态方法。框架
方法名 | 描述 |
---|---|
allOf(CompletableFuture<?>... cfs) | 在全部Future对象完成后结束,并返回一个future。 |
allOf()方法所返回的CompletableFuture,并不能组合前面多个CompletableFuture的计算结果。因而咱们借助Java 8的Stream来组合多个future的结果。dom
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "tony");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "cafei");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "aaron");
CompletableFuture.allOf(future1, future2, future3)
.thenApply(v ->
Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" ")))
.thenAccept(System.out::print);复制代码
执行结果:异步
tony cafei aaron复制代码
方法名 | 描述 |
---|---|
anyOf(CompletableFuture<?>... cfs) | 在任何一个Future对象结束后结束,并返回一个future。 |
Random rand = new Random();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from future1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from future2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from future3";
});
CompletableFuture<Object> future = CompletableFuture.anyOf(future1,future2,future3);
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}复制代码
使用anyOf()时,只要某一个future完成,就结束了。因此执行结果多是"from future1"、"from future2"、"from future3"中的任意一个。async
anyOf 和 acceptEither、applyToEither的区别在于,后二者只能使用在两个future中,而anyOf可使用在多个future中。异步编程
CompletableFuture在运行时若是遇到异常,可使用get()并抛出异常进行处理,但这并非一个最好的方法。CompletableFuture自己也提供了几种方式来处理异常。
方法名 | 描述 |
---|---|
exceptionally(Function fn) | 只有当CompletableFuture抛出异常的时候,才会触发这个exceptionally的计算,调用function计算值。 |
CompletableFuture.supplyAsync(() -> "hello world")
.thenApply(s -> {
s = null;
int length = s.length();
return length;
}).thenAccept(i -> System.out.println(i))
.exceptionally(t -> {
System.out.println("Unexpected error:" + t);
return null;
});复制代码
执行结果:
Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException复制代码
对上面的代码稍微作了一下修改,修复了空指针的异常。
CompletableFuture.supplyAsync(() -> "hello world")
.thenApply(s -> {
// s = null;
int length = s.length();
return length;
}).thenAccept(i -> System.out.println(i))
.exceptionally(t -> {
System.out.println("Unexpected error:" + t);
return null;
});复制代码
执行结果:
11复制代码
whenComplete 在上一篇文章其实已经介绍过了,在这里跟exceptionally的做用差很少,能够捕获任意阶段的异常。若是没有异常的话,就执行action。
CompletableFuture.supplyAsync(() -> "hello world")
.thenApply(s -> {
s = null;
int length = s.length();
return length;
}).thenAccept(i -> System.out.println(i))
.whenComplete((result, throwable) -> {
if (throwable != null) {
System.out.println("Unexpected error:"+throwable);
} else {
System.out.println(result);
}
});复制代码
执行结果:
Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException复制代码
跟whenComplete类似的方法是handle,handle的用法在上一篇文章中也已经介绍过。
CompletableFuture 有不少特性跟RxJava很像,因此将CompletableFuture、Java 8 Stream和RxJava作一个相互的比较。
composable | lazy | resuable | async | cached | push | back pressure | |
---|---|---|---|---|---|---|---|
CompletableFuture | 支持 | 不支持 | 支持 | 支持 | 支持 | 支持 | 不支持 |
Stream | 支持 | 支持 | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 |
Observable(RxJava1) | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
Observable(RxJava2) | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 不支持 |
Flowable(RxJava2) | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
Java 8提供了一种函数风格的异步和事件驱动编程模型CompletableFuture,它不会形成堵塞。CompletableFuture背后依靠的是fork/join框架来启动新的线程实现异步与并发。固然,咱们也能经过指定线程池来作这些事情。
CompletableFuture特别是对微服务架构而言,会有很大的做为。举一个具体的场景,电商的商品页面可能会涉及到商品详情服务、商品评论服务、相关商品推荐服务等等。获取商品的信息时(/productdetails?productid=xxx),须要调用多个服务来处理这一个请求并返回结果。这里可能会涉及到并发编程,咱们彻底可使用Java 8的CompletableFuture或者RxJava来实现。
先前的文章:
Java8新的异步编程方式 CompletableFuture(一)
Java8新的异步编程方式 CompletableFuture(二)