CompletableFuture是Java8新增的一个超大型工具类,为何说她大呢?由于一方面它实现了Future接口,更重要的是,它实现了CompletionStage接口.这个接口也是Java8新增长的,而CompletionStage拥有多达约40种方法,javascript
经过CompletableFuture提供进一步封装,咱们很容易实现Future模式那样的异步调用,例如:
java
public static Integer cale(Integer para) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return para * para; } public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> cale(50)); System.out.println(future.get()); }
上述代码中CompletableFuture.supplyAsync()方法构造了一个CompletableFuture实例,在supplyAsync()函数中,他会在一个新的线程中,执行传入的参数.在这里,他会执行calc()方法,而calc()方法的执行多是比较慢的,可是不影响CompletableFuture实例的构造速度,所以supplyAsync()会当即返回,他返回的CompletableFuture对象实例就能够做为此次调用的契约,在未来任何场合,用于得到最终的计算结果.在CompletableFuture中,相似的工厂方法有如下几个:数据库
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)其中supplyAsync()方法用于那些须要返回值的场景,好比计算某个数据,而runAsync()方法用于没有返回值的场景,好比,仅仅是简单地执行某一个异步动做.编程
首先说明一下已Async结尾的方法都是能够异步执行的,若是指定了线程池,会在指定的线程池中执行,若是没有指定,默认会在ForkJoinPool.commonPool()中执行网络
在这两对方法中,都有一个方法能够接手一个Executor参数,这使咱们可让Supplier<U>或者Runnable在指定的线程池工做,若是不指定,则在默认的系统公共的ForkJoinPool.common线程池中执行.dom
在前文中我已经简单的提到,CompletionStage的约
40
个接口为函数式编程作准备的,在这里,就让咱们看一下,若是使用这些接口进行函数式的流式API调用
异步
public static Integer cale(Integer para) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return para * para; } CompletableFuture<Void> future = CompletableFuture .supplyAsync(() -> cale(50)) .thenApply(i -> Integer.toString(i)) .thenApply(str -> "\"" + str + "\"") .thenAccept(System.out::println); future.get();
上述代码中,使用supplyAsync()函数执行了一个异步任务,接着连续使用流式调用对任务处理结果进行在加工,直到最后的结果输出:
async
public static Integer cale(Integer para) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return para * para; } CompletableFuture<Void> future = CompletableFuture .supplyAsync(() -> cale(50)) .exceptionally(ex -> { System.out.println("ex.toString() = " + ex.toString()); return 0; }) .thenApply(i -> Integer.toString(i)) .thenApply(str -> "\"" + str + "\"") .thenAccept(System.out::println); future.get();
CompletableFuture还容许你将多个CompletableFuture进行组合,一种方法是使用thenCompose(),它的签名以下:
函数式编程
public <U>CompletableFuture<U>thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
CompletableFuture<Void> future = CompletableFuture .supplyAsync(() -> cale(50)) .thenCompose(i -> CompletableFuture .supplyAsync(() -> cale(i))) .thenApply(i -> Integer.toString(i)) .thenApply(str -> "\"" + str + "\"") .thenAccept(System.out::println); future.get();
另一种组和多个CompletableFuture的方法是thenCombine(),它的签名以下:
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
函数
/*方法thenCombine()首先完成当前CompletableFuture和other的执行, 接着,将这二者的执行结果传递给BiFunction(该接口接受两个参数,并有一个返回值), 并返回表明BiFuntion实例的CompletableFuture对象:*/ CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> cale(50)); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> cale(25)); CompletableFuture<Void> fu = future1.thenCombine(future2, (i, j) -> (i + j)) .thenApply(str -> "\"" + str + "\"") .thenAccept(System.out::println); fu.get();
public double getPrice(String product) { return calculatePrice(product); } /** * 同步计算商品价格的方法 * * @param product 商品名称 * @return 价格 */ private double calculatePrice(String product) { delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } /** * 模拟计算,查询数据库等耗时 */ public static void delay() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }
/** * 异步计算商品的价格. * * @param product 商品名称 * @return 价格 */ public Future<Double> getPriceAsync(String product) { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread(() -> { double price = calculatePrice(product); futurePrice.complete(price); }).start(); return futurePrice; } 使用异步API 模拟客户端 Shop shop = new Shop("BestShop"); long start = System.nanoTime(); Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); long incocationTime = (System.nanoTime() - start) / 1_000_000; System.out.println("执行时间:" + incocationTime + " msecs"); try { Double price = futurePrice.get(); System.out.printf("Price is %.2f%n", price); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } long retrievalTime = (System.nanoTime() - start) / 1_000_000; System.out.println("retrievalTime:" + retrievalTime + " msecs"); //>执行时间:37 msecs //>Price is 125.79 //>retrievalTime:1055 msecs
上述代码,若是没有意外,能够正常工做,可是若是价格计算过程当中生产了错误会怎样呢?很是不幸,这种状况下你会获得一个至关糟糕的结果:
用于提示错误的异常会限制在视图计算商品的价格的当前线程的范围内,最终会杀死该线程,
而这会致使等待get方法放回结果的客户端永久的被阻塞,
客户端可使用重载的get方法,它给定了一个超时时间,这是一种推荐作法!
为了让客户端能了解商店没法提供请求商品价格的缘由.咱们对代码优化,!
/** * 异步计算商品的价格. * * @param product 商品名称 * @return 价格 */ public Future<Double> getPriceAsync(String product) { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread(() -> { try { double price = calculatePrice(product); futurePrice.complete(price); } catch (Exception e) { //不然就抛出异常,完成此次future操做 futurePrice.completeExceptionally(e); } }).start(); return futurePrice; }
/** * 异步计算商品的价格. * * @param product 商品名称 * @return 价格 */ public Future<Double> getPriceAsync(String product) { /* CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread(() -> { try { double price = calculatePrice(product); futurePrice.complete(price); } catch (Exception e) { //不然就抛出异常,完成此次future操做 futurePrice.completeExceptionally(e); } }).start(); return futurePrice;*/ return CompletableFuture.supplyAsync(() -> calculatePrice(product)); }
案例:最佳价格查询器 private static List<Shop> shops = Arrays.asList(new Shop("BestPrice"), new Shop(":LetsSaveBig"), new Shop("MyFavoriteShop"), new Shop("BuyItAll")); /** * 最佳价格查询器 * * @param product 商品 * @return */ public static List<String> findprices(String product) { return shops .stream() .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))) .collect(Collectors.toList()); } 验证findprices的正确性和执行性能 long start = System.nanoTime(); System.out.println(findprices("myPhones27s")); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Done in " + duration+" msecs"); /* [BestPrice price is 197.76, : LetsSaveBig price is 155.39, MyFavoriteShop price is 124.21, BuyItAll price is 139.23] Done in 4071 msecs */
/** * 最佳价格查询器(并行流) * * @param product 商品 * @return */ public static List<String> parallelFindprices(String product) { return shops .parallelStream() .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))) .collect(Collectors.toList()); } /* [BestPrice price is 201.41, : LetsSaveBig price is 153.64, MyFavoriteShop price is 224.65, BuyItAll price is 211.83] Done in 1064 msecs */
至关不错,看起来这是个简单有效的主意,对4个不一样商店的查询实现了并行.全部完成操做的总耗时只有1秒多一点, 让咱们尝试使用CompletableFuture,将findprices方法中对不一样商店的同步调用替换为异步调用.
/** * 最佳价格查询器(异步调用实现) * @param product 商品 * @return */ public static List<String> asyncFindprices(String product) { //使用这种方式,你会获得一个List<CompletableFuture<String>>, //列表中的每个CompletableFuture对象在计算完成后都包含商店的String类型的名称. //可是,因为你用CompletableFuture实现了asyncFindprices方法要求返回一个List<String>. //你须要等待全部的future执行完毕,将其包含的值抽取出来,填充到列表中才能返回 List<CompletableFuture<String>> priceFuture = shops .stream() .map(shop ->CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))) .collect(Collectors.toList()); //为了实现这个效果,我门能够向最初的List<CompletableFuture<String>> //施加第二个map操做,对list中的每个future对象执行join操做,一个接一个地等待他们容许结束,join和get方法 //有相同的含义,不一样的在于join不会抛出任何检测到的异常 return priceFuture .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } [BestPrice price is 187.24, : LetsSaveBig price is 158.26, MyFavoriteShop price is 169.78, BuyItAll price is 170.59] Done in 1061 msecs
结果让咱们失望了.咱们采用异步调用新版方法,和并行差很少
通过我增长商店数量,而后使用三种方式反复的测试,发现了一个问题,并行流和异步调用的性能不分伯仲,究其缘由都同样, 它们内部采用的是一样的通用线程池,默认都使用固定数目的线程, 具体线程数取决于Runtime.getRuntime.availableProcessors()反回值,然而, .CompletableFuture具备必定的优点,由于它容许你对执行器进行配置, 尤为是线程池的大小,让它以适合应用需求的方式进行配置,知足程序的要求,而这是并行流API没法提供的.
private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100)); /** * 最佳价格查询器(异步调用实现,自定义执行器) * * @param product 商品 * @return */ public static List<String> asyncFindpricesThread(String product) { List<CompletableFuture<String>> priceFuture = shops .stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getName() + " price is " + shop.getPrice(product), executor)) .collect(Collectors.toList()); return priceFuture .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); }
通过测试处理5个商店 是1秒多,处理9个商店也是1秒多
并行--使用流仍是CompletableFutures?
目前为止,咱们已经知道对集合进行并行计算有两种方式,要么将其转化为并行流,利用map这样的操做开展工做,要么枚举出集合中的每个元素,建立新的线程,在CompletableFuture内对其进行操做,后者提供了更多的灵活性,你能够调整线程池大小,两者能帮助你确保总体计算机不会由于线程都在等待I/O而发生阻塞 咱们使用这些API的建议以下:
上一篇讲的<总结,总结!!!!java回调,future>能够先看看