title: dubbo Protocol实现剖析
date: 2018-09-09 19:10:07
tags:
---
2.6.3版本,以前读的是2.4.9版本
本篇主要阐述dubbo rpc的com.alibaba.dubbo.rpc.Protocol的实现,包括做用,用法,原理等等。html
根据类与接口关系文档能够看到Protocol实现关系以下:
264 Protocol
--264.1 QosProtocolWrapper
--264.2 RegistryProtocol
--264.3 AbstractProtocol
----264.3.1 AbstractProxyProtocol
------264.3.1.1 HessianProtocol
------264.3.1.2 HttpProtocol
------264.3.1.3 RestProtocol
------264.3.1.4 RmiProtocol
------264.3.1.5 WebServiceProtocol
----264.3.2 DubboProtocol
----264.3.3 InjvmProtocol
----264.3.4 MemcachedProtocol
----264.3.5 RedisProtocol
----264.3.6 ThriftProtocol
----264.3.7 MockProtocol
--264.4 InjvmProtocol
--264.5 ProtocolFilterWrapper
--264.6 ProtocolListenerWrapper
默认会用到 QosProtocolWrapper RegistryProtocol(若是有注册中心) DubboProtocol InjvmProtocol ProtocolFilterWrapper ProtocolListenerWrapper。
java
在META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol中作了装配
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
qos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapperweb
要看懂protocol的实例化与export过程最好先了解下扩展点机制,前面有文章写过。 此处要须要注意ExtensionLoader类的下面代码:redis
private Class<?> createAdaptiveExtensionClass() { String code = createAdaptiveExtensionClassCode(); ClassLoader classLoader = findClassLoader(); com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); return compiler.compile(code, classLoader); }
再协议初始化与export时会动态生成的两个类的代码: Protocol$Adaptive,ProxyFactory$Adaptive,看上面code变量就是生成的具体代码了。缓存
在扩展点机制com.alibaba.dubbo.common.extension.ExtensionLoader.createExtension(String)中:
在建立扩展实例时,会同时对他的wrap类的实例进行建立,代码细节以下:app
T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } injectExtension(instance);// 此时是DubboProtocol示例 Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (wrapperClasses != null && !wrapperClasses.isEmpty()) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));// 此处建立wrap实例 } }
实际应用中,以dubbo协议这个扩展为例,当创上述代码建了DubboProtocol协议的instance,此时是DubboProtocol实例,接下来对cachedWrapperClasses进行for循环,建立对应的wrap类,这些wrap类都有一个以Protocol类型的参数,那么建立wrap实例的时候,会把以前产生的Protocol实例传递给他。
此例,cachedWrapperClasses里的数据来自于META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol文件中配置的是wrap的扩展,
此处是:
class com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
class com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper,
class com.alibaba.dubbo.qos.protocol.QosProtocolWrapper,
(怎么从配置文件中识别是不是wrap类?答案是 只要这个QosProtocolWrapper类有类型为Protocol的构造函数存在,就当QosProtocolWrapper是wrap类。)
那么此处便会依次构造这三个类。 这个三个类的构造顺序不固定,由于用的上面createExtension代码中cachedWrapperClasses是ConcurrentHashSet维护的。负载均衡
此处是以provider端为例
DubboProtocol.
ProtocolListenerWrapper.
级联的wrap类是怎么获取
部分
QosProtocolWrapper.
ProtocolFilterWrapper.
ProtocolListenerWrapper.
QosProtocolWrapper.
ProtocolFilterWrapper.
ProtocolFilterWrapper.export(Invoker
QosProtocolWrapper.export(Invoker
ProtocolListenerWrapper.export(Invoker
InjvmProtocol.export(Invoker
RegistryProtocol.
//
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker);
针对 registryURL 先建立invoker,再用这个invoker建立wrapperInvoker,再用wrapperInvoker作exporter。
ProtocolListenerWrapper.
QosProtocolWrapper.
ProtocolFilterWrapper.
----
ProtocolFilterWrapper.export(Invoker
QosProtocolWrapper.export(Invoker
ProtocolListenerWrapper.export(Invoker
RegistryProtocol.export(Invoker
ProtocolFilterWrapper.export(Invoker
String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } }
QosProtocolWrapper.export(Invoker
ProtocolListenerWrapper.export(Invoker
DubboProtocol.export(Invoker
rpc过滤器链的组织建立者。
若是是注册中心 级联export的,直接级联wrap的protocol进行export,不作其余处理。
若是是非注册中心 级联export的,好比injvm dubbo这种协议级联的,作buildInvokerChain 构建调用中过滤器链的处理。 就是前面博客介绍的rpc过滤器com.alibaba.dubbo.rpc.Filter的实现实例串起来。经过循环加内部类访问外围变量的方式组成的链。贴下关键代码:
for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { // ...... public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } // ...... }; }
next与last配合使用造成了链。
若是是非注册中心 级联export的,直接级联wrap的protocol进行export,不作其余处理
若是是注册中心 级联export的,启动QoS server。参见com.alibaba.dubbo.qos.server.Server。
用于服务export时候监听机制的插入。
若是是注册中心 级联export的,直接级联wrap的protocol进行export,不作其余处理。
若是是非注册中心 级联export的,好比injvm dubbo这种协议级联的,先级联wrap的protocol进行export,再建立ListenerExporterWrapper实例。 主要是为了让ExporterListener监听器插入进来。
对应注册中心协议
先作本地的export
而后根据配置将 registry://127.0.0.1:2174/com.alibaba.dubbo.registry.RegistryService?application=hello-...置换成具体的注册中心协议地址,如: zookeeper://127.0.0.1:2174/com.alibaba.dubbo.registry.RegistryService?application=hello-world-...
根据注册中心协议地址 或者Registry-->ZookeeperRegistry的实例。
判断是不是非延迟暴露的,就开始注册,比较简单:
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
能够经过其父类FailbackRegistry
看到,支持失败重试,由一个定时调度线程重试。
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { // Check and connect to the registry try { retry(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
Failback是一种容错,详细的能够参见 常见容错机制。
注册以后是对OverrideListener 的处理。
该export返回的是DestroyableExporter实例,顾名思义是能够销毁的exoprter,在其unexport里作了文章,移除注册,移除export。
先看下consumer端发起调用时的链路流程:
+---------------------------+ +---------------------------+ +---------------------------+ | helloService | | proxy | | InvokerInvocationHandler | | sayHello +----------> | sayHello +----------> | invoke | | | | | | proxy method args | +---------------------------+ +---------------------------+ +-------------+-------------+ | | +---------------------------------+ | | | | +------------v--------------+ | | | MockClusterInvoker | | | | invoke | | | | | | | +------------+--------------+ | | | | | | | | | | +---------------------------+ +---------------------------+ | +------------v--------------+ | | Router | | RegistryDirectory | | | FailoverClusterInvoker | | | route | <----------+ list | <-----------+ invoke | | | MockInVokersSelector | | INVOCATION-->List INVOKER | | | | | +------------+--------------+ +---------------------------+ | +---------------------------+ | | | | | +---------------------------------+ | cluster invoke,分布式调用容错机制也是在这作 | | | | | +-------------v-------------+ +---------------------------+ +---------------------------+ | RandomLoadBalance | |InvokerDelegate | | ListenerInvokerWrap | | select +-----------> |invoke +-----------> | invoke | | List INVOKER-->INVOKER | | | | | +---------------------------+ +---------------------------+ +---------------------------+
RegistryProtocol 注册中心协议 对接口协议接口约束,不论何种注册中心都走这个实现。
RegistryDirectory 提供由服务目录查找出真正能提供服务的全部提供方
Registry 各个注册中心的实现的接口约束,具体实现有 RedisRegistry ZookeeperRegistry MulticastRegistry DubboRegistry。 主要完成注册这个逻辑。 就是把本身的信息写入注册中心。
Cluster 多服务提供方的一个封装 。他们负责建立多服务调用者实例。 好比FailoverCluster负责建立FailoverClusterInvoker实例。Cluster的实现是以分布式容错机制Failover,Failfast,Failback等这些维度进行。 对应invoker也是按照这个维度划分,好比FailbackClusterInvoker,FailfastClusterInvoker,BroadcastClusterInvoker等等。
Router 路由机制
LoadBalance 负载均衡机制
Invoker 调用者。 按分布式容错机制的维度进行划分了多个实现,与 Cluster实现一一对应。
基本上描述了 由consumer的ref类-->proxy-->InvokerInvocationHandler-->clusterinvoker-->服务目录查找-->路由机制-->负载均衡机制-->调用委托-->调用监听器。
Invoker的实现结构:
--61.1 Invoker
----61.1.1 DelegateProviderMetaDataInvoker
----61.1.2 ConsumerInvokerWrapper
----61.1.3 ProviderInvokerWrapper
----61.1.4 AbstractClusterInvoker
------61.1.4.1 AvailableClusterInvoker
------61.1.4.2 BroadcastClusterInvoker
------61.1.4.3 FailbackClusterInvoker
------61.1.4.4 FailfastClusterInvoker
------61.1.4.5 FailoverClusterInvoker
------61.1.4.6 FailsafeClusterInvoker
------61.1.4.7 ForkingClusterInvoker
----61.1.5 MergeableClusterInvoker
----61.1.6 MockClusterInvoker
----61.1.7 ListenerInvokerWrapper
----61.1.8 AbstractInvoker
------61.1.8.1 ChannelWrappedInvoker
------61.1.8.2 DubboInvoker
------61.1.8.3 InjvmInvoker
------61.1.8.4 ThriftInvoker
----61.1.9 InvokerWrapper
----61.1.10 AbstractProxyInvoker
----61.1.11 DelegateInvoker
----61.1.12 MockInvoker
ref完成的逻辑是:
建立服务目录实例 RegistryDirectory
建立ClusterInvoker,默认是FailoverClusterInvoker..
ProtocolFilterWrapper, QosProtocolWrapper, ProtocolListenerWrapper的ref逻辑与export基本相同,再也不赘述。