- 硬件的极速发展,多核心CPU司空见惯;分布式的软件架构司空见惯;
- 功能API大多采用混聚的方式把基础服务的内容连接在一块儿,方便用户生活。
抛出了两个问题:java
- 如何发挥多核能力;
- 切分大型任务,让每一个子任务并行运行;
项目 | 区别1 | 实现技术 |
---|---|---|
并行 | 每一个任务跑在单独的cpu核心上 | 分支合并框架,并行流 |
并发 | 不一样任务共享cpu核心,基于时间片调度 | CompletableFuture |
java5开始引入。未来某个时刻发生的事情进行建模。
进行一个异步计算,返回一个执行运算的结果引用,当运算结束后,这个引用能够返回给调用方。
可使用Future把哪些潜在耗时的任务放到异步线程中,让主线程继续执行其余有价值的工做,不在白白等待。编程
下面是一个例子:使用Future,可让两个任务并发的运行,而后汇聚结果;api
package com.test.completable; import com.google.common.base.Stopwatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * 说明:Future应用实例 * @author carter * 建立时间: 2019年11月18日 10:53 **/ public class FutureTest { static final ExecutorService pool = Executors.newFixedThreadPool(2); public static void main(String[] args) { Stopwatch stopwatch = Stopwatch.createStarted(); Future<Long> longFuture = pool.submit(() -> doSomethingLongTime()); doSomething2(); try { final Long longValue = longFuture.get(3, TimeUnit.SECONDS); System.out.println(Thread.currentThread().getName() + " future return value :" + longValue + " : " + stopwatch.stop()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } pool.shutdown(); } private static void doSomething2() { Stopwatch stopwatch = Stopwatch.createStarted(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " doSomething2 :" + stopwatch.stop()); } private static Long doSomethingLongTime() { Stopwatch stopwatch = Stopwatch.createStarted(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " doSomethingLongTime : " + stopwatch.stop()); return 1000L; } }
无法编写简介的并发代码。描叙能力不够;好比以下场景:架构
基于这个缺陷,java8中引入了CompletableFuture 类;并发
技能点:app
类型 | 区别 | 是否堵塞 |
---|---|---|
同步API | 调用方在被调用运行的过程当中等待,被调用方运行结束后返回,调用方取得返回值后继续运行 | 堵塞 |
异步API | 调用方和被调用方是异步的,调用方不用等待被调用方返回结果 | 非堵塞 |
package com.test.completable; import com.google.common.base.Stopwatch; import com.google.common.base.Ticker; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * 说明:异步调用计算价格的方法 * @author carter * 建立时间: 2019年11月18日 13:32 **/ public class Test { public static void main(String[] args) { Shop shop = new Shop("BestShop"); Stopwatch stopwatch = Stopwatch.createStarted(); Stopwatch stopwatch2 = Stopwatch.createStarted(); Future<Double> doubleFuture = shop.getPriceFuture("pizza"); System.out.println("getPriceFuture return after: " + stopwatch.stop()); doSomethingElse(); try{ final Double price = doubleFuture.get(); System.out.println("price is " + price + " return after: " + stopwatch2.stop()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } private static void doSomethingElse() { Stopwatch stopwatch = Stopwatch.createStarted(); DelayUtil.delay(); System.out.println("doSomethingElse " + stopwatch.stop()); } }
错误处理框架
若是计算价格的方法产生了错误,提示错误的异常会被如今在试图计算商品价格的当前线程的范围内,最终计算的异步线程会被杀死,这会致使get方法返回结果的客户端永久的被等待。iphone
如何避免异常被掩盖, completeExceptionally会把CompletableFuture内发生的问题抛出去。异步
private static void test2() { Shop shop = new Shop("BestShop"); Stopwatch stopwatch = Stopwatch.createStarted(); Stopwatch stopwatch2 = Stopwatch.createStarted(); Future<Double> doubleFuture = shop.getPriceFutureException("pizza"); System.out.println("getPriceFuture return after: " + stopwatch.stop()); doSomethingElse(); try{ final Double price = doubleFuture.get(); System.out.println("price is " + price + " return after: " + stopwatch2.stop()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
方法改造:分布式
//异步方式查询产品价格,异常抛出去 public Future<Double> getPriceFutureException(String product){ final CompletableFuture<Double> doubleCompletableFuture = new CompletableFuture<>(); new Thread(()->{try { doubleCompletableFuture.complete(alculatePriceException(product)); }catch (Exception ex){ doubleCompletableFuture.completeExceptionally(ex); } }).start(); return doubleCompletableFuture; }
即让多个线程去异步并行或者并发的执行任务,计算完以后汇聚结果;
private static void test3(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); final List<String> stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .map(item -> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName))) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test3 done in " + stopwatch.stop()); } private static void test3_parallelStream(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); final List<String> stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .parallel() .map(item -> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName))) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test3_parallelStream done in " + stopwatch.stop()); } private static void test3_completableFuture(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); final List<String> stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .map(item ->CompletableFuture.supplyAsync(()-> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName)))) .collect(Collectors.toList()) .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test3_completableFuture done in " + stopwatch.stop()); } private static void test3_completableFuture_pool(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); final List<String> stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .map(item ->CompletableFuture.supplyAsync(()-> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName)),pool)) .collect(Collectors.toList()) .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test3_completableFuture done in " + stopwatch.stop()); }
代码中有一个简单的计算场景,我想查询4家商店的iphone11售价;
华强北,益田苹果店,香港九龙城,京东商城;
每一家的查询大概耗时1s;
任务处理方式 | 耗时 | 优缺点说明 |
---|---|---|
顺序执行 | 4秒多 | 简单,好理解 |
并行流 | 1秒多 | 没法定制流内置的线程池,使用简单,改造简单 |
CompletableFuture 默认线程池 | 2秒多 | 默认线程池 |
CompletableFuture 指定线程池 | 1秒多 | 指定了线程池,可定制性更好,相比于并行流 |
场景: 先计算价格,在拿到折扣,最后计算折扣价格;
private static void test4(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); final List<String> stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .map(shop->shop.getPrice_discount(productName)) .map(Quote::parse) .map(DisCount::applyDiscount) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test4 done in " + stopwatch.stop()); } private static void test4_completableFuture(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); final List<String> stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .map(shop->CompletableFuture.supplyAsync(()->shop.getPrice_discount(productName),pool)) .map(future->future.thenApply( Quote::parse)) .map(future->future.thenCompose(quote -> CompletableFuture.supplyAsync(()->DisCount.applyDiscount(quote),pool))) .collect(Collectors.toList()) .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test4_completableFuture done in " + stopwatch.stop()); }
以上是有依赖关系的两个任务的聚合,即任务2,依赖任务1的结果。使用的是thenCompose方法;
接下来若是有两个任务能够异步执行,最后须要依赖着两个任务的结果计算获得最终结果,采用的是thenCombine;
//两个不一样的任务,最后须要汇聚结果,采用combine private static void test5(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); Shop shop = new Shop("香港九龙"); Double pricefinal = CompletableFuture.supplyAsync(()->shop.getPrice(productName)) .thenCombine(CompletableFuture.supplyAsync(shop::getRate),(price, rate)->price * rate).join(); System.out.println("test4 done in " + stopwatch.stop()); }
让任务尽快结束,无需等待;
有多个服务来源,你请求多个,谁先返回,就先响应;
结果依次返回:
//等待全部的任务执行完毕; CompletableFuture.allOf() public void findPriceStream(String productName){ List<Shop> shops = Arrays.asList(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")); final CompletableFuture[] completableFutureArray = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice_discount(productName), pool)) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> DisCount.applyDiscount(quote), pool))) .map(f -> f.thenAccept(System.out::println)) .toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(completableFutureArray).join(); }
多个来源获取最快的结果:
//有两个获取天气的途径,哪一个快最后结果就取哪个 public static void getWeather(){ final Object join = CompletableFuture.anyOf(CompletableFuture.supplyAsync(() -> a_weather()), CompletableFuture.supplyAsync(() -> b_weather())).join(); System.out.println(join); } private static String b_weather() { DelayUtil.delay(3); return "bWeather"; } private static String a_weather() { DelayUtil.delay(5); return "aWeather"; }
可完备化的未来;CompletableFuture ;
先看签名:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}
实现了Futrue,CompletionStage接口;
这两个接口简单说明一下:
接口 | 关键特性 |
---|---|
Future | 直接翻译为将来,标识把一个任务异步执行,须要的的时候,经过get方法获取,也能够取消cancel,此外还提供了状态查询方法,isDone, isCancled,实现类是FutureTask |
CompletionStage | 直接翻译是完成的阶段,提供了函数式编程方法 |
能够分为以下几类方法
方法 | 说明 |
---|---|
thenApply(Function f) | 当前阶段正常完成以后,返回一个新的阶段,新的阶段把当前阶段的结果做为参数输入; |
thenConsume(Consumer c), | 当前阶段完成以后,结果做为参数输入,直接消费掉,获得不返回结果的完成阶段; |
thenRun(Runnable action), | 不接受参数,只是继续执行任务,获得一个新的完成阶段; |
thenCombine(otherCompletionStage,BiFunction), | 当两个完成阶段都完成的时候,执行BIFunction,返回一个新的阶段; |
thenAcceptBoth(OtherCompletionStage, BiConsumer) | 两个完成阶段都完成以后,对两个结果进行消费; |
runAfterBoth(OtherCompletionStage,Runable) | 两个完成阶段都完成以后,执行一个动做; |
applyToEither(OtherCompletionStage,Function) | 两个完成阶段的任何一个执行结束,进入函数操做,并返回一个新的阶段 |
acceptEither(OtherCompletionStage,Consumer) | 两个完成阶段的任何一个执行结束,消费掉,返回一个空返回值的完成阶段 |
runAfterEither(OtherCompletionStage,Runable) | 两个完成阶段的任何一个结束,执行一个动做,返回一个空返回值的完成阶段 |
thenCompose(Function) | 当前阶段完成,返回值做为参数,进行函数运算,而后结果做为一个新的完成阶段 |
exceptionally(Function) | 不管当前阶段是否正常完成,消费掉异常,而后返回值做为一个新的完成阶段 |
whenComplete | |
handle | 不管当前完成阶段是否正常结束,都执行一个BIFunction的函数,并返回一个新结果做为一个新的完成阶段 |
toCompletableFuture | 转换为ComplatableFuture |
里面的实现细节后面单独成文章再讲。
- 执行一些比较耗时的操做,尤为是依赖一个或者多个远程服务的操做,可使用异步任务改善程序的性能,加快程序的响应速度;
- 使用CompletableFuture你能够轻松的实现异步API;
- CompletableFuture提供了异常管理机制,让主线程有机会接管子任务抛出的异常;
- 把同步API封装到CompletableFuture中,能够异步获得它的结果;
- 若是异步任务之间互相独立,而他们之间的某一些结果是另一些的输入,能够把这些任务进行compose;
- 能够为CompletableFuture中的任务注册一个回调函数,当任务执行完毕以后再进行一些其它操做;
- 你能够决定何时结束程序的运行,是全部的CompletableFuture任务全部对象执行完毕,或者只要其中任何一个完成便可。
原创不易,转载请注明出处。