Dubbo 自 2011 年 10 月 27 日开源后,已被许多非阿里系的公司使用,其中既有当当网、网易考拉等互联网公司,也不乏中国人寿、青岛海尔等大型传统企业。更多用户信息,能够访问Dubbo @GitHub,issue#1012: Wanted: who's using dubbo。java
自去年 12 月开始,Dubbo 3.0 便已正式进入开发阶段,并备受社区和广大 Dubbo 用户的关注,本文将为您详细解读 3.0 预览版的新特性和新功能。react
下面先解答一下两个有意思的与 Dubbo 相关的疑问。git
笔者曾作过 Dubbo 协议的适配兼容,Dubbo 确实存在过 1.x 版本,并且从协议设计和模型设计上都与 2.0 的开源版本协议是彻底不同的。下图是关于 Dubbo 的发展路径:github
是的,很是肯定,当前开源版本的 Dubbo 在阿里巴巴被普遍使用,而阿里的电商核心部门是用的 HSF2.2 版本,这个版本是兼容了 Dubbo 使用方式和 Remoting 协议。固然,咱们如今正在作 HSF2.2 的升级,直接依赖开源版本的 Dubbo 来作内核的统一。因此,Dubbo 是获得大规模线上系统验证的分布式服务框架,这一点毋容置疑。spring
Dubbo 3.0 在设计和功能上的新增支持和改进,主要是如下四方面:apache
这里要指出的是,3.0 中规划的异步去阻塞和 2.7 中提供的异步是两个层面的特性。2.7 中的异步是创建在传统 RPC 中 request – response 会话模型上的,而 3.0 中的异步将会从通信协议层面由下向上构建,关注的是跨进程、全链路的异步问题。经过底层协议开始支持 streaming 方式,不仅仅能够支持多种会话模型,还能够在协议层面开始支持反压、限流等特性,使得整个分布式体系更具备弹性。综上所述,2.7 关注的异步更局限在点对点的异步(一个 consumer 调用一个 provider),3.0 关注的异步化,宽度上则关注整个调用链上的异步,高度上则向上又能够包装成 Rx 的编程模型。有趣的是,Spring 5.0 发布了对 Flux 的支持,随后开始解决跨进程的异步问题。编程
最近几年, reactive programming
这个词语的热度迅速提高,Wikipedia 上的 reactive programming 解释是 reactive programming is a programming paradigm oriented around data flows and the propagation of change. Dubbo3.0会实现Reactive Stream 的 rx 接口,从而能让用户享受到RP带来的响应性提高,甚至面向 RP 的架构升级。固然,咱们但愿 reactive 不仅仅可以带来事件(event)驱动的应用集成方式的升级,也但愿在 Load Balance(选择最优的服务节点),fault tolerance(限流降级时最好作到自适应)等方面发挥其积极价值。设计模式
咱们定下的策略是进入 Envoy 社区来实现 Dubbo 融入 mesh 的理念思想,目前 Dubbo 协议已经被 Envoy 支持。固然,Dubbo Mesh 离真正可用还有很长一段距离,其在选址、负载均衡和服务治理方面的工做须要继续在数据面建设,另外,控制面板的建设在社区也没有提上日程。api
Dubbo 3.0 定下了内外融合的策略,也就是说 3.0 的核心最终会在阿里巴巴的生产系统中部署,相信经过大流量、大规模的考验,Dubbo 用户能够得到一个性能、稳定、服务治理实践各方面俱佳的核心,用户在生产系统中采用 3.0 也会更加放心。这一点也是 Dubbo 3.0 最重要的使命。网络
Dubbo 最强大的一处设计是其在 Filter 链上的抽象设计,经过其扩展机制的开放性支持,用户能够对 Dubbo 作功能加强,并容许各个扩展点被定制来是否保留。
Dubbo 的 Filter 定义以下:
@SPI public interface Filter { /** * do invoke filter. * <p> * <code> * // before filter * Result result = invoker.invoke(invocation); * // after filter * return result; * </code> * * @param invoker service * @param invocation invocation. * @return invoke result. * @throws RpcException * @see org.apache.dubbo.rpc.Invoker#invoke(Invocation) */ Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException; }
按照“调用一个远程服务的方法就像调用本地的方法同样”这种说法,这个直接返回 Result 响应的方式是很是好的,用起来是简单直接,问题是时代变换到了须要关注体验,须要走 Reactive 响应式的时代,也回到基本点:invoke一个 invocation 须要通过网络在不一样的进程处理,自然就是异步的过程,也就是发送请求(invocation)与接收响应(Result)自己是两个不一样的事件,是须要两个过程方法来在 Filter 链处理。那么如何改造这个关键的 SPI 呢?有两种方案:
第一种,把 invoke 的返回值改为 CompletableFuture, 好处是一目了然,Result 不在建议同步获取了;但基础接口的签名一改会致使代码改造量巨大,同时也会让原有的 SPI 扩展不在支持。
第二种,Result 接口直接继承 CompletationStage,是表明了响应的异步计算。这样能进避免第一种的劣势。因此,3.0.0 Preview 版本对内部调用链路实现作了一次重构:基于 CompletableFuture 实现了框架内部的全异步调用,而在外围编程上,同时支持同步、异步调用模式。
值得注意的是,这次重构仅限于框架内部实现,对使用方没有任何影响即接口上保持彻底兼容。要了解 Dubbo 异步 API 如何使用,请参考《如何基于 Dubbo 实现全异步的调用链》,这篇文章将着重对实现思路和原理作一些简单介绍。这次重构的要点有:
首先咱们来看一个通用的跨网络异步调用的线程模型:
通讯框架异步发送请求消息,请求消息发送成功后,返回表明业务结果的 CompletableFuture 给业务线程。以后对于 Future 的处理,根据调用类型会有所区别:
接下来具体看一下一次异步 Dubbo RPC 请求的调用流程:
6. 调用方在拿到表明异步业务结果的 Future 后,可选择注册回调监听器,以监听真正的业务结果返回。
同步调用和异步调用基本上是一致的,而且也是走的回调模式,只是在链路返回以前作了一次阻塞 get 调用,以确保在收到实际结果时再返回。Filter 在注册 Listener 时因为 Future 已处于 complete 状态,所以会同时触发回调 onResponse()/onError()。
关于流程图中提到的 Result,Result 在 Dubbo 的一次 RPC 调用中表明返回结果,在 3.0 中 Result 自身增长了表明状态的接口,相似 Future 如今 Result 能够表明一次未完成的调用。
要让 Result 具有表明异步返回结果的能力,有两中方式来实现:
1. Result is a Future,在 Java 8 中更合理的方式是继承 CompletionStage 接口。
public interface Result extends CompletionStage { }
2. 让 Result 实例持有 Future 实例,与 1 的区别便是设计中选用“继承”仍是“组合”。
public class AsyncRpcResult implements Result { private CompletableFuture<RpcResult> resultFuture; }
同时,为了让 Result 更直观的体现其异步结果的特性,也为了方便面向 Result 接口编程,咱们能够考虑为Result增长一些异步接口:
public interface Result extends Serializable { Result thenApplyWithContext(Function<Result, Result> fn); <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn); Result get() throws InterruptedException, ExecutionException; }
Filter 是 Dubbo 预置的拦截器扩展 SPI,用来作请求的预处理、结果的后处理,框架自己内置了一些拦截器实现,而从用户层面,我相信这个 SPI 也应该是被扩展最多的一个。在 3.0 版本中,Filter 回归单一职责的设计模式,将回调接口单独提取到 Listener 中。
@SPI public interface Filter { Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException; interface Listener { void onResponse(Result result, Invoker<?> invoker, Invocation invocation); void onError(Throwable t, Invoker<?> invoker, Invocation invocation); } }
以上是 Filter 的 SPI 定义,Filter 的核心定义中只有一个 invoke() 方法用来传递调用请求。
同时,增长了一个新的回调接口 Listener,每一个 Filter 实现能够定义本身的 Listenr 回调器,从而实现对返回结果的异步监听,参考如下是为 MonitorFilter 增长的 Listener 回调实现:
class MonitorListener implements Listener { @Override public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) { if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false); getConcurrent(invoker, invocation).decrementAndGet(); // count down } } @Override public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true); getConcurrent(invoker, invocation).decrementAndGet(); // count down } } }
为了更直观的作异步调用,泛化接口新增了 CompletableFuture<Object>$invokeAsync(Stringmethod,String[]parameterTypes,Object[]args)
接口:
public interface GenericService { /** * Generic invocation * * @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is * required, e.g. findPerson(java.lang.String) * @param parameterTypes Parameter types * @param args Arguments * @return invocation return value * @throws GenericException potential exception thrown from the invocation */ Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException; default CompletableFuture<Object> $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException { Object object = $invoke(method, parameterTypes, args); if (object instanceof CompletableFuture) { return (CompletableFuture<Object>) object; } return CompletableFuture.completedFuture(object); } }
这样,当咱们想作异步调用时,就能够直接这样使用:
CompletableFuture<Object> genericService.$invokeAsync(method, parameterTypes, args);
更具体用例请参见《泛化调用示例》
组要注意的是,框架内部的异步实现自己并不能提升单次调用的性能,相反,因为线程切换和回调逻辑的存在,异步反而可能会致使单次调用性能的降低,可是异步带来的优点是能减小对资源的占用,提高整个系统的并发程度和吞吐量,这点对于 RPC 这种须要处理网络延迟的场景很是适用。更多关于异步化设计的好处,请参考其余异步化原理介绍相关文章。
响应式编程让开发者更方便地编写高性能的异步代码,很惋惜,在以前很长一段时间里,dubbo 并不支持响应式编程,简单来讲,dubbo 不支持在 rpc 调用时使用 Mono/Flux 这种流对象(reative-stream 里流的概念),给用户使用带来了不便。(关于响应式编程更详细的信息请参见这里:http://reactivex.io/)。
RSocket 是一个开源的支持 reactive-stream 语义的网络通讯协议,他将 reative 语义的复杂逻辑封装起来了,使得上层能够方便实现网络程序。(RSocket详细资料请参见这里:http://rsocket.io/)。
dubbo 在 3.0.0-SNAPSHOT 版本里基于 RSocket 对响应式编程进行了简单的支持,用户能够在请求参数和返回值里使用 Mono 和 Flux 类型的对象。下面咱们给出使用范例,(范例源码能够在这里获取:https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-rsocket)。
首先定义接口以下:
public interface DemoService { Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2); Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2); }
而后实现该 demo 接口:
public class DemoServiceImpl implements DemoService { @Override public Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2) { return m1.zipWith(m2, new BiFunction<String, String, String>() { @Override public String apply(String s, String s2) { return s+" "+s2; } }); } @Override public Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2) { return f1.zipWith(f2, new BiFunction<String, String, String>() { @Override public String apply(String s, String s2) { return s+" "+s2; } }); } }
而后配置并启动服务端,注意协议名字填写 rsocket:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!-- provider's application name, used for tracing dependency relationship --> <dubbo:application name="demo-provider"/> <!-- use registry center to export service --> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <!-- use dubbo protocol to export service on port 20880 --> <dubbo:protocol name="rsocket" port="20890"/> <!-- service implementation, as same as regular local bean --> <bean id="demoService" class="org.apache.dubbo.samples.basic.impl.DemoServiceImpl"/> <!-- declare the service interface to be exported --> <dubbo:service interface="org.apache.dubbo.samples.basic.api.DemoService" ref="demoService"/> </beans> public class RsocketProvider { public static void main(String[] args) throws Exception { new EmbeddedZooKeeper(2181, false).start(); ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-provider.xml"}); context.start(); System.in.read(); // press any key to exit } }
而后配置并启动消费者消费者以下, 注意协议名填写 rsocket:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!-- consumer's application name, used for tracing dependency relationship (not a matching criterion), don't set it same as provider --> <dubbo:application name="demo-consumer"/> <!-- use registry center to discover service --> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <!-- generate proxy for the remote service, then demoService can be used in the same way as the local regular interface --> <dubbo:reference id="demoService" check="true" interface="org.apache.dubbo.samples.basic.api.DemoService"/> </beans> public class RsocketConsumer { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-consumer.xml"}); context.start(); DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy while (true) { try { Mono<String> monoResult = demoService.requestMonoWithMonoArg(Mono.just("A"), Mono.just("B")); monoResult.doOnNext(new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } }).block(); Flux<String> fluxResult = demoService.requestFluxWithFluxArg(Flux.just("A","B","C"), Flux.just("1","2","3")); fluxResult.doOnNext(new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } }).blockLast(); } catch (Throwable throwable) { throwable.printStackTrace(); } } } }
能够看到配置上除了协议名使用 rsocket 之外其余并无特殊之处。
实现原理
之前用户并不能在参数或者返回值里使用 Mono/Flux 这种流对象(reative-stream 里的流的概念)。由于流对象自带异步属性,当业务把流对象做为参数或者返回值传递给框架以后,框架并不能将流对象正确的进行序列化。
dubbo 基于 RSocket 实现了 reative 支持。RSocket 将 reative 语义的复杂逻辑封装起来了,给上层提供了简洁的抽象以下:
/** * Fire and Forget interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} that completes when the passed {@code payload} is successfully * handled, otherwise errors. */ Mono<Void> fireAndForget(Payload payload); /** * Request-Response interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing at most a single {@code Payload} representing the * response. */ Mono<Payload> requestResponse(Payload payload); /** * Request-Stream interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing the stream of {@code Payload}s representing the response. */ Flux<Payload> requestStream(Payload payload); /** * Request-Channel interaction model of {@code RSocket}. * * @param payloads Stream of request payloads. * @return Stream of response payloads. */ Flux<Payload> requestChannel(Publisher<Payload> payloads);
咱们只须要在此基础上添加咱们的 rpc 逻辑便可。
通过上面的分析,咱们知道了 Dubbo 如何基于 RSocket 实现了响应式编程的支持。有了响应式编程支持,业务能够更加方便的实现异步逻辑。
当前 Dubbo 3.0 将提供具有当代特性(如响应性编程)的相关支持,同时汲取阿里内部 HSF 的设计长处来实现二者的融合,当前预览版的不少地方还在探讨中,但愿你们可以积极反馈,咱们都会虚心学习并参考。
原文连接 本文为云栖社区原创内容,未经容许不得转载。