Java异步编程工具 CompletableFuture

CompletableFuture简介

JDK 1.8 提供了CompletableFuture来支持异步编程,咱们能够用CompletableFuture来很快的实现异步编程,CompletableFuture提供了串行,并行,汇聚3种模式提供给咱们使用java

使用方法

建立

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

咱们能够经过上面4个API来建立CompletableFuture对象,API分为两大类,一类是无返回值的runAsync,一类是有返回值的supplyAsync,每一个大类下面有分红了两个小类,一种是使用默认的Fork/Join线程池,一种是使用本身定义的线程池编程

串行调用

CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

CompletableFuture<Void> thenAccept(Consumer<? super T> action);
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
                                                
CompletableFuture<Void> thenRun(Runnable action);
CompletableFuture<Void> thenRunAsync(Runnable action);
CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)

CompletableFuture<R> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
CompletableFuture<R> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor)

串行调用提供了上述的8个API,分为4大类,逐一介绍一下:app

  • thenApply系列须要传入一个Function<? super T,? extends U>参数,T表明入参,U表明出参,因此thenApply系列能够传入参数也能够返回结果异步

  • thenAccept系列会传入一个Consumer<? super T>,T是入参,因此thenAccept能够传入参数,可是不会返回结果异步编程

  • thenRun系列须要传入一个Runnale,因此这个系列既不能有入参也不会有结果线程

  • thenCompose系列和thenApply系列结果相同,可是须要开启一个子任务去执行,从传入的参数也能够看出,参数一接受一个CompletionStage的Function,CompletionStage就是CompletableFuture实现的接口,具体到实现类就是在接收一个CompletableFuture对象code

每一个大类都有* 和 *Async两种API,区别就在于带Async的任务会在丢给Fork/Join线程池执行,不带Async就直接由前面任务的线程来执行,带Async还能够本身指定线程池对象

并行

并行比较好理解,就是同时建立多个CompletableFuture,让任务去并行执行接口

汇聚

汇聚又分红两种,一种的AND汇聚,一个是OR汇聚,简单的说就是AND汇聚须要汇聚的任务都完成才能够执行汇聚以后的方法,而OR汇聚只要其中一个任务完成就能够往下执行了,汇聚API能够将并行执行的CompletableFuture汇聚成一个CompletableFutureget

AND汇聚

CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)

CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action));
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)      

CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)

AND汇聚提供了3类API,API和串行的API功能相似,thenCombine提供了有入参和出参的能力,thenAcceptBoth只提供了入参的能力,没有返回值,runAfterBoth既没有入参也没有出参

OR汇聚

CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)

CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)

CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)

OR汇聚也和AND汇聚相似,提供了3类API,功能方法也能够参考AND汇聚执行的方法

异常处理

CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
        
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action));
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)
        
CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

异常处理提供了3类API:

  • exceptionally相似try-catch,若是有异常咱们能够经过入参获取到异常
  • whenComplete能够获取CompletableFuture的结果,而且能够经过第二个参数异常(若是有的话),而且这个异常在主线程也能够捕获
  • handle和whenComplete相似,可是他还能够返回一个结果,和whenComplete不一样的是,里面的异常在主线程不能捕获

例子

package com.demo;

import java.util.concurrent.CompletableFuture;

public class Test {
    
    public static void main(String[] args){

        CompletableFuture<String> f1 = CompletableFuture.runAsync(()->{
            System.out.println("T1:start");
            sleep(1000);
            System.out.println("T1: doing sth");
            sleep(5000);
        }).thenRunAsync(()-> System.out.println("T1 : next task")).thenApply((__)-> {
            System.out.println("T1 task end");
            return " T1 result";
        });

        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
            System.out.println("T2: start");
            sleep(1000);
            System.out.println("T2: doing sth");
            sleep(2000);
            return " T2:result";
        }).thenApply(s-> s+ "!!!").thenCompose(s -> CompletableFuture.supplyAsync(s::toUpperCase));

        CompletableFuture<String> f3 = f1.thenCombine(f2,(r1,r2)->{
            System.out.println("T1 result :" + r1);
            System.out.println("T2 result:" + r2);
            return "t1 t2 end";
        });


        System.out.println(f3.join());

        System.out.println("--------------");

        /**
         * exceptionally处理异常状况
         * result:
         * java.lang.ArithmeticException: / by zero
         * 0
         */
        CompletableFuture<Integer> f4 = CompletableFuture.supplyAsync(()->1/0)
                .thenApply(i->i*i)
                .exceptionally((throwable -> {
                    System.out.println(throwable.getMessage());
                    return 0;
                }));
        System.out.println(f4.join());
        System.out.println("--------------");
        /**
         * whenComplete处理异常状况
         * result : null, error : java.lang.ArithmeticException: / by zero
         * enter exception block
         *
         * Process finished with exit code 0
         *
         */
        try {
            CompletableFuture<Integer> f5 = CompletableFuture.supplyAsync(()->1/0)
                    .thenApply(i->i*i)
                    .whenComplete((i,t)-> {
                        System.out.println("result : " +i+ ", error : " + t.getMessage());
                    });

            System.out.println(f5.join());
        }catch (Exception e){
            System.out.println("enter exception block");
        }
        System.out.println("--------------");
        /**
         * handle处理异常状况
         * result : null, error : java.lang.ArithmeticException: / by zero
         * 0
         *
         * Process finished with exit code 0
         *
         */
        try {
            CompletableFuture<Integer> f6 = CompletableFuture.supplyAsync(()->1/0)
                    .thenApply(i->i*i)
                    .handle((i,t)-> {
                        System.out.println("result : " +i+ ", error : " + t.getMessage());
                        return 0;
                    });

            System.out.println(f6.join());
        }catch (Exception e){
            System.out.println("enter exception block");
        }
    }

    private static void sleep(long time){
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

一次结果

T1:start
T2: start
T1: doing sth
T2: doing sth
T1 : next task
T1 task end
T1 result : T1 result
T2 result: T2:RESULT!!!
t1 t2 end
--------------
java.lang.ArithmeticException: / by zero
0
--------------
result : null, error : java.lang.ArithmeticException: / by zero
enter exception block
--------------
result : null, error : java.lang.ArithmeticException: / by zero
0

Process finished with exit code 0
相关文章
相关标签/搜索