并发——组合式异步编程

简介

本章节的内容:java

一、建立异步计算并获取计算结果;算法

二、使用非阻塞操做提高吞吐量;数据库

三、设计和实现异步API;编程

同步API只是对传统方法调用的另外一种称呼:你调用了某个方法,调用方在被调用方运行的过程会等待,被调用方法运行结束返回,调用方取得被调用方的返回值并继续运行。即便调用方法和被调用方在不一样的线程运行,调用方仍是须要等待被调用方结束后才能继续运行,这就是阻塞式调用的由来。api

四、如何以异步的方式使用同步的API;数组

异步API会直接返回,或者至少在被调用方计算完成以前,将他剩余的计算任务交给另外一个线程去作,该线程和调用方是异步的——这就是非阻塞式调用的由来。执行剩余计算任务的线程会将他的计算结果返回给调用方。返回的方式要么是经过回调函数,要么是由调用方再次执行一个“等待,直到计算完成(Future的get方法)”的方法调用。这种方式的计算在I/O系统程序设计中很是常见。网络

五、如何对两个或多个异步操做进行流水线和合并操做;app

六、如何处理异步操做的完成状态。dom

一、Future接口

Future接口在Java5中被引入,设计初衷是对未来某个时刻会发生的结果进行建模。他建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操做吧调用线程解放出来,让他能继续执行其余有价值的工做,再也不须要呆呆等待耗时的操做完成。异步

打个比方,你去一些快餐店吃饭,付钱以后,他不会让你在排队窗口那里呆呆的等。而是给你一张号码牌,告诉你何时你的饭菜会作好。作饭的同时,你能够作其余的事情,和同事坐在餐桌边聊聊今年为何不涨工资种种。等到饭作好了,你再去取饭。

Future的另外一个有点是他比更底层的Thread更易用。要使用Future,一般只须要将耗时的操做封装在一个Callable对象中,再将它提交给ExecutorService就好了。例如

package ch11;
import java.util.concurrent.*;
public class FutureTest {
    public static void main(String[] args) {
        // 建立ExecutorService,经过他你能够像线程池提交任务
        ExecutorService executorService = Executors.newCachedThreadPool();

        // 提交任务到线程池中异步执行;
        // 1.提交的是一个Callable(有返回值)或Raunable(无返回值)对象;
        // 2.你的主要任务应该是写在Callable对象的call方法之中;
        // 3.泛型参数即为计算结果的类型;
        // 4.该执行过程是异步的,也就是说,此处的代码不会产生阻塞,你能够在以后的代码中运行其余的操做;
        Future<Integer> future = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() {
                Integer result = 0;
                System.out.println("Run some compute that can use more time...");
                return result;
            }
        });

        // 异步操做进行的同时,你能够作其余事情
        System.out.println("Do other things...");

        // 获取异步操做的结果,此处会产生阻塞,但能够设置等待时间
        try {
            // V get(long timeout, TimeUnit unit)
            // 获取异步操做的结果,若是最终被阻塞,没法获得结果,那么将会
            // 在对坐等待1秒以后退出。
            future.get(1,TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            // 当前线程在等待过程当中被中断
            e.printStackTrace();
        } catch (ExecutionException e) {
            // 计算抛出一个异常
            e.printStackTrace();
        } catch (TimeoutException e) {
            // 超时异常
            e.printStackTrace();
        }
    }
}

若是你已经运行到没有异步操做的结果就没法在进行下一步操做的时候,就能够考虑调用Future的get方法去获取操做的结果。若是操做已经完成该方法会当即获得操做结果,不然他会阻塞你的线程,直到操做完成,返回相应的结果。

可是无参的get方法是没有超时参数的,这可能会致使线程的永久阻塞,所以,推荐使用带有超时参数的get方法。除非你对你的程序有着“不可能计算出现问题”的自信。

1.一、Future接口的局限性

很明显,若是有多个Future在程序中出现,那么咱们很难表述Future之间的依赖性。从文字上虽然能够这样表述:“当长时间计算任务完成时,请将该计算的结果通知到另外一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另外一个查询操做结果合并”。可是,使用Future中提供的方法完成这样的操做又是另外一回事儿。这也是咱们须要更具描述能力的特性的缘由,好比:

一、两个两个异步计算你们的结果合并为一个 —— 这两个异步计算之间相互独立,同时第二个有依赖于第一个的结果;

二、等待Future集合中的全部任务都完成;

三、仅等待Future集合中最快结束的任务完成(有可能由于他们试图经过不一样的方式计算同一个值);

四、经过编程方式完成一个Future任务的执行(即以手工设定异步操做的方式)

五、应对Future的完成事件。(即当Future的完成事件发生时会受到通知,并能使用Future计算的结果进行下一步的操做,不仅是简单的橘色等待操做的结果)。

1.二、使用CompletableFuture构建异步应用

接下来咱们会使用CompletableFuture构建异步应用,该应用大体就是一个获取顾客服务、商店打折价等。

二、实现异步API

若是您开发过集成式的应用程序,会深有感悟,不少获取结果的操做一般不是咱们练习的时候信手捏来的值,而是经过各类外部服务获取,例如:

  • 数据库访问
  • ES访问
  • 分布式文件系统访问
  • 复杂算法推导

等等,当目标查询量、数据集过大时,这一般是一个耗时的操做。因为咱们的重点不在这里,所以解析来会使用delay方法模拟这个耗时的过程,从而体现出使用异步任务相比较传统的同步方法的优点。delay方法以下所示,其实就是将当前线程休眠一段时间

public static void delay(){
    try{
        // sleep 1 sec.
        Thread.sleep(1000L);
    }catch(InterruptedExcetion e){
        throw new RuntimeException(e);
    }
}

接下来咱们编写一个同步方法的应用方法,该方法用于查询一件商品的价格

package ch11;

import java.util.Random;

/**
 * 商店类
 */
public class Shop {
    private Random random;
    private String shopName;

    public Shop(String shopName) {
        this.shopName = shopName;
        this.random = new Random();
    }

    // 获取商店名称
    public String getShopName() {
        return shopName;
    }

    /**
     * 模拟延迟操做
     */
    public static void delay(){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            System.out.println("sleep now thread error:" + e.getMessage());
            e.printStackTrace();
        }
    }

    public double getPrice(String product){
        return calculatePrice(product);
    }

    /**
     * 经过商品的前两个字符和一个随机数作乘积得到价格
     * @param product
     * @return
     */
    private double calculatePrice(String product){
        // delay times.
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

记住,delay模拟的是一个耗时的过程。很明显,这个类的使用者每次调用getPrice方法时,都会为的古代这个同步事件完成而等待1秒钟。这是没法接收的,尤为是考虑到价格查询器对网络中的全部的商店都重复这样的操做。

所以,咱们但愿使用异步的API重写这段代码。

2.一、将同步方法转换为异步方法

使用咱们提到的Future转换方法

public Future<Double> getPriceAsync(String product){
    ...
}

Future能够理解为一个暂时还不可知道结果的处理器,这个结果在计算完成后,能够经过调用该Future对象的get方法取得。

由于这样的设计,getPriceAsync能够当即返回,调用线程能够有机会在同一时间去执行其余的有价值的计算任务。

新的CompleteableFuture提供了不少的方法,支持咱们执行各类各样的操做。例如:

public Future<Double> getPriceAsync(String product){
        // 建立一个CompletableFuture对象,他会包含计算的结果
        CompletableFuture<Double> future = new CompletableFuture<>();

        // 在另外一个线程中以异步的方法执行计算
        new Thread(() -> {
            double price = calculatePrice(product);
            // 需长时间计算的任务结束并得出结果时,设置Future的返回值。
            future.complete(price);
        }).start();
        // 无需等待结果,直接返回Future对象
        return future;
    }

接下来咱们测试该异步任务的表现

package ch11;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class CFutureTest {
    public static void main(String[] args) {
        Shop kfc = new Shop("KFC");

        // 得到任意时间(纳秒)
        long start = System.nanoTime();

        Future<Double> hamburgerPriceFuture = kfc.getPriceAsync("hamburger");

        long usedTimes = System.nanoTime() - start;

        //
        System.out.println("异步任务返回花费了" + usedTimes/1_000_000 + " 毫秒的时间;");

        System.out.println("执行其余的任务...");

        try {
            Double price = hamburgerPriceFuture.get();
            System.out.println("Price is " + price);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        usedTimes = System.nanoTime() - start;
        System.out.println("得到异步任务结果花费了" + usedTimes/1_000_000 + "毫秒的时间");
        /**
         * 异步任务返回花费了55 毫秒的时间;
         * 执行其余的任务...
         * Price is 117.5786113274089
         * 得到异步任务结果花费了1058毫秒的时间
         */
    }
}

2.二、错误处理

若是价格计算的过程当中出现了错误,咱们该怎么办?

很是不幸的是,这种状况下,咱们会获得一个至关严重的结果,用于提示错误的异常会被抑制(java core 异常章节有讲述抑制的概念)在视图计算商品价格的当前线程的范围内。最终会杀死该线程,而这会致使等待get方法返回结果的客户端永久阻塞。

为此,客户端须要使用重载版本的get方法。这样,在指定的时间内没有计算出结果时,提早获知超时消息。

但这也有一个问题,那就是咱们没法获知到底发生了什么错误,这对于应用程序开发者来讲是一个很严重的问题。为了让客户端知道发生了什么错误,咱们可使用CompletableFuture的completeExceptionally方法将致使CompletableFuture内发生的问题异常抛出。

基于此,咱们来改写以前的代码(新增一个优化方法)

public Future<Double> getPriceAsync2(String product){
        CompletableFuture<Double> future = new CompletableFuture<>();
        new Thread(() -> {
            try{
                double price = calculatePrice(product);
                // 若是计算正常结束,则完成complete操做并设置商品价格;
                future.complete(price);
            }catch (Exception e){
                // 不然,抛出致使失败的异常,完成此次Future操做。
                future.completeExceptionally(e);
            }
        }).start();
        return future;
    }

咱们能够传入一个空的字符串做为商品名称参数来求取结果,显然会发生数组越界的异常。

Future<Double> hamburgerPriceFuture = kfc.getPriceAsync2("");

若是咱们调用以前的getPriceAsync方法,此时程序会被阻塞。

可是若是咱们调用getPriceAsync2则会获得异常的缘由,以及主线程及时的终止

异步任务返回花费了55 毫秒的时间;
执行其余的任务...
得到异步任务结果花费了1057毫秒的时间
java.util.concurrent.ExecutionException: java.lang.StringIndexOutOfBoundsException: String index out of range: 0
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at ch11.CFutureTest.main(CFutureTest.java:23)
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 0
	at java.lang.String.charAt(String.java:658)
	at ch11.Shop.calculatePrice(Shop.java:76)
	at ch11.Shop.lambda$getPriceAsync2$1(Shop.java:57)
	at java.lang.Thread.run(Thread.java:745)

Process finished with exit code 0

2.三、更优雅的编码方式

前面调用CompletableFutre的过程当中,咱们会明显的感受到调用比较繁琐。其实还有优雅的建立、调用方式。即便用工厂方式取代以前的建立方式,咱们改写以前的方法:

public Future<Double> getPriceAsync3(String product){
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }

supplyAsync接受一个生产者做为参数,返回一个CompletableFuture对象,该对象是完成异步执行后会读取调用生产者方法的返回值。

须要注意的是,该使用方式的效果和咱们提供了错误管理机制的getPriceAsync2是同样的,很明显,他优雅的许多。

三、让代码免受阻塞之苦

如今咱们解决了查询一个商店的需求了,但新的需求来了,须要咱们查询一个列表里面的商店对于指定的商品,到底哪一家的更便宜。

显然,这须要咱们分别取访问全部商店提供的查询指定商品价格的API。

假设如今有5家商店:

List<Shop> shops = Arrays.asList(new Shop("shop1"),
        new Shop("shop2"),
        new Shop("shop3"),
        new Shop("shop4"),
        new Shop("shop5"));

针对该问题,咱们来了三位参赛者A、B和C。他们提出了各自的解决方案,咱们分别来看看他们的方案以及效果如何。

3.一、前两位参赛者:顺序流、并行流

A为咱们提出了一个很是直接的解决方案,即便用java8的顺序流,咱们来看看其方案的源码:

package ch11;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public class CFutureTest2 {
    public static void main(String[] args) {
        String product = "hamburger";
        List<Shop> shops = Arrays.asList(new Shop("shop1"),
                new Shop("shop2"),
                new Shop("shop3"),
                new Shop("shop4"),
                new Shop("shop5"));

        long start = System.nanoTime();

        List<String> list = shops.stream().map(shop -> {
            return String.format("%s 的对应的价格为 %.2f", shop.getShopName(), shop.getPrice(product));
        }).collect(Collectors.toList());

        System.out.println(list);

        System.out.println("=== 执行效果 ===");
        System.out.println("该方式共花费时间(毫秒): " + (System.nanoTime() - start)/1_000_000);
        /**
         * [shop1 的对应的价格为 145.81, shop2 的对应的价格为 136.21, shop3 的对应的价格为 191.72, shop4 的对应的价格为 158.95, shop5 的对应的价格为 152.92]
         * === 执行效果 ===
         * 该方式共花费时间(毫秒): 5110
         */
    }
}

由于是顺序执行的缘由,所以5s+的时间大概是顺序的从这5家商店中依次获取商品价格所花费的时间。

咱们的第一位方案提出者A,完成需求花费时间大约为5110毫秒

接下来,B很是的志得意满,对A的结果嗤之以鼻。他说,这种状况彻底可使用java 8的并行流,A呐,既然学过java 8,不会不知道这东西吧,只须要修改你的stream方法为parallelStream就能够获得意想不到的结果了:

List<String> list = shops.parallelStream().map(shop -> {
            return String.format("%s 的对应的价格为 %.2f", shop.getShopName(), shop.getPrice(product));
        }).collect(Collectors.toList());
        /**
         * [shop1 的对应的价格为 176.59, shop2 的对应的价格为 184.50, shop3 的对应的价格为 141.36, shop4 的对应的价格为 143.61, shop5 的对应的价格为 131.53]
         * === 执行效果 ===
         * 该方式共花费时间(毫秒): 2098
         */

取得了不错的效果,只花费了2098毫秒。看来B并非装腔做势,由于对5个商店的查询采起了并行操做,足足比以前的顺序流快了不止一倍的时间。

咱们的第二位方案提出者B,花费时间为2098毫秒

因为价格是随机生成的,所以显然和以前的例子价格会有所不一样。但这不是重点。

问题来了,咱们还能够作得更好吗?C会带来什么样的解决方案了,咱们下一节拭目以待。

3.二、第三位参赛者:使用CompletableFuture发起异步请求

C很是同意并行的处理方式,不过他以为,B的这种方式有点太过极端,并且应对一些比较特殊的状况没法优雅的处理,甚至没法知足需求。

咱们还能够作得更好。可使用CompletableFuture,再进一步的对查询方法进行优化。

咱们不妨看看C的源码,看看是否能取得更好的成绩。

List<String> list = shops.stream().map(shop -> {
    return CompletableFuture.supplyAsync(() -> {
        return String.format("%s 的对应的价格为 %.2f", shop.getShopName(), shop.getPrice(product));
    });
}).map(CompletableFuture::join).collect(Collectors.toList());
System.out.println(list); 
// 该方式共花费时间(毫秒): 5095

join和get的效果基本一致,惟一的区别在于,使用join方法不会抛出任何检测到的异常,所以咱们没必要在使用try/catch语句。

5秒了,但C立马解释说,之因此花费这么多时间是因为stream的延迟特性致使的,所以,须要使用两个不一样的流水线,而不是一个,一个流水线会致使发现不一样商家的请求只能以同步、顺序执行的方式才会成功。所以,每一个建立CompletableFuture对象只能在前一个操做结束以后执行查询指定商家的动做、通知join方法返回计算结果。

为了不这种状况的发生,咱们应该使用两个不一样的map流水线。以下所示:

List<CompletableFuture<String>> futures = shops.stream().map(shop -> {
    return CompletableFuture.supplyAsync(() -> {
        return String.format("%s 的对应的价格为 %.2f", shop.getShopName(), shop.getPrice(product));
    });
}).collect(Collectors.toList());

List<String> list = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
// === 执行效果 ===
// 该方式共花费时间(毫秒): 2133

2133毫秒,期待着获得不错成绩的咱们以为有些失望。这意味着C的解决方案也不见得比B好到哪里去,甚至显得有些昂长。

C真的不比B好吗?C的作法不少余吗?咱们接着往下看。

3.三、寻求更好的方案

咱们知道,并行处理任务数,通常是和CPU的核数对等的,例如,我当前的这台电脑的核数是4

System.out.print("CPU核数:" + Runtime.getRuntime().availableProcessors());
 // CPU个数:4

这也就是意味着,从1开始计数,每当个人商店数增长4个,那么针对上述的方案B和C,执行时间都会增长一秒。由于B和C内部采用的都是一样的通用线程池,默认都是用固定数目的线程,具体数目取决于上述测试代码,即Runtime.getRuntime().availableProcessors()的返回值。然而CompletableFuture具备必定的优点,由于他容许你对执行器(Executor)进行配置,尤为是线程池的大小,让他以更适合应用需求的方式进行配置,知足程序的要求,这些,咱们的并行流是没法提供的。

3.四、使用定制的执行器

前面提到,咱们能够建立一个配有线程池的执行器,线程池中线程的数目取决于你预计你的应用须要处理的符合,可是你该如何选择合适的线程数目呢?

线程数目的多少是很是讲究的?设置的太小,如咱们以前所面临的问题同样,没法充分的利用处理器的性能,若是业务比较频繁,还会致使过多的任务面临排队过长的情况;相反,设置的过大,会致使他们竞争稀缺的处理器和内存资源,浪费大量的时间用在上下文的切换上。

估算线程池大小的公式 $$ N = N_{cpu} * U_{cpu} * (1 + \frac{W}{C}) $$ 其中:

  • $N_{cpu}$:是CPU的核数,例如咱们上面看到的4核;
  • $U_{cpu}$: 是指望的CPU利用率
  • $\frac{W}{C}$ 是等待时间(wait time)与计算时间(compute time)的比率

针对该公式,显然咱们的计算时间W/C约为100倍,假设须要CPU利用率为100%,咱们显然要建立包含400个线程的线程池。

实际操做中,其实咱们只需商店数目的线程数就能够了。

不过,为了不商店数目过多,咱们能够给其设定一个上限值100。以下所示:

final ExecutorService executorService = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                // 设置为守护进程
                t.setDaemon(true);
                return t;
            }
        });

List<CompletableFuture<String>> futures = shops.stream().map(shop -> {
    //  public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
    return CompletableFuture.supplyAsync(() -> {
        return String.format("%s 的对应的价格为 %.2f", shop.getShopName(), shop.getPrice(product));
    },executorService);
}).collect(Collectors.toList());

也就是说,咱们将执行器做为第二个参数传递给了supplyAsync方法。

改进以后,看一下时间的花费

=== 执行效果 ===
该方式共花费时间(毫秒): 1091

只需1091毫秒。

3.五、使用并行流仍是CompletableFuture?

目前为止,咱们知道了对集合进行并行计算的两种方式:

  • 转化为并行流,利用map展开工做;

  • 枚举集合的每个元素,建立新的线程,在CompletableFuture内对其进行操做。

后者提供了更好的灵活性,支持咱们调整线程池的大小,也就是觉得这,确保总体的计算不会由于线程都在等待I/P而发生阻塞。

那么,在实际状况中该选择哪种呢?建议以下:

  1. 若是进行的是计算密集型的操做,而且没有I/O,那么推荐使用stream接口。由于实现简单,同时效率也是最高的。

  2. 若是并行的工做单元还涉及到等到I/O的操做,例如网络链接等,那么使用CompletableFuture更好,你能够像咱们讨论的那样,依据等待/计算,或者W/C的比率设定须要使用的线程数。这种状况下不适用并行流的缘由还有一个,那就是处理刘的流水线中若是发生I/O等待,流的延迟特性会让咱们很难判断到底何时触发了等待。

四、对多个异步任务进行流水线操做

接下来咱们定义一个折扣服务,该折扣服务提供了五个不一样的折扣代码,每一个折扣代码对应不一样的折扣率。

具体代码以下所示:

package ch12;

public class Discount {
    public enum Code{
        NONE(0),SILVER(5),GOLD(10),PLATINUM(15),DIAMOND(20);
        private final int percentage;
        Code(int percentage){
            this.percentage = percentage;
        }
    }
    // other code...
}

接下来,咱们修改以前的getPrice方法,将其返回的价格形式改变

public String getPrice(String product){
        double price = calculatePrice(product);
        // 从五中折扣中随即选取一种
        Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
        // shopName:price:discountCode
        return String.format("%s:%.2f:%s",shopName,price,code);
    }

4.一、实现折扣服务

咱们从商店的getPrice方法中能够得到shopName:price:discountCode,接下来咱们须要知道具体的折扣价格,所以编写一个类来进行解析:

package ch12;

public class Quote {
    private final String shopName;
    private final double price;
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code discountCode) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = discountCode;
    }

    /**
     * 解析方法
     * @param str shopName:price:discountCode
     * @return
     */
    public static Quote parse(String str){
        String[] tokens = str.split(":");
        return new Quote(tokens[0],Double.parseDouble(tokens[1]),Discount.Code.valueOf(tokens[2]));
    }

    public String getShopName() {
        return shopName;
    }

    public double getPrice() {
        return price;
    }

    public Discount.Code getDiscountCode() {
        return discountCode;
    }
}

另外,咱们还在Discount类中添加了一个方法,他接收一个Quote对象,返回一个字符串,表示生成该Quote的商店中的折扣价格:

/**
     * 将折扣应用于商品最初的原始价格
     * @param q
     * @return
     */
    public static String applyDiscount(Quote q){
        return q.getShopName() + "的价格是:" + Discount.apply(q.getPrice(),q.getDiscountCode());
    }

    /**
     * 获取折扣以后的价格,模拟了延迟
     * @param price
     * @param code
     * @return
     */
    private static double apply(double price, Code code){
        delay();
        return price * (100 - code.percentage)/100;
    }

    /**
     * 模拟延迟操做
     */
    public static void delay(){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            System.out.println("sleep now thread error:" + e.getMessage());
            e.printStackTrace();
        }
    }

4.二、使用折扣服务

因为Discount也是一种远程服务,所以咱们在以前他的时候模拟了一秒钟的延迟时间。接下来咱们使用和以前同样的几种方式来使用该服务。

一、同步执行:

package ch12;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class Test1 {

    List<Shop> shops = Arrays.asList(new Shop("shop1"),
            new Shop("shop2"),
            new Shop("shop3"),
            new Shop("shop4"));

    public static void main(String[] args) {
        long start = System.nanoTime();
        System.out.println(new Test1().getPrices("sss"));
        System.out.println("花费时间为(毫秒): " + (System.nanoTime() - start) / 1_000_000);
        /**
         * [shop1的价格是:190.06850000000003, shop2的价格是:218.77, shop3的价格是:134.43, shop4的价格是:202.89149999999998]
         * 花费时间为(毫秒): 8174
         */
    }

    public List<String> getPrices(String product){
        return shops.stream().map(shop -> shop.getPrice(product))
                .map(Quote::parse)
                .map(Discount::applyDiscount)
                .collect(Collectors.toList());
    }
}

差很少8秒的时间,在预料以内。

二、使用并行流优化

很明显,当前4个商店,咱们的电脑拥有4个核,所以,并行流操做花费的时间应该是1+1=2S左右,咱们修改代码以下:

public static void main(String[] args) {
        long start = System.nanoTime();
        System.out.println(new Test2().getPrices("sss"));
        System.out.println("花费时间为(毫秒): " + (System.nanoTime() - start) / 1_000_000);
        /**
         * [shop1的价格是:206.37, shop2的价格是:224.38, shop3的价格是:160.1315, shop4的价格是:178.02900000000002]
         * 花费时间为(毫秒): 2105
         */
    }

和预估的差很少。但一样的,并行流的方式没法控制线程数量,所以会在商店数目上升以后变得力不从心,也就是扩展性并非很好。

4.三、构造同步和异步操做

接下来使用Completable提供的特性改造代码。

public List<String> getPrices(String product){
    ExecutorService executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });

    List<CompletableFuture<String>> priceFuture = shops.stream()
            // 以异步方式取得每一个shop中指定产品的shopName:xxx
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product),executor))
            // 而后解析返回的串
            .map(f -> f.thenApply(Quote::parse))
            // 使用另外一个异步任务构造指望的future,申请折扣
            .map(f -> f.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote),executor)))
            .collect(Collectors.toList());

    // 等待流中的全部Future执行完毕,并提取各自的返回值
    return priceFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
    /**
    * [shop1的价格是:111.682, shop2的价格是:164.6875, shop3的价格是:97.20800000000001, shop4的价格是:147.77]
    * 花费时间为(毫秒): 2115
    */
}

须要注意里面出现的两个新方法,thenApply以及thenCompose:

  • thenApply() 是返回的是CompletableFuture类型:它的功能至关于将CompletableFuture<T>转换成CompletableFuture<U>

  • thenCompose() 用来链接两个CompletableFuture,返回值是新的CompletableFuture

也就是说,thenApply() 转换的是泛型中的类型,是同一个CompletableFuture; thenCompose()用来链接两个CompletableFuture,是生成一个新的CompletableFuture。也就是说,在thenCompose()中通常会设计到CompletableFuture的建立代码。新生成的CompletableFuture使用先前的CompletableFuture做为输入。

4.四、链接两个CompletableFuture对象(不管是否存在依赖)

更常见的状况是将两个彻底不相干的CompletableFuture对象的结果整合起来(thenCombine)。

public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)

例如,若是某个商店返回的价格是以欧元为单位的,咱们的需求是获得美圆的结果,那么,咱们同时还须要去一个提供汇率的服务获取美圆和欧元之间的汇率,最后,将这两个结果进行组合(thenCombine),从而获得最终的以美圆为单位的结果。

package ch11;

import ch12.ExchangeService;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class CFutureTest5 {
    public static void main(String[] args) {
        Shop shop = new Shop("KFC");

        // 得到任意时间(纳秒)
        long start = System.nanoTime();
        CompletableFuture<Double> future = CompletableFuture
                .supplyAsync(() -> shop.getPrice("sss"))
                // 组合获取汇率的调用结果,第二个参数是 BiFunction类型,结合二者的结果,最终返回最后的答案。
                .thenCombine(CompletableFuture.supplyAsync(() -> ExchangeService.getRate("usa", "europe")), (price, rate) -> price * rate);
        try {
            System.out.printf("结果为:%.2f\r\n" ,future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("得到异步任务结果花费了" + (System.nanoTime() - start)/1_000_000 + "毫秒的时间");

    }
}

为了演示,咱们添加了一个计算汇率的简便方法

package ch12;

public class ExchangeService {
    public static double getRate(String firstCountry, String secondContry){
        // 1欧元=1.1233美圆
        // 其实这里应该模拟一下延迟的:delay()
        return 1.1233;
    }
}

固然,你也可使用theCombineAsync方法来启动一个新的线程执行结果的整合操做,可是这个计算显然很快速,所以没有必要在开启一个新的线程。

4.五、Future和CompletableFuture的回顾

如今咱们已经能够说,CompletableFuture相比较直接使用Java 7的Future的优点。不少时候,咱们若是采用java 7去编写相同的案例,状况会变得复杂不少。

可是如今还有一个小小的问题,那就是咱们调用get或者join时还说会形成线程阻塞,直到CompletableFuture完成后才会继续往下执行。

而接下来,咱们针对这个问题,来学习一下CompletableFuture的completion事件。

五、completion事件

所有完成

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {

一个完成便可

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {

thenAccept:在返回结果上应用一个回调操做,也就是说,该方法定义的操做会在CompletableFuture获得计算结果以后执行。

CompletableFuture<Void> thenAccept(Consumer<? super T> action)

因为thenAccept已经定义了对返回结果的操做,一旦计算到结果,thenAccept返回的就是一个CompletableFuture<Void>类型的结果,这对于咱们没有什么做用。

CompletableFuture[] completableFutures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 的对应的价格为 %.2f", shop.getShopName(), shop.getPrice(product)),
                        executorService))
                .map(f -> f.thenAccept(s -> System.out.println(s)))
                .toArray(size -> new CompletableFuture[size]);
        System.out.println("===");
        CompletableFuture.anyOf(completableFutures).join();
        /**
         * ===
         * shop5 的对应的价格为 186.41
         */

注意delay的时间设置成为了一个随机的值。这样咱们使用anyOf以后,则该计算会在第一个完成的任务以后直接再也不等待,往下执行。

总结

这一章节,咱们学习到的内容以下:

一、执行比较耗时的操做时,尤为是那些依赖一个或多个远程服务的操做,使用异步任务能够改善程序的性能,加快程序的响应速度;

二、你应该尽量的为客户提供异步API,使用CompletableFuture类提供的特性,可以轻松地实现这一目标;

三、CompetableFuture类还提供了异常管理的机制,让咱们有机会抛出或者管理异步任务执行时产生的异常;

四、将同步api的调用封装到一个CompletableFuture中,你可以以异步的方式使用其结果;

五、若是异步任务之间相互独立,或者他们之间某一些的结果是另外一些的输入,你能够将这些异步任务构造或者合并成一个;

六、你能够为CompletableFuture注册一个回调函数,在Future执行完毕或者他们计算的结果可用时,针对性的执行一些程序;

七、你能够决定在何时结束程序的运行,是等待由CompletableFuture对象构成的列表中全部的对象都执行完毕,仍是只要其中任何一个首先完成就终止程序的运行。

相关文章
相关标签/搜索