目标:从源码的角度分析2.7的新特性中对于异步化的改造原理。
dubbo中提供了不少类型的协议,关于协议的系列能够查看下面的文章:html
官方推荐的是使用dubbo协议,而异步调用的支持也是在dubbo协议中实现的。java
看了我以前写的2.7新特性的文章,应该对于异步化改造有个大体的印象。要弄懂异步在何时起做用,先要弄懂dubbo 的服务暴露和引用过程以及消费端发送请求过程和服务端处理请求过程。我在前四篇文章已经讲述了相关内容,异步请求只是dubbo的一种请求方式,基于 dubbo 底层的异步 NIO 实现异步调用,对于 Provider 响应时间较长的场景是必须的,它能有效利用 Consumer 端的资源,相对于 Consumer 端使用多线程来讲开销较小。可让消费者无需阻塞等待返回结果。web
通过改良后,Provider端也支持异步处理请求,引用官网的话就是如今Provider端异步执行和Consumer端异步调用是相互独立的,你能够任意正交组合两端配置。redis
如何开启和使用异步能够查看如下连接:apache
Provider异步执行: http://dubbo.apache.org/zh-cn/docs/user/demos/async-execute-on-provider.htmlConsumer异步调用:http://dubbo.apache.org/zh-cn/docs/user/demos/async-call.htmlsegmentfault
从设计上数据结构
在Filter接口新增了内部接口Listener,相关接口设计以下。多线程
interface Listener { /** * 回调正常的调用结果 * @param appResponse * @param invoker * @param invocation */ void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation); /** * 回调异常结果 * @param t * @param invoker * @param invocation */ void onError(Throwable t, Invoker<?> invoker, Invocation invocation); }
public abstract class ListenableFilter implements Filter { protected Listener listener = null; public Listener listener() { // 提供该过滤器的内部类listener return listener; } }
不变的是配置来决定调用方式,变的是在什么时候去作同步异步的不一样逻辑处理。看《dubbo源码解析(四十六)消费端发送请求过程》讲到的(十四)DubboInvoker的doInvoke,在之前的逻辑会直接在doInvoke方法中根据配置区分同步、异步、单向调用。如今只单独作了单向调用和须要返回结果的区分,统一先使用AsyncRpcResult来表示结果,也就是说一开始统一都是异步调用,而后在调用回到AsyncToSyncInvoker的invoke中时,才对同步异步作区分,这里新增了InvokeMode,InvokeMode如今有三种模式:SYNC, ASYNC, FUTURE。前两种很显而易见,后面一种是调用的返回类型是Future类型,表明调用的方法的返回类型是CompletableFuture类型,这种模式专门用来支持服务端异步的。看下面的源码。app
public static InvokeMode getInvokeMode(URL url, Invocation inv) { // 若是返回类型是future if (isReturnTypeFuture(inv)) { return InvokeMode.FUTURE; } else if (isAsync(url, inv)) { // 若是是异步调用 return InvokeMode.ASYNC; } else { // 若是是同步 return InvokeMode.SYNC; } }
参考《dubbo源码解析(四十六)消费端发送请求过程》的(十二)AsyncToSyncInvoker的invoke逻辑,若是是同步模式,就会阻塞调用get方法。直到调用成功有结果返回。若是不是同步模式,就直接返回。异步
关于ResponseFuture能够参考《dubbo源码解析(十)远程通讯——Exchange层》的(六)ResponseFuture。具体的能够看它的两个实现(七)DefaultFuture和(八)SimpleFuture。
在此次改造中,最小JDK版本从之前的1.6变成了1.8。固然也要用到1.8中新特性,其中就包括CompletableFuture。dubbo的通讯主要有两处,一处是Consumer发送请求消息给Provider,另外一处就是Provider把结果发送给Consumer。在Consumer发送请求消息给Provider的时候,Consumer不会一直处于等待,而是生成ResponseFuture会抛给下游去作其余操做,等到Provider把结果返回放入ResponseFuture,Consumer能够经过get方法得到结果,或者它也支持回调。可是这就暴露了一些问题,也就是为在新特性里提到的缺陷:
针对以上两个不足,CompletableFuture能够很好的解决它们。
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
当计算完成后,就会执行该方法中的action方法。相比于ResponseFuture,再也不须要本身去作回调注册的编码,更加易于理解。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
因为回调风格的实现,咱们没必要由于等待一个计算完成而阻塞着调用线程,而是告诉CompletableFuture当计算完成的时候请执行某个function。并且咱们还能够将这些操做串联起来,或者将CompletableFuture组合起来。这一组函数的功能是当原来的CompletableFuture计算完后,将结果传递给函数fn,将fn的结果做为新的CompletableFuture计算结果。所以它的功能至关于将CompletableFuture<T>转换成CompletableFuture<U>。
除了转化以外,还有future之间组合的支持,例如如下三个方法:
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
这一组方法接受一个Function做为参数,这个Function的输入是当前的CompletableFuture的计算值,返回结果将是一个新的CompletableFuture,这个新的CompletableFuture会组合原来的CompletableFuture和函数返回的CompletableFuture。
如今就能看出CompletableFuture的强大了,它解决了自定义ResponseFuture的许多问题,该类有几十个方法,感兴趣的能够去一一尝试。
能够看到之前的版本只能在RpcContext中进行获取。而通过改良后,首先RpcContext同样能过获取,其次在过滤器链返回的Result中也能获取,能够从最新的代码中看到,原先的RpcResult类已经被去除,而在AsyncRpcResult也继承了CompletableFuture<Result>类,也就是说有AsyncRpcResult的地方,就有CompletableFuture。而且在后续的dubbo3.0中,AsyncRpcResult将会内置CompletableFuture类型的变量,CompletableFuture的获取方式也会大大增长。
接下来我就来说解一下AsyncRpcResult类。
/** * 当相同的线程用于执行另外一个RPC调用时,而且回调发生时,原来的RpcContext可能已经被更改。 * 因此咱们应该保留当前RpcContext实例的引用,并在执行回调以前恢复它。 * 存储当前的RpcContext */ private RpcContext storedContext; /** * 存储当前的ServerContext */ private RpcContext storedServerContext; /** * 会话域 */ private Invocation invocation; public AsyncRpcResult(Invocation invocation) { // 设置会话域 this.invocation = invocation; // 得到当前线程内表明消费者端的Context this.storedContext = RpcContext.getContext(); // 得到当前线程内表明服务端的Context this.storedServerContext = RpcContext.getServerContext(); } /** * 转换成新的AsyncRpcResult * @param asyncRpcResult */ public AsyncRpcResult(AsyncRpcResult asyncRpcResult) { this.invocation = asyncRpcResult.getInvocation(); this.storedContext = asyncRpcResult.getStoredContext(); this.storedServerContext = asyncRpcResult.getStoredServerContext(); }
上面的是AsyncRpcResult核心的变量以及构造函数,storedContext和storedServerContext存储了相关的RpcContext实例的引用,为的就是防止在回调的时候因为相同的线程用于执行另外一个RPC调用致使原来的RpcContext可能已经被更改。因此存储下来后,咱们须要在执行回调以前恢复它。具体的能够看下面的thenApplyWithContext方法。
@Override public Object getValue() { // 得到计算的结果 return getAppResponse().getValue(); } @Override public void setValue(Object value) { // 建立一个AppResponse实例 AppResponse appResponse = new AppResponse(); // 把结果放入AppResponse appResponse.setValue(value); // 标志该future完成,而且把携带结果的appResponse设置为该future的结果 this.complete(appResponse); } @Override public Throwable getException() { // 得到抛出的异常信息 return getAppResponse().getException(); } @Override public void setException(Throwable t) { // 建立一个AppResponse实例 AppResponse appResponse = new AppResponse(); // 把异常放入appResponse appResponse.setException(t); // 标志该future完成,而且把携带异常的appResponse设置为该future的结果 this.complete(appResponse); } @Override public boolean hasException() { // 设置是否有抛出异常 return getAppResponse().hasException(); } public Result getAppResponse() { // 若是该结果计算完成,则直接调用get方法得到结果 try { if (this.isDone()) { return this.get(); } } catch (Exception e) { // This should never happen; logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.", e); } // 建立AppResponse return new AppResponse(); }
这些实现了Result接口的方法,能够发现其中都是调用了AppResponse的方法,AppResponse跟AsyncRpcResult同样也继承了AbstractResult,不过它是做为回调的数据结构。AppResponse我会在异步化过滤器链回调中讲述。
@Override public Object recreate() throws Throwable { // 强制类型转化 RpcInvocation rpcInvocation = (RpcInvocation) invocation; // 若是返回的是future类型 if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) { // 建立AppResponse实例 AppResponse appResponse = new AppResponse(); // 建立future CompletableFuture<Object> future = new CompletableFuture<>(); // appResponse设置future值,由于返回的就是CompletableFuture类型 appResponse.setValue(future); // 当该AsyncRpcResult完成的时候,把结果放入future中,这样返回的就是CompletableFuture包裹的结果 this.whenComplete((result, t) -> { if (t != null) { if (t instanceof CompletionException) { t = t.getCause(); } future.completeExceptionally(t); } else { if (result.hasException()) { future.completeExceptionally(result.getException()); } else { future.complete(result.getValue()); } } }); // 重置 return appResponse.recreate(); } else if (this.isDone()) { // 若是完成,则直接重置 return this.get().recreate(); } // 若是返回类型不是CompletableFuture,则调用AppResponse的重置 return (new AppResponse()).recreate(); }
该方法是重置,原本也是直接调用了AppResponse的方法,不过由于支持了以CompletableFuture为返回类型的服务方法调用,因此这里作了一些额外的逻辑,也就是把结果用CompletableFuture包裹,做为返回的结果放入AppResponse实例中。能够对标使用了CompletableFuture签名的服务。
@Override public Result thenApplyWithContext(Function<Result, Result> fn) { // 当该AsyncRpcResult完成后,结果做为参数先执行beforeContext,再执行fn,最后执行andThen this.thenApply(fn.compose(beforeContext).andThen(afterContext)); // You may need to return a new Result instance representing the next async stage, // like thenApply will return a new CompletableFuture. return this; } /** * tmp context to use when the thread switch to Dubbo thread. * 临时的RpcContext,当用户线程切换为Dubbo线程时候使用 */ /** * 临时的RpcContext */ private RpcContext tmpContext; private RpcContext tmpServerContext; private Function<Result, Result> beforeContext = (appResponse) -> { // 得到当前线程消费者端的RpcContext tmpContext = RpcContext.getContext(); // 得到当前线程服务端的RpcContext tmpServerContext = RpcContext.getServerContext(); // 从新设置消费者端的RpcContext RpcContext.restoreContext(storedContext); // 从新设置服务端的RpcContext RpcContext.restoreServerContext(storedServerContext); return appResponse; }; private Function<Result, Result> afterContext = (appResponse) -> { // 从新把临时的RpcContext设置回去 RpcContext.restoreContext(tmpContext); RpcContext.restoreServerContext(tmpServerContext); return appResponse; };
把这几部分代码放在一块儿时由于当用户线程切换为Dubbo线程时候须要用到临时的RpcContext来记录,如何使用该thenApplyWithContext方法,我也会在异步化过滤器链回调中讲到。
其余的方法比较好理解,我就不一一讲解。
若是看过前两篇关于发送请求和处理请求的过程,应该就知道在整个调用链中有许多的过滤器,而Consumer和Provider分别都有各自的过滤器来作一些功能加强。过滤器有执行链,也有回调链,若是整一个链路都是同步的,那么过滤器一旦增多,链路增加,就会带来请求响应时间的增长,这固然是最不想看到的事情。那若是把过滤器的调用链异步化,那么咱们就能够用一个future来代替结果抛给下游,让下游再也不阻塞。这样就大大下降了响应时间,节省资源,提高RPC响应性能。而这里的future就是下面要介绍的AppResponse。那我先来介绍一下如何实现异步化过滤器链回调。我就拿消费端发送请求过程来举例子说明。
参考《dubbo源码解析(四十六)消费端发送请求过程》的(六)ProtocolFilterWrapper的内部类CallbackRegistrationInvoker的invoke,能够看到当全部的过滤器执行完后,会遍历每个过滤器链,得到上面所说的内部接口Listener实现类,进行异步回调,由于请求已经在(十四)DubboInvoker的doInvoke中进行了发送,返回给下游一个AsyncRpcResult,而AsyncRpcResult内包裹的是AppResponse,能够看《dubbo源码解析(四十七)服务端处理请求过程》的(二十三)AbstractProxyInvoker的invoke,当代理类执行相关方法后,会建立一个AppResponse,把结果放入AppResponse中。因此AsyncRpcResult中包裹的是AppResponse,而后调用回调方法onResponse。而且会执行thenApplyWithContext把回调结果放入上下文中。而这个上下文如何避免相同的线程用于执行另外一个RPC调用致使原来的RpcContext可能已经被更改的状况,我也在上面已经说明。
AppResponse继承了AbstractResult,一样也是CompletableFuture<Result>类型,可是AppResponse跟AsyncRpcResult职能不同,AsyncRpcResult做为一个future,而AppResponse能够说是做为rpc调用结果的一个数据结构,它的实现很简单,就是封装了如下三个属性和对应的一些方法。
/** * 调用结果 */ private Object result; /** * rpc调用时的异常 */ private Throwable exception; /** * 附加值 */ private Map<String, String> attachments = new HashMap<String, String>();
前面我也讲了,Provider处理请求完成后,会把结果放在AppResponse内,在整个链路调用过程当中AsyncRpcResult内部必然会有一个AppResponse存在,而为上文提到的过滤器内置接口Listener的onResponse方法中的appResponse就是AppResponse类型的,它做为一个回调的数据类型。
该文章讲解了dubbo 2.7.x版本对于异步化改造的介绍,上面只是罗列了全部改动的点,没有具体讲述在哪些新增功能上的应用,若是感兴趣,能够参考前几篇的调用过程文章,来看看新增的功能点如何运用上述的设计的,好比Provider异步,有一种实现方式就用到了上述的InvokeMode。接下来一篇我会讲述元数据的改造。