Java 8 (10) CompletableFuture:组合式异步编程

  随着多核处理器的出现,提高应用程序的处理速度最有效的方式就是能够编写出发挥多核能力的软件,咱们已经能够经过切分大型的任务,让每一个子任务并行运行,使用线程的方式,分支/合并框架(java 7) 和并行流(java 8)来实现。java

如今不少大型的互联网公司都对外提供了API服务,好比百度的地图,微博的新闻,天气预报等等。不多有网站或网络应用汇以彻底隔离的方式工做,而是采用混聚的方式:它会使用来自多个源的内容,将这些内容聚合在一块儿,方便用户使用。数据库

好比实现一个功能,你须要在微博中搜索某个新闻,而后根据当前坐标获取天气预报。这些调用第三方信息的时候,不想由于等待搜索新闻时,放弃对获取天气预报的处理,因而咱们可使用 分支/合并框架 及并行流 来并行处理,将他们切分为多个子操做,在多个不一样的核、CPU甚至是机器上并行的执行这些子操做。数组

相反,若是你想实现并发,而不是并行,或者你的主要目标是在同一个CPU上执行几个松耦合的任务,充分利用CPU的核,让其足够忙碌,从而最大化程序的吞吐量,那么你其实真正想作的是避免由于等待远程服务的返回,或者对数据库的查询,而阻塞线程的执行,浪费宝贵的计算资源,由于这种等待时间可能会很长。Future接口,尤为是它的新版实现CompletableFuture是处理这种状况的利器。网络

 

Future接口多线程

  Future接口在java 5中被引入,设计初衷是对未来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些可能会耗时的操做把调用线程解放出来,让它能继续执行其余工做,不用一直等待耗时的操做完成,好比:你拿了一袋子衣服到洗衣店去洗衣服,洗衣店会给你张发票,告诉你何时会洗好,而后你就能够去作其余的事了。Future的另外一个优势是它比更底层的Thread更容易使用。使用Future只须要讲耗时的操做封装在一个Callable对象中,再将它提交给ExecutorService就能够了。 Java 8以前使用Future的例子:并发

    public static void main(String[] args) {
        //建立Executor-Service,经过他能够向线程池提交任务
        ExecutorService executor = Executors.newCachedThreadPool();
        //向executor-Service提交 Callable对象
        Future<Double> future = executor.submit(new Callable<Double>() {
            @Override
            public Double call() throws Exception {
                //异步的方式执行耗时的操做
                return doSomeLongComputation();
            }
        });
        //异步时,作其余的事情
        doSomethingElse();

        try{
            //获取异步操做的结果,若是被阻塞,没法获得结果,那么最多等待1秒钟以后退出
            Double result = future.get(1, TimeUnit.SECONDS);
            System.out.print(result);
        } catch (InterruptedException e) {
            System.out.print("计算抛出一个异常");
        } catch (ExecutionException e) {
            System.out.print("当前线程在等待过程当中被中断");
        } catch (TimeoutException e) {
            System.out.print("future对象完成以前已过时");
        }

    }

    public static Double doSomeLongComputation() throws InterruptedException {
        Thread.sleep(1000);
        return 3 + 4.5;
    }

    public static void doSomethingElse(){
        System.out.print("else");
    }

这种方式能够再ExecutorService以并发的方式调用另一个线程执行耗时的操做的同时,去执行一些其余任务。接着到已经没有任务运行时,调用它的get方法来获取操做的结果,若是操做完成,就会返回结果,不然会阻塞你的线程,一直到操做完成,返回响应的结果。app

 

CompletableFuture框架

  在java 8 中引入了CompletableFuture类,它实现了Future接口,使用了Lambda表达式以及流水线的思想,经过下面这个例子进行学习,好比:咱们要作一个商品查询,根据折扣来获取价格。dom

public class Shop {
    public double getPrice(String product) throws InterruptedException {
        //查询商品的数据库,或连接其余外部服务获取折扣
        Thread.sleep(1000);
        return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

当调用这个方法时,它会阻塞进程,等待事件完成。异步

将同步方法转换成异步方法

    public Future<Double> getPriceAsync(String product){
        //建立CompletableFuture对象
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();

        new Thread (()->{
            try {
                //在另外一个线程中执行计算
                double price = getPrice(product);
                //须要长时间计算的任务结束并得出结果时,设置future的返回值
                futurePrice.complete(price);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        return futurePrice;
    }

而后能够这样调用:

        System.out.println("begin");
        Future<Double> futurePrice = shop.getPriceAsync("ss");
        System.out.println("doSomething");
        System.out.println(futurePrice.get());
System.out.println("end");

begin
doSomething
171.47509091822835
end

这个例子中,首先会调用接口 当即返回一个Future对象,在这种方式下,在查询价格的同时,还能够处理其余任务。最后全部的工做都已经完成,而后再调用future的get方法。得到Future中封装的值,要么发生阻塞,直到该任务异步任务完成,指望的值可以返回。

 

错误处理

若是没有意外,这个代码工做的会很是正常。可是若是计算价格的过程当中发生了错误,那么get会永久的被阻塞。这时可使用重载的get方法,让它超过一个时间后就强制返回。应该尽可能在代码中使用这种方式来防止程序永久的等待下去。超时会引起TimeoutException。可是这样会致使你没法知道具体什么缘由致使Future没法返回,这时须要使用CompletableFUture的completeExceptionally方法将致使CompletableFuture内发生的问题抛出。

public Future<Double> getPriceAsync(String product){
        //建立CompletableFuture对象
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();

        new Thread (()->{
            try {
                double price = getPrice(product);
                futurePrice.complete(price);
            } catch (Exception ex) {
                //抛出异常
 futurePrice.completeExceptionally(ex);
            }
        }).start();
        return futurePrice;
    }

调用时:

        System.out.println("begin");
        Future<Double> futurePrice = shop.getPriceAsync("ss");
        System.out.println("doSomething");
        try {
            System.out.println(futurePrice.get(1, TimeUnit.SECONDS));
        } catch (TimeoutException e) {
            System.out.print(e);
        }
        System.out.println("end");

设置超时时间,而后会将错误信息打印出来。

 

工厂方法supplyAsync建立CompletableFuture

使用工厂方法能够一句话来建立getPriceAsync方法

    public Future<Double> getPriceAsync(String product) {
        return CompletableFuture.supplyAsync(() -> getPrice(product));
    }

supplyAsync方法接受一个生产者(Supplier)做为参数,返回一个CompletableFuture对象,该对象完成异步执行后悔读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,也能够调用supplyAsync方法的重载版本,传入第二个参数指定不一样的线程执行生产者方法。 工厂方法返回的CompletableFuture对象也提供了一样的错误处理机制。

 

阻塞优化

例如如今有一个商品列表,而后输出一个字符串 商品名,价格 。

        List<Shop> shops = Arrays.asList(
                new Shop("one"),
                new Shop("two"),
                new Shop("three"),
                new Shop("four"));


        long start = System.nanoTime();
        List<String> str = shops.stream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList());
        System.out.print(str);
        long end = System.nanoTime();
        System.out.print((end - start) / 1000000);

[one price: 161.83, two price: 126.04, three price: 153.20, four price: 166.06]
4110

 

每次调用getPrice方法都会阻塞1秒钟,对付这种咱们可使用并行流来进行优化:

List<String> str = shops.parallelStream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList());

1137

 

明显速度提高了,如今对四个商品查询 实现了并行,因此只耗时1秒多点,下面咱们尝试CompletableFuture

List<CompletableFuture<String>> str2 = shops.stream().map(shop->
                        CompletableFuture.supplyAsync(
                                ()->String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName())))).collect(toList());

咱们使用工厂方法supplyAsync建立CompletableFuture对象,使用这种方式咱们会获得一个List<CompletableFuture<String>>,列表中的每个ComplatableFuture对象在计算完成后都会包含商品的名称。可是咱们要求返回的是List<String>,因此须要等待全部的future执行完毕,再将里面的值提取出来,填充到列表中才能返回。

List<String> str3 =str2.stream().map(CompletableFuture::join).collect(toList());

为了返回List<String> 须要对str2添加第二个map操做,对List中的全部future对象执行join操做,一个接一个的等待他们的运行结束。CompletableFuture类中的join和Future接口中的get方法有相同的含义,而且声明在Future接口中,惟一的不一样是join不会抛出任何检测到的异常。

1149

如今使用了两个不一样的Stream流水线,而不是在同一个处理流的流水线上一个接一个的防治两个map操做。考虑流操做之间的延迟特性,若是你在单一流水线中处理流,发向不一样商家的请求只能以同步、顺序执行的方式才会成功。所以每一个建立CompletableFuture对象只能在前一个操做结束以后,再join返回计算结果。

 

更好的解决方式

并行流的版本工做的很是好,那是由于他能够并行处理8个任务,获取操做系统线程数量:

System.out.print(Runtime.getRuntime().availableProcessors());

可是若是列表是9个呢?那么执行结果就会2秒。由于他最多只能让8个线程处于繁忙状态。 可是使用CompletableFuture容许你对执行器Executor进行配置,尤为是线程池的大小,这是并行流API没法实现的。

 

定制执行器

//建立一个线程池,线程池的数目为100何商店数目两者中较小的一个值
        final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setDaemon(true); //使用守护线程 ---这种方式不会阻止程序的关停
                        return t;
                    }
                });

这个线程池是一个由守护线程构成的线程池,Java程序没法终止或退出正在运行中的线程,因此最后剩下的那个线程会因为一直等待没法发生的事件而引起问题。与此相反,若是将线程标记为守护进程,意味着程序退出时它也会被回收。这两者之间没有性能上的差别。如今能够将执行器做为第二个参数传递给supplyAsync方法了。

CompletableFuture.supplyAsync(
                                ()->String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))
                                ,executor)

这时,执行9个商品时,执行速度只有1秒。 执行18个商品时也是1秒。这种状态会一直持续,直到商店的数目达到咱们以前计算的阀值。 处理须要大量使用异步操做的状况时,这几乎是最有效的策略。

 

对多个异步任务进行流水线操做

咱们在商品中增长一个枚举Discount.Code 来表明每一个商品对应不一样的折扣率,建立枚举以下:

public class Discount {
    public enum Code{
        NONE(0),
        SILVER(5),
        GOLD(10),
        PLATINUM(15),
        DIAMOND(20);

        private final int value;

        Code(int value){
            this.value = value;
        }
    }
}

如今咱们修改 getPrice方法的返回格式为:ShopName:price:DiscountCode 使用  : 进行分割的返回值。

    public String getPrice(String product){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Double price = new Random().nextDouble() * product.charAt(0) + product.charAt(1);
        Discount.Code code = Discount.Code.values()[new Random().nextInt(Discount.Code.values().length)];
        return String.format("%s:%.2f:%s",name,price,code);
    }

返回值: one:120.10:GOLDD

将返回结果封装到 Quote 类中:

public class Quote {
    private final String shopName;
    private final double price;
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code code) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = code;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }

    public String getShopName() {
        return shopName;
    }

    public double getPrice() {
        return price;
    }

    public Discount.Code getDiscountCode() {
        return discountCode;
    }
}

parse方法 经过getPrice的方法 返回的字符串 会返回Quote对象,此外 Discount服务还提供了一个applyDiscount方法,它接收一个Quote对象,返回一个字符串,表示该Quote的shop中的折扣价格:

public class Discount {
    public enum Code{..
    }

    public static String applyDiscount(Quote quote){
        return quote.getShopName() + "price :" + Discount.apply(quote.getPrice() ,quote.getDiscountCode());
    }
    public static double apply(double price,Code code){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return price * (100 - code.value) / 100;
    }
}

Discount中 也模拟了远程操做 睡了1秒钟,首先咱们尝试最直接的方式:

        List<String> str = shops.stream()
                .map(shop->shop.getPrice("hhhhh")) //获取 one:120.10:GOLDD 格式字符串
                .map(Quote::parse) //转换为 Quote 对象
                .map(Discount::applyDiscount) //返回 Quote的shop中的折扣价格
                .collect(toList());
                System.out.print(str);

8146

首先,咱们调用getPrice远程方法将shop对象转换成了一个字符串。每一个1秒

而后,咱们将字符串转换为Quote对象。

最后,咱们将Quote对象 调用 远程 Discount服务获取折扣,返回折扣价格。每一个1秒

顺序执行4个商品是4秒,而后又调用了Discount服务又4秒 因此是8秒。 虽然咱们如今把流转换为并行流 性能会很好 可是数量大于8时也很慢。相反,使用自定义CompletableFuture执行器可以更充分的利用CPU资源。

        List<CompletableFuture<String>> priceFutures = shops.stream()
                //异步获取每一个shop中的价格
                .map(shop -> CompletableFuture.supplyAsync(
                        () -> shop.getPrice("hhhhh", executor)
                ))
                //Quote对象存在时,对其返回值进行转换
                .map(future -> future.thenApply(Quote::parse))
                //使用另外一个异步任务构造指望的future,申请折扣
                .map(future -> future.thenCompose(quote ->
                        CompletableFuture.supplyAsync(
                                () -> Discount.applyDiscount(quote), executor)
                ))
                .collect(toList());
        //等待流中的全部Future执行完毕,提取各自的返回值
        List<String> str = priceFutures.stream().map(CompletableFuture::join).collect(toList());
        System.out.print(str);

2126

使用的这三个map跟同步没有太大的区别,可是使用了CompletableFuture类提供的特性,在须要的地方把他们变成了异步操做。

thenApply方法:当第一个Future运行结束,返回CompletableFuture<String>对象转换为CompleTableFuture<Quote>对象。

thenCompose方法:将两个异步操做进行流水线,当第一个操做完成时,将其结果做为参数传递给第二个操做。换句话说,你能够建立两个CompletableFuture对象,对第一个对象调用thenCompose,并向其传递一个函数。

这个方法也有Async版本:thenComposeAsync,一般带后缀的版本是讲任务移交到一个新线程,不带后缀的在当前线程执行。对于这个例子咱们没有加上后缀,由于对于最终结果,或者大体的时间而言都没有多少差异,少了不少线程切换的开销。

 

合并两个CompletableFuture,不管是否依赖

与上面不一样,第二个CompletableFuture无需等待第一个CompletableFuture运行结束。而是,将两个彻底不相干的CompletableFuture对象整合起来,不但愿等到第一个任务彻底结束才开始第二个任务。

这种状况应该使用thenCombine方法,它接受名为BiFunction的第二个参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。同thenCompose方法同样,thenCombine方法也提供了一个Async的版本。使用thenCombineAsync会致使BiFunction中定义的合并操做被提交到线程池中,由另外一个任务以异步的方式执行。

回到这个例子,好比说咱们如今须要第三个CompletableFuture来获取汇率,展现美圆。当前两个CompletableFuture计算出结果,并由BiFunction方法彻底合并后,由它来最终将诶书这一任务:

Future<Double> futurePriceUSD = CompletableFuture.supplyAsync(()->shops.get(0).getPrice("gg"))
                .thenCombine(
                        CompletableFuture.supplyAsync(
                                ()-> 0.66 //远程服务获取 汇率
                        ),(price,rate) -> price * rate
                );

这里 第一个参数price 是 getPrice的返回值 double , 第二个参数 rate 是第二个工厂方法返回的0.66 偷了个懒, 最后是他们的结果进行乘法操做 返回最终结果。

 

响应CompletableFuture的completion事件

在本章中,全部的延迟例子都是延迟1秒钟,可是在现实世界中,有时可能更糟。到目前为止,你所实现的方法必须等待全部的商品返回时才能现实商品的价格。而你但愿的效果是,只要有商品返回商品价格就在第一时间显示出来,不用等待那些尚未返回的商品。

CompletableFuture[] futures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(
                        () -> shop.getPrice("hhhhh", executor)
                ))
                .map(future -> future.thenApply(Quote::parse))
                .map(future -> future.thenCompose(quote ->
                        CompletableFuture.supplyAsync(
                                () -> Discount.applyDiscount(quote), executor)
                ))
                //在每一个CompletableFuture上注册一个操做,该操做会在CompletableFuture完成后使用它的返回值。
                //使用thenAccept将结果输出,它的参数就是 CompletableFuture的返回值。
                .map(f -> f.thenAccept(System.out::println))
                //你能够把构成的Stream的全部CompletableFuture<void>对象放到一个数组中,等待全部的任务执行完成
                .toArray(size -> new CompletableFuture[size]);
        
        //allOf方法接受一个CompletableFuture构成的数组,数组中全部的COmpletableFuture对象执行完成后,
        //它返回一个COmpletableFuture<Void>对象。因此你须要哦等待最初Stream中的全部CompletableFuture对象执行完毕,
        //对allOf方法返回的CompletableFuture执行join操做
        CompletableFuture.allOf(futures).join();

Connected to the target VM, address: '127.0.0.1:62278', transport: 'socket'
8twoprice :113.31
threeprice :108.15
oneprice :137.844
Disconnected from the target VM, address: '127.0.0.1:62278', transport: 'socket'
fourprice :119.2725
3768

还有一个方法anyOf,对于CompletableFuture对象数组中有任何一个执行完毕就不在等待时使用。

 

小结:

  1.执行比较耗时的操做时,尤为是那些依赖一个或多个远程服务的操做,使用异步任务能够改善程序的性能,加快程序的响应速度。

  2.你应该尽量的为客户提供异步API。使用CompletableFuture类提供的特性,可以轻松的实现这一目标。

  3.CompletableFuture类还提供了异常管理的机制,然给你有机会抛出/管理异步任务执行中发生的异常。

  4.将同步API的调用封装到一个CompletableFuture中,你可以以异步的方式使用其结果。

  5.若是异步任务之间互相独立,或者他们之间某一些的结果是另外一些的输入,你能够讲这些异步任务合并成一个。

  6.你能够为CompletableFuture注册一个回调函数,在Future执行完毕或者他们计算的结果可用时,针对性的执行一些程序。

  7.你能够决定在何时将诶书程序的运行,是等待由CompletableFuture对象构成的列表中全部的对象都执行完毕,仍是只要其中任何一个首先完成就终止程序的运行。

相关文章
相关标签/搜索