在咱们平时开发中或多或少都会遇到须要调用接口来完成一个功能的需求,这个接口能够是内部系统也能够是外部的,而后等到接口返回数据了才能继续其余的业务流程,这就是传统的同步模式。html
同步模式虽然简单但缺点也很明显,若是对方服务处理缓慢迟迟未能返回数据,或网络问题致使响应变长,就会阻塞咱们调用方的线程,致使咱们主流程的耗时latency延长,传统的解决方式是增长接口的超时timeout设置,防止无限期等待。但即便这样仍是会占用CPU资源。java
在咱们作rpc远程调用,redis,数据库访问等比较耗时的网络请求时常常要面对这样的问题,这种业务场景咱们能够引入异步的编程思想,即主流程不须要阻塞等待接口返回数据,而是继续往下执行,当真正须要这个接口返回结果时再经过回调或阻塞的方式获取,此时咱们的主流程和异步任务是并行执行的。web
Java中实现异步主要是经过Future
,CompletableFuture
,Guava ListenableFuture
以及一些异步响应式框架如RxJava实现。redis
下面咱们主要看下这几种组件适用的业务场景和须要注意的地方,避免踩坑。数据库
java.util.concurrent.Future
是JDK5引入的,用来获取一个异步计算的结果。你可使用isDone
方法检查计算是否完成,也可使用get阻塞住调用线程,直到计算完成返回结果,你也可使用cancel
方法中止任务的执行。编程
Future的api说明api
实际开发中咱们通常会结合线程池的submit配合使用,代码以下:服务器
package com.javakk; import java.util.concurrent.*; public class FutureTest { public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); // 线程池 Future<String> future = executor.submit(() ->{ Thread.sleep(200); // 模拟接口调用,耗时200ms return "hello world"; }); // 在输出下面异步结果时主线程能够不阻塞的作其余事情 // TODO 其余业务逻辑 System.out.println("异步结果:"+future.get()); //主线程获取异步结果 // 或者经过下面轮询的方式 // while(!future.isDone()); } } // 输出结果: 异步结果:hello world
简单的说我有一个任务,提交给了Future,Future替我完成这个任务,这期间我能够去作别的事情。一段时间以后,我再从Future取出结果。网络
上面的代码有2个地方须要注意,在15行不建议使用future.get()
方式,而应该使用future.get(long timeout, TimeUnit unit)
, 尤为是在生产环境必定要设置合理的超时时间,防止程序无限期等待下去。另外就是要考虑异步任务执行过程当中报错抛出异常的状况,须要捕获future的异常信息。并发
经过代码能够看出一些简单的异步场景可使用Future解决,可是对于结果的获取却不是很方便,只能经过阻塞或者轮询的方式获得任务的结果。阻塞的方式至关于把异步变成了同步,显然和异步编程的初衷相违背,轮询的方式又会浪费CPU资源。
Future没有提供通知的机制,就是回调,咱们没法知道它什么时间完成任务。
并且在复杂一点的状况下,好比多个异步任务的场景,一个异步任务依赖上一个异步任务的执行结果,异步任务合并等,Future没法知足需求。
Google并发包下的listenableFuture
对Java原生的future
作了扩展,顾名思义就是使用监听器模式实现的回调,因此叫可监听的future。
在咱们公司早期的项目里(jdk8以前的版本)都是使用listenableFuture
来实现异步编程。
要使用listenableFuture
还要结合MoreExecutor
线程池,MoreExecutor
是对Java原生线程池的封装,好比经常使用的MoreExecutors.listeningDecorator(threadPool);
修改Java原生线程池的submit
方法,封装了future返回listenableFuture
。
代码示例以下:
// ListeningExecutorService继承jdk的ExecutorService接口,重写了submit方法,修改返回值类型为ListenableFuture ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); [ListenableFuture](http://javakk.com/tag/listenablefuture "查看更多关于 ListenableFuture 的文章")<String> listenableFuture = executor.submit(() -> { Thread.sleep(200); // 模拟接口调用,耗时200ms return "hello world"; });
上面的代码是构造了一个ListenableFuture的异步任务,调用它的结果通常有两种方式:
基于addListener
:
listenableFuture.addListener(() -> { try { System.out.println("异步结果:" + listenableFuture.get()); } catch (Exception e) { e.printStackTrace(); } }, executor); // 输出结果: 异步结果:hello world
基于addCallback:
Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String result) { System.out.println("异步结果:" + result); } @Override public void onFailure(Throwable t) { t.printStackTrace(); } }, executor); // 输出结果: 异步结果:hello world
其实两种方式都是基于回调,具体使用哪一种看业务场景。
addListener
须要本身代码里捕获处理异常状况,最好设置超时时间addCallback
把正常返回和异常状况作了分离,方便咱们针对不一样状况作处理另外Futures里还有不少其余的api,能够知足咱们负责场景,好比transform()
能够处理异步任务之间的依赖状况,allAsList()
将多个ListenableFuture合并成一个。
若是大家公司的jdk是8或以上的版本,那能够直接使用CompletableFuture
类来实现异步编程。
Java8新增的CompletableFuture
类借鉴了Google Guava的ListenableFuture
,它包含50多个方法,默认使用forkJoinPool
线程池,提供了很是强大的Future扩展功能,能够帮助咱们简化异步编程的复杂性,结合函数式编程,经过回调的方式处理计算结果,而且提供了转换和组合CompletableFuture
的多种方法,能够知足大部分异步回调场景。
CompletableFuture的api
虽然方法不少但有个特征:
下面就来看下经常使用的几种api代码示例:
转换 : thenApplyAsync
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello" ); // f2依赖f1的结果作转换 CompletableFuture<String> f2 = f1.thenApplyAsync(t -> t + " world" ); System.out.println("异步结果:" + f2.get()); // 输出结果: 异步结果:hello world
这里先说明一下,示例代码只关注核心功能,若是要实际使用须要考虑超时和异常状况,你们须要注意。
在上面的代码中异步任务f2
须要异步任务f1
的结果才能执行,但对于咱们的主线程来讲,无须等到f1
返回结果后再调用函数f2
,即不会阻塞主流程,而是告诉CompletableFuture当执行完了f1
的方法再去执行f2
,只有当须要最后的结果时再获取。
组合 : thenComposeAsync
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello" ); // f2虽然依赖f1的结果,但不会等待f1结果返回,而是再包装成一个future返回 CompletableFuture<String> f2 = f1.thenComposeAsync(t -> CompletableFuture.supplyAsync(() -> t + " world" ) ); // 等到真正调用的时候再执行f2里的逻辑 System.out.println("异步结果:" + f2.get()); // 输出结果: 异步结果:hello world
经过代码注释能看出thenCompose
至关于flatMap
,避免CompletableFuture<CompletableFuture<String>>
这种写法。
这也是thenCompose
和thenApply
的区别,经过查看api也能看出:
thenApply:
public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); }
thenCompose:
public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(screenExecutor(executor), fn); }
合并 : thenCombineAsync
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模拟接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模拟接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return " world"; }); CompletableFuture<String> f3 = f1.thenCombineAsync(f2, (t1, t2) -> t1 + t2 ); long time = System.currentTimeMillis(); System.out.println("异步结果:" + f3.get()); System.out.println("耗时:" + (System.currentTimeMillis() - time)); // 输出结果: 异步结果:hello world 耗时:1002
从代码输出结果能够看到两个异步任务f一、f2是并行执行,彼此无前后依赖顺序,thenCombineAsync
适合将两个并行执行的异步任务的结果合并返回成一个新的future。
还有一个相似的方法thenAcceptBoth
也是合并两个future的结果,可是不会返回新的值,内部消费掉了。
二选一 : applyToEitherAsync
Random rand = new Random(); CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + rand.nextInt(1000)); // 模拟接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + rand.nextInt(1000)); // 模拟接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }); CompletableFuture<String> f3 = f1.applyToEitherAsync(f2, t -> t); long time = System.currentTimeMillis(); System.out.println("异步结果:" + f3.get()); System.out.println("耗时:" + (System.currentTimeMillis() - time));
输出的结果有时候是hello 有时候是world,哪一个future先执行完就根据它的结果计算,取两个future最早返回的。
这里要说明一点,若是两个future是同时返回结果,那么applyToEitherAsync永远以第一个future的结果为准,你们能够把上面代码的Thread.sleep
注释掉测试下。
另外acceptEither
方法和这个相似,可是没有返回值。
allOf / anyOf
前面讲的compose
,combine
,either
都是处理两个future的方法,若是是超过2个的可使用allOf
或anyOf
allOf:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模拟接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模拟接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }); CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模拟接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "java老k"; }); List<CompletableFuture<String>> list = new ArrayList<>(); list.add(f1); list.add(f2); list.add(f3); CompletableFuture<Void> f4 = CompletableFuture.allOf(list.toArray(new CompletableFuture[]{})); long time = System.currentTimeMillis(); f4.thenRunAsync(() -> list.forEach(f -> { try { System.out.println("异步结果:" + f.get()); } catch (Exception e) { e.printStackTrace(); } }) ); f4.get(); System.out.println("耗时:" + (System.currentTimeMillis() - time)); // 输出结果: 耗时:1004 异步结果:hello 异步结果:world 异步结果:java老k
allOf
方法是当全部的CompletableFuture都执行完后执行计算,无返回值。
anyOf:
Random rand = new Random(); // 随机数 CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + rand.nextInt(1000)); // 模拟接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + rand.nextInt(1000)); // 模拟接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }); CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + rand.nextInt(1000)); // 模拟接口调用耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return "java老k"; }); CompletableFuture<Object> f4 = CompletableFuture.anyOf(f1, f2, f3); long time = System.currentTimeMillis(); System.out.println("异步结果:" + f4.get()); System.out.println("耗时:" + (System.currentTimeMillis() - time)); // 输出结果: 异步结果:java老k 耗时:1075
屡次执行输出的结果不同,anyOf
方法当任意一个CompletableFuture执行完后就会执行计算。
虽说CompletableFuture更适合I/O场景,但使用时必定要结合具体业务,好比说有些公共方法处理异步任务时须要考虑异常状况,这时候使用CompletableFuture.handle(BiFunction<? super T, Throwable, ? extends U> fn)更合适,handle方法会处理正常计算值和异常,所以它能够屏蔽异常,避免异常继续抛出。
CompletableFuture还有一个坑须要注意:若是线上流量比较大的状况下会出现响应缓慢的问题。
由于CompletableFuture默认使用的线程池是forkJoinPool,当时对一台使用了CompletableFuture实现异步回调功能的接口作压测,经过监控系统发现有大量的ForkJoinPool.commonPool-worker-*
线程处于等待状态,进一步分析dump信息发现是forkJoinPool的makeCommonPool问题,以下图:
看到这你们应该清楚了,若是在项目里没有设置java.util.concurrent.ForkJoinPool.common.parallelism
的值,那么forkJoinPool线程池的线程数就是(cpu-1),咱们测试环境的机器是2核,这样实际执行任务的线程数只有1个,当有大量请求过来时,若是有耗时高的io操做,势必会形成更多的线程等待,进而拖累服务响应时间。
解决方案一个是设置java.util.concurrent.ForkJoinPool.common.parallelism
这个值(要在项目启动时指定),或者指定线程池不使用默认的forkJoinPool。
forkJoinPoll线程池不了解的能够看下这篇文章:线程池ForkJoinPool简介
线程数如何设置能够参考《Java并发编程实战》这本书给出的建议,以下图:
线程池设置线程数公式:
threads = N CPU U CPU (1 + W/C)
其中:
网上也有这么区分的:
若是服务是cpu密集型的,设置为电脑的核数
若是服务是io密集型的,设置为电脑的核数*2
其实我以为并不严谨,尤为是io密集型的还要参考QPS和web服务器的配置。
线程池使用不当形成的后果和分析能够在推荐阅读里了解。
今天主要讲了java实现异步编程的几种方式,你们能够结合本身的实际状况参考,下次有时间会跟你们分享下咱们另一个项目如何使用RxJava实现的全异步化服务。