Java8新的异步编程方式 CompletableFuture(三)

前面两篇文章已经整理了CompletableFuture大部分的特性,本文会整理完CompletableFuture余下的特性,以及将它跟RxJava进行比较。java

3.6 Either

Either 表示的是两个CompletableFuture,当其中任意一个CompletableFuture计算完成的时候就会执行。编程

方法名 描述
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) 当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) 当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。使用ForkJoinPool
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) 当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。使用指定的线程池
Random random = new Random();

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "from future1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "from future2";
        });

        CompletableFuture<Void> future =  future1.acceptEither(future2,str->System.out.println("The future is "+str));

        try {
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }复制代码

执行结果:The future is from future1 或者 The future is from future2。
由于future1和future2,执行的顺序是随机的。架构

applyToEither 跟 acceptEither 相似。并发

方法名 描述
applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) 当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会看成新的CompletableFuture<U>的计算结果。
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) 当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会看成新的CompletableFuture<U>的计算结果。使用ForkJoinPool
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor) 当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会看成新的CompletableFuture<U>的计算结果。使用指定的线程池
Random random = new Random();

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "from future1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "from future2";
        });

        CompletableFuture<String> future =  future1.applyToEither(future2,str->"The future is "+str);

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }复制代码

执行结果也跟上面的程序相似。app

3.7 其余方法

allOf、anyOf是CompletableFuture的静态方法。框架

3.7.1 allOf

方法名 描述
allOf(CompletableFuture<?>... cfs) 在全部Future对象完成后结束,并返回一个future。

allOf()方法所返回的CompletableFuture,并不能组合前面多个CompletableFuture的计算结果。因而咱们借助Java 8的Stream来组合多个future的结果。dom

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "tony");

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "cafei");

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "aaron");

        CompletableFuture.allOf(future1, future2, future3)
                .thenApply(v ->
                Stream.of(future1, future2, future3)
                        .map(CompletableFuture::join)
                        .collect(Collectors.joining(" ")))
                .thenAccept(System.out::print);复制代码

执行结果:异步

tony cafei aaron复制代码

3.7.2 anyOf

方法名 描述
anyOf(CompletableFuture<?>... cfs) 在任何一个Future对象结束后结束,并返回一个future。
Random rand = new Random();
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future2";
        });
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future3";
        });

        CompletableFuture<Object> future =  CompletableFuture.anyOf(future1,future2,future3);

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }复制代码

使用anyOf()时,只要某一个future完成,就结束了。因此执行结果多是"from future1"、"from future2"、"from future3"中的任意一个。async

anyOf 和 acceptEither、applyToEither的区别在于,后二者只能使用在两个future中,而anyOf可使用在多个future中。异步编程

3.8 CompletableFuture异常处理

CompletableFuture在运行时若是遇到异常,可使用get()并抛出异常进行处理,但这并非一个最好的方法。CompletableFuture自己也提供了几种方式来处理异常。

3.8.1 exceptionally

方法名 描述
exceptionally(Function fn) 只有当CompletableFuture抛出异常的时候,才会触发这个exceptionally的计算,调用function计算值。
CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .exceptionally(t -> {
                    System.out.println("Unexpected error:" + t);
                    return null;
                });复制代码

执行结果:

Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException复制代码

对上面的代码稍微作了一下修改,修复了空指针的异常。

CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
// s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .exceptionally(t -> {
                    System.out.println("Unexpected error:" + t);
                    return null;
                });复制代码

执行结果:

11复制代码

3.8.2 whenComplete

whenComplete 在上一篇文章其实已经介绍过了,在这里跟exceptionally的做用差很少,能够捕获任意阶段的异常。若是没有异常的话,就执行action。

CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .whenComplete((result, throwable) -> {

                    if (throwable != null) {
                       System.out.println("Unexpected error:"+throwable);
                    } else {
                        System.out.println(result);
                    }

                });复制代码

执行结果:

Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException复制代码

跟whenComplete类似的方法是handle,handle的用法在上一篇文章中也已经介绍过。

四. CompletableFuture VS Java8 Stream VS RxJava1 & RxJava2

CompletableFuture 有不少特性跟RxJava很像,因此将CompletableFuture、Java 8 Stream和RxJava作一个相互的比较。

composable lazy resuable async cached push back pressure
CompletableFuture 支持 不支持 支持 支持 支持 支持 不支持
Stream 支持 支持 不支持 不支持 不支持 不支持 不支持
Observable(RxJava1) 支持 支持 支持 支持 支持 支持 支持
Observable(RxJava2) 支持 支持 支持 支持 支持 支持 不支持
Flowable(RxJava2) 支持 支持 支持 支持 支持 支持 支持

五. 总结

Java 8提供了一种函数风格的异步和事件驱动编程模型CompletableFuture,它不会形成堵塞。CompletableFuture背后依靠的是fork/join框架来启动新的线程实现异步与并发。固然,咱们也能经过指定线程池来作这些事情。

CompletableFuture特别是对微服务架构而言,会有很大的做为。举一个具体的场景,电商的商品页面可能会涉及到商品详情服务、商品评论服务、相关商品推荐服务等等。获取商品的信息时(/productdetails?productid=xxx),须要调用多个服务来处理这一个请求并返回结果。这里可能会涉及到并发编程,咱们彻底可使用Java 8的CompletableFuture或者RxJava来实现。

先前的文章:
Java8新的异步编程方式 CompletableFuture(一)
Java8新的异步编程方式 CompletableFuture(二)

相关文章
相关标签/搜索