《Java8实战》-第十一章笔记(CompletableFuture:组合式异步编程)

CompletableFuture:组合式异步编程

最近这些年,两种趋势不断地推进咱们反思咱们设计软件的方式。第一种趋势和应用运行的硬件平台相关,第二种趋势与应用程序的架构相关,尤为是它们之间如何交互。咱们在第7章中已经讨论过硬件平台的影响。咱们注意到随着多核处理器的出现,提高应用程序处理速度最有效的方式是编写能充分发挥多核能力的软件。你已经看到经过切分大型的任务,让每一个子任务并行运行,这一目标是可以实现的;你也已经了解相对直接使用线程的方式,使用分支/合并框架(在Java 7中引入)和并行流(在Java 8中新引入)能以更简单、更有效的方式实现这一目标。java

第二种趋势反映在公共API日益增加的互联网服务应用。著名的互联网大鳄们纷纷提供了本身的公共API服务,好比谷歌提供了地理信息服务,Facebook提供了社交信息服务,Twitter提供了新闻服务。如今,不多有网站或者网络应用会以彻底隔离的方式工做。更多的时候,咱们看到的下一代网络应用都采用“混聚”(mash-up)的方式:它会使用来自多个来源的内容,将这些内容聚合在一块儿,方便用户的生活。git

好比,你可能但愿为你的法国客户提供指定主题的热点报道。为实现这一功能,你须要向谷歌或者Twitter的API请求全部语言中针对该主题最热门的评论,可能还须要依据你的内部算法对它们的相关性进行排序。以后,你可能还须要使用谷歌的翻译服务把它们翻译成法语,甚至利用谷歌地图服务定位出评论做者的位置信息,最终将全部这些信息汇集起来,呈如今你的网站上。github

固然,若是某些外部网络服务发生响应慢的状况,你但愿依旧能为用户提供部分信息,好比提供带问号标记的通用地图,以文本的方式显示信息,而不是呆呆地显示一片空白屏幕,直到地图服务器返回结果或者超时退出。算法

要实现相似的服务,你须要与互联网上的多个Web服务通讯。但是,你并不但愿由于等待某些服务的响应,阻塞应用程序的运行,浪费数十亿宝贵的CPU时钟周期。好比,不要由于等待Facebook的数据,暂停对来自Twitter的数据处理。数据库

这些场景体现了多任务程序设计的另外一面。第7章中介绍的分支/合并框架以及并行流是实现并行处理的宝贵工具;它们将一个操做切分为多个子操做,在多个不一样的核、CPU甚至是机器上并行地执行这些子操做。编程

与此相反,若是你的意图是实现并发,而非并行,或者你的主要目标是在同一个CPU上执行几个松耦合的任务,充分利用CPU的核,让其足够忙碌,从而最大化程序的吞吐量,那么你其实真正想作的是避免由于等待远程服务的返回,或者对数据库的查询,而阻塞线程的执行,浪费宝贵的计算资源,由于这种等待的时间极可能至关长。经过本章中你会了解,Future接口,尤为是它的新版实现CompletableFuture,是处理这种状况的利器。服务器

Future 接口

Future接口在Java 5中被引入,设计初衷是对未来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操做把调用线程解放出来,让它能继续执行其余有价值的工做,再也不须要呆呆等待耗时的操做完成。打个比方,你能够把它想象成这样的场景:你拿了一袋子衣服到你中意的干洗店去洗。干洗店的员工会给你张发票,告诉你何时你的衣服会洗好(这就是一个Future事件)。衣服干洗的同时,你能够去作其余的事情。Future的另外一个优势是它比更底层的Thread更易用。要使用Future,一般你只须要将耗时的操做封装在一个Callable对象中,再将它提交给ExecutorService,就万事大吉了。下面这段代码展现了Java 8以前使用Future的一个例子。网络

ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
    public Double call() {
        return doSomeLongComputation();
    }
});
// 异步操做进行的同时,你能够作其余的事情
doSomethingElse();

try {
    Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
    // 计算抛出一个异常
} catch (InterruptedException ie) {
    // 当前线程在等待过程当中被中断
} catch (TimeoutException te) {
    // 在Future对象完成以前超过已过时
}

这种编程方式让你的线程能够在ExecutorService以并发方式调用另外一个线程执行耗时操做的同时,去执行一些其余的任务。接着,若是你已经运行到没有异步操做的结果就没法继续任何有意义的工做时,能够调用它的get方法去获取操做的结果。若是操做已经完成,该方法会马上返回操做的结果,不然它会阻塞你的线程,直到操做完成,返回相应的结果。架构

你能想象这种场景存在怎样的问题吗?若是该长时间运行的操做永远不返回了会怎样?为了处理这种可能性,虽然Future提供了一个无需任何参数的get方法,咱们仍是推荐你们使用重载版本的get方法,它接受一个超时的参数,经过它,你能够定义你的线程等待Future结果的最长时间,而不是样永无止境地等待下去。并发

Future 接口的局限性

经过第一个例子,咱们知道Future接口提供了方法来检测异步计算是否已经结束(使用isDone方法),等待异步操做结束,以及获取计算的结果。可是这些特性还不足以让你编写简洁的并发代码。好比,咱们很难表述Future结果之间的依赖性;从文字描述上这很简单,“当长时间计算任务完成时,请将该计算的结果通知到另外一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另外一个查询操做结果合并”。可是,使用Future中提供的方法完成这样的操做又是另一回事。这也是咱们须要更具描述能力的特性的缘由,好比下面这些。

  • 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
  • 等待Future集合中的全部任务都完成。
  • 仅等待Future集合中最快结束的任务完成(有可能由于它们试图经过不一样的方式计算同一个值),并返回它的结果。
  • 经过编程方式完成一个Future任务的执行(即以手工设定异步操做结果的方式)。
  • 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操做,不仅是简单地阻塞等待操做的结果)。

这一章中,你会了解新的CompletableFuture类(它实现了Future接口)如何利用Java 8的新特性以更直观的方式将上述需求都变为可能。Stream和CompletableFuture的设计都遵循了相似的模式:它们都使用了Lambda表达式以及流水线的思想。从这个角度,你能够说CompletableFuture和Future的关系就跟Stream和Collection的关系同样。

使用CompletableFuture 构建异步应用

为了展现CompletableFuture的强大特性,咱们会建立一个名为“最佳价格查询器”(best-price-finder)的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。这个过程当中,你会学到几个重要的技能。

  • 首先,你会学到如何为你的客户提供异步API(若是你拥有一间在线商店的话,这是很是有帮助的)。
  • 其次,你会掌握如何让你使用了同步API的代码变为非阻塞代码。你会了解如何使用流水线将两个接续的异步操做合并为一个异步计算操做。这种状况确定会出现,好比,在线商店返回了你想要购买商品的原始价格,并附带着一个折扣代码——最终,要计算出该商品的实际价格,你不得不访问第二个远程折扣服务,查询该折扣代码对应的折扣比率。
  • 你还会学到如何以响应式的方式处理异步操做的完成事件,以及随着各个商店返回它的商品价格,最佳价格查询器如何持续地更新每种商品的最佳推荐,而不是等待全部的商店都返回他们各自的价格(这种方式存在着必定的风险,一旦某家商店的服务中断,用户可能遭遇白屏)。

实现异步API

为了实现最佳价格查询器应用,让咱们从每一个商店都应该提供的API定义入手。首先,商店应该声明依据指定产品名称返回价格的方法:

public double getPrice(String product) {
    // 待实现
}

该方法的内部实现会查询商店的数据库,但也有可能执行一些其余耗时的任务,好比联系其余外部服务(好比,商店的供应商,或者跟制造商相关的推广折扣)。咱们在本章剩下的内容中,采用delay方法模拟这些长期运行的方法的执行,它会人为地引入1秒钟的延迟,方法声明以下。

public class Util {
    public static void delay() {
        int delay = 1000;
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

为了介绍本章的内容,getPrice方法会调用delay方法,并返回一个随机计算的值,代码清单以下所示。返回随机计算的价格这段代码看起来有些取巧。它使用charAt,依据产品的名称,生成一个随机值做为价格。

public class Shop {
    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

很明显,这个API的使用者(这个例子中为最佳价格查询器)调用该方法时,它依旧会被阻塞。为等待同步事件完成而等待1秒钟,这是没法接受的,尤为是考虑到最佳价格查询器对网络中的全部商店都要重复这种操做。本章接下来的小节中,你会了解如何以异步方式使用同步API解决这个问题。可是,出于学习如何设计异步API的考虑,咱们会继续这一节的内容,伪装咱们还在深受这一困难的烦扰:你是一个睿智的商店店主,你已经意识到了这种同步API会为你的用户带来多么痛苦的体验,你但愿以异步API的方式重写这段代码,让用户更流畅地访问你的网站。

将同步方法转换为异步方法

为了实现这个目标,你首先须要将getPrice转换为getPriceAsync方法,并修改它的返回值:

public class Shop {
    ...
    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        }).start();
        return futurePrice;
    }
    ...
}

在这段代码中,你建立了一个表明异步计算的CompletableFuture对象实例,它在计算完成时会包含计算的结果。接着,你调用fork建立了另外一个线程去执行实际的价格计算工做,不等该耗时计算任务结束,直接返回一个Future实例。当请求的产品价格最终计算得出时,你可使用它的complete方法,结束completableFuture对象的运行,并设置变量的值。很显然,这个新版Future的名称也解释了它所具备的特性。使用这个API的客户端,能够经过下面的这段代码对其进行调用。

public class ShopMain {

    public static void main(String[] args) {
        Shop shop = new Shop("最好的商店");
        long start = System.nanoTime();
        Future<Double> futurePrice = shop.getPriceAsync("我最喜欢的商品");
        long invocationTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("调用时间 " + invocationTime);
        // 这里能够作其余的事情,好比查询其余的商店
        doSomethingElse();
        // 计算商品价格
        try {
            double price = futurePrice.get();
            System.out.printf("价格是 %.2f%n", price);
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
        long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("计算价格时间 " + retrievalTime);
    }

    private static void doSomethingElse() {
        System.out.println("正在查询其余的商店...");
    }
}

咱们看到这段代码中,客户向商店查询了某种商品的价格。因为商店提供了异步API,该次调用马上返回了一个Future对象,经过该对象客户能够在未来的某个时刻取得商品的价格。这种方式下,客户在进行商品价格查询的同时,还能执行一些其余的任务,好比查询其余家商店中商品的价格,不会呆呆地阻塞在那里等待第一家商店返回请求的结果。最后,若是全部有意义的工做都已经完成,客户全部要执行的工做都依赖于商品价格时,再调用Future的get方法。执行了这个操做后,客户要么得到Future中封装的值(若是异步任务已经完成),要么发生阻塞,直到该异步任务完成,指望的值可以访问。上面的代码中,输出的结果:

调用时间 116
正在查询其余的商店...
价格是 49107.07
计算价格时间 1172

你必定已经发现getPriceAsync方法的调用时间远远早于最终价格计算完成的时间,在以前的代码,你还会知道咱们有可能避免发生客户端被阻塞的风险。实际上这很是简单,Future执行完毕能够发送一个通知,仅在计算结果可用时执行一个由Lambda表达式或者方法引用定义的回调函数。不过,咱们当下不会对此进行讨论,如今咱们要解决的是另外一个问题:如何正确地管理异步任务执行过程当中可能出现的错误。

错误处理

若是没有意外,咱们目前开发的代码工做得很正常。可是,若是价格计算过程当中产生了错误会怎样呢?很是不幸,这种状况下你会获得一个至关糟糕的结果:用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会致使等待get方法返回结果的客户端永久地被阻塞。

客户端可使用重载版本的get方法,它使用一个超时参数来避免发生这样的状况。这是一种值得推荐的作法,你应该尽可能在你的代码中添加超时判断的逻辑,避免发生相似的问题。使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会获得通知发生了TimeoutException。不过,也由于如此,你不会有机会发现计算商品价格的线程内到底发生了什么问题才引起了这样的失效。为了让客户端能了解商店没法提供请求商品价格的缘由,你须要使用CompletableFuture的completeExceptionally方法将致使CompletableFuture内发生问题的异常抛出。对代码优化后的结果以下所示。

public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            // 若是价格计算正常结束,完成Future操做并设置商品价格
            futurePrice.complete(price);
        } catch (Exception e) {
            // 不然就抛出致使失败的异常,完成此次Future操做
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;
}

客户端如今会收到一个ExecutionException异常,该异常接收了一个包含失败缘由的Exception参数,即价格计算方法最初抛出的异常。因此,举例来讲,若是该方法抛出了一个运行时异常“product not available”,客户端就会获得像下面这样一段ExecutionException:

java.util.concurrent.ExecutionException: java.lang.RuntimeException: product
    not availableat java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)
    at xin.codedream.java8.chap11.AsyncShopClient.main(AsyncShopClient.java:14)
    ... 5 more
Caused by: java.lang.RuntimeException: product not available
    at xin.codedream.java8.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)
    atxin.codedream.java8.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23)
    at xin.codedream.java8.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:744)

使用工厂方法supplyAsync建立CompletableFuture
目前为止咱们已经了解了如何经过编程建立CompletableFuture对象以及如何获取返回值,虽然看起来这些操做已经比较方便,但还有进一步提高的空间,CompletableFuture类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担忧实现的细节。好比,采用supplyAsync方法后,你能够用一行语句重写getPriceAsync方法,以下所示。

public Future<Double> getPriceAsync(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

太棒了!七八行才能实现的功能,咱们如今只须要一行就能够搞定了!supplyAsync方法接受一个生产者(Supplier)做为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,可是你也可使用supplyAsync方法的重载版本,传递第二个参数指定不一样的执行线程执行生产者方法。通常而言,向CompletableFuture的工厂方法传递可选参数,指定生产者方法的执行线程是可行的,在后面,你会使用这一能力,后面咱们将使用适合你应用特性的执行线程改善程序的性能。

接下来剩余部分中,咱们会假设你很是不幸,没法控制Shop类提供API的具体实现,最终提供给你的API都是同步阻塞式的方法。这也是当你试图使用服务提供的HTTP API时最常发生的状况。你会学到如何以异步的方式查询多个商店,避免被单一的请求所阻塞,并由此提高你的“最佳价格查询器”的性能和吞吐量。

让你的代码免受阻塞之苦

因此,你已经被要求进行“最佳价格查询器”应用的开发了,不过你须要查询的全部商店都如上面开始时介绍的那样,只提供了同步API。换句话说,你有一个商家的列表,以下所示:

public class BestPriceFinder {
    private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"));
    ...
}

你须要使用下面这样的签名实现一个方法,它接受产品名做为参数,返回一个字符串列表,这个字符串列表中包括商店的名称、该商店中指定商品的价格:

public List<String> findPrices(String product);

你的第一个想法多是使用咱们在前面的章节中学习的Stream特性。你可能试图写出相似下面这个代码(是的,做为第一个方案,若是你想到这些已经至关棒了!)。

好吧,这段代码看起来很是直白。如今试着用该方法去查询你最近这些天疯狂着迷的惟一产品(是的,你已经猜到了,它就是Old-Mi-Mix3)。此外,也请记录下方法的执行时间,经过这些数据,咱们能够比较优化以后的方法会带来多大的性能提高,具体的代码以下。

public class BestPriceFinder {
    private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"));

    public static void main(String[] args) {
        BestPriceFinder finder = new BestPriceFinder();
        finder.testFindPrices();
    }

    public void testFindPrices() {
        long start = System.nanoTime();
        System.out.println(findPrices("Old-Mi-Mix3"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("完成时间 " + duration);
    }

    public List<String> findPrices(String product) {
        return shops.stream()
                .map(shop -> String.format("%s 价格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }
}

输出结果:

[BestPrice 价格 109.64, LetsSaveBig 价格 143.13, MyFavoriteShop 价格 175.50, BuyItAll 价格 154.20]
完成时间 4184

正如你预期的,findPrices方法的执行时间仅比4秒钟多了那么几百毫秒,由于对这4个商店的查询是顺序进行的,而且一个查询操做会阻塞另外一个,每个操做都要花费大约1秒左右的时间计算请求商品的价格。你怎样才能改进这个结果呢?

使用并行流对请求进行并行操做

若是你看了第七章的笔记,那么你应该想到的第一个,可能也是最快的改善方法是使用并行流来避免顺序计算,以下所示。

public List<String> findPricesParallel(String product) {
    return shops.parallelStream()
            .map(shop -> String.format("%s 价格 %.2f",
                    shop.getName(), shop.getPrice(product)))
            .collect(toList());
}

运行代码,与最初的代码执行结果相比较,你发现了新版findPrices的改进了吧。

[BestPrice 价格 109.64, LetsSaveBig 价格 143.13, MyFavoriteShop 价格 175.50, BuyItAll 价格 154.20]
完成时间 1248

至关不错啊!看起来这是个简单但有效的主意:如今对四个不一样商店的查询实现了并行,因此完成全部操做的总耗时只有1秒多一点儿。你能作得更好吗?让咱们尝试使用刚学过的CompletableFuture,将findPrices方法中对不一样商店的同步调用替换为异步调用。

使用CompletableFuture 发起异步请求

你已经知道咱们可使用工厂方法supplyAsync建立CompletableFuture对象。让咱们把它利用起来:

public List<CompletableFuture<String>> findPricesFuture(String product) {
    return shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 价格 %.2f",
                    shop.getName(), shop.getPrice(product))))
            .collect(toList());
}

使用这种方式,你会获得一个List<CompletableFuture<String>>,列表中的每一个CompletableFuture对象在计算完成后都包含商店的String类型的名称。可是,因为你用CompletableFutures实现的findPrices方法要求返回一个List<String>,你须要等待全部的future执行完毕,将其包含的值抽取出来,填充到列表中才能返回。

为了实现这个效果,你能够向最初的List<CompletableFuture<String>>施加第二个map操做,对List中的全部future对象执行join操做,一个接一个地等待它们运行结束。注意CompletableFuture类中的join方法和Future接口中的get有相同的含义,而且也声明在Future接口中,它们惟一的不一样是join不会抛出任何检测到的异常。使用它你再也不须要使用try/catch语句块让你传递给第二个map方法的Lambda表达式变得过于臃肿。全部这些整合在一块儿,你就能够从新实现findPrices了,具体代码以下。

public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 价格 %.2f",
                    shop.getName(), shop.getPrice(product))))
            .collect(toList());

    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

运行下代码了解下第三个版本findPrices方法的性能,你会获得下面这几行输出:

[BestPrice 价格 109.64, LetsSaveBig 价格 143.13, MyFavoriteShop 价格 175.50, BuyItAll 价格 154.20]
完成时间 2207

这个结果让人至关失望,不是吗?超过2秒意味着利用CompletableFuture实现的版本,比刚开始的代码中的原生顺序执行且会发生阻塞的版本快。可是它的用时也差很少是使用并行流的前一个版本的两倍。尤为是,考虑到从顺序执行的版本转换到并行流的版本只作了很是小的改动,就让人更加沮丧。

与此造成鲜明对比的是,咱们为采用CompletableFutures完成的新版方法作了大量的工做!但,这就是所有的真相吗?这种场景下使用CompletableFutures真的是浪费时间吗?或者咱们可能漏掉了某些重要的东西?继续往下探究以前,让咱们休息几分钟,尤为是想一想你测试代码的机器是否足以以并行方式运行四个线程。

寻找更好的方案

并行流的版本工做得很是好,那是由于它能并行地执行四个任务,因此它几乎能为每一个商家分配一个线程。可是,若是你想要增长第五个商家到商店列表中,让你的“最佳价格查询”应用对其进行处理,这时会发生什么状况?

public class BestPriceFinder {
    private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"),
            new Shop("ShopEasy"));
    ...

    public List<String> findPricesParallel(String product) {
        return shops.parallelStream()
                .map(shop -> String.format("%s 价格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }

    public List<String> findPricesSequential(String product) {
        return shops.stream()
                .map(shop -> String.format("%s 价格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }


    public List<String> findPricesFuture(String product) {
        List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 价格 %.2f",
                        shop.getName(), shop.getPrice(product))))
                .collect(toList());

        return priceFutures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }
}

public class BestPriceFinderMain {

    private static BestPriceFinder bestPriceFinder = new BestPriceFinder();

    public static void main(String[] args) {
        execute("sequential", () -> bestPriceFinder.findPricesSequential("Old-Mi-Mix3"));
    }

    private static void execute(String msg, Supplier<List<String>> s) {
        long start = System.nanoTime();
        System.out.println(s.get());
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println(msg + " 完成时间 " + duration);
    }
}

绝不意外,顺序执行版本的执行仍是须要大约5秒多钟的时间,下面是执行的输出:

[BestPrice 价格 109.64, LetsSaveBig 价格 143.13, MyFavoriteShop 价格 175.50, BuyItAll 价格 154.20, ShopEasy 价格 147.92]
sequential 完成时间 5139

很是不幸,并行流版本的程序此次比以前也多消耗了差很少1秒钟的时间,由于能够并行运行(通用线程池中处于可用状态的)的四个线程如今都处于繁忙状态,都在对前4个商店进行查询。第五个查询只能等到前面某一个操做完成释放出空闲线程才能继续,它的运行结果以下:

[BestPrice 价格 163.19, LetsSaveBig 价格 141.77, MyFavoriteShop 价格 159.81, BuyItAll 价格 165.02, ShopEasy 价格 165.81]
parallel 完成时间 2106

CompletableFuture版本的程序结果如何呢?咱们也试着添加第5个商店对其进行了测试,结果以下:

[BestPrice 价格 144.31, LetsSaveBig 价格 142.49, MyFavoriteShop 价格 146.99, BuyItAll 价格 132.52, ShopEasy 价格 139.15]
composed CompletableFuture 完成时间 2004

CompletableFuture版本的程序彷佛比并行流版本的程序还快那么一点儿。可是最后这个版本也不太使人满意。好比,若是你试图让你的代码处理9个商店,并行流版本耗时3143毫秒,而CompletableFuture版本耗时3009毫秒。它们看起来不相伯仲,究其缘由都同样:它们内部采用的是一样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具备必定的优点,由于它容许你对执行器(Executor)进行配置,尤为是线程池的大小,让它以更适合应用需求的方式进行配置,知足程序的要求,而这是并行流API没法提供的。让咱们看看你怎样利用这种配置上的灵活性带来实际应用程序性能上的提高。

使用定制的执行器

就这个主题而言,明智的选择彷佛是建立一个配有线程池的执行器,线程池中线程的数目取决于你预计你的应用须要处理的负荷,可是你该如何选择合适的线程数目呢?

调整线程池的大小

你的应用99%的时间都在等待商店的响应,因此估算出的W/C比率为100。这意味着若是你指望的CPU利用率是100%,你须要建立一个拥有400个线程的线程池。实际操做中,若是你建立的线程数比商店的数目更多,反而是一种浪费,由于这样作以后,你线程池中的有些线程根本没有机会被使用。出于这种考虑,咱们建议你将执行器使用的线程数,与你须要查询的商店数目设定为同一个值,这样每一个商店都应该对应一个服务线程。不过,为了不发生因为商店的数目过多致使服务器超负荷而崩溃,你仍是须要设置一个上限,好比100个线程。代码清单以下所示。

private final Executor executor = Executors.newFixedThreadPool(100, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });

注意,你如今正建立的是一个由守护线程构成的线程池。Java程序没法终止或者退出一个正在运行中的线程,因此最后剩下的那个线程会因为一直等待没法发生的事件而引起问题。与此相反,若是将线程标记为守护进程,意味着程序退出时它也会被回收。这两者之间没有性能上的差别。如今,你能够将执行器做为第二个参数传递给supplyAsync工厂方法了。好比,你如今能够按照下面的方式建立一个可查询指定商品价格的CompletableFuture对象:

CompletableFuture.supplyAsync(() -> String.format("%s 价格 %.2f",
                        shop.getName(), shop.getPrice(product)), executor)

改进以后,使用CompletableFuture方案的程序处理5个商店结果:

[BestPrice 价格 144.31, LetsSaveBig 价格 142.49, MyFavoriteShop 价格 146.99, BuyItAll 价格 132.52, ShopEasy 价格 139.15]
composed CompletableFuture 完成时间 1004

这个例子证实了要建立更适合你的应用特性的执行器,利用CompletableFutures向其提交任务执行是个不错的主意。处理需大量使用异步操做的状况时,这几乎是最有效的策略。

并行——使用流仍是CompletableFutures?

目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操做开展工做,要么枚举出集合中的每个元素,建立新的线程,在CompletableFuture内对其进行操做。后者提供了更多的灵活性,你能够调整线程池的大小,而这能帮助你确保总体的计算不会由于线程都在等待I/O而发生阻塞。书中使用这些API的建议以下。

  • 若是你进行的是计算密集型的操做,而且没有I/O,那么推荐使用Stream接口,由于实现简单,同时效率也多是最高的(若是全部的线程都是计算密集型的,那就没有必要建立比处理器核数更多的线程)。
  • 反之,若是你并行的工做单元还涉及等待I/O的操做(包括网络链接等待),那么使用CompletableFuture灵活性更好,你能够像前文讨论的那样,依据等待/计算,或者W/C的比率设定须要使用的线程数。这种状况不使用并行流的另外一个缘由是,处理流的流水线中若是发生I/O等待,流的延迟特性会让咱们很难判断到底何时触发了等待。

如今你已经了解了如何利用CompletableFuture为你的用户提供异步API,以及如何将一个同步又缓慢的服务转换为异步的服务。不过到目前为止,咱们每一个Future中进行的都是单次的操做。

代码

Github: chap11

Gitee: chap11

相关文章
相关标签/搜索