Java8新增的并发工具类-CompletableFuture



欢迎关注:本篇主要是介绍CompletableFuture的特性php

最近看到有地方使用CompletableFuture这个类本身没有使用过,而后就花点时间学习了一下对于jdk新的并发工具必定要握,理论也要有,实战也能少。
java

介绍web

        首先简单的介绍一下CompletableFuture,咱们知道1.5之后jdk就开始支持Future异步编程,今天咱们看看java8中,并发包新增的一个并发工具CompletableFuture。它依然放在并发包中,喜欢研究高并发和多线程的朋友,想必早已经熟悉了一遍,做者依然是Doug Lea并发大神编写。Java8新增了不少基于事件驱动的异步调用框架,好比CompletableFuture 经过回调函数,实现非阻塞的IO的异步调用。它拥有更强大的Future功能,而且实现了CompletionStage接口,这个接口中的方法很是多,每一个方法都有不同的功能和实现,你们能够去尝试的测试一下。sql

方法列表编程

         CompletableFutures中的方法列表介绍,这里就不逐一测试,其中每一种类型基本有三个方法:一种是没有使用线程池模式,一种是使用默认线程池,还有一种是使用自定义线程池。后端

//建立CompletableFuture对象
public static <U> CompletableFuture<U> completedFuture(U value)
//supplyAsync支持返回值异步任务,runAsync 不支持返回值的异步任务
public static CompletableFuture<Void>     runAsync(Runnable runnable)
public static CompletableFuture<Void>     runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier, Executor executor)


//thenApply当一个线程依赖另外一个线程时,可使用 thenApply 方法来把这两个线程串行化,进行转换
public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

//thenAccept:接收任务的处理结果,并处理,无返回结果,能够注册一个action,而后消费结果
public CompletableFuture<Void>     thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>     thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>     thenAcceptAsync(Consumer<? super T> action, Executor executor)

//thenAcceptBoth:当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗
public <U> CompletableFuture<Void>     thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>     thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>     thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public     CompletableFuture<Void>     runAfterBoth(CompletionStage<?> other,  Runnable action)

//thenRun:跟 thenAccept 方法不同的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept
public CompletableFuture<Void>     thenRun(Runnable action)
public CompletableFuture<Void>     thenRunAsync(Runnable action)
public CompletableFuture<Void>     thenRunAsync(Runnable action, Executor executor)

//thenCompose 方法容许你对两个 CompletionStage 进行流水线操做,第一个操做完成时,将其结果做为参数传递给第二个操做。
public <U> CompletableFuture<U>     thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

//whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务,计算结果完成时的处理
public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

//handle 也是结果处理
public <U> CompletableFuture<U>     handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

//thenCombine:把两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
public <U,V> CompletableFuture<V>     thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>     thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>     thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

//acceptEither,当任意一个CompletableFuture计算完成的时候就会执行
public CompletableFuture<Void>     acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>     acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>     acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U>     applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

//allOf是等待全部任务完成,构造后CompletableFuture完成
public static CompletableFuture<Void>         allOf(CompletableFuture<?>... cfs)
//anyOf是只要有一个任务完成,构造后CompletableFuture就完成
public static CompletableFuture<Object>     anyOf(CompletableFuture<?>... cfs)

简单示例安全

        简单的例子来理解一下,CompletableFuture的使用。supplyAsync提交一个Supplier异步任务;而后使用thenAccept异步处理结果,无需阻塞等待,至关于只要异步任务完成,使用回调的方式实现了非阻塞异步调用;其次使用exceptionally进行异常处理,比起传统的Future是否是处理起来更加方便快捷。微信

@Test
public void completableFutureTest1() throws ExecutionException, InterruptedException {
    CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() {
        @Override
        public String get() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int a = 1/0;
            return "supplyAsync";
        }
    }).thenAccept(result -> System.out.println(result))
            .exceptionally((Function<Throwable, Void>) t -> {
                System.out.println("error message:"+t.getMessage());
                return null;
            });
    //阻塞式的获取结果
    //completableFuture.get();
    System.out.println("xsxsx");
}

        传统的Future,若是捕获异步任务异常,整个线程就会中断,而且须要阻塞的等待结果,这里CompletableFuture无需等待,程序继续执行,当supplyAsync执行成功结果直接传递到thenAccept回调处理结果,CPU能够继续执行其余的逻辑,而且能够对异常状况进行单独处理。网络

结合业务场景多线程

        下面针对实际的项目来谈一谈completableFuture的使用,以前作过一个小需求,就是一个用户管理详情页页信息:
1.加载用户的基础信息,银行卡信息,积分信息,余额查询,还有风控安全等级信息、征信、还有修改变动记录等;
2.因为不少东西沿用vm模版渲染,没有作js单独加载,其实这里异步加载是最好的;
3.因为有的接口还相互依赖,上一个接口的查询结果,要传递到下面去查新的数据;

处理方式

        既然不想采用js异步加载,我就后端把全部的接口都查询好,而且返回,发现同步的结果,这张页面可能要十几秒。后来采用Future+Callable异步处理,把结果放到一个采用阻塞的方式放到modelAndView中,时间确实缩短到了正常范围.可是想想若是使用completableFuture是否方便快捷一些呢?后边发现completableFuture只是帮咱们实现了非阻塞式的回调,和异常处理等,在开启自定义线程池时,Future异步执行的效果和completableFuture差很少,只不过completableFuture功能更强大,方法库更丰富。

测试

        下面我模拟一下简单的场景,userSourceService中定义了查询基础信息、帐户信息、和征信信息的服务接口。其中每一个接口中MOCK一点数据,而后再sleep1秒.模拟IO操做。假如咱们须要依赖获取queryUserInfo中的结果,而后再去查询后面两个用户信息,这样就可使用异步的Future去实现。

interface UserSourceService {

    /**
     * 查询用户基础信息
     * @param userId
     * @return
     */

    UserBase queryUserInfo(String userId);

    /**
     * 查询用户帐户信息
     * @param userId
     * @return
     */

    UserAccount queryUserAccountInfo(String userId);

    /**
     * 查询征信信息
     * @param userId
     * @return
     */

    UserCredit queryUserCreditInfo(String userId);

一、Future+stream

        这里使用串行的Stream 而后使用executorService.submit提交任务

static List<String> userIdList = Arrays.asList("000001""000002""000003""000004");
static ExecutorService executorService = Executors.newCachedThreadPool();
private static UserSourceServiceImpl userSourceService = new UserSourceServiceImpl();

@Test
public void FutureTest() {
    Instant start = Instant.now();
    System.out.println(JSON.toJSONString(futureTest()));
    Instant end = Instant.now();
    System.out.println("耗时:"+ Duration.between( start,end ).toMillis());
}

public List<UserCredit> futureTest() {
    List<Future<UserCredit>> userCreditFuture = userIdList.stream()
            .map(userId -> executorService.submit(() -> userSourceService.queryUserInfo(userId)))
            .map(future -> executorService.submit(() -> userSourceService.queryUserAccountInfo(future.get().getUserId())))
            .map(future -> executorService.submit(() -> userSourceService.queryUserCreditInfo(future.get().getUserId())))
            .collect(toList());
    List<UserCredit> creditInfos = new ArrayList<>();
    userCreditFuture.parallelStream().forEach(future -> {
        try {
            creditInfos.add(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    });
    return creditInfos;
}

二、parallelStream

     这里使用并行的Stream ,而后去掉自定义的线程池。

@Test
public void findUserByParallelStreamTest() 
{
    Instant start = Instant.now();
    System.out.println(JSON.toJSONString(findUserByParallelStream()));
    Instant end = Instant.now();
    System.out.println("耗时:"+ Duration.between( start,end ).toMillis());
}
public static List<UserCredit> findUserByParallelStream() {
    return userIdList.parallelStream()
            .map(userId -> userSourceService.queryUserInfo(userId))
            .map(user -> userSourceService.queryUserAccountInfo(user.getUserId()))
            .map(userAccount -> userSourceService.queryUserCreditInfo(userAccount.getUserId()))
            .collect(toList());
}

3.CompletableFuture

@Test
public void findUserByCompletableFutureTest() {
    Instant start = Instant.now();
    System.out.println(JSON.toJSONString(findUserByCompletableFuture4()));
    Instant end = Instant.now();
    System.out.println("耗时:"+ Duration.between( start,end ).toMillis());
}

public static List<UserCredit> findUserByCompletableFutureTest() {
    List<CompletableFuture<UserCredit>> queryUserFuture = userIdList.stream()
            .map(userId -> CompletableFuture.supplyAsync(() -> userSourceService.queryUserInfo(userId)))
            .map(future -> future.thenApply(user -> userSourceService.queryUserAccountInfo(user.getUserId())))
            .map(future -> future.thenCompose(userAccountInfo -> CompletableFuture.supplyAsync(() -> userSourceService.queryUserCreditInfo(userAccountInfo.getUserId()))))
            .collect(toList());

    //join 操做和get操做有相同的含义,等待全部异步操做的结果。
    return queryUserFuture.stream()
            .map(CompletableFuture::join)
            .collect(toList());
}

第一组:userIdList中4个元素
1.futureTest

耗时:3181毫秒

2.findUserByParallelStreamTest
耗时:3176毫秒

3.findUserByCompletableFuture4Test,使用自定义executorService
耗时:3177毫秒


第二组:userIdList中8个元素

1.futureTest

耗时:3184毫秒

2.findUserByParallelStreamTest
耗时:3187毫秒
3.findUserByCompletableFuture4Test ,使用自定义executorService
耗时:3158毫秒


第三组:userIdList中16个元素

1.futureTest

耗时:3210毫秒

2.findUserByParallelStreamTest
耗时:6190毫秒
3.findUserByCompletableFuture4Test ,使用自定义executorService
耗时:3177毫秒

        由上面的测试结果能够看出,使用内置的线程池时,在并发数量比较少的状况下ParallelStream和CompletableFuture 执行时间差很少,当数量递增后,原生的Future+线程池和CompletableFuture 自定义线程池,性能更佳.发现ParallelStream内置的池线程数是根据CPU核数有关,因此通常只适合cpu密集型的计算模式,而自定义的线程池,能够知足不一样的线程数量需求,当IO密集型的操做比较多时,CUP的资源其实并无彻底利用,因此当使executorService的时候,IO型操做效果更佳。

传统的Future痛点

1.经过使用Stream+传统的Future的,虽然Future能够知足咱们的异步处理,可是不支持上一个future依赖,而且转换输出,以及结果合并。
2.get是阻塞的,若是某一个异步任务中有没有catch的运行时异常,这回致使get抛出异常退出运行,若是咱们再一个遍历操做中,没有显示的处理,致使整个线程中断.可能产生意想不到的风险,因此是否可以自动的帮咱们检测异常,而且处理异常呢?
3.没有一个比较好的回调机制,例如响应式的根据上一个异步任务结果触发某个Action。
4.上面说的阻塞式结果传递,须要硬编码保证,是否不须要这样的显示get,直接拿到结果,而且进行流式的计算,使得编码更加流畅、优雅。

        其实经过测试发现,在IO密集型的任务中使用线程池的方式效率是最好的, 性能测试比较Future和CompletableFuture,并无质的差别,只是CompletableFuture支持更好的函数式响应式编程,以及非阻塞的处理结果,回调机制更加健全等

        上面说了一堆Future的缺点,其实这都不算啥致命的缺点,怎么方便怎么来就行,至少操做简单,不少不肯接受新的工具,认为有风险,有时候在没有经验时,保守一点,对项目也是一种负责。CompletableFuture也存在缺点, CompletableFuture并不能优雅地处理cancel(),由于他建立的时候能够不和线程绑定在一块儿,取消的时候,并不会去中断线程,因此使用cancel的时候须要注意.可是传统的Future并无这个问题。

总结

1.今天对于java8的Stream计算和并发工具CompletableFutures有了新的认识,项目中有相关的场景,咱们就能够尝试的运用;
        2.计算密集型操做,而且没有I/O,推荐使用Stream接口。由于实现简单,同时效率也多是最高的(若是全部的线程都是计算密集型的,那就没有必要建立比处理器核数更多的线程;
        3.若是并行的工做单元还涉及等待I/O的操做(包括网络链接等待),那么使用CompletableFuture灵活性更好。这种状况下处理流的流水线中若是发生I/O等待,流的延迟特性会让咱们很难判断到底何时触发了等待;
        4.针对不太了解的功能,咱们须要多作测试,不要盲目的追求新工具,若是是使用不当可能达不到效果,甚是有更可怕的事情发生。

参考

1.《Java CompletableFuture 详解》

2.《CompletableFuture 不能被中断》


历史文章

1.JVM-GC调优必会的知识 
2.Mysql事务隔离以及MVCC实现原理                                     
3.一次性了解Mysql的几种日志 
4.再谈synchronized 


微信公众号:MyClass社区

若有问题或建议,请公众号留言。

喜欢请关注

本文分享自微信公众号 - MyClass社区(MyClass_ZZ)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索