总结!Java8 加强的Future:CompletableFuture

CompletableFuture是Java8新增的一个超大型工具类,为何说她大呢?由于一方面它实现了Future接口,更重要的是,它实现了CompletionStage接口.这个接口也是Java8新增长的,而CompletionStage拥有多达约40种方法,javascript

经过CompletableFuture提供进一步封装,咱们很容易实现Future模式那样的异步调用,例如:java

public static Integer cale(Integer para) {
    try {
        Thread.sleep(1000);
 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return para * para;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
 
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> cale(50));
    System.out.println(future.get());
}

上述代码中CompletableFuture.supplyAsync()方法构造了一个CompletableFuture实例,在supplyAsync()函数中,他会在一个新的线程中,执行传入的参数.在这里,他会执行calc()方法,而calc()方法的执行多是比较慢的,可是不影响CompletableFuture实例的构造速度,所以supplyAsync()会当即返回,他返回的CompletableFuture对象实例就能够做为此次调用的契约,在未来任何场合,用于得到最终的计算结果.在CompletableFuture中,相似的工厂方法有如下几个:数据库

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

其中supplyAsync()方法用于那些须要返回值的场景,好比计算某个数据,而runAsync()方法用于没有返回值的场景,好比,仅仅是简单地执行某一个异步动做.编程

首先说明一下已Async结尾的方法都是能够异步执行的,若是指定了线程池,会在指定的线程池中执行,若是没有指定,默认会在ForkJoinPool.commonPool()中执行网络

     在这两对方法中,都有一个方法能够接手一个Executor参数,这使咱们可让Supplier<U>或者Runnable在指定的线程池工做,若是不指定,则在默认的系统公共的ForkJoinPool.common线程池中执行.dom

* 流式调用

在前文中我已经简单的提到,CompletionStage的约40个接口为函数式编程作准备的,在这里,就让咱们看一下,若是使用这些接口进行函数式的流式API调用异步

public static Integer cale(Integer para) {
    try {
        Thread.sleep(1000);
 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return para * para;
}
CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(() -> cale(50))
        .thenApply(i -> Integer.toString(i))
        .thenApply(str -> "\"" + str + "\"")
        .thenAccept(System.out::println);
future.get();

上述代码中,使用supplyAsync()函数执行了一个异步任务,接着连续使用流式调用对任务处理结果进行在加工,直到最后的结果输出:async

* CompletableFuture中的异常处理

public static Integer cale(Integer para) {
    try {
        Thread.sleep(1000);
 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return para * para;
}
CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(() -> cale(50))
        .exceptionally(ex -> {
            System.out.println("ex.toString() = " + ex.toString());
            return 0;
        })
        .thenApply(i -> Integer.toString(i))
        .thenApply(str -> "\"" + str + "\"")
        .thenAccept(System.out::println);
future.get();

* 组合多个CompletableFuture

CompletableFuture还容许你将多个CompletableFuture进行组合,一种方法是使用thenCompose(),它的签名以下:
public <U>CompletableFuture<U>thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
函数式编程

CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(() -> cale(50))
        .thenCompose(i -> CompletableFuture
                .supplyAsync(() -> cale(i)))
        .thenApply(i -> Integer.toString(i))
        .thenApply(str -> "\"" + str + "\"")
        .thenAccept(System.out::println);
future.get();

另一种组和多个CompletableFuture的方法是thenCombine(),它的签名以下:
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)函数

/*方法thenCombine()首先完成当前CompletableFuture和other的执行,
接着,将这二者的执行结果传递给BiFunction(该接口接受两个参数,并有一个返回值),
并返回表明BiFuntion实例的CompletableFuture对象:*/
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> cale(50));
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> cale(25));
 
CompletableFuture<Void> fu = future1.thenCombine(future2, (i, j) -> (i + j))
        .thenApply(str -> "\"" + str + "\"")
        .thenAccept(System.out::println);
fu.get();

 * 实现异步API

public double getPrice(String product) {
    return calculatePrice(product);
}

/**
 * 同步计算商品价格的方法
 *
 * @param product 商品名称
 * @return 价格
 */
private double calculatePrice(String product) {
    delay();
    return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
/**
 * 模拟计算,查询数据库等耗时
 */
public static void delay() {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

* 将同步方法装换为异步方法

/**
 * 异步计算商品的价格.
 *
 * @param product 商品名称
 * @return 价格
 */
public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        double price = calculatePrice(product);
        futurePrice.complete(price);
    }).start();
    return futurePrice;
}

使用异步API 模拟客户端
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long incocationTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("执行时间:" + incocationTime + " msecs");
try {
    Double price = futurePrice.get();
    System.out.printf("Price is %.2f%n", price);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
long retrievalTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("retrievalTime:" + retrievalTime + " msecs");

//>执行时间:37 msecs
//>Price is 125.79
//>retrievalTime:1055 msecs

* 错误处理

上述代码,若是没有意外,能够正常工做,可是若是价格计算过程当中生产了错误会怎样呢?很是不幸,这种状况下你会获得一个至关糟糕的结果:
用于提示错误的异常会限制在视图计算商品的价格的当前线程的范围内,最终会杀死该线程,
而这会致使等待get方法放回结果的客户端永久的被阻塞,
     客户端可使用重载的get方法,它给定了一个超时时间,这是一种推荐作法!
     为了让客户端能了解商店没法提供请求商品价格的缘由.咱们对代码优化,!

/**
 * 异步计算商品的价格.
 *
 * @param product 商品名称
 * @return 价格
 */
public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        } catch (Exception e) {
            //不然就抛出异常,完成此次future操做
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;
}

* 使用工厂方法supplyAsync建立CompletableFuture

/**
 * 异步计算商品的价格.
 *
 * @param product 商品名称
 * @return 价格
 */
public Future<Double> getPriceAsync(String product) {
   /* CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        } catch (Exception e) {
            //不然就抛出异常,完成此次future操做
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;*/

    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

* 让代码免受阻塞之苦

案例:最佳价格查询器
private static List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
        new Shop(":LetsSaveBig"),
        new Shop("MyFavoriteShop"),
        new Shop("BuyItAll"));

/**
 * 最佳价格查询器
 *
 * @param product 商品
 * @return
 */
public static List<String> findprices(String product) {
    return shops
            .stream()
            .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
}

验证findprices的正确性和执行性能
long start = System.nanoTime();
System.out.println(findprices("myPhones27s"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration+" msecs");
/*
[BestPrice price is 197.76, :
LetsSaveBig price is 155.39, 
MyFavoriteShop price is 124.21,
BuyItAll price is 139.23]
Done in 4071 msecs
*/

* 使用平行流对请求进行并行操做

/**
 * 最佳价格查询器(并行流)
 *
 * @param product 商品
 * @return
 */
public static List<String> parallelFindprices(String product) {
    return shops
            .parallelStream()
            .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
}
/*
[BestPrice price is 201.41, :
LetsSaveBig price is 153.64,
MyFavoriteShop price is 224.65, 
BuyItAll price is 211.83]
Done in 1064 msecs
*/
至关不错,看起来这是个简单有效的主意,对4个不一样商店的查询实现了并行.全部完成操做的总耗时只有1秒多一点,
让咱们尝试使用CompletableFuture,将findprices方法中对不一样商店的同步调用替换为异步调用.

* 使用CompletableFuture发起异步请求

/**
 * 最佳价格查询器(异步调用实现)
 * @param product 商品
 * @return
 */
public static List<String> asyncFindprices(String product) {
    //使用这种方式,你会获得一个List<CompletableFuture<String>>,
    //列表中的每个CompletableFuture对象在计算完成后都包含商店的String类型的名称.
    //可是,因为你用CompletableFuture实现了asyncFindprices方法要求返回一个List<String>.
    //你须要等待全部的future执行完毕,将其包含的值抽取出来,填充到列表中才能返回
    List<CompletableFuture<String>> priceFuture = shops
            .stream()
            .map(shop ->CompletableFuture.supplyAsync(() ->
     String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
                 .collect(Collectors.toList());
    //为了实现这个效果,我门能够向最初的List<CompletableFuture<String>>
    //施加第二个map操做,对list中的每个future对象执行join操做,一个接一个地等待他们容许结束,join和get方法
    //有相同的含义,不一样的在于join不会抛出任何检测到的异常
    return priceFuture
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

[BestPrice price is 187.24, :
LetsSaveBig price is 158.26, 
MyFavoriteShop price is 169.78, 
BuyItAll price is 170.59]
Done in 1061 msecs
结果让咱们失望了.咱们采用异步调用新版方法,和并行差很少

* 寻找更好的方案

通过我增长商店数量,而后使用三种方式反复的测试,发现了一个问题,并行流和异步调用的性能不分伯仲,究其缘由都同样,
它们内部采用的是一样的通用线程池,默认都使用固定数目的线程,
具体线程数取决于Runtime.getRuntime.availableProcessors()反回值,然而,
.CompletableFuture具备必定的优点,由于它容许你对执行器进行配置,
尤为是线程池的大小,让它以适合应用需求的方式进行配置,知足程序的要求,而这是并行流API没法提供的.
private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100));
/**
 * 最佳价格查询器(异步调用实现,自定义执行器)
 *
 * @param product 商品
 * @return
 */
public static List<String> asyncFindpricesThread(String product) {
    List<CompletableFuture<String>> priceFuture = shops
            .stream()
            .map(shop -> CompletableFuture.supplyAsync(
() -> shop.getName() + " price is " + shop.getPrice(product), executor))
            .collect(Collectors.toList());
    return priceFuture
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

通过测试处理5个商店 是1秒多,处理9个商店也是1秒多

并行--使用流仍是CompletableFutures?

目前为止,咱们已经知道对集合进行并行计算有两种方式,要么将其转化为并行流,利用map这样的操做开展工做,要么枚举出集合中的每个元素,建立新的线程,在CompletableFuture内对其进行操做,后者提供了更多的灵活性,你能够调整线程池大小,两者能帮助你确保总体计算机不会由于线程都在等待I/O而发生阻塞 咱们使用这些API的建议以下:

  1.  若是你进行的是计算密集型的操做,而且没有I/O,那么推荐使用Stream接口,由于实现简单,同时效率也多是最高的
  2.  反之,若是你并行的工做单元还涉及等待I/O的操做(包括网络链接等待).那么使用CompletableFuture是灵活性更好,你能够像前面讨论的那样,依据等待/计算,或者W/C的比率设定须要使用的线程数,

上一篇讲的<总结,总结!!!!java回调,future>能够先看看

创造靠智慧,处世靠常识;有常识而无智慧,谓之平庸,有智慧而无常识,谓之笨拙。智慧是一切力量中最强大的力量,是世界上惟一自觉活着力量。 —— 高尔基

相关文章
相关标签/搜索