消费者在启动以后,会经过ReferenceConfig#get()
来生成远程调用代理类。在get
方法中,会启动一系列调用函数,咱们来一个个解析。java
配置一样包含2种:spring
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <dubbo:application name="first-consumer-xml"/> <dubbo:registry address="zookeeper://127.0.0.1:2181/"/> <dubbo:reference proxy="javassist" scope="remote" id="demoService" generic="false" check="false" async="false" group="dubbo-sxzhongf-group" version="1.0.0" interface="com.sxzhongf.deep.in.dubbo.api.service.IGreetingService"> <dubbo:method name="sayHello" retries="3" timeout="5000" mock="false" /> <dubbo:method name="testGeneric" retries="3" timeout="5000" mock="false" /> </dubbo:reference> </beans>
public class ApiConsumerApplication { public static void main(String[] args) { // 1. 建立服务引用对象实例 ReferenceConfig<IGreetingService> referenceConfig = new ReferenceConfig<IGreetingService>(); // 2. 设置应用程序信息 referenceConfig.setApplication(new ApplicationConfig("deep-in-dubbo-first-consumer")); // 3. 设置注册中心 referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181/")); // 4. 设置服务接口和超时时间 referenceConfig.setInterface(IGreetingService.class); // 默认重试3次 referenceConfig.setTimeout(5000); // 5 设置服务分组和版本 referenceConfig.setGroup("dubbo-sxzhongf-group"); referenceConfig.setVersion("1.0.0"); // 6. 引用服务 IGreetingService greetingService = referenceConfig.get(); // 7. 设置隐式参数 RpcContext.getContext().setAttachment("company", "sxzhongf"); // 获取provider端传递的隐式参数(FIXME: 须要后续追踪) // System.out.println("年龄是:" + RpcContext.getContext().getAttachment("age")); //8. 调用服务 System.out.println(greetingService.sayHello("pan")); } }
new ReferenceConfig
在此阶段,会初始化org.apache.dubbo.config.AbstractConfig
& org.apache.dubbo.config.ReferenceConfig
的静态变量以及静态代码块。apache
ReferenceConfig#get
ReferenceConfig#init
json
DubboBootstrap
启动dubbo。URL.buildKey(interfaceName, group, version)
这段用来生成惟一服务的key,因此咱们以前说dubbo的惟一标识是经过全路径
和group以及version来决定的。org.apache.dubbo.config.utils.ConfigValidationUtils#checkMock
来检查咱们mock是否设置正确。dubbo
的注册地址,默认为当前主机IPReferenceConfig#createProxy
建立调用代理开始ReferenceConfig#shouldJvmRefer
首先判断是否为Injvm
调用injvm
,判断是否为peer to peer
端对端设置,若是为p2p,那么就直连url配置转换URL
api
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=deep-in-dubbo-first-consumer&dubbo=2.0.2&pid=9780&refer=application%3Ddeep-in-dubbo-first-consumer%26dubbo%3D2.0.2%26group%3Ddubbo-sxzhongf-group%26interface%3Dcom.sxzhongf.deep.in.dubbo.api.service.IGreetingService%26methods%3DsayHello%2CtestGeneric%26pid%3D9780%26register.ip%3D192.168.14.99%26release%3D2.7.5%26revision%3D1.0.0%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D5000%26timestamp%3D1582959441066%26version%3D1.0.0®istry=zookeeper&release=2.7.5×tamp=1582961922459
REF_PROTOCOL.refer(interfaceClass, urls.get(0));
来将URL
转为Invoker
对象,由于private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
是扩展是Adaptive
,所以在这里会执行Protocol$Adaptive#refer
方法,又因为protocol
参数值为registry
,所以会只是RegistryProtocol#refer
,又因为被Wrapper
类装配,所以会先执行三个Wrapper类,最后才能执行到RegistryProtocol#refer -> RegistryProtocol#doRefer
,在doRefer
方法中会订阅服务提供者地址,最后返回Invoker
对象。那么到底是如何生成的Invoker
对象呢?咱们来看下具体代码:服务器
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 1.能够查寻RegistryDirectory & StaticDirectory 区别 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); //2. 封装订阅所用URL URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); registry.register(directory.getRegisteredConsumerUrl()); } //3.build路由规则链 directory.buildRouterChain(subscribeUrl); //4.订阅服务提供者地址 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); //5.封装集群策略到虚拟invoker Invoker invoker = cluster.join(directory); return invoker; }
上述代码中,步骤1根据URL生成了一个RegistryDirectory
(关于Directory接口的做用,能够自行查询一些,直白一些就是将一堆Invoker对象封装成一个List<Invoker>,只有2种实现RegistryDirectory & StaticDirectory
,从命名可看出一个是动态可变,一个不可变),代码2 封装了订阅所要使用的参数信息,代码3则是封装绑定路由规则链,代码4订阅。代码5调用 Cluster$Adaptive#join
方法生成Invoker
对象。app
在代码2种从zk获取服务提供者信息:
一旦zk返回服务提供者列表以后,就会调用RegistryDirectory#notify
,以下:负载均衡
在org.apache.dubbo.common.utils.UrlUtils#isMatch
中对provider和consumer的api进行匹配校验。继续跟踪:RegistryDirectory#notify -> RegistryDirectory#refreshOverrideAndInvoker -> RegistryDirectory#refreshInvoker -> RegistryDirectory#toInvokers
在toInvokers
正式将URL转换为Invoker
,经过invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
在这里protocol#refer
一样执行顺序如:jvm
(dubbo 2.7.5) protocol#refer -> protocol$Adaptive#refer -> QosProtocolWrapper#refer -> ProtocolListenerWrapper#refer -> ProtocolFilterWrapper#refer ->AbstractProtocol#refer->DubboProtocol#protocolBindingRefer
,调用代码以下:async
@Override public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
关注getClients
,其中执行了DubboProtocol#getSharedClient -> DubboProtocol#initClient
建立netty client进行链接。
由于这里使用的是明确的DubboInvoker
,在回调的时候,Wrapper会对该Invoker进行包装,执行效果以下:
所以,会执行到ProtocolFilterWrapper#buildInvokerChain
,该函数会对服务进行调用链跟踪:
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; // 获取全部在MATA-INF文件中配置的激活的责任连接口 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { asyncResult = filter.invoke(next, invocation); } catch (Exception e) { if (filter instanceof ListenableFilter) {// Deprecated! Filter.Listener listener = ((ListenableFilter) filter).listener(); if (listener != null) { listener.onError(e, invoker, invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; listener.onError(e, invoker, invocation); } throw e; } finally { } return asyncResult.whenCompleteWithContext((r, t) -> { if (filter instanceof ListenableFilter) {// Deprecated! Filter.Listener listener = ((ListenableFilter) filter).listener(); if (listener != null) { if (t == null) { listener.onMessage(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; if (t == null) { listener.onMessage(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } else {// Deprecated! filter.onResponse(r, invoker, invocation); } }); } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }
全部的负载均衡、容错策略等都是在这里绑定的。
7.若是有多个注册中心,将会循环执行步骤6,将URL转换为Invoker
对象,而后添加到一个List,分别进行注册以后,而后判断最后一个注册中心url
是否有效,针对多订阅的场景,URL中添加cluster
参数,默认使用org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster
策略,使用org.apache.dubbo.rpc.cluster.Cluster#join
将多个Invoker
对象封装一个虚拟的Invoker
中,不然若是最后一个注册中心是无效的,直接封装Invoker
对象。
8.建立服务代理ProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker<T>)
,由于ProxyFactory
是一个适配类。那么一样这里会调用ProxyFactory$Adaptive#getProxy
,这里最终就是返回一个代理服务的Invoker对象。
至此,咱们的消费端的绑定远程zk的服务就已经结束了。那么,咱们在调用服务器方法的时候服务器端和客户端都是如何处理的呢?下节咱们将继续分析。