《Java 8 in Action》Chapter 11:CompletableFuture:组合式异步编程

某个网站的数据来自Facebook、Twitter和Google,这就须要网站与互联网上的多个Web服务通讯。但是,你并不但愿由于等待某些服务的响应,阻塞应用程序的运行,浪费数十亿宝贵的CPU时钟周期。好比,不要由于等待Facebook的数据,暂停对来自Twitter的数据处理。 java

第7章中介绍的分支/合并框架以及并行流是实现并行处理的宝贵工具;它们将一个操做切分为多个子操做,在多个不一样的核、CPU甚至是机器上并行地执行这些子操做。与此相反,若是你的意图是实现并发,而非并行,或者你的主要目标是在同一个CPU上执行几个松耦合的任务,充分利用CPU的核,让其足够忙碌,从而最大化程序的吞吐量,那么你其实真正想作的是避免由于等待远程服务的返回,或者对数据库的查询,而阻塞线程的执行,浪费宝贵的计算资源,由于这种等待的时间极可能至关长。 git

1. Future接口

Future接口在Java 5中被引入,设计初衷是对未来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操做把调用线程解放出来,让它能继续执行其余有价值的工做,再也不须要等待耗时的操做完成。Future的另外一个优势是它比更底层的Thread更易用。要使用Future,一般你只须要将耗时的操做封装在一个Callable对象中,再将它提交给ExecutorService。使用Future以异步的方式执行一个耗时的操做: github

线程能够在ExecutorService以并发方式调用另外一个线程执行耗时操做的同时,去执行一些其余的任务。接着,若是你已经运行到没有异步操做的结果就没法继续任何有意义的工做时,能够调用它的get方法去获取操做的结果。若是操做已经完成,该方法会马上返回操做的结果,不然它会阻塞你的线程,直到操做完成,返回相应的结果。若是该长时间运行的操做永远不返回了会怎样?Future提供了一个无需任何参数的get方法,推荐使用重载版本的get方法,它接受一个超时的参数,能够定义线程等待Future结果的最长时间,避免无休止的等待。下图是Future异步执行线程原理图。 数据库

2. 使用CompletableFuture构建异步应用

Future接口有必定的局限性,好比,咱们很难表述Future结果之间的依赖性。所以咱们引入了CompletableFuture。接下来经过一个“最佳价格查询器“的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格,来展示CompletableFuture实现异步应用。经过此例你能学到这些:网络

  • 如何编写异步API
  • 如何让使用同步API的代码变为非阻塞代码
  • 如何使用流水线将两个接续的异步操做合并为一个异步计算操做
  • 如何以响应式的方式处理异步操做的完成事件

同步API和异步API:并发

  • 同步API其实只是对传统方法调用的另外一种称呼:你调用了某个方法,调用方在被调用方运行的过程当中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。即便调用方和被调用方在不一样的线程中运行,调用方仍是须要等待被调用方结束运行,这就是阻塞式调用这个名词的由来。
  • 异步API会直接返回,或者至少在被调用方计算完成以前,将它剩余的计算任务交给另外一个线程去作,该线程和调用方是异步的——这就是非阻塞式调用的由来。执行剩余计算任务的线程会将它的计算结果返回给调用方。返回的方式要么是经过回调函数,要么是由调用方再次执行一个“等待,直到计算完成”的方法调用。

2.1 实战:实现异步API

2.1.1 同步方法

同步操做中会为等待同步事件完成而等待1s,这种是没法接受的,对于程序体验来讲是很是很差的。框架

2.1.2 将同步方法转换为异步方法

Java 5引入了java.util.concurrent.Future接口表示一个异步计算(即调用线程能够继续运行,不会由于调用方法而阻塞)的结果。这意味着Future是一个暂时还不可知值的处理器,这个值在计算完成后,能够经过调用它的get方法取得。这种方式下,在进行价格查询的同时,还能执行一些其余的任务,好比查询其余商店中商品的价格,不会阻塞在那里等待第一家商店返回请求的结果。最后,若是全部有意义的工做都已经完成,全部要执行的工做都依赖于商品价格时,再调用Future的get方法。执行了这个操做后,要么得到Future中封装的值(若是异步任务已经完成),要么发生阻塞,直到该异步任务完成,指望的值可以访问。同时,若是某个商品价格计算发生异常,会将当前线程杀死,从而致使等待get方法返回结果的客户端永久地被阻塞。客户端可使用重载版本的get方法,设置超时参数来避免。为了让客户端能了解没法提供请求商品价格的缘由,你须要使用CompletableFuture的completeExceptionally方法将致使CompletableFuture内发生问题的异常抛出。异步

2.1.3 使用工厂方法supplyAsync建立CompletableFuture对象

supplyAsync方法接受一个生产者(Supplier)做为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,可是你也可使用supplyAsync方法的重载版本,传递第二个参数指定不一样的执行线程执行生产者方法。函数

3. 消除代码阻塞问题

3.1 顺序同步请求

3.2 使用并行流对请求进行并行操做

3.3 使用CompletableFuture发起异步请求

CompletableFuture版本的程序彷佛比并行流版本的程序还快那么一点儿。可是最后这个版本也不太使人满意。它们看起来不相伯仲,究其缘由都同样:它们内部采用的是一样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具备必定的优点,由于它容许你对执行器(Executor)进行配置,尤为是线程池的大小,让它以更适合应用需求的方式进行配置,知足程序的要求,而这是并行流API没法提供的。 顺序执行和并行执行的原理对比: 工具

图11-4的上半部分展现了使用单一流水线处理流的过程,咱们看到,执行的流程(以虚线标识)是顺序的。事实上,新的CompletableFuture对象只有在前一个操做彻底结束以后,才能建立。与此相反,图的下半部分展现了如何先将CompletableFutures对象汇集到一个列表中(即图中以椭圆表示的部分),让对象们能够在等待其余对象完成操做以前就能启动。

3.4 使用CompletableFuture发起异步请求WithExecutor

3.5 调用结果:

3.6 并行——使用流仍是CompletableFutures

目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操做开展工做,要么枚举出集合中的每个元素,建立新的线程,在CompletableFuture内对其进行操做。后者提供了更多的灵活性,你能够调整线程池的大小,而这能帮助你确保总体的计算不会由于线程都在等待I/O而发生阻塞。

  • 若是你进行的是计算密集型的操做,而且没有I/O,那么推荐使用Stream接口,由于实现简单,同时效率也多是最高的(若是全部的线程都是计算密集型的,那就没有必要建立比处理器核数更多的线程)。
  • 若是你并行的工做单元还涉及等待I/O的操做(包括网络链接等待),那么使用CompletableFuture灵活性更好,你能够像前文讨论的那样,依据等待/计算,或者 W/C的比率设定须要使用的线程数。这种状况不使用并行流的另外一个缘由是,处理流的流水线中若是发生I/O等待,流的延迟性会让咱们很难判断到底何时触发了等待。

4. 对多个异步任务进行流水线操做

4.1 案例

经过在shop构成的流上采用流水线方式执行三次map操做,咱们获得告终果。

  • 第一个操做将每一个shop对象转换成了一个字符串,该字符串包含了该 shop中指定商品的价格和折扣代码。
  • 第二个操做对这些字符串进行了解析,在Quote对象中对它们进行转换。
  • 第三个map会操做联系远程的Discount服务,计算出最终的折扣价格,并返回该价格及提供该价格商品的shop。

代码如图:

原理图:

Java 8的CompletableFuture API提供了名为thenCompose的方法,它就是专门为这一目的而设计的,thenCompose方法容许你对两个异步操做进行流水线,第一个操做完成时,将其结果做为参数传递给第二个操做。换句话说,你能够建立两个CompletableFutures对象,对第一个CompletableFuture对象调用thenCompose,并向其传递一个函数。当第一个 CompletableFuture执行完毕后,它的结果将做为该函数的参数,这个函数的返回值是以第一 个CompletableFuture的返回作输入计算出的第二个CompletableFuture对象。thenCompose方法像CompletableFuture类中的其余方法同样,也提供了一个以Async后缀结尾的版本thenComposeAsync。一般而言,名称中不带Async的方法和它的前一个任务同样,在同一个线程中运行;而名称以Async结尾的方法会将后续的任务提交到一个线程池,因此每一个任务是由不一样的线程处理的。

4.2 thenCombine方法

将两个CompletableFuture对象结合起来,不管他们是否存在依赖。thenCombine方法,它接收名为BiFunction的第二参数,这个参数 定义了当两个CompletableFuture对象完成计算后,结果如何合并。同thenCompose方法同样, thenCombine方法也提供有一个Async的版本。这里,若是使用thenCombineAsync会致使BiFunction中定义的合并操做被提交到线程池中,由另外一个任务以异步的方式执行。

代码图:

原理图:

4.3 响应CompletableFuture的completion事件

Java 8的CompletableFuture经过thenAccept方法提供了这一功能,它接收 CompletableFuture执行完毕后的返回值作参数。thenAccept方法也提供 了一个异步版本,名为thenAcceptAsync。异步版本的方法会对处理结果的消费者进行调度, 从线程池中选择一个新的线程继续执行,再也不由同一个线程完成CompletableFuture的全部任 务。由于你想要避免没必要要的上下文切换,更重要的是你但愿避免在等待线程上浪费时间,尽快响应CompletableFuture的completion事件,因此这里没有采用异步版本。

4.3.1 实战

5. 小结

  • 执行比较操做时,尤为是那些依赖一个或多个远程服务的操做,使用异步任务能够改善程序的性能,加快程序的响应速度。
  • 你应该尽量地为客户提供异步API。使用CompletableFuture类提供的特性,你可以轻松地实现这一目标。
  • CompletableFuture类还提供了异常管理的机制,让你有机会抛出/管理异步任务执行中发生的异常。
  • 将同步API的调用封装到一个CompletableFuture中,你可以以异步的方式使用其结果。
  • 若是异步任务之间相互独立,或者它们之间某一些的结果是另外一些的输入,你能够将这些异步任务构造或者合并成一个。
  • 你能够为CompletableFuture注册一个回调函数,在Future执行完毕或者它们计算的结果可用时,针对性地执行一些程序。
  • 你能够决定在何时结束程序的运行,是等待由CompletableFuture对象构成的列表中全部的对象都执行完毕,仍是只要其中任何一个首先完成就停止程序的运行。

Tips

本文同步发表在公众号,欢迎你们关注!😁 后续笔记欢迎关注获取第一时间更新!

相关文章
相关标签/搜索