了解CompletableFuture须要理解java8的函数式接口,不了解的同窗能够先移步:http://www.javashuo.com/article/p-nswjqwms-kw.htmljava
//比较特殊,他入参就是返回值,也就是说他能够用来执行须要其余返回值的异步任务。 public static <U> CompletableFuture<U> completedFuture(U value) //无返回值,使用默认线程池 public static CompletableFuture<Void> runAsync(Runnable runnable) //无返回值,使用自定义线程池 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) //有返回值,使用默认线程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //有返回值,使用自定义线程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) // public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
举例:app
//supplyAsync方法无入参,可是返回一个String对象。此方法使用了默认的线程池执行异步任务 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { //长时间的计算任务 return "·00"; });
下面将经过具体例子来展现各个方法的使用。dom
当原始的CompletableFuture任务执行完后,不论是否成功计算出结果,仍是抛出异常,都会会执行 whenComplete* 或 exceptionally 的方法中的任务。
该操做执行完毕后:异步
BiConsumer<T,U> 函数接口有两个参数,无返回值。 Function<T,R> 函数接口有一个输入参数,返回一个结果。 //无Async,同步处理正常计算结果或异常,使用执行任务的那个线程来执行该方法,因此这个方法是同步的。 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) //有Async,异步处理正常计算结果或异常,使用执行任务的那个线程池中的线程来执行该方法!因此这个方法是异步的。 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) //有Async,异步处理正常计算结果或异常,使用自定义线程池来执行该方法,因此这个方法是异步的。 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? superThrowable> action, Executor executor) //处理异常。 public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
注意:当没有异常抛出来的时候,上面的Throwable参数为空!函数
计算逻辑: private static Random random = new Random(); private static long time = System.currentTimeMillis(); public static int getMoreData(){ System.out.println("begin to start compute"); try { Thread.sleep(10000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("end to compute, passed:" + System.currentTimeMillis()); return random.nextInt(1000); } public static int throwException(){ System.out.println("准备抛出异常"); try { Thread.sleep(10000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("抛了"); throw new RuntimeException("主动抛出异常"); }
whenComplete: //若是使用这段代码,则会是和当前线程同步执行 public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> getMoreData()); CompletableFuture<Integer> future2 = future.whenComplete((result, excetion) -> { System.out.println("执行到whenComplete了,result:" + result); System.out.println("执行到whenComplete了,exception:" + (excetion == null ? "无异常" : excetion.getClass())); }); System.out.println("执行到最后一段代码了,future result:" + future.get()); System.out.println("执行到最后一段代码了,future2 result:" + future2.get()); } > 打印执行结果: begin to start compute end to compute, passed:1551182552193 执行到whenComplete了,result:625 执行到whenComplete了,exception:无异常 执行到最后一段代码了,future result:625 执行到最后一段代码了,future2 result:625 >从打印结果可知,whenComplete使用原始的执行的任务的线程,因此能够当作是同步执行的,而且新的CompletableFuture对象的结果和原始的一致
whenCompleteAsync: //若是使用这段代码,则会是和当前线程同步执行 public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> getMoreData()); future.whenCompleteAsync((result, exception) -> { System.out.println("计算已执行完毕,result:" + result); System.out.println("计算已执行完毕,exception:" + (excetion == null ? "无异常" : excetion.getClass())); }); System.out.println("执行到最后一段代码了,result:" + future.get()); } > 打印执行结果: begin to start compute end to compute, passed:1551180611064 执行到最后一段代码了,result:323 执行到whenComplete了,result:323 执行到whenComplete了,exception:无异常 >从打印结果可知,whenCompleteAsync是异步执行的
exceptionally比较复杂,须要经过4个实例才能真正明白:.net
exceptionally实例1: //这段代码,因为会抛出异常,会先走whenCompleteAsync,而后再走exceptionally,并且是没法获取到返回值的。 public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> throwException()); future.whenCompleteAsync((result, exception) -> { System.out.println("计算已执行完毕,result:" + result); System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getClass())); }).exceptionally(exception -> { System.out.println("计算执行过程当中发生了异常,exception:" + exception.getClass()); return 0; }); System.out.println("执行到最后一段代码了,future result:" + future.get()); } > 打印执行结果: 准备抛出异常 抛了 计算已执行完毕,result:null 计算已执行完毕,result:null 计算已执行完毕,exception:class java.util.concurrent.CompletionException 计算已执行完毕,exception:class java.util.concurrent.CompletionException 计算执行过程当中发生了异常,exception:class java.util.concurrent.CompletionException 计算执行过程当中发生了异常,exception:class java.util.concurrent.CompletionException Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: 主动抛出异常 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at me.ele.ecs.eapp.service.impl.Main.main(Main.java:69) Caused by: java.lang.RuntimeException: 主动抛出异常 at me.ele.ecs.eapp.service.impl.Main.throwException(Main.java:37) at me.ele.ecs.eapp.service.impl.Main.lambda$main$0(Main.java:44)
exceptionally实例2: //这里的打印结果是和上面相似的,但是为何此次要获取新的CompletableFuture对象呢?看下面的exceptionally实例3后,再回来对比吧 public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> throwException()); CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> { System.out.println("计算已执行完毕,result:" + result); System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getClass())); }); CompletableFuture<Integer> future3 = future2.exceptionally(exception -> { System.out.println("计算执行过程当中发生了异常,exception:" + exception.getClass()); return 0; }); System.out.println("执行到最后一段代码了,future result:" + future.get()); //由于上面的执行过程当中,已经抛出了异常了,那么下面的这两段代码是没法执行到的, System.out.println("执行到最后一段代码了,future2 result:" + future2.get()); System.out.println("执行到最后一段代码了,future3 result:" + future3.get()); } > 打印执行结果: 准备抛出异常 抛了 计算已执行完毕,result:null 计算已执行完毕,exception:class java.util.concurrent.CompletionException 计算执行过程当中发生了异常,exception:class java.util.concurrent.CompletionException Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: 主动抛出异常 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at me.ele.ecs.eapp.service.impl.Main.main(Main.java:69) Caused by: java.lang.RuntimeException: 主动抛出异常 at me.ele.ecs.eapp.service.impl.Main.throwException(Main.java:37) at me.ele.ecs.eapp.service.impl.Main.lambda$main$0(Main.java:44)
exceptionally实例3: // public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> throwException()); CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> { System.out.println("计算已执行完毕,result:" + result); System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getClass())); }); CompletableFuture<Integer> future3 = future2.exceptionally(exception -> { System.out.println("计算执行过程当中发生了异常,exception:" + exception.getClass()); //这里的返回值实际其是没有用处的。由于若是抛出了异常,future的get方法是执行不到的;而若是没有抛出异常的话,仍是会返回原始的CompletableFuture的值的 //因此这个exceptionally就是仅仅用来处理异常的。 return 0; }); //System.out.println("执行到最后一段代码了,future result:" + future.get()); //System.out.println("执行到最后一段代码了,future2 result:" + future2.get()); //和上面实例2惟一的区别就是注释掉了上面两段代码,可是执行结果却不同了,并且整个main方法都没有抛出来异常,缘由就在于future和future2是异步执行的,因此是在别的线程抛了异常,而main方法是不会抛出来的。并且在获取future3的结果时,能够发现,返回了future3对象自定义的返回值 System.out.println("执行到最后一段代码了,future3 result:" + future3.get()); } > 打印执行结果: 准备抛出异常 抛了 计算已执行完毕,result:null 计算已执行完毕,exception:class java.util.concurrent.CompletionException 计算执行过程当中发生了异常,exception:class java.util.concurrent.CompletionException 执行到最后一段代码了,future3 result:0
exceptionally实例4: public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData); CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> { System.out.println("计算已执行完毕,result:" + result); System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getClass())); }); CompletableFuture<Integer> future3 = future2.exceptionally(exception -> { System.out.println("计算执行过程当中发生了异常,exception:" + exception.getClass()); return 0; }); System.out.println("执行到最后一段代码了,future result:" + future.get()); System.out.println("执行到最后一段代码了,future2 result:" + future2.get()); //原始的计算逻辑不变,exceptionally返回的新的CompletableFuture对象的结果和原始计算逻辑返回的结果一致。 System.out.println("执行到最后一段代码了,future3 result:" + future3.get()); } > 打印执行结果: begin to start compute end to compute, passed:1551239497158 getMoreData: 679 执行到最后一段代码了,future result:679 计算已执行完毕,result:679 计算已执行完毕,exception:无异常 执行到最后一段代码了,future2 result:679 执行到最后一段代码了,future3 result:679
和 whenComplete* 方法同样,都是在任务执行完后,执行该方法的逻辑,可是和 whenComplete* 不一样的是:
该操做执行完毕后,它返回的新CompletableFuture对象的计算结果是handle*方法的返回值,并非原始计算逻辑的返回值线程
//同步 public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) //异步,使用原始CompletableFuture的线程 public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) //异步,使用自定义线程池的线程 public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
实例: public static void main(String[] args) throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData); CompletableFuture<Integer> future2 = future.handleAsync((result, exception) -> { System.out.println("计算已执行完毕,result:" + result); System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getClass())); return result + 1; }); System.out.println("执行到最后一段代码了,future result:" + future.get()); System.out.println("执行到最后一段代码了,future2 result:" + future2.get()); } > 打印执行结果: begin to start compute end to compute, passed:1551243326193 getMoreData: 395 执行到最后一段代码了,future result:395 计算已执行完毕,result:395 计算已执行完毕,exception:无异常 执行到最后一段代码了,future2 result:396
thenApply* 能够链接多个CompletableFuture对象,至关于将一个一个的CompletableFuture串联起来了,第一个CompletableFuture对象的结果会传递到下一个对象中,而且下一个CompletableFuture对象的结算结果会做为上一个对象的CompletableFuture结果,依次类推,也就是说会改变原始CompletableFuture对象的结果。
注:它和 handle 方法有点相似,都会拿到上一个CompletableFuture对象的结果进行计算,可是区别就是thenApply 会改变原始对象的计算结果,而 handle* 并不会**。code
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 100; }); //因为这里同时链接了多个thenApplyAsync,第一个是异步的,第二个是同步的,而且都没有处理异常,因此异常会直接在执行计算的线程上抛出来。 CompletableFuture<String> f = future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString()); System.out.println(f.get()); //"1000" }
thenAccept* 返回的新的CompletableFuture对象不返回结果,若是使用get方法,会返回一个null。对象
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData); CompletableFuture<Void> future2 = future.thenAccept(result -> { System.out.println("执行到thenAccept了, result:" + result); }); System.out.println("执行到最后一段代码了,future result:" + future.get()); System.out.println("执行到最后一段代码了,future2 result: " + future2.get()); } > 打印执行结果: begin to start compute end to compute, passed:1551341977684 getMoreData: 171 执行到thenAccept了, result:171 执行到最后一段代码了,future result:171 执行到最后一段代码了,future2 result: null
它和 thenAccept 同样,都是纯消费,可是thenAccept*只能消费一个CompletableFuture对象,而thenAcceptBoth* 能在两个不一样的CompletableFuture对象执行完成后,消费他们两个的计算结果。
并且他仅仅在原始的两个CompletableFuture对象都计算成功以后,才开始执行。blog
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) //runAfterBoth和上面三个的区别就是它不消费原始的CompletableFuture结果 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(Main::getMoreData); future.thenAcceptBothAsync(future2, (x, y) -> { System.out.println("future1 和 future都执行完成了,结果分别是:" + x + "," + y); }); System.out.println("执行到最后一段代码了,future result:" + future.get()); System.out.println("执行到最后一段代码了,future2 result: " + future2.get()); } > 打印执行结果: begin to start compute begin to start compute end to compute, passed:1551342475808 getMoreData: 920 执行到最后一段代码了,future result:920 end to compute, passed:1551342475811 getMoreData: 747 执行到最后一段代码了,future2 result: 747 future1 和 future都执行完成了,结果分别是:920,747
在原始CompletableFuture执行任务结束后,并且执行指定的任务,不消费,也不产生结果。
public CompletableFuture<Void> thenRun(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData); CompletableFuture<Void> future2 = future.thenRunAsync(() -> { System.out.println("future执行完成了"); }); System.out.println("执行到最后一段代码了,future result:" + future.get()); System.out.println("执行到最后一段代码了,future2 result:" + future2.get()); } > 打印执行结果: begin to start compute end to compute, passed:1551344347162 getMoreData: 688 执行到最后一段代码了,future result:688 future执行完成了 执行到最后一段代码了,future2 result:null
runAfterBoth是当两个CompletableFuture都计算完成后才执行。
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
runAfterBoth是当两个CompletableFuture都计算完成后才执行。而 acceptEither* 没有返回值。
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
若是你用过Guava的Future类,你就会知道它的Futures辅助类提供了不少便利方法,用来处理多个Future,而不像Java的CompletableFuture,只提供了allOf、anyOf两个方法。 好比有这样一个需求,将多个CompletableFuture组合成一个CompletableFuture,这个组合后的CompletableFuture的计算结果是个List,它包含前面全部的CompletableFuture的计算结果,guava的Futures.allAsList能够实现这样的功能,可是对于java CompletableFuture,咱们须要一些辅助方法:
public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) { CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList())); } public static <T> CompletableFuture<Stream<T>> sequence(Stream<CompletableFuture<T>> futures) { List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList()); return sequence(futureList); }
Java Future转CompletableFuture:
public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) { return CompletableFuture.supplyAsync(() -> { try { return future.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }, executor); }
本文大量参考了鸟窝的文章,本人只是将实例便于理解。
地址:https://colobu.com/2016/02/29/Java-CompletableFuture/