在Future中触发那些潜在耗时的操做把调用线程解放出来,让它能继续执行其余有价值的工做,再也不须要呆呆等待耗时的操做完成。打个比方,你能够把它想象成这样的场景:你拿了一袋子衣服到你中意的干洗店去洗。干洗店的员工会给你张发票,告诉你何时你的衣服会洗好(这就是一个Future事件)。衣服干洗的同时,你能够去作其余的事情。Future的另外一个优势是它比更底层的Thread更易用。要使用Future,一般你只须要将耗时的操做封装在一个Callable对象中,再将它提交给ExecutorService,就万事大吉了。java
ExecutorService executor = Executors.newCachedThreadPool(); Future<Double> future = executor.submit(new Callable<Double>() { public Double call() { return doSomeLongComputation(); }}); doSomethingElse(); //异步操做进行的同时,你能够作其余的事情 try { Double result = future.get(1, TimeUnit.SECONDS); //获取异步操做的结果,若是最终被阻塞,没法获得结 //果,那么在最多等待1秒钟以后退出 } catch (ExecutionException ee) { // 计算抛出一个异常 } catch (InterruptedException ie) { // 当前线程在等待过程当中被中断 } catch (TimeoutException te) { // 在Future对象完成以前超过已过时 }
同步API与异步API同步API其实只是对传统方法调用的另外一种称呼:你调用了某个方法,调用方在被调用方运行的过程当中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。即便调用方和被调用方在不一样的线程中运行,调用方仍是须要等待被调用方结束运行,这就是阻塞式调用这个名词的由来。数组
与此相反,异步API会直接返回,或者至少在被调用方计算完成以前,将它剩余的计算任务交给另外一个线程去作,该线程和调用方是异步的——这就是非阻塞式调用的由来。执行剩余计算任务的线程会将它的计算结果返回给调用方。返回的方式要么是经过回调函数,要么是由调用方再次执行一个“等待,直到计算完成”的方法调用。这种方式的计算在I/O系统程序设计中很是常见:你发起了一次磁盘访问,此次访问和你的其余计算操做是异步的,你完成其余的任务时,磁盘块的数据可能还没载入到内存,你只须要等待数据的载入完成。服务器
使用CompletableFuture后,getPriceAsync方法的实现网络
public Future<Double> getPriceAsync(String product) { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread( () -> { double price = calculatePrice(product); //calculatePrice需长时间计算,任务结束并得出结果时设置 //Future的返回值 futurePrice.complete(price); }).start(); return futurePrice; //无需等待还没结束的计算,直接返回Future对象 }
使用异步API:app
Shop shop = new Shop("BestShop"); long start = System.nanoTime(); Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); long invocationTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Invocation returned after " + invocationTime + " msecs"); // 执行更多任务,好比查询其余商店 doSomethingElse(); // 在计算商品价格的同时 try { double price = futurePrice.get(); //从Future对象中读取价格,若是价格未知,会发生阻塞 System.out.printf("Price is %.2f%n", price); } catch (Exception e) { throw new RuntimeException(e); } long retrievalTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Price returned after " + retrievalTime + " msecs");
Stream和CompletableFuture的设计都遵循了相似的模式:它们都使用了Lambda表达式以及流水线的思想。CompletableFuture和Future的关系就跟Stream和Collection的关系同样。
错误处理
若是计算商品价格的方法出现异常,用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会致使等待get方法返回结果的客户端永久地被阻塞。为了不这种状况,你须要使用CompletableFuture的completeExceptionally方法将致使CompletableFuture内发生问题的异常抛出。
抛出CompletableFuture内的异常:异步
public Future<Double> getPriceAsync( String product ) { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread( () - > { try { double price = calculatePrice( product ); futurePrice.complete( price ); } catch ( Exception ex ) { futurePrice.completeExceptionally( ex ); } } ).start(); return(futurePrice); }
使用工厂方法supplyAsync建立CompletableFuture对象:函数
public Future<Double> getPriceAsync(String product) { return CompletableFuture.supplyAsync(() -> calculatePrice(product)); }
此处getPriceAsync方法返回的CompletableFuture对象和上面你手工建立和完成的CompletableFuture对象是彻底等价的,这意味着它提供了一样的错误管理机制。supplyAsync方法接受一个生产者(Supplier)做为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,可是你也可使用supplyAsync方法的重载版本,传递第二个参数指定不一样的执行线程执行生产者方法。spa
在全部店铺中找出同一商品的价格,使用CompletableFuture实现findPrices方法线程
public List<String> findPrices(String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getName() + " price is " + shop.getPrice(product))) .collect(Collectors.toList()); return priceFutures.stream() .map(CompletableFuture::join) .collect(toList()); }
这里使用了两个不一样的Stream流水线,而不是在同一个处理流的流水线上一个接一个地放置两个map操做——这实际上是有原因的。考虑流操做之间的延迟特性,若是你在单一流水线中处理流,发向不一样商家的请求只能以同步、顺序执行的方式才会成功。所以,每一个建立CompletableFuture对象只能在前一个操做结束以后执行查询指定商家的动做、通知join方法返回计算结果。设计
CompletableFuture类中的 join方法和Future接口中的get有相同的含义,而且也声明在Future接口中,它们惟一的不一样是join不会抛出任何检测到的异常。使用它你再也不须要使用try/catch语句块让你传递给第二个map方法的Lambda表达式变得过于臃肿。
使用定制的执行器:
调整线程池的大小
Brian Goetz建议,线程池大小与处理器的利用率之比可使用下面的公式进行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
❑NCPU是处理器的核的数目,能够经过 Runtime.getRuntime().availableProcessors()获得
❑UCPU是指望的CPU利用率(该值应该介于0和1之间)
❑W/C是等待时间与计算时间的比率
实际操做中,若是你建立
的线程数比商店的数目更多,反而是一种浪费,由于这样作以后,你线程池中的有些线程根本没有机会被使用。出于这种考虑,咱们建议你将执行器使用的线程数,与你须要查询的商店数目设定为同一个值,这样每一个商店都应该对应一个服务线程。不过,为了不发生因为商店的数目过多致使服务器超负荷而崩溃,你仍是须要设置一个上限,好比100个线程。代码清单以下所示。为“最优价格查询器”应用定制的执行器:
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } });
并行——使用流仍是CompletableFutures?
目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操做开展工做,要么枚举出集合中的每个元素,建立新的线程,在CompletableFuture内对其进行操做。后者提供了更多的灵活性,你能够调整线程池的大小,而这能帮助你确保总体的计算不会由于线程都在等待I/O而发生阻塞。
咱们对使用这些API的建议以下。
❑若是你进行的是计算密集型的操做,而且没有I/O,那么推荐使用Stream接口,由于实现简单,同时效率也多是最高的(若是全部的线程都是计算密集型的,那就没有必要建立比处理器核数更多的线程)。
❑反之,若是你并行的工做单元还涉及等待I/O的操做(包括网络链接等待),那么使用CompletableFuture灵活性更好,你能够像前文讨论的那样,依据等待/计算,或者W/C的比率设定须要使用的线程数。这种状况不使用并行流的另外一个缘由是,处理流的流水线中若是发生I/O等待,流的延迟特性会让咱们很难判断到底何时触发了等待。
使用CompletableFuture实现findPrices方法(获取商品折扣后价格):
public List<String> findPrices(String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPrice(product), executor))//getPrice耗时操做,获取商品的价格字符串,使用异步方式 .map(future -> future.thenApply(Quote::parse)) //将价格字符串解析成Quote对象(包装了价格,折扣率等) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor))) //异步计算商品最终价格 .collect(toList()); return priceFutures.stream() .map(CompletableFuture::join) //等待流中的全部Future执行完毕,并提取各自的返回值 .collect(toList()); }
thenapply()是返回的是非CompletableFuture类型:它的功能至关于将CompletableFuture<T>转换成CompletableFuture<U>。
thenCompose()用来链接两个CompletableFuture,返回值是新的CompletableFuture:
thenCompose方法容许你对两个异步操做进行流水线,第一个操做完成时,将其结果做为参数传递给第二个操做。
CompletableFuture类中的其余方法同样,也提供了一个以 Async后缀结尾的版本 thenComposeAsync。一般而言,名称中不带Async
的方法和它的前一个任务同样,在同一个线程中运行;而名称以Async结尾的方法会将后续的任务提交到一个线程池,因此每一个任务是由不一样的线程处理的。
thenCombine方法,它接收名为BiFunction的第二参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。同thenCompose方法同样,thenCombine方法也提供有一个Async的版本。这里,若是使用thenCombineAsync会致使BiFunction中定义的合并操做被提交到线程池中,由另外一个任务以异步的方式执行。
eg:有一家商店提供的价格是以欧元(EUR)计价的,可是你但愿以美圆的方式提供给你的客户:
Future<double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product)) .thenCombine( CompletableFuture.supplyAsync( () -> exchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate );
只要有商店返回商品价格就在第一时间显示返回值,再也不等待那些还未返回的商店(有些甚至会发生超时)。Java 8的CompletableFuture通 过thenAccept方法提供了这一功能,它接收CompletableFuture执行完毕后的返回值作参数。
重构findPrices方法返回一个由Future构成的流
public Stream<CompletableFuture<String>> findPricesStream(String product) { return shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPrice(product), executor)) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor))); } findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));
由 于thenAccept方法已经定义了如何处理CompletableFuture返回的结果,一旦CompletableFuture计算获得结果,它就返回一个CompletableFuture<Void>。对这个<CompletableFuture<Void>>对象,你能作的事很是有限,只能等待其运行结束。
你还但愿能给最慢的商店一些机会,让它有机会打印输出返回的价格。为了实现这一目的,你能够把构成Stream的全部CompletableFuture<Void>对象放到一个数组中,等待全部的任务执行完成,代码以下所示:
CompletableFuture[] futures = findPricesStream("myPhone") .map(f -> f.thenAccept(System.out::println)) .toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(futures).join();
allOf工厂方法接收一个由CompletableFuture构成的数组,数组中的全部CompletableFuture对象执行完成以后,它返回一个CompletableFuture<Void>对象。这意味着,若是你须要等待最初Stream中的全部 CompletableFuture对象执行完毕,对 allOf方法返回的
CompletableFuture执行join操做是个不错的主意。
你可能但愿只要CompletableFuture对象数组中有任何一个执行完毕就再也不等待,好比,你正在查询两个汇率服务器,任何一个返回告终果都能知足你的需求。在这种状况下,你可使用一个相似的工厂方法anyOf。该方法接收一个CompletableFuture对象构成的数组,返回由第一个执行完毕的CompletableFuture对象的返回值构成的CompletableFuture<Object>。