Java并发编程(六) CompletableFuture与CompletionStage使用

一.前言

最近复习了些java线程池和并发的一点东西,正好看到Future,Future是在JDK5中新加入的用来获取异步执行结果,可是局限就是除了采用get()阻塞获取或者采用轮训isDone(),别无他法,代码也不够优雅,JDK8中新特性CompletableFuture与CompletionStage,是对于Future的补充,下面来分享一下;java

这里只是对这个类方法进行演示说明,使得快速入门,实际使用中有不少场景和灵活使用;并发

二. CompletableFuture

2.1 提交任务执行、获取执行结果

static CompletableFuture<Void> runAsync(Runnable runnable) 
//1. 返回一个新的CompletableFuture,它在运行给定操做后由运行在 ForkJoinPool.commonPool()中的任务 异步完成。 
 
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) 
//2. 返回一个新的CompletableFuture,它在运行给定操做以后由在给定执行程序中运行的任务异步完成。  

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 
//3. 返回一个新的CompletableFuture,它经过在 ForkJoinPool.commonPool()中运行的任务与经过调用给定的供应商得到的值 异步完成。  

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) 
//4. 返回一个新的CompletableFuture,由给定执行器中运行的任务异步完成,并经过调用给定的供应商得到的值。

说明:异步

  • 前二者用于无返回值的异步执行,后二者用于可得到返回值的异步执行 ;
  • 2、四 可执行特定线程池或自定义线程池执行,不然默认使用ForkJoinPool.commonPool()线程池;
public void test1() throws ExecutionException, InterruptedException {
        //1. 提交一个一部执行的任务,无结果返回值
        CompletableFuture.runAsync(() -> {
            log.info("I have done Nothing");
        });

        CompletableFuture.runAsync(() -> {
            log.info("Me, too");
        },EnvirmentThreadPool.getThreadPool());

        //2. 提交一个一部执行的任务,有结果返回值
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "future 1");
        //若future执行完毕,则future.get();返回执行结果'future 1',若未执行完毕,则返回给定值'111'
        future.complete("111");

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future 1",
                EnvirmentThreadPool.getThreadPool());

        //返回一个指定结果的CompletableFuture对象
        CompletableFuture<String> future2 = CompletableFuture.completedFuture("future 2");
    }

2.2 allOf 、 anyOf

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 
//5. 返回一个新的CompletableFuture,当全部给定的CompletableFutures完成时,完成。  

static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 
//6. 返回一个新的CompletableFuture,当任何一个给定的CompletableFutures完成时,完成相同的结果。

说明:函数

  • 前者为执行完全部提交任务后进行后面的操做,无返回值;
  • 后者用于有返回值的,当任一执行完返回结果时便可进行后续的任务执行;
@Test
    public void test1() throws ExecutionException, InterruptedException {
        //1. 提交一个一部执行的任务,无结果返回值
        CompletableFuture<Void> void1 = CompletableFuture.runAsync(() -> {
            log.info("I have done Nothing");
        });

        CompletableFuture<Void> void2 = CompletableFuture.runAsync(() -> {
            log.info("Me, too");
        }, EnvirmentThreadPool.getThreadPool());

        CompletableFuture.allOf(void1, void2).thenRun(() -> {log.info("success");}).join();



        //2. 提交一个一部执行的任务,有结果返回值
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "future 1");
        
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future 3",
                EnvirmentThreadPool.getThreadPool());

         //后面须要join(),不然void1和void2将异步执行,这里不会阻塞,也就拿不到执行结果
         CompletableFuture.allOf(void1, void2).thenRun(() -> {log.info("success");}).join();

        CompletableFuture.anyOf(future,future1).thenAccept((value)-> System.out.println(value)).join(); //输出结果future 1与future 3不定



    }
  • 二者均须要join()或者get()方法配合使用才能达到同步入参的返回值执行后面的操做,不然将异步执行;工具

/**
         * 在这里重点对比一下下面的方法,下面举例用allOf,实际allOf与anyOf用法相同,下面提到的join(),换成get()也一样;
         */
        //1.allOf没有join()配合使用,allOf后面的结果集若没有执行完毕则直接执行log.info();结果集继续异步执行,完成后当前异步线程继续执行thenAccept()方法;
        CompletableFuture.allOf(future, future1).thenAccept((value) -> {
            System.out.println(value);
        });
        //2.allOf配合join()使用,线程将会阻塞等待结果集执行完毕获取结果后执行thenAccept();
        CompletableFuture.allOf(future, future1).thenAccept((value) -> {
            System.out.println(value);
        }).join();

        CompletableFuture.allOf(future, future1).thenAccept((value) -> {
            System.out.println(value);
        }).get();

        CompletableFuture.allOf(future, future1).thenAccept((value) -> {
            System.out.println(value);
        }).get(2000, TimeUnit.MILLISECONDS);

        log.info("do nothing");

2.3 异步结果的处理

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) 
//7.返回一个新的CompletionStage,当此阶段正常完成时,将以该阶段的结果做为所提供函数的参数执行。 
 
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) 
//8. 返回一个新的CompletionStage,当该阶段正常完成时,将使用此阶段的默认异步执行工具执行此阶段的结果做为所提供函数的参数。
  
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 
//9. 返回一个新的CompletionStage,当此阶段正常完成时,将使用提供的执行程序执行此阶段的结果做为提供函数的参数。
CompletableFuture<Void> thenAccept(Consumer<? super T> action) 
//10. 返回一个新的CompletionStage,当此阶段正常完成时,将以该阶段的结果做为提供的操做的参数执行。
  
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) 
//11. 返回一个新的CompletionStage,当此阶段正常完成时,将使用此阶段的默认异步执行工具执行,此阶段的结果做为提供的操做的参数。  

CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) 
//12. 返回一个新的CompletionStage,当此阶段正常完成时,将使用提供的执行程序执行此阶段的结果做为提供的操做的参数。

 说明: 上面两块方法,都用于获取前一步执行结果后,对其进行接下来的操做,前者用于又返回值的,后者用于无返回值的结果消耗;入参都支持lamdba,可指定线程池;默认为ForkJoinPool.commonPool();spa

@Test
    public void test3(){
        CompletableFuture.supplyAsync(() -> "future 1").thenAccept(s -> log.info(s));
        CompletableFuture.supplyAsync(() -> "future 1").thenAcceptAsync(s -> log.info(s),EnvirmentThreadPool.getThreadPool());
        
        
        String c = CompletableFuture.supplyAsync(() -> "future 1").thenApply(a -> a).join();
        try {
            String d = CompletableFuture.supplyAsync(() -> "future 1").thenApply(a -> a).get();
        } catch (InterruptedException e) {
            
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        
    }

2.4. 获取执行结果get(),join()

public T join() {
        Object r;
        return reportJoin((r = result) == null ? waitingGet(false) : r);
    }

    public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }

上面的代码示例有演示,join()没有异常抛出,而get()方法抛出了2个异常,从异常能够看出,两者都是阻塞的等待获取执行结果,但get()的阻塞能够被interrupt(),而join()不能够;两者对于异常的处理也不相同;.net

get()方法咱们能够看到异常抛出的位置在68行,也就是在调用get()方法的时候抛出的异常,同时异常中显示异常位置在66行;get()须要处理检查异常,必须try..catch..处理或者抛出到外层;线程

下面使用join():3d

此处异常抛出的位置在66行;join()没有强制处理异常;code

2.5 任务串行执行,不处理上一步执行结果

CompletableFuture<Void> thenRun(Runnable action) 
//13. 返回一个新的CompletionStage,当此阶段正常完成时,执行给定的操做。  

CompletableFuture<Void> thenRunAsync(Runnable action) 
//14. 返回一个新的CompletionStage,当此阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操做。 
 
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) 
//15. 返回一个新的CompletionStage,当此阶段正常完成时,使用提供的执行程序执行给定的操做。

说明: 显而易见,执行完上一个任务后执行下一个,支持异步执行,而且指定线程池;

2.6 合并2个结果

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 
//16. 返回一个新的CompletionStage,当这个和另外一个给定的阶段都正常完成时,两个结果做为提供函数的参数执行。
  
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 
//17. 返回一个新的CompletionStage,当这个和另外一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中两个结果做为提供函数的参数。  

<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) 
//18. 返回一个新的CompletionStage,当这个和另外一个给定阶段正常完成时,使用提供的执行器执行,其中两个结果做为提供的函数的参数。
@Test
    public void test4(){
        CompletableFuture<Integer> integerCompletableFuture =
                CompletableFuture.supplyAsync(() -> 3)
                .thenCombine(CompletableFuture.supplyAsync(() -> 8), (one, two) -> one + two);
        System.out.println(integerCompletableFuture.join());
    }

 说明: 上面是合并2个串行结果并返回值,下面的对应的是无返回值的方法

<U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) 
//19. 返回一个新的CompletionStage,当这个和另外一个给定的阶段都正常完成时,两个结果做为提供的操做的参数被执行。  
<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) 
//20. 返回一个新的CompletionStage,当这个和另外一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中两个结果做为提供的操做的参数。  
<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) 
//21. 返回一个新的CompletionStage,当这个和另外一个给定阶段正常完成时,使用提供的执行器执行,其中两个结果做为提供的函数的参数。
@Test
    public void test4(){
            CompletableFuture.supplyAsync(() -> 3)
                    .thenAcceptBoth(CompletableFuture.supplyAsync(() -> 8), (one, two) -> System.out.println(one + two));
    }

输出11;

2.7 

CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) 
//22. 返回一个新的CompletionStage,当这个和另外一个给定的阶段都正常完成时,执行给定的动做。  
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) 
//23. 返回一个新的CompletionStage,当这个和另外一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操做。  
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) 
//24. 返回一个新CompletionStage,当这和其余特定阶段正常完成,使用附带的执行见执行给定的动做CompletionStage覆盖特殊的完成规则的文档。  

CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) 
//25. 返回一个新的CompletionStage,当这个或另外一个给定阶段正常完成时,执行给定的操做。  
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) 
//26. 返回一个新的CompletionStage,当这个或另外一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操做。  
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor) 
//27. 返回一个新的CompletionStage,当这个或另外一个给定阶段正常完成时,使用提供的执行器执行给定的操做。

说明: 前3个方法用于2个任务完成以后串行执行第三个任务,后3个方法用于只要前2个任务有一个完成就开始执行第三个任务,都不关心前面的执行结果;

CompletableFuture.supplyAsync(() -> 3)
                .runAfterBoth(CompletableFuture.supplyAsync(()->9),
                        ()->System.out.println()
                );

        CompletableFuture.supplyAsync(() -> 3)
                .runAfterEither(CompletableFuture.supplyAsync(()->9),
                        ()->System.out.println()
                );

2.8

CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) 
//28. 返回与此阶段相同的结果或异常的新的CompletionStage,当此阶段完成时,使用结果(或 null若是没有))和此阶段的异常(或 null若是没有))执行给定的操做。  
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) 
//29. 返回一个与此阶段相同结果或异常的新CompletionStage,当此阶段完成时,执行给定操做将使用此阶段的默认异步执行工具执行给定操做,结果(或 null若是没有))和异常(或 null若是没有)这个阶段做为参数。  
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) 
//30. 返回与此阶段相同的结果或异常的新的CompletionStage,当此阶段完成时,使用提供的执行者执行给定的操做,若是没有,则使用结果(或 null若是没有))和异常(或 null若是没有))做为论据。

 说明; 当前者任务完成时,将执行结果和异常做为参数传入方法,因为后续处理;

CompletableFuture.supplyAsync(() -> 3).whenComplete((r,throwable)->System.out.println(r));

2.9

<U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn) 
//31. 返回一个新的CompletionStage,当这个阶段正常完成时,这个阶段将做为提供函数的参数执行。  
<U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn) 
//32. 返回一个新的CompletionStage,当此阶段正常完成时,将使用此阶段的默认异步执行工具执行,此阶段做为提供的函数的参数。  
<U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor) 
//33. 返回一个新的CompletionStage,当此阶段正常完成时,将使用提供的执行程序执行此阶段的结果做为提供函数的参数。

 说明: 将前一执行结果做为参数传入后面的任务中;

CompletableFuture.supplyAsync(() -> 3).thenCompose(i-> CompletableFuture.supplyAsync(()->i));

2.10

<U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) 
//34. 返回一个新的CompletionStage,当此阶段正常或异常完成时,将使用此阶段的结果和异常做为所提供函数的参数执行。  
<U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) 
//35. 返回一个新的CompletionStage,当该阶段完成正常或异常时,将使用此阶段的默认异步执行工具执行,此阶段的结果和异常做为提供函数的参数。  
<U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor) 
//36. 返回一个新的CompletionStage,当此阶段完成正常或异常时,将使用提供的执行程序执行此阶段的结果和异常做为提供的函数的参数。

与2.9不一样的是,此处将前者执行结果包括异常做为参数入参执行后续的方法;可与thenApply方法进行对比;多了对异常的处理

CompletableFuture.supplyAsync(() -> 3).thenApply(r->r+1);
        CompletableFuture.supplyAsync(() -> 3).handle((r,throwable)-> r+2);
        CompletableFuture<String> handle = CompletableFuture.supplyAsync(() -> 3).handle((r, throwable) -> throwable.getMessage());

2.11 获取执行结果

boolean complete(T value) 
//若是不是已经完成,将返回的值 get()种相关方法为给定值。  
boolean completeExceptionally(Throwable ex) 
//若是还没有完成,则调用 get()和相关方法来抛出给定的异常。  
T get() 
//等待这个将来完成的必要,而后返回结果。  
T get(long timeout, TimeUnit unit) 
//若是有必要等待这个将来完成的给定时间,而后返回其结果(若是有的话)。  
T getNow(T valueIfAbsent) 
//若是已完成,则返回结果值(或抛出任何遇到的异常),不然返回给定的值IfAbsent。  
T join() 
//返回结果值,若是完成异常,则返回(未检查)异常。

2.12 

boolean completeExceptionally(Throwable ex) 
//若是还没有完成,则调用 get()和相关方法来抛出给定的异常。  
CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn) 
//返回一个新的CompletableFuture,当CompletableFuture完成时完成,结果是异常触发此CompletableFuture的完成特殊功能的给定功能; 不然,若是此CompletableFuture正常完成,则返回的CompletableFuture也会以相同的值正常完成。  
boolean isCompletedExceptionally() 
//若是此CompletableFuture以任何方式完成异常完成,则返回 true 。
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {

            try {
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            throw new RuntimeException("hahahahh");
        });
        String join = result.exceptionally(throwable -> "123").join();
        System.out.println(join);
        System.out.println(result.isCompletedExceptionally());
        System.out.println(result.completeExceptionally(new RuntimeException()));
        result.get();
相关文章
相关标签/搜索