提升应用性能的时候很容易就会想起异步,异步去处理一些任务这样主线程能够尽快响应。
经过阅读本篇文章你将了解到:java
查询全部商店某个商品的价格并返回,而且查询商店某个商品的价格的API为同步
一个Shop类,提供一个名为getPrice的同步方法mysql
public class Shop { private Random random = new Random(); /** * 根据产品名查找价格 * */ public double getPrice(String product) { return calculatePrice(product); } /** * 计算价格 * * @param product * @return * */ private double calculatePrice(String product) { delay(); //random.nextDouble()随机返回折扣 return random.nextDouble() * product.charAt(0) + product.charAt(1); } /** * 经过睡眠模拟其余耗时操做 * */ private void delay() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
查询商品的价格为同步方法,并经过sleep方法模拟其余操做。这个场景模拟了当须要调用第三方API,但第三方提供的是同步API,在没法修改第三方API时如何设计代码调用提升应用的性能和吞吐量,这时候可使用CompletableFuture类git
Completable是Future接口的实现类,在JDK1.8中引入github
CompletableFuture的建立:sql
使用new方法数据库
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
使用CompletableFuture#completedFuture静态方法建立设计模式
public static <U> CompletableFuture<U> completedFuture(U value) { return new CompletableFuture<U>((value == null) ? NIL : value); }
参数的值为任务执行完的结果,通常该方法在实际应用中较少应用缓存
使用 CompletableFuture#supplyAsync静态方法建立
supplyAsync有两个重载方法:并发
//方法一 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } //方法二 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); }
runAsync有两个重载方法框架
//方法一 public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } //方法二 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
说明:
对于结果的获取CompltableFuture类提供了四种方式
//方式一 public T get() //方式二 public T get(long timeout, TimeUnit unit) //方式三 public T getNow(T valueIfAbsent) //方式四 public T join()
说明:
示例:
public class AcquireResultTest { public static void main(String[] args) throws ExecutionException, InterruptedException { //getNow方法测试 CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(60 * 1000 * 60 ); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); System.out.println(cp1.getNow("hello h2t")); //join方法测试 CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((()-> 1 / 0)); System.out.println(cp2.join()); //get方法测试 CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((()-> 1 / 0)); System.out.println(cp3.get()); } }
说明:
使用静态方法建立的CompletableFuture对象无需显示处理异常,使用new建立的对象须要调用completeExceptionally方法设置捕获到的异常,举例说明:
CompletableFuture completableFuture = new CompletableFuture(); new Thread(() -> { try { //doSomething,调用complete方法将其余方法的执行结果记录在completableFuture对象中 completableFuture.complete(null); } catch (Exception e) { //异常处理 completableFuture.completeExceptionally(e); } }).start();
店铺为一个列表:
private static List<Shop> shopList = Arrays.asList( new Shop("BestPrice"), new Shop("LetsSaveBig"), new Shop("MyFavoriteShop"), new Shop("BuyItAll") );
同步方法:
private static List<String> findPriceSync(String product) { return shopList.stream() .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))) //格式转换 .collect(Collectors.toList()); }
异步方法:
private static List<String> findPriceAsync(String product) { List<CompletableFuture<String>> completableFutureList = shopList.stream() //转异步执行 .map(shop -> CompletableFuture.supplyAsync( () -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))) //格式转换 .collect(Collectors.toList()); return completableFutureList.stream() .map(CompletableFuture::join) //获取结果不会抛出异常 .collect(Collectors.toList()); }
性能测试结果:
Find Price Sync Done in 4141 Find Price Async Done in 1033
异步执行效率提升四倍
在JDK1.8之前,经过调用线程池的submit方法可让任务以异步的方式运行,该方法会返回一个Future对象,经过调用get方法获取异步执行的结果:
private static List<String> findPriceFutureAsync(String product) { ExecutorService es = Executors.newCachedThreadPool(); List<Future<String>> futureList = shopList.stream().map(shop -> es.submit(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))).collect(Collectors.toList()); return futureList.stream() .map(f -> { String result = null; try { result = f.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return result; }).collect(Collectors.toList()); }
既生瑜何生亮,为何仍须要引入CompletableFuture?
对于简单的业务场景使用Future彻底没有,可是想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果须要前一个异步任务的值等等,使用Future提供的那点API就囊中羞涩,处理起来不够优雅,这时候仍是让CompletableFuture以声明式的方式优雅的处理这些需求
对前面计算结果进行处理,没法返回新值
提供了三个方法:
//方法一 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) //方法二 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) //方法三 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
说明:
示例:
public class WhenCompleteTest { public static void main(String[] args) { CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture<String> cf2 = cf1.whenComplete((v, e) -> System.out.println(String.format("value:%s, exception:%s", v, e))); System.out.println(cf2.join()); } }
将前面计算结果的的CompletableFuture传递给thenApply,返回thenApply处理后的结果。能够认为经过thenApply方法实现CompletableFuture<T>至CompletableFuture<U>的转换。白话一点就是将CompletableFuture的计算结果做为thenApply方法的参数,返回thenApply方法处理后的结果
提供了三个方法:
//方法一 public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } //方法二 public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn) { return uniApplyStage(asyncPool, fn); } //方法三 public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn, Executor executor) { return uniApplyStage(screenExecutor(executor), fn); }
说明:
示例:
public class ThenApplyTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenApplyTest::randomInteger).thenApply((i) -> i * 8); System.out.println(result.get()); } public static Integer randomInteger() { return 10; } }
这里将前一个CompletableFuture计算出来的结果扩大八倍
thenApply也能够归类为对结果的处理,thenAccept和thenApply的区别就是没有返回值
提供了三个方法:
//方法一 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action); } //方法二 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { return uniAcceptStage(asyncPool, action); } //方法三 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) { return uniAcceptStage(screenExecutor(executor), action); }
说明:
示例:
public class ThenAcceptTest { public static void main(String[] args) { CompletableFuture.supplyAsync(ThenAcceptTest::getList).thenAccept(strList -> strList.stream() .forEach(m -> System.out.println(m))); } public static List<String> getList() { return Arrays.asList("a", "b", "c"); } }
将前一个CompletableFuture计算出来的结果打印出来
thenCompose方法能够将两个异步操做进行流水操做
提供了三个方法:
//方法一 public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null, fn); } //方法二 public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(asyncPool, fn); } //方法三 public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) { return uniComposeStage(screenExecutor(executor), fn); }
说明:
示例:
public class ThenComposeTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenComposeTest::getInteger) .thenCompose(i -> CompletableFuture.supplyAsync(() -> i * 10)); System.out.println(result.get()); } private static int getInteger() { return 666; } private static int expandValue(int num) { return num * 10; } }
执行流程图:
thenCombine方法将两个无关的CompletableFuture组合起来,第二个Completable并不依赖第一个Completable的结果
提供了三个方法:
//方法一 public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(null, other, fn); } //方法二 public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(asyncPool, other, fn); } //方法三 public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) { return biApplyStage(screenExecutor(executor), other, fn); }
说明:
示例:
public class ThenCombineTest { private static Random random = new Random(); public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenCombineTest::randomInteger).thenCombine( CompletableFuture.supplyAsync(ThenCombineTest::randomInteger), (i, j) -> i * j ); System.out.println(result.get()); } public static Integer randomInteger() { return random.nextInt(100); } }
将两个线程计算出来的值作一个乘法在返回
执行流程图:
方法介绍:
//allOf public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { return andTree(cfs, 0, cfs.length - 1); } //anyOf public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { return orTree(cfs, 0, cfs.length - 1); }
说明:
示例:
allOf方法测试
public class AllOfTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello"); return null; }); CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("world"); return null; }); CompletableFuture<Void> result = CompletableFuture.allOf(future1, future2); System.out.println(result.get()); } }
allOf方法没有返回值,适合没有返回值而且须要前面全部任务执行完毕才能执行后续任务的应用场景
anyOf方法测试
public class AnyOfTest { private static Random random = new Random(); public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("hello"); return "hello";}); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("world"); return "world"; }); CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2); System.out.println(result.get()); } private static void randomSleep() { try { Thread.sleep(random.nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } } }
两个线程都会将结果打印出来,可是get方法只会返回最早完成任务的结果。该方法比较适合只要有一个返回值就能够继续执行其余任务的应用场景
不少方法都提供了异步实现【带async后缀】,可是需当心谨慎使用这些异步方法,由于异步意味着存在上下文切换,可能性能不必定比同步好。若是须要使用异步的方法,先作测试,用测试数听说话!!!
存在IO密集型的任务能够选择CompletableFuture,IO部分交由另一个线程去执行。Logback、Log4j2异步日志记录的实现原理就是新起了一个线程去执行IO操做,这部分能够以CompletableFuture.runAsync(()->{ioOperation();})的方式去调用,有关Logback异步日志记录的原理能够参考这篇文章Logback异步日志记录
supplyAsync执行任务底层实现:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); e.execute(new AsyncSupply<U>(d, f)); return d; }
底层调用的是线程池去执行任务,而CompletableFuture中默认线程池为ForkJoinPool
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool线程池的大小取决于CPU的核数。以前写的为何阿里巴巴要禁用Executors建立线程池?文章中说起过,CPU密集型任务线程池大小配置为CPU核心数就能够了,可是IO密集型,线程池的大小由CPU数量 CPU利用率 (1 + 线程等待时间/线程CPU时间)肯定。而CompletableFuture的应用场景就是IO密集型任务,所以默认的ForkJoinPool通常没法达到最佳性能,咱们需本身根据业务建立线程池
最后附:示例代码,欢迎Fork与Star
并发相关
1.为何阿里巴巴要禁用Executors建立线程池?
2.本身的事情本身作,线程异常处理
设计模式相关:
1. 单例模式,你真的写对了吗?
2. (策略模式+工厂模式+map)套餐 Kill 项目中的switch case
JAVA8相关:
1. 使用Stream API优化代码
2. 亲,建议你使用LocalDateTime而不是Date哦
数据库相关:
1. mysql数据库时间类型datetime、bigint、timestamp的查询效率比较
2. 很高兴!终于踩到了慢查询的坑
高效相关:
1. 撸一个Java脚手架,一统团队项目结构风格
日志相关:
1. 日志框架,选择Logback Or Log4j2?
2. Logback配置文件这么写,TPS提升10倍
工程相关:
1. 闲来无事,动手写一个LRU本地缓存
2. Redis实现点赞功能模块
3. JMX可视化监控线程池
4. 权限管理 【SpringSecurity篇】
5. Spring自定义注解从入门到精通
6. java模拟登录优酷
7. QPS这么高,那就来写个多级缓存吧
8. java使用phantomjs进行截图