思考的过程和设计思想以下:html
一、咱们想要进行远程服务的调用,那么确定要创建网络链接,不妨改用TCP长链接,并设计通讯协议,并封装为一个类,不妨叫作ExchangeClient。用它来进行网络通讯。算法
二、有了能够进行远程通讯的服务对象ExchangeClient后,咱们能够把远程服务封装为一个Invoker对象,这个Invoker对象内部采用自已定义的协议与远程服务器通讯,不妨叫作DubboInvoker,由于采用了dubbo协议来进行网络通讯的。spring
三、有了这个DubboInvoker 我就能够根据dubbo协议与远程服务通讯了,可是我还想在本地增长一些过滤器Filter,或者监听器Listener。不要紧,直接经过责任链模式,把这些Filter与这个DubboInvoker进行连接。返回的一个ProtocolFilterWrapper对象。apache
四、同理,若是须要一些监听器的功能怎么办,一样进行一次封装。把ProtocolFilterWraper封装到Listener类型的Invoker对象,不妨叫作ListenerInvokerWrapper。服务器
五、如今考虑远程服务提供者有不少个,那么我对每一个远程服务都须要有一个ListenerInvokerWrapper的对象。以下:
Demoservice::196.254.324.1 ListenerInvokerWrapper1
Demoservice::196.254.324.2 ListenerInvokerWrapper2
Demoservice::196.254.324.3 ListenerInvokerWrapper3
Demoservice::196.254.324.4 ListenerInvokerWrapper4
Demoservice::196.254.324.5 ListenerInvokerWrapper5
.....网络
六、服务太多了,在本地这样建立太费事了。引入了注册中心,直接把服务注册到服务中心上,而后客户端直接从注册中心拉取。咱们把拉取到的服务,统称为服务目录。而且它是从注册中心拉取到的,那么不妨名字就叫作RegistryDirectory。那么这个服务目录里确定包含了上面的远程服务调用对象ListenerInvokerWrapper。咱们把这些对象放到服务目录的成员上,名字就叫作urlInvokerMap。key: Demoservice::xxxx。value:ListenerInvokerWrapper。app
七、如今咱们能够在本地调用RegistryDirectory对象,与远程服务通讯了,想调哪一个服务就从urlInvokerMap取出一个进行调用便可。可是每次指定一个远程服务器,不只太麻烦了,并且也会形成流量不均匀,负载不平衡。那么咱们就经过经过负载均衡策略来选择一个服务调用。就取名LoadBalance吧。他有个方法select。入参就是咱们的服务目录RegistryDirectory。那么经过LoadBalance.select(RegistryDirectory) 获得一个咱们想要的通讯的远程服务便可。目前负载均衡算法有一致性Hash算法,随机算法、权重轮训算法、最短响应时间算法、最少活跃数算法。负载均衡
八、有了负载均衡算法LoadBalance后,我想要这样的功能,当服务调用失败的时候,我能够重试,或者直接直接失败。那我就把有这种能力服务调用,称为一个集群Cluster。他有一个方法叫作join。入参仍是服务目录RegistryDirectory。返回一个具备快速失败、或者重试的服务调用,不妨叫AbstractClusterInvoker。每一个不一样的策略都去实现它。而且这个对象内部经过LoadBalance来选择一个服务进行调用,失败后的策略(是否重试或失败)由我决定。异步
九、目前咱们已经有了一个XXXclusterInvoker 对象,它具备快速失败或者重试等功能,且具备负载均衡算法的远程服务调用对象。可是有时,这些远程服务提供者这的qps不达标,或者新上线的服务有问题,或者远程服务调用失败后,能够在本地模拟的调用,返回一个mock对象。那么咱们从新对XXXclusterInvoker进行封装,就命名为MockClusterInvoker,具备Mock功能,且具备集群能力。它持有咱们的服务目录RegistryDirectory和XXXclusterInvoker对象。jvm
十、目前咱们已经有了一个MockClusterInvoker对象。可是这个invoker对象和咱们像本地同样调用服务仍是有点差异,最后咱们直接经过Java的动态代理计算Proxy.newInstance()来建立一个具体的服务对象DemoService,而且在InvokeHandler内部调用咱们的MockClusterInvoker对象的invoke 方法。
十一、好比咱们的DubboInvoker是经过Java 异步线程CompletableFuture实现的话,若是须要转为同步,还能够对其封装从异步转为同步的Invoker,不妨命名为AsyncToSyncInvoker。
则最终在服务消费端呈现给咱们以下一个远程服务代理对象。
在上一章节,已经说明了getObject()对象的调用时机,内部调用的ReferenceConfig#init方法,该init()方法主要作了以下几件事情:
一、缺省的配置进行填充,好比registry,application等属性。
二、校验配置是否填写正确,好比<dubbo:reference />中的stub 和mock 是否配置,配置了是否正确。
三、经过SPI机制获取Protocol$Adaptive自适应协议,经过Protocol$Adaptive#refer()方法获得一个MockClusterInvoker对象。该方法的调用内容基本和上面的猜测设计一致。
1)和注册中心创建tcp链接。
2)把当前的订阅服务注册到注册中心上的consumer节点上。
3)从注册中心中把订阅的服务列表拉取到本地,即RegistryDirectory。
4)根据上面相似猜测建立MockClusterInvoker返回。
四、经过SPI机制获取ProxyFactory$Adaptive自适应代理工厂,而后经过这个代理工厂建立动态代理对象,并把这个代理对象赋值给ref属性。
服务的订阅核心就是这条语句,这条语句博大精深。仅仅一条语句把全部的订阅工做完成了。
一、首先根据SPI机制获取自适应的协议对象。语句以下:
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
该语句建立了Protocol$Apdative。它有个自适应refer方法以下:
@Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (type == null) throw new IllegalArgumentException("url == null"); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])"); Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); return extension.refer(type, url); }
二、Protocol$Apdative#refer()方法内部又经过参数的url的协议头和SPI机制获取一个具体的协议。显而易见,url.getProtocol()返回的是registry。由于当前是服务订阅。因此是registry打头。那么返回的Protocol具体类型就是RegistryProtocol。可是Protocol扩展点有包裹类型:ProtocolListenerWrapper、ProtocolFilterWrapper。因此最终返回的是ProtocolListenerWrapper类型的协议。查看这个2个包裹类型的refer()方法:
类ProtocolListenerWrapper
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (UrlUtils.isRegistry(url)) { return protocol.refer(type, url); } return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, INVOKER_LISTENER_KEY))); }
类ProtocolFilterWrapper
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (UrlUtils.isRegistry(url)) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER); }
三、因此Protocol$Apdative#refer()内部的getExtension返回的是ProtocolListenerWrapper的Protocol。又由于url是注册url,因此知足UrlUtils.isRegistry(url)==true.直接进行一次传递调用。
四、最终调到RegistryProtocol#refer()。代码以下:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = getRegistryUrl(url); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); }
即获得注册中心Registry,通常是ZookeeperRegistry。获取注册中心的内容在以前的章节已见过,就不在多说了。接着会调用doRefer()方法。
在看doRefer()方法以前,咱们来看下其定义:
Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url);
出参:
返回值就是咱们须要的Invoker对象。
入参:
cluster:集群对象Cluster$Adaptive,经过Spi获取.内部getExtension获取Cluster,默认为FailoverCluster。
registry:注册中心
type:订阅的接口类型
url:服务注册连接注册中心URL。
五、Cluster的join 接口以下:
Cluster$Adaptive#join()内部实际是默认调用的是FailoverCluster#join()。
而且Cluster扩展点也有其Wrapper类,即MockClusterWrapper。因此Cluster$Adaptive#join()的方法调用
Cluster extension = ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(extName);
返回的extension是MockClusterWrapper,MockClusterWrapper#join()代码以下:
return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); }
因此Cluster$Adaptive#join()返回的Invoker类型是MockClusterInvoker。MockClusterWrapper持有的cluster是FailoverCluster,因此MockClusterInvoker内部持有invoker类型是FailoverClusterInvoker。
六、源码doRefer()
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // new 一个服务目录,订阅服务类型为type 的 RegistryDirectory RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); // 设置注册中心 directory.setRegistry(registry); //设置协议,即Protocol$Adaptive directory.setProtocol(protocol); // all attributes of REFER_KEY //获取订阅参数 Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters()); //构建订阅URL ,以consumer//打头 URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); //把该url注册到注册中心上 if (directory.isShouldRegister()) { directory.setRegisteredConsumerUrl(subscribeUrl); registry.register(directory.getRegisteredConsumerUrl()); } //设置路由链 directory.buildRouterChain(subscribeUrl); //重点,重中之重。这里订阅服务,而且会拉取远程服务invoker 到directory对象的urlInvokerMap成员中。 directory.subscribe(toSubscribeUrl(subscribeUrl)); //由上面分析,获得是MockClusterInvoker Invoker<T> invoker = cluster.join(directory); //查找注册协议监听器,没有设置为空 List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url); if (CollectionUtils.isEmpty(listeners)) { return invoker; } // 若是有其监听器进行监听器onRefer()调用,并返回RegistryInvokerWrapper包裹类型。 RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl); for (RegistryProtocolListener listener : listeners) { listener.onRefer(this, registryInvokerWrapper); } return registryInvokerWrapper; }
/** * 核心,经过配置的元信息,建立一个代理对象 * @param map * @return */ @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) private T createProxy(Map<String, String> map) { // 首先判断本地是否有Service提供者, if (shouldJvmRefer(map)) { //若是有,导出jvm导出refer URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); invoker = REF_PROTOCOL.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { urls.clear(); //指定服务提供者URL。点对点好比在<dubbo:reference url="dubbo://xxxxx:12222"> if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. String[] us = SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (StringUtils.isEmpty(url.getPath())) { url = url.setPath(interfaceName); } if (UrlUtils.isRegistry(url)) { urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // assemble URL from register center's configuration // if protocols not injvm checkRegistry //若是不是jvm 协议,通常是dubbo if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) { //检测注册中心 checkRegistry(); //根据注册中心地址,获得注册服务 //registry://106.52.187.48:2181/org.apache.dubbo.registry.RegistryService // ?application=dubbo-demo-annotation-consumer&dubbo=2.0.2&pid=9757®istry=zookeeper×tamp=1597380362736 List<URL> us = ConfigValidationUtils.loadRegistries(this, false); if (CollectionUtils.isNotEmpty(us)) { for (URL u : us) { //对每一个注册中心URL,获得监控URL URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u); if (monitorUrl != null) { map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } } //若是注册中心之一一个的话,通常就一个注册中心 if (urls.size() == 1) { invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); } else { //多个注册中心时,Protocol$Adaptive List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { //把其获得的Invoker 填入invokers invokers.add(REF_PROTOCOL.refer(interfaceClass, url)); if (UrlUtils.isRegistry(url)) { registryURL = url; // use last registry url } } //多注册中心,多订阅场景 if (registryURL != null) { // registry url is available // for multi-subscription scenario, use 'zone-aware' policy by default URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME); // The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker //经过集群,返回一个invoker invoker = CLUSTER.join(new StaticDirectory(u, invokers)); } else { // not a registry url, must be direct invoke. invoker = CLUSTER.join(new StaticDirectory(invokers)); } } } if (shouldCheck() && !invoker.isAvailable()) { invoker.destroy(); throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } /** * @since 2.7.0 * ServiceData Store */ /** * * 这里是发布元数据信息 */ String metadata = map.get(METADATA_KEY); WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata); if (metadataService != null) { URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map); metadataService.publishServiceDefinition(consumerURL); } // create service proxy //经过动态代理把invoker 转化为具体的服务类型 return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic)); }
上面核心的代码invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0))已分析,接下下来就是经过PROXY_FACTORY.getProxy()建立活动,以后服务调用上进行分析。其余元数据的注册,等以后讲解配置中心时进行讲解。
接下来,以一个图解来描述服务订阅的过程。在下一章节来描述如何具体的拉取远程服务invoker到服务目录RegistryDirectory上的urlInvokerMap。