Java5中,Future以及相关使用方法提供了异步执行任务的能力,但对于结果的获取却不是很方便,只能经过get()方法阻塞住调用线程直至计算完成返回结果或者isDone()方法轮询的方式获得任务结果,也能够用cancel方法来中止任务的执行,阻塞的方式与咱们理解的异步编程实际上是相违背的,而轮询又会耗无谓的CPU资源,并且还不能及时获得计算结果,为何不能用观察者设计模式当计算结果完成及时通知监听者呢?java
不少语言像Node.js,采用回调的方式实现异步编程。Java的一些框架像Netty,本身扩展Java的Future接口,提供了addListener等多个扩展方法:编程
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); future.addListener((channelFuture) -> { if (channelFuture.isSuccess()) { // SUCCESS } else { // FAILURE } });
guava里面也提供了通用的扩展Future: ListenableFuture\SettableFuture以及辅助类Futures等,方便异步编程。bootstrap
做为正统Java类库,是否是应该加点什么特性,能够增强一下自身库的功能?设计模式
Java8里面新增长了一个包含50个方法左右的类:CompletableFuture。app
CompletableFuture类实现了CompletionStage和Future接口。提供了很是强大的Future的扩展功能,能够帮助简化异步编程的复杂性,提供了函数式编程能力,能够经过回调的方式计算处理结果,而且提供了转换和组织CompletableFuture的方法。框架
CompletableFuture 类实现了CompletionStage和Future接口,因此仍是能够像之前同样经过阻塞或轮询的方式得到结果。尽管这种方式不推荐使用。异步
public T get() public T get(long timeout, TimeUnit unit) public T getNow(T valueIfAbsent) public T join()
其中的getNow有点特殊,若是结果已经计算完则返回结果或抛异常,不然返回给定的valueIfAbsent的值。 join返回计算的结果或抛出一个uncheckd异常。函数式编程
CompletionStage是一个接口,从命名上看得知是一个完成的阶段,它里面的方法也标明是在某个运行阶段获得告终果以后要作的事情。异步编程
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
以Async结尾的方法都是能够异步执行的,若是指定了线程池,会在指定的线程池中执行,若是没有指定,默认会在ForkJoinPool.commonPool()中执行,下文中将会有好多相似的,都不详细解释了。关键的入参只有一个Function,它是函数式接口,因此使用Lambda表示起来会更加优雅。它的入参是上一个阶段计算后的结果,返回值是通过转化后结果。函数
例如:
@Test public void thenApply() { String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join(); System.out.println(result); // hello world }
public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
thenAccept是针对结果进行消耗,由于他的入参是Consumer,有入参无返回值。
例如:
@Test public void thenAccept(){ CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s + " world")); }
public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
thenRun它的入参是一个Runnable的实例,表示当获得上一步的结果时的操做。
例如:
@Test public void thenRun(){ CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }).thenRun(() -> System.out.println("hello world")); // hello world while (true){} }
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
它须要原来的处理返回值,而且other表明的CompletionStage也要返回值以后,利用这两个返回值,进行转换后返回指定类型的值。 例如:
@Test public void thenCombine() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }).thenCombine(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }), (s1, s2) -> s1 + " " + s2).join(); System.out.println(result); // hello world }
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
它须要原来的处理返回值,而且other表明的CompletionStage也要返回值以后,利用这两个返回值,进行消耗。
例如:
@Test public void thenAcceptBoth() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }), (s1, s2) -> System.out.println(s1 + " " + s2)); // hello world while (true){} }
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
不关心这两个CompletionStage的结果,只关心这两个CompletionStage执行完毕,以后在进行操做(Runnable)。
例如:
@Test public void runAfterBoth(){ CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s2"; }), () -> System.out.println("hello world")); // hello world while (true){} }
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
咱们现实开发场景中,总会碰到有两种渠道完成同一个事情,因此就能够调用这个方法,找一个最快的结果进行处理。
例如:
@Test public void applyToEither() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).applyToEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }), s -> s).join(); System.out.println(result); // hello world }
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
例如:
@Test public void acceptEither() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).acceptEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }), System.out::println); // hello world while (true){} }
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
例如:
@Test public void runAfterEither() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).runAfterEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "s2"; }), () -> System.out.println("hello world")); // hello world while (true) { } }
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
例如:
@Test public void exceptionally() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } if (1 == 1) { throw new RuntimeException("测试一下异常状况"); } return "s1"; }).exceptionally(e -> { System.out.println(e.getMessage()); // java.lang.RuntimeException: 测试一下异常状况 return "hello world"; }).join(); System.out.println(result); // hello world }
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
例如:
@Test public void whenComplete() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } if (1 == 1) { throw new RuntimeException("测试一下异常状况"); } return "s1"; }).whenComplete((s, t) -> { System.out.println(s); System.out.println(t.getMessage()); }).exceptionally(e -> { System.out.println(e.getMessage()); return "hello world"; }).join(); System.out.println(result); }
结果:
null java.lang.RuntimeException: 测试一下异常状况 java.lang.RuntimeException: 测试一下异常状况 hello world
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
例如: 出现异常时
@Test public void handle() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //出现异常 if (1 == 1) { throw new RuntimeException("测试一下异常状况"); } return "s1"; }).handle((s, t) -> { if (t != null) { return "hello world"; } return s; }).join(); System.out.println(result); // hello world }
未出现异常时
@Test public void handle() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).handle((s, t) -> { if (t != null) { return "hello world"; } return s; }).join(); System.out.println(result); // s1 }
上面就是CompletionStage接口中方法的使用实例,CompletableFuture一样也一样实现了Future,因此也一样可使用get进行阻塞获取值,总的来讲,CompletableFuture使用起来仍是比较爽的,看起来也比较优雅一点。