上一篇说到每个shop都会提供一个价格查询的服务,可是如今咱们进行假设:java
1. 全部的价格查询是同步方式提供的 2. shop在返回价格的同时会返回一个折扣码 3. 咱们须要解析返回的字符串,而且根据折扣码区获取折扣后的价格 4. 折扣后的价格计算依然是同步执行的 5. 查询价格返回的字符串格式为shopName:price:discountCode("沃尔玛:200:15")
定义商店对象:Shop.javaapp
public class Shop { private String name; public Shop(String name){ this.name = name; } public String getName(){ return name; } public String getPriceFormat(String product){ double price = calculatePrice(product); //随机返回一个折扣码 Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)]; return String.format("%s:%.2f:%s",name,price,code); } private double calculatePrice(String product){ delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } private Random random = new Random(); /** * 模拟耗时操做:延迟一秒 */ private static void delay(){ try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
定义折扣对象:Discount.javadom
public class Discount { public enum Code{ NONE(0),SILVER(5),GOLD(10),PLATINUM(15),DIAMOND(20); private final int percantage; Code(int percentage){ this.percantage = percentage; } } public static String applyDiscount(Quote quote){ return quote.getShopName() + "prices is " + Discount.apply(quote.getPrice(),quote.getDiscountCode()); } //计算折扣价格 private static Double apply(double price ,Code code){ //模拟远程操做的延迟 delay(); return (price * (100 - code.percantage)) / 100; } private static void delay(){ try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
用于封装解析getPriceFormat的字符串对象:Quote.javaiphone
public class Quote { private final String shopName; private final double price; private final Discount.Code discountCode; public Quote(String shopName,double price,Discount.Code code){ this.shopName = shopName; this.price = price; this.discountCode = code; } public static Quote parse(String s){ String[] split = s.split(":"); String shopName = split[0]; Double price = Double.valueOf(split[1]); Discount.Code code = Discount.Code.valueOf(split[2]); return new Quote(shopName,price,code); } public double getPrice() { return price; } public String getShopName() { return shopName; } public Discount.Code getDiscountCode() { return discountCode; } }
因而如今的任务就是:异步
1. 远程查询商品价格 2. 将得到的字符串解析成为Quote对象 3. 根据Quote对象远程获取折扣后的价格
如今先看看同步的方式来执行这个操做:ide
public List<String> findPrices2(String product){ return shops.stream() .map(shop -> shop.getPriceFormat(product)) .map(Quote::parse) .map(Discount::applyDiscount) .collect(Collectors.toList()); }
由于有两个耗时操做,每一个1秒,耗时毫无疑问20秒以上:this
1. 获取价格:使用CompletableFuture.supplyAsync()工厂方法便可,一旦运行结束每一个CompletableFuture对象会包含一个shop返回的字符串,这里记住使用咱们自定义的执行器。spa
2. 解析报价:通常状况下解析操做并不涉及到IO处理,所能够采用同步处理,因此这里咱们直接使用CompletableFuture对象的thenApply()方法,代表在的带运算结果后马上同步处理。线程
3. 计算折扣价格:这是一个远程操做,确定是须要异步执行的,因而咱们如今就有了两次异步处理(1.获取价格,2.计算折扣)。如今使用级联的方式将它们串联起来工做。CompletableFuture提供了thenCompose方法,代表将两个异步操做进行流水线处理。第一个异步操做的结果会成为第二个异步操做的入参。使用这样的方式,即便Future在向不一样的shop手机报价,主线程依然能够执行其余操做,好比响应UI事件。code
因而咱们有了以下代码:
/** * 异步查询 * 相比并行流的话CompletableFuture更有优点:能够对执行器配置,设置线程池大小 */ @SuppressWarnings("all") private final Executor myExecutor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); public List<String> findPrices2Async(String product){ List<CompletableFuture<String>> futurePrices = shops.stream() //首先异步获取价格 .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPriceFormat(product),myExecutor)) //将获取的字符串解析成对象 .map(future -> future.thenApply(Quote::parse)) //使用另外一个异步任务有获取折扣价格 .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote),myExecutor))) .collect(Collectors.toList()); //等待全部异步任务完成 return futurePrices.stream().map(CompletableFuture::join).collect(Collectors.toList());
运算结果不到3秒:
咱们刚才使用thenCompose()将两个CompletableFuture结合了起来,而且一个CompletableFuture的运算结果将做为第二个CompletableFuture的入参。可是更多的状况是两个不相干的CompletableFuture对象相互结合,而且咱们也不但愿第一个任务结束以后才开始第二个任务。这时可使用thenCombine()。
好比咱们获取价格的同时也获取汇率:
远程获取汇率方法:
/** * 获取汇率 */ public double getRate(String type){ delay(); if("$".equals(type)){ return 0.3; } if("¥".equals(type)){ return 0.7; } return 1; }
结合俩个异步操做:
@Test public void combine(){ Shop shop = new Shop("沃尔玛"); Future<Double> futurePrice = CompletableFuture.supplyAsync(() -> shop.getPrice("iphoneX")) .thenCombine(CompletableFuture.supplyAsync(() -> shop.getRate("$")), (price,rate) -> price * rate); }
thenCombine()接受两个参数:
1. CompletableFuture对象:代表第二个异步操做 2. BiFunction<? super T,? super U,? extends V>接口:两个异步操做的结果合并处理