Java8 CompletableFuture组合式的编程(笔记)

    * 实现异步API


public double getPrice(String product) {
    return calculatePrice(product);
}

/**
 * 同步计算商品价格的方法
 *
 * @param product 商品名称
 * @return 价格
 */
private double calculatePrice(String product) {
    delay();
    return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
/**
 * 模拟计算,查询数据库等耗时
 */
public static void delay() {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

    * 将同步方法装换为异步方法

/**
 * 异步计算商品的价格.
 *
 * @param product 商品名称
 * @return 价格
 */
public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        double price = calculatePrice(product);
        futurePrice.complete(price);
    }).start();
    return futurePrice;
}

使用异步API 模拟客户端
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long incocationTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("执行时间:" + incocationTime + " msecs");
try {
    Double price = futurePrice.get();
    System.out.printf("Price is %.2f%n", price);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
long retrievalTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("retrievalTime:" + retrievalTime + " msecs");

* 错误处理

     上述代码,若是没有意外,能够正常工做,可是若是价格计算过程当中生产了错误会怎样呢?很是不幸,这种状况下你会获得一个至关糟糕的结果:用于提示错误的异常会限制在视图计算商品的价格的当前线程的范围内,最终会杀死该线程,而这会致使等待get方法放回结果的客户端永久的被阻塞,
     客户端可使用重载的get方法,它给定了一个超时时间,这是一种推荐作法!
     为了让客户端能了解商店没法提供请求商品价格的缘由.咱们对代码优化,!
/**
 * 异步计算商品的价格.
 *
 * @param product 商品名称
 * @return 价格
 */
public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        } catch (Exception e) {
            //不然就抛出异常,完成此次future操做
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;
}


    * 使用工厂方法supplyAsync建立CompletableFuture

/**
 * 异步计算商品的价格.
 *
 * @param product 商品名称
 * @return 价格
 */
public Future<Double> getPriceAsync(String product) {
   /* CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        } catch (Exception e) {
            //不然就抛出异常,完成此次future操做
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;*/

    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}


    * 让代码免受阻塞之苦

案例:最佳价格查询器
private static List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
        new Shop(":LetsSaveBig"),
        new Shop("MyFavoriteShop"),
        new Shop("BuyItAll"));

/**
 * 最佳价格查询器
 *
 * @param product 商品
 * @return
 */
public static List<String> findprices(String product) {
    return shops
            .stream()
            .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
}

验证findprices的正确性和执行性能
long start = System.nanoTime();
System.out.println(findprices("myPhones27s"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration+" msecs");

执行结果:

* 使用平行流对请求进行并行操做

/**
 * 最佳价格查询器(并行流)
 *
 * @param product 商品
 * @return
 */
public static List<String> parallelFindprices(String product) {
    return shops
            .parallelStream()
            .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
}

一样的测试代码:
至关不错,看起来这是个简单有效的主意,对4个不一样商店的查询实现了并行.全部完成操做的总耗时只有1秒多一点,让咱们尝试使用CompletableFuture,将findprices方法中对不一样商店的同步调用替换为异步调用.

    * 使用CompletableFuture发起异步请求

/**
 * 最佳价格查询器(异步调用实现)
 * @param product 商品
 * @return
 */
public static List<String> asyncFindprices(String product) {
    //使用这种方式,你会获得一个List<CompletableFuture<String>>,列表中的每个CompletableFuture对象在计算完成后都包含商店的String类型的名称.
    //可是,因为你用CompletableFuture实现了asyncFindprices方法要求返回一个List<String>.你须要等待全部的future执行完毕,将其包含的值抽取出来,填充到列表中才能返回
    List<CompletableFuture<String>> priceFuture = shops
            .stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
            .collect(Collectors.toList());
    //为了实现这个效果,我门能够向最初的List<CompletableFuture<String>>施加第二个map操做,对list中的每个future对象执行join操做,一个接一个地等待他们容许结束,join和get方法
    //有相同的含义,不一样的在于join不会抛出任何检测到的异常
    return priceFuture
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

相同的测试代码:
结果让咱们失望了.咱们采用异步调用新版方法,要比并行流慢了一倍.

    * 寻找更好的方案

通过我增长商店数量,而后使用三种方式反复的测试,发现了一个问题,并行流和异步调用的性能不分伯仲,究其缘由都同样,它们内部采用的是一样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime.availableProcessors()放回值,然而,.CompletableFuture具备必定的优点,由于它容许你对执行器进行配置,尤为是线程池的大小,让它以适合应用需求的方式进行配置,知足程序的要求,而这是并行流API没法提供的.

    * 使用定制的执行器

private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100));
/**
 * 最佳价格查询器(异步调用实现,自定义执行器)
 *
 * @param product 商品
 * @return
 */
public static List<String> asyncFindpricesThread(String product) {
    List<CompletableFuture<String>> priceFuture = shops
            .stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor))
            .collect(Collectors.toList());
    return priceFuture
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

通过测试处理5个商店 是1秒多,处理9个商店也是1秒多

并行--使用流仍是CompletableFutures?
     目前为止,咱们已经知道对集合进行并行计算有两种方式,要么将其转化为并行流,利用map这样的操做开展工做,要么枚举出集合中的每个元素,建立新的线程,在CompletableFuture内对其进行操做,后者提供了更多的灵活性,你能够调整线程池大小,两者能帮助你确保总体计算机不会由于线程都在等待I/O而发生阻塞
     咱们使用这些API的建议以下:

    1. 若是你进行的是计算密集型的操做,而且没有I/O,那么推荐使用Stream接口,由于实现简单,同时效率也多是最高的
    2. 反之,若是你并行的工做单元还涉及等待I/O的操做(包括网络链接等待).那么使用CompletableFuture是灵活性更好,你能够像前面讨论的那样,依据等待/计算,或者W/C的比率设定须要使用的线程数,
相关文章
相关标签/搜索