欢迎关注:本篇主要是介绍CompletableFuture的特性php
最近看到有地方使用CompletableFuture这个类,本身没有使用过,而后就花点时间学习了一下。对于jdk新的并发工具必定要掌握,理论也要有,实战也不能少。
java
介绍web
首先简单的介绍一下CompletableFuture,咱们知道1.5之后jdk就开始支持Future异步编程,今天咱们看看java8中,并发包新增的一个并发工具CompletableFuture。它依然放在并发包中,喜欢研究高并发和多线程的朋友,想必早已经熟悉了一遍,做者依然是Doug Lea并发大神编写。Java8新增了不少基于事件驱动的异步调用框架,好比CompletableFuture 经过回调函数,实现非阻塞的IO的异步调用。它拥有更强大的Future功能,而且实现了CompletionStage接口,这个接口中的方法很是多,每一个方法都有不同的功能和实现,你们能够去尝试的测试一下。sql
方法列表编程
CompletableFutures中的方法列表介绍,这里就不逐一测试,其中每一种类型基本有三个方法:一种是没有使用线程池模式,一种是使用默认线程池,还有一种是使用自定义线程池。后端
//建立CompletableFuture对象
public static <U> CompletableFuture<U> completedFuture(U value)
//supplyAsync支持返回值异步任务,runAsync 不支持返回值的异步任务
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
//thenApply当一个线程依赖另外一个线程时,可使用 thenApply 方法来把这两个线程串行化,进行转换
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
//thenAccept:接收任务的处理结果,并处理,无返回结果,能够注册一个action,而后消费结果
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
//thenAcceptBoth:当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
//thenRun:跟 thenAccept 方法不同的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
//thenCompose 方法容许你对两个 CompletionStage 进行流水线操做,第一个操做完成时,将其结果做为参数传递给第二个操做。
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
//whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务,计算结果完成时的处理
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
//handle 也是结果处理
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
//thenCombine:把两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
//acceptEither,当任意一个CompletableFuture计算完成的时候就会执行
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
//allOf是等待全部任务完成,构造后CompletableFuture完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
//anyOf是只要有一个任务完成,构造后CompletableFuture就完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
简单示例安全
简单的例子来理解一下,CompletableFuture的使用。supplyAsync提交一个Supplier异步任务;而后使用thenAccept异步处理结果,无需阻塞等待,至关于只要异步任务完成,使用回调的方式实现了非阻塞异步调用;其次使用exceptionally进行异常处理,比起传统的Future是否是处理起来更加方便快捷。微信
@Test
public void completableFutureTest1() throws ExecutionException, InterruptedException {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int a = 1/0;
return "supplyAsync";
}
}).thenAccept(result -> System.out.println(result))
.exceptionally((Function<Throwable, Void>) t -> {
System.out.println("error message:"+t.getMessage());
return null;
});
//阻塞式的获取结果
//completableFuture.get();
System.out.println("xsxsx");
}
传统的Future,若是捕获异步任务异常,整个线程就会中断,而且须要阻塞的等待结果,这里CompletableFuture无需等待,程序继续执行,当supplyAsync执行成功结果直接传递到thenAccept回调处理结果,CPU能够继续执行其余的逻辑,而且能够对异常状况进行单独处理。网络
结合业务场景多线程
下面针对实际的项目来谈一谈completableFuture的使用,以前作过一个小需求,就是一个用户管理详情页页信息:
1.加载用户的基础信息,银行卡信息,积分信息,余额查询,还有风控安全等级信息、征信、还有修改变动记录等;
2.因为不少东西沿用vm模版渲染,没有作js单独加载,其实这里异步加载是最好的;
3.因为有的接口还相互依赖,上一个接口的查询结果,要传递到下面去查新的数据;
处理方式
既然不想采用js异步加载,我就后端把全部的接口都查询好,而且返回,发现同步的结果,这张页面可能要十几秒。后来采用Future+Callable异步处理,把结果放到一个采用阻塞的方式放到modelAndView中,时间确实缩短到了正常范围.可是想想若是使用completableFuture是否方便快捷一些呢?后边发现completableFuture只是帮咱们实现了非阻塞式的回调,和异常处理等,在开启自定义线程池时,Future异步执行的效果和completableFuture差很少,只不过completableFuture功能更强大,方法库更丰富。
测试
下面我模拟一下简单的场景,userSourceService中定义了查询基础信息、帐户信息、和征信信息的服务接口。其中每一个接口中MOCK一点数据,而后再sleep1秒.模拟IO操做。假如咱们须要依赖获取queryUserInfo中的结果,而后再去查询后面两个用户信息,这样就可使用异步的Future去实现。
interface UserSourceService {
/**
* 查询用户基础信息
* @param userId
* @return
*/
UserBase queryUserInfo(String userId);
/**
* 查询用户帐户信息
* @param userId
* @return
*/
UserAccount queryUserAccountInfo(String userId);
/**
* 查询征信信息
* @param userId
* @return
*/
UserCredit queryUserCreditInfo(String userId);
一、Future+stream
这里使用串行的Stream 而后使用executorService.submit提交任务
static List<String> userIdList = Arrays.asList("000001", "000002", "000003", "000004");
static ExecutorService executorService = Executors.newCachedThreadPool();
private static UserSourceServiceImpl userSourceService = new UserSourceServiceImpl();
@Test
public void FutureTest() {
Instant start = Instant.now();
System.out.println(JSON.toJSONString(futureTest()));
Instant end = Instant.now();
System.out.println("耗时:"+ Duration.between( start,end ).toMillis());
}
public List<UserCredit> futureTest() {
List<Future<UserCredit>> userCreditFuture = userIdList.stream()
.map(userId -> executorService.submit(() -> userSourceService.queryUserInfo(userId)))
.map(future -> executorService.submit(() -> userSourceService.queryUserAccountInfo(future.get().getUserId())))
.map(future -> executorService.submit(() -> userSourceService.queryUserCreditInfo(future.get().getUserId())))
.collect(toList());
List<UserCredit> creditInfos = new ArrayList<>();
userCreditFuture.parallelStream().forEach(future -> {
try {
creditInfos.add(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
return creditInfos;
}
二、parallelStream
这里使用并行的Stream ,而后去掉自定义的线程池。
@Test
public void findUserByParallelStreamTest() {
Instant start = Instant.now();
System.out.println(JSON.toJSONString(findUserByParallelStream()));
Instant end = Instant.now();
System.out.println("耗时:"+ Duration.between( start,end ).toMillis());
}
public static List<UserCredit> findUserByParallelStream() {
return userIdList.parallelStream()
.map(userId -> userSourceService.queryUserInfo(userId))
.map(user -> userSourceService.queryUserAccountInfo(user.getUserId()))
.map(userAccount -> userSourceService.queryUserCreditInfo(userAccount.getUserId()))
.collect(toList());
}
3.CompletableFuture
@Test
public void findUserByCompletableFutureTest() {
Instant start = Instant.now();
System.out.println(JSON.toJSONString(findUserByCompletableFuture4()));
Instant end = Instant.now();
System.out.println("耗时:"+ Duration.between( start,end ).toMillis());
}
public static List<UserCredit> findUserByCompletableFutureTest() {
List<CompletableFuture<UserCredit>> queryUserFuture = userIdList.stream()
.map(userId -> CompletableFuture.supplyAsync(() -> userSourceService.queryUserInfo(userId)))
.map(future -> future.thenApply(user -> userSourceService.queryUserAccountInfo(user.getUserId())))
.map(future -> future.thenCompose(userAccountInfo -> CompletableFuture.supplyAsync(() -> userSourceService.queryUserCreditInfo(userAccountInfo.getUserId()))))
.collect(toList());
//join 操做和get操做有相同的含义,等待全部异步操做的结果。
return queryUserFuture.stream()
.map(CompletableFuture::join)
.collect(toList());
}
第一组:userIdList中4个元素
1.futureTest耗时:3181毫秒
2.findUserByParallelStreamTest
耗时:3176毫秒3.findUserByCompletableFuture4Test,使用自定义executorService
耗时:3177毫秒
第二组:userIdList中8个元素
1.futureTest
耗时:3184毫秒
2.findUserByParallelStreamTest
耗时:3187毫秒
3.findUserByCompletableFuture4Test ,使用自定义executorService
耗时:3158毫秒
第三组:userIdList中16个元素
1.futureTest
耗时:3210毫秒
2.findUserByParallelStreamTest
耗时:6190毫秒
3.findUserByCompletableFuture4Test ,使用自定义executorService
耗时:3177毫秒
由上面的测试结果能够看出,使用内置的线程池时,在并发数量比较少的状况下ParallelStream和CompletableFuture 执行时间差很少,当数量递增后,原生的Future+线程池和CompletableFuture 自定义线程池,性能更佳.发现ParallelStream内置的池线程数是根据CPU核数有关,因此通常只适合cpu密集型的计算模式,而自定义的线程池,能够知足不一样的线程数量需求,当IO密集型的操做比较多时,CUP的资源其实并无彻底利用,因此当使executorService的时候,IO型操做效果更佳。
传统的Future痛点
1.经过使用Stream+传统的Future的,虽然Future能够知足咱们的异步处理,可是不支持上一个future依赖,而且转换输出,以及结果合并。
2.get是阻塞的,若是某一个异步任务中有没有catch的运行时异常,这回致使get抛出异常退出运行,若是咱们再一个遍历操做中,没有显示的处理,致使整个线程中断.可能产生意想不到的风险,因此是否可以自动的帮咱们检测异常,而且处理异常呢?
3.没有一个比较好的回调机制,例如响应式的根据上一个异步任务结果触发某个Action。
4.上面说的阻塞式结果传递,须要硬编码保证,是否不须要这样的显示get,直接拿到结果,而且进行流式的计算,使得编码更加流畅、优雅。
其实经过测试发现,在IO密集型的任务中使用线程池的方式效率是最好的, 性能测试比较Future和CompletableFuture,并无质的差别,只是CompletableFuture支持更好的函数式和响应式编程,以及非阻塞的处理结果,回调机制更加健全等。
上面说了一堆Future的缺点,其实这都不算啥致命的缺点,怎么方便怎么来就行,至少操做简单,不少不肯接受新的工具,认为有风险,有时候在没有经验时,保守一点,对项目也是一种负责。CompletableFuture也存在缺点, CompletableFuture并不能优雅地处理cancel(),由于他建立的时候能够不和线程绑定在一块儿,取消的时候,并不会去中断线程,因此使用cancel的时候须要注意.可是传统的Future并无这个问题。
总结
1.今天对于java8的Stream计算和并发工具CompletableFutures有了新的认识,项目中有相关的场景,咱们就能够尝试的运用;
2.计算密集型操做,而且没有I/O,推荐使用Stream接口。由于实现简单,同时效率也多是最高的(若是全部的线程都是计算密集型的,那就没有必要建立比处理器核数更多的线程;
3.若是并行的工做单元还涉及等待I/O的操做(包括网络链接等待),那么使用CompletableFuture灵活性更好。这种状况下处理流的流水线中若是发生I/O等待,流的延迟特性会让咱们很难判断到底何时触发了等待;
4.针对不太了解的功能,咱们须要多作测试,不要盲目的追求新工具,若是是使用不当可能达不到效果,甚是有更可怕的事情发生。
参考
1.《Java CompletableFuture 详解》
2.《CompletableFuture 不能被中断》

历史文章



1.JVM-GC调优必会的知识
2.Mysql事务隔离以及MVCC实现原理
3.一次性了解Mysql的几种日志
4.再谈synchronized
本文分享自微信公众号 - MyClass社区(MyClass_ZZ)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。