因为咱们的两个远程服务:1.查询价格,2.查询折扣价格都是基于网络的。因此很容易出现某一个商店的数据迟迟没法返回的状况。因为这些缘由,我但愿查询器在查询时可以将拿到数据先返回过来,而不是等待全部的异步查询完成后集中返回一个List。
咱们首要须要避免的就是:等待建立一个包含全部价格的List。咱们应该直接处理CompletableFuture流,而后去响应他的completion事件,每个CompletableFuture对象完成时获取到相应的返回值。java
先将Discount的折扣服务延迟时间修改成随机值:数组
//计算折扣价格 private static Double apply(double price ,Code code){ //模拟远程操做的延迟 delay(); return (price * (100 - code.percantage)) / 100; } private static void delay(){ try { //随机延迟时间 int delay = 500 + random.nextInt(2000); Thread.sleep(delay); } catch (InterruptedException e) { e.printStackTrace(); } }
开始实现最佳价格查询器:网络
package BestPriceFinder; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors; import java.util.stream.Stream; /** * 最佳价格查询器 */ public class BestFinder { List<Shop> shops = Arrays.asList( new Shop("A"), new Shop("B"), new Shop("C"), new Shop("D"), new Shop("E"), new Shop("F"), new Shop("G"), new Shop("H"), new Shop("I"), new Shop("J") ); public void findPricesContinue(String product){ long st = System.currentTimeMillis(); Stream<CompletableFuture<String>> futurePrices = shops.stream() //首先异步获取价格 .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPriceFormat(product),myExecutor)) //将获取的字符串解析成对象 .map(future -> future.thenApply(Quote::parse)) //使用另外一个异步任务有获取折扣价格 .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote),myExecutor))); //thenAccept()会在CompletableFuture完成以后使用他的返回值,这里会持续执行子线程 CompletableFuture[] futures = futurePrices.map(f -> f.thenAccept(s -> { String sout = String.format("%s done in %s mesc",s,(System.currentTimeMillis() - st)); System.out.println(sout); })) .toArray(size -> new CompletableFuture[size]); //allOf()工厂方法接受由CompletableFuture对象构成的数组,这里使用其等待全部的子线程执行完毕 CompletableFuture.allOf(futures).join(); } /** * 异步查询 * 相比并行流的话CompletableFuture更有优点:能够对执行器配置,设置线程池大小 */ @SuppressWarnings("all") private final Executor myExecutor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); //使用守护线程保证不会阻止程序的关停 t.setDaemon(true); return t; } });
thenAccept():提供了在CompletableFuture对象完成后使用他的返回值的功能。这样咱们的每个CompletableFuture完成后就会打印他的返回值,最终等待全部的子线程完毕。app
allOf():工厂方法接受由CompletableFuture对象构成的数组,数组中全部的CompletableFuture完成后它返回一个CompletableFuture<Void>对象。dom
anyOf():厂方法接受由CompletableFuture对象构成的数组,返回数组中第一个完成的CompletableFuture的返回值CompletableFuture<Object>对象。异步