CompletableFuture 类声明了 CompletionStage 接口,CompletionStage 接口实际上提供了同步或异步运行计算的舞台。java
所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操做继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不须要等待计算结果。但调用者仍须要获取线程的计算结果。dom
CompletableFuture 提供了以下的异步方法,异步
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
supplyAsync 返回带有任务结果的CompletableFuture,而runAsync返回CompletableFuture<Void>。async
Executor
参数能够手动指定线程池,不然默认ForkJoinPool.commonPool()
系统级公共线程池。ide
注意:ForkJoinPool.commonPool()
是
Daemon Thread(守护线程) 函数只要当前JVM实例中尚存在任何一个非守护线程(用户线程)没有结束,守护线程就所有工做;this
只有当用户线程结束时,JVM推出,守护线程随着JVM一同结束工做。spa
@Test public void test() throws InterruptedException { CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> { System.out.println("runAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("done=" + cf.isDone()); TimeUnit.SECONDS.sleep(4); System.out.println("done=" + cf.isDone()); }
输出,线程
done=false runAsync=ForkJoinPool.commonPool-worker-1|true done=true
在这段代码中,runAsync 是异步执行的 ,经过 Thread.currentThread().isDaemon() 打印的结果就能够知道是Daemon线程异步执行的。code
CompletableFuture中不带Async的同步方法以下,
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public CompletableFuture<Void> thenAccept(Consumer<? super T> action); public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public CompletableFuture<Void> thenRun(Runnable action);
这些方法都是同步执行的
@Test public void testSync11() { CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApply(s -> { randomSleep(); System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 若是成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,而后返回结果 System.out.println(cf.join()); }
输出以下,
thenApply=main|false MESSAGE MESSAGE
首先经过 completedFuture 方法获取一个结果已经完成的Future,而后执行同步方法thenApply,由main线程执行,会阻塞当前的main线程 ,最后getNow方法获取到结果。
CompletableFuture中异步执行的方法都是带Async 结尾的,能够制定执行异步任务的线程池,也能够不指定,若是不指定,默认使用ForkJoinPool.commonPool() 线程池。
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor); public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action); public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor); public CompletableFuture<Void> thenRunAsync(Runnable action); public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);
如下使用的两个方法都是异步执行任务的方法
@Test public void testAsync1() { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }).thenApplyAsync(s -> { randomSleep(); System.out.println("thenApplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 若是成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,而后返回结果 System.out.println(cf.join()); }
输出以下,
null supplyAsync=ForkJoinPool.commonPool-worker-1|true thenApplyAsync=ForkJoinPool.commonPool-worker-1|true MESSAGE
当执行 cf.gotNow 方法的时候,异步任务尚未执行完成,因此返回 null 。执行 cf.join 方法,阻塞一直等到异步任务结果返回。
thenApply 不带async结尾,是一个同步方法,但可能仍是由执行任务的线程池来执行,或者是当前main线程来执行。
@Test public void testAsync125() { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { //没有sleep System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }).thenApply(s -> { // thenApplyAsync=main|false 使用调用者线程来进行处理 System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 若是成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,而后返回结果 System.out.println(cf.join()); } @Test public void testAsync126() throws InterruptedException { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }); TimeUnit.SECONDS.sleep(2); // 使用调用者线程 当前线程main 来进行处理thenApply 转换操做 cf = cf.thenApply(s -> { System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 若是成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,而后返回结果 System.out.println(cf.join()); } @Test public void testAsync124() { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }).thenApply(s -> { System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 若是成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,而后返回结果 System.out.println(cf.join()); }
输出以下,
supplyAsync=ForkJoinPool.commonPool-worker-1|true thenApplyAsync=main|false MESSAGE MESSAGE ////// supplyAsync=ForkJoinPool.commonPool-worker-1|true thenApply=main|false MESSAGE MESSAGE ////// null supplyAsync=ForkJoinPool.commonPool-worker-1|true thenApply=ForkJoinPool.commonPool-worker-1|true MESSAGE
在testAsync125方法中,thenApply 回调方法是由当前main线程执行的;
在testAsync126方法中,thenApply 回调方法是由当前main线程执行的;
在testAsync124方法中,thenApply 方法是由执行任务的线程池的线程来执行的,thenApply 虽然是一个同步方法,但其调用是经过 ForkJoinPool.commonPool 线程池异步执行的。
因此要注意的是 若是在thenApply 方法中执行比较耗时的操做,会阻塞调用者线程或者主线程。
When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.
The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "Beautiful"; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } return "World"; }); System.out.println("f1=" + future1.isDone()); System.out.println("f2=" + future2.isDone()); System.out.println("f3=" + future3.isDone()); CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3); System.out.println("========"); System.out.println("f1=" + future1.isDone()); System.out.println("f2=" + future2.isDone()); System.out.println("f3=" + future3.isDone()); // 等待全部的future 执行完成 combinedFuture.join(); System.out.println("========"); System.out.println("f1=" + future1.isDone()); System.out.println("f2=" + future2.isDone()); System.out.println("f3=" + future3.isDone());
f1=false f2=false f3=false ======== f1=false f2=false f3=false ======== f1=true f2=true f3=true
经过 combinedFuture.join() 方法等待全部的异步任务执行完成。当其全部的CompletableFuture均完成结果时,combinedFuture就会处于完成状态
Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture<Void>. The limitation of this method is that it does not return the combined results of all Futures. Instead you have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:
String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); System.out.println(combined);
更简化后完整连贯的代码,
@Test public void testAllOf2() { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture.allOf(future1, future2, future3) .thenApply((v) -> Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" "))) .thenAccept(System.out::println); }
========END========