[Java并发-15] CompletableFuture: 异步编程

前面咱们不止一次提到,用多线程优化性能,其实不过就是将串行操做变成并行操做。若是仔细观察,你还会发如今串行转换成并行的过程当中,必定会涉及到异步化,例以下面的示例代码,如今是串行的,为了提高性能,咱们得把它们并行化。java

// 如下两个方法都是耗时操做
doBizA();
doBizB();


//建立两个子线程去执行就能够了,两个操做已经被异步化了。
new Thread(()->doBizA())
  .start();
new Thread(()->doBizB())
  .start();

异步化,是并行方案得以实施的基础,更深刻地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程。编程

CompletableFuture 的核心优点

为了领略 CompletableFuture 异步编程的优点,这里咱们用 CompletableFuture 从新实现前面曾说起的烧水泡茶程序。首先仍是须要先完成分工方案,在下面的程序中,咱们分了 3 个任务:任务 1 负责洗水壶、烧开水,任务 2 负责洗茶壶、洗茶杯和拿茶叶,任务 3 负责泡茶。其中任务 3 要等待任务 1 和任务 2 都完成后才能开始。这个分工以下图所示。多线程

烧水泡茶分工方案app

// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Void> f1 = 
  CompletableFuture.runAsync(()->{
  System.out.println("T1: 洗水壶...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T1: 烧开水...");
  sleep(15, TimeUnit.SECONDS);
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
  System.out.println("T2: 洗茶壶...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T2: 洗茶杯...");
  sleep(2, TimeUnit.SECONDS);

  System.out.println("T2: 拿茶叶...");
  sleep(1, TimeUnit.SECONDS);
  return " 龙井 ";
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> f3 = 
  f1.thenCombine(f2, (__, tf)->{
    System.out.println("T1: 拿到茶叶:" + tf);
    System.out.println("T1: 泡茶...");
    return " 上茶:" + tf;
  });
// 等待任务 3 执行结果
System.out.println(f3.join());

void sleep(int t, TimeUnit u) {
  try {
    u.sleep(t);
  }catch(InterruptedException e){}
}
// 一次执行结果:
T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井

从总体上来看,咱们会发现dom

  1. 无需手工维护线程,没有繁琐的手工维护线程的工做,给任务分配线程的工做也不须要咱们关注;
  2. 语义更清晰,例如f3 = f1.thenCombine(f2, ()->{}) 可以清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始”;
  3. 代码更简练而且专一于业务逻辑,几乎全部代码都是业务逻辑相关的。

领略 CompletableFuture 异步编程的优点以后,下面咱们详细介绍 CompletableFuture 的使用。异步

建立 CompletableFuture 对象

建立 CompletableFuture 对象主要靠下面代码中展现的这 4 个静态方法,咱们先看前两个。在烧水泡茶的例子中,咱们已经使用了runAsync(Runnable runnable)supplyAsync(Supplier<U> supplier),它们之间的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。异步编程

前两个方法和后两个方法的区别在于:后两个方法能够指定线程池参数。函数

默认状况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认建立的线程数是 CPU 的核数(也能够经过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。若是全部 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操做,就会致使线程池中全部线程都阻塞在 I/O 操做上,从而形成线程饥饿,进而影响整个系统的性能。因此,强烈建议你要根据不一样的业务类型建立不一样的线程池,以免互相干扰性能

// 使用默认线程池
static CompletableFuture<Void> 
  runAsync(Runnable runnable)
static <U> CompletableFuture<U> 
  supplyAsync(Supplier<U> supplier)
// 能够指定线程池  
static CompletableFuture<Void> 
  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> 
  supplyAsync(Supplier<U> supplier, Executor executor)

建立完 CompletableFuture 对象以后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,对于一个异步操做,你须要关注两个问题:一个是异步操做何时结束,另外一个是如何获取异步操做的执行结果。由于 CompletableFuture 类实现了 Future 接口,因此这两个问题你均可以经过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口,这个接口内容实在是太丰富了,在 1.8 版本里有 40 个方法,这些方法咱们该如何理解呢?优化

理解 CompletionStage 接口

能够站在分工的角度类比一下工做流。任务是有时序关系的,好比有串行关系、并行关系、汇聚关系等。这样说可能有点抽象,这里还举前面烧水泡茶的例子,其中洗水壶和烧开水就是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间就是并行关系,而烧开水、拿茶叶和泡茶就是汇聚关系。

图示123

CompletionStage 接口能够清晰地描述任务之间的这种时序关系,例如前面提到的
f3 = f1.thenCombine(f2, ()->{}) 描述的就是一种汇聚关系。烧水泡茶程序中的汇聚关系是一种 AND 聚合关系,这里的 AND 指的是全部依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)。既然有 AND 聚合关系,那就必定还有 OR 聚合关系,所谓 OR 指的是依赖的任务只要有一个完成就能够执行当前任务。

最后就是异常,CompletionStage 接口也能够方便地描述异常处理。

下面咱们就来一一介绍,CompletionStage 接口如何描述串行关系、AND 聚合关系、OR 聚合关系以及异常处理。

1. 描述串行关系

CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。

thenApply 系列函数里参数 fn 的类型是接口 Function<T, R>,这个接口里与 CompletionStage 相关的方法是R apply(T t),这个方法既能接收参数也支持返回值,因此 thenApply 系列方法返回的是CompletionStage<R>

而 thenAccept 系列方法里参数 consumer 的类型是接口Consumer<T>,这个接口里与 CompletionStage 相关的方法是void accept(T t),这个方法虽然支持参数,但却不支持回值,因此 thenAccept 系列方法返回的是CompletionStage<Void>

thenRun 系列方法里 action 的参数是 Runnable,因此 action 既不能接收参数也不支持返回值,因此 thenRun 系列方法返回的也是CompletionStage<Void>

这些方法里面 Async 表明的是异步执行 fn、consumer 或者 action。其中,须要你注意的是 thenCompose 系列方法,这个系列的方法会新建立出一个子流程,最终结果和 thenApply 系列是相同的。

CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

经过下面的示例代码,你能够看一下 thenApply() 方法是如何使用的。首先经过 supplyAsync() 启动一个异步流程,以后是两个串行操做,总体看起来仍是挺简单的。不过,虽然这是一个异步流程,但任务①②③倒是串行执行的,②依赖①的执行结果,③依赖②的执行结果。

CompletableFuture<String> f0 = 
  CompletableFuture.supplyAsync(
    () -> "Hello World")      //①
  .thenApply(s -> s + " QQ")  //②
  .thenApply(String::toUpperCase);//③

System.out.println(f0.join());
// 输出结果
HELLO WORLD QQ

2. 描述 AND 汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不一样。

CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

3. 描述 OR 汇聚关系

CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不一样。

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
CompletableFuture<String> f1 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});

CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});

CompletableFuture<String> f3 = 
  f1.applyToEither(f2,s -> s);

System.out.println(f3.join());

4. 异常处理

虽然上面咱们提到的 fn、consumer、action 它们的核心方法都不容许抛出可检查异常,可是却没法限制它们抛出运行时异常 ,例以下面的代码,执行

CompletableFuture<Integer> 
  f0 = CompletableFuture.
    .supplyAsync(()->(7/0))
    .thenApply(r->r*10);
System.out.println(f0.join());

CompletionStage 接口给咱们提供的方案很是简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操做是同样的,都支持链式编程方式。

CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

下面的示例代码展现了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用很是相似于 try{}catch{}中的 catch{},可是因为支持链式编程方式,因此相对更简单。

whenComplete() 和 handle() 系列方法就相似于 try{}finally{}中的 finally{},不管是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。

whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

CompletableFuture<Integer> 
  f0 = CompletableFuture
    .supplyAsync(()->7/0))
    .thenApply(r->r*10)
    .exceptionally(e->0);
System.out.println(f0.join());

总结

不过最近几年,伴随着 ReactiveX 的发展(Java 语言的实现版本是 RxJava),回调地狱已经被完美解决了,Java 语言也开始官方支持异步编程:在 1.8 版本提供了 CompletableFuture,在 Java 9 版本则提供了更加完备的 Flow API,异步编程目前已经彻底工业化。

CompletableFuture 已经可以知足简单的异步编程需求,若是你对异步编程感兴趣,能够重点关注 RxJava 这个项目,利用 RxJava,即使在 Java 1.6 版本也能享受异步编程的乐趣。

相关文章
相关标签/搜索