Future接口时java5引入的,设计初衷是对未来某个时刻会发生的结果建模。它建模了一种异步计算,返回了一个执行预算结果的引用。好比,你去干洗店洗衣服,店员会告诉你何时能够来取衣服,而不是让你一直在干洗店等待。要使用Future只须要将耗时操做封装在一个Callable对象中,再将其提交给ExecutorService就能够了。java
ExecutorService executor = Executors.newFixedThreadPool(10); Future<Double> future = executor.submit(new Callable<Double>() { @Override public Double call() throws Exception { return doSomeLongComputation(); } }); doSomethingElse(); try { //最多等待1秒 Double result = future.get(1,TimeUnit.SECONDS); } catch (InterruptedException e) { //当前线程等待过程当中被打断 e.printStackTrace(); } catch (ExecutionException e) { //计算时出现异常 e.printStackTrace(); } catch (TimeoutException e) { //完成计算前就超时 e.printStackTrace(); }
可是Future依然有一些局限性:数据库
而CompletableFuture类实现了Future接口,能够将上述的问题所有解决。CompletableFuture与Stream的设计都遵循了相似的设计模式:使用Lambda表达式以及流水线的思想,从这个角度能够说CompletableFuture与Future的关系相似于Stream与Collection的关系。编程
最佳价格查询器:查询多个线上商店对同一商品的价格。设计模式
首先构建商店对象:api
package BestPriceFinder; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; public class Shop { private String name; public Shop(String name){ this.name = name; } public String getName(){ return name; } /** * 异步api:使用建立CompletableFuture类提供的工厂方法与getPriceAsync()效果彻底一致 * 能够更轻易的完成这个流程,而且不用担忧实现细节 * @param product * @return */ public Future<Double> getPriceAsyncByFactory(String product){ return CompletableFuture.supplyAsync(() -> calculatePrice(product)); } /** * 异步api: * @param product * @return */ public Future<Double> getPriceAsync(String product){ //建立CompletableFuture对象,它将包含计算结果 CompletableFuture<Double> futurePrice = new CompletableFuture<>(); //在新线程中异步计算结果 new Thread(() -> { try { double price = calculatePrice(product); //须要长时间计算的任务结束时,设置future的返回值 futurePrice.complete(price); }catch (Exception e){ //如这里没有使用completeExceptionally,线程不会结束,调用方会永远的执行下去 futurePrice.completeExceptionally(e); } }).start(); //无需等待计算结果,直接返回future对象 return futurePrice; } /** * 同步api: * 每一个商店都须要提供的查询api:根据名称返回价格; * 模拟查询数据库等一些耗时操做:使用delay()模拟这些耗时操做。 * @param product * @return */ public double getPrice(String product){ return calculatePrice(product); } private double calculatePrice(String product){ delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } private Random random = new Random(); /** * 模拟耗时操做:延迟一秒 */ private static void delay(){ try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
下面咱们针对Shop.java提供的同步方法与异步方法来进行测试:网络
package BestPriceFinder; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors; /** * 最佳价格查询器 */ public class BestFinder { List<Shop> shops = Arrays.asList( new Shop("A"), new Shop("B"), new Shop("C"), new Shop("D"), new Shop("E"), new Shop("F"), new Shop("G"), new Shop("H"), new Shop("I"), new Shop("J") ); /** * 顺序查询 */ public List<String> findPrices(String product){ return shops.stream() .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))) .collect(Collectors.toList()); } /** * 并行流查询 */ public List<String> findPricesParallel(String product){ return shops.parallelStream() .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))) .collect(Collectors.toList()); } /** * 异步查询 * 相比并行流的话CompletableFuture更有优点:能够对执行器配置,设置线程池大小 */ @SuppressWarnings("all") private final Executor myExecutor = Executors.newFixedThreadPool(Math.min(shops.size(), 800), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); //使用守护线程保证不会阻止程序的关停 t.setDaemon(true); return t; } }); @SuppressWarnings("all") public List<String> findPricesAsync(String product){ List<CompletableFuture<String>> priceFuctures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)),myExecutor)) .collect(Collectors.toList()); /** 这里须要使用新的stream来等待全部的子线程执行完, * 由于:若是在一个stream中使用两个map: * List<CompletableFuture<String>> priceFuctures = shops.parallelStream() * .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)))) * .map(c -> c.join()).collect(Collectors.toList()) * .collect(Collectors.toList()); * 考虑到流操做之间的延迟特性。若是你在单一的流水线中处理流,发向不一样商家的请求只能以同步顺序的方式执行才会成功。所以每一个建立CompletableFuture * 对象只能在前一个操做结束以后执行查询商家动做。 */ return priceFuctures.stream().map(c -> c.join()).collect(Collectors.toList()); } }
@Test public void findPrices(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPrices("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); } @Test public void findPricesParallel(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPrices("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); } @Test public void findPricesAsync(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPricesAsync("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); }
同步api测试结果:毫无疑问是10秒之上并发
并行流获取同步api测试结果:也是10秒之上,可是并行流不是很高效吗?怎么会如此凄惨呢?由于这与并行流能够调用的系统核数相关,个人计算机是8核,最多8个线程同时运行。而商店有10个,也就是说,咱们的两个线程会一直等待前面的某一个线程释放出空闲才能继续运行。dom
异步获取api测试结果:一秒左右异步
为什么差距如此大呢?
明智的选择是建立了一个配有线程池的执行器,线程池中线程的数目取决于你的应用须要处理的负担,可是你该如何选择合适的线程数目呢?ide
《Java并发编程实战》中给出以下公式:
Number = NCpu * Ucpu * ( 1 + W/C) Number : 线程数量 NCpu : 处理器核数 UCpu : 指望cpu利用率 W/C : 等待时间与计算时间比
咱们这里:99%d的时间是等待商店响应 W/C = 99 ,cpu利用率指望 100% ,NCpu = 9,推断出 number = 800。可是为了不过多的线程搞死计算机,咱们选择商店数与计算值中较小的一个。
目前,咱们对集合进行计算有两种方式:1.并行流 2.CompletableFuture;而CompletableFuture更加的灵活,咱们能够配置线程池的大小确保总体的计算不会由于等待IO而发生阻塞。
书上给出的建议以下:
如今咱们知道了如何用CompletableFuture提供异步的api,后面的文章会学习如何利用CompletableFuture高效的操做同步api。