当RegistryDirectory#substribe()方法被RegistryProtocol#refer()方法调用时,本地服务消费端会与注册中心交互,拉取最新的服务提供者,并与这些服务提供者创建TCP链接。html
public void subscribe(URL url) { setConsumerUrl(url); CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); serviceConfigurationListener = new ReferenceConfigurationListener(this, url); registry.subscribe(url, this); }
从上面的代码块能够知道RegistryDirectory直接调用的注册中心的substribe()方法。咱们以ZookeeperRegistry为例,查看其方法doSubscribe()。缓存
public void doSubscribe(final URL url, final NotifyListener listener) { ...... } else { // 正常服务订阅 List<URL> urls = new ArrayList<>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); //监听,当目录变动时,调用notify方法 ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds))); zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } //订阅节点后,要拉取节点的最新数据 notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
从上面代码,能够知道就是监听zookeeper上的providers,routers,configurations节点,并注册其监听器。当订阅完这些节点后,须要从新拉取最新的提供者数据,即调用其notify()方法。服务器
notify()方法最终会调用RegistryDirectory的notify()方法。该方法的主要完成以下内容:
一、获得zookeeper的configurations节点下的URLS,并转化为configurators
二、获得zookeeper的routers节点下的URLS,并转化为Routers
三、激活3.x的AddressListener特性
四、获得zookeeper的providers节点下的URLS,与其服务提供者建立TCP连接,把URL转化为 invokerapp
咱们主要来看下第四点,其方法为refreshOverrideAndInvoker()。框架
private void refreshOverrideAndInvoker(List<URL> urls) { // mock zookeeper://xxx?mock=return null overrideDirectoryUrl(); refreshInvoker(urls); }
该方法主要是2个操做,1个是若是必要的话,从新覆盖订阅的URL,由于dubbo的服务调用URL的一些配置,好比路由,mock能够在monitor中心进行动态修改。因此须要从新覆盖本地的URL一些参数。二、是经过refreshInvoker()与服务端创建TCP连接。less
/** * * 把提供者者的URL List 转化为 Invoker Map结合,转化规则以下: * Convert the invokerURL list to the Invoker Map. The rules of the conversion are as follows: * <ol> * * <li> If URL has been converted to invoker, it is no longer re-referenced and obtained directly from the cache, * and notice that any parameter changes in the URL will be re-referenced.</li> * 若是URL已经在缓存中,则不用从新引用该服务提供者(即从新创建TCP链接),若是URL的参数变动须要从新引用。 * * * <li>If the incoming invoker list is not empty, it means that it is the latest invoker list.</li> * 若是传入的调用程序列表不是空的,这意味着它是最新的调用程序列表 * * <li>If the list of incoming invokerUrl is empty, It means that the rule is only a override rule or a route * rule, which needs to be re-contrasted to decide whether to re-reference.</li> * </ol> * 若是传入的invokerUrl列表为空,则意味着该规则只是一个覆盖规则或路由规则,须要对其进行从新对比以决定是否从新引用 * * @param invokerUrls this parameter can't be null */ // TODO: 2017/8/31 FIXME The thread pool should be used to refresh the address, otherwise the task may be accumulated. private void refreshInvoker(List<URL> invokerUrls) { Assert.notNull(invokerUrls, "invokerUrls should not be null"); //若是只有一个提供者,且为空协议,则禁止连接和销毁invoker if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // Forbid to access this.invokers = Collections.emptyList(); routerChain.setInvokers(this.invokers); destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow to access Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls == Collections.<URL>emptyList()) { invokerUrls = new ArrayList<>(); } if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<>(); this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison } if (invokerUrls.isEmpty()) { return; } Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map /** * If the calculation is wrong, it is not processed. * * 1. The protocol configured by the client is inconsistent with the protocol of the server. * eg: consumer protocol = dubbo, provider only has other protocol services(rest). * 2. The registration center is not robust and pushes illegal specification data. * */ if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls .toString())); return; } List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); // pre-route and build cache, notice that route cache should build on original Invoker list. // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. routerChain.setInvokers(newInvokers); this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }
private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } Set<String> keys = new HashSet<>(); //目前的协议 String queryProtocols = this.queryMap.get(PROTOCOL_KEY); //服务提供方URLproviderUrl ,看这些提供方是否支持目前协议 for (URL providerUrl : urls) { // If protocol is configured at the reference side, only the matching protocol is selected if (queryProtocols != null && queryProtocols.length() > 0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } if (!accept) { continue; } } //空协议过滤 if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } //没有在Spi框架找不大的扩展点,过滤 if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions())); continue; } // 合并url,通常参数配置可能配置在消费端,提供端,须要进行合并。合并规则为:override(配置中心) > -D(启动运行指定) >Consumer(消费端) > Provider(提供方) URL url = mergeUrl(providerUrl); String key = url.toFullString(); // The parameter urls are sorted if (keys.contains(key)) { // Repeated url continue; } keys.add(key); // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again /** *key:是没有合并消费端端配置参数的Url(provider端), * 缓存键是不与用户端参数合并的url,不管用户如何合并参数,若是服务器url更改,则再次引用 * * */ Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) {// 看本地缓存是否存在,若是存在// Not in the cache, refer again try { boolean enabled = true; if (url.hasParameter(DISABLED_KEY)) {//是否disable enabled = !url.getParameter(DISABLED_KEY, false); } else { enabled = url.getParameter(ENABLED_KEY, true); } if (enabled) { /** * 把rpc invoker 、mergeUrl(override > -D >Consumer > Provider 参数内容),原providerUrl * url: getProtocol()=dubbo */ invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); } if (invoker != null) { // Put new invoker in cache newUrlInvokerMap.put(key, invoker); } } else { newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }
上面的注释已经很是清楚,不在详细讲解。最后因此提供者的URL,会被转化为InvokerDelegate。该类表明一个Invoker对象的委派类,里面包括真实的Invoker和相应的提供者的URL。并把这些InvokerDelegate放入到newUrlInvokerMap成员变量上。异步
来看下以下这条语句:
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
又是经过protocol.refer(serviceType, url)获取一个Invoker,此时URL的getProtocol()==dubbo,因此会调用DubboProtocol#refer()方法。而DubboProtocol的refer()仍是AbstractProtocol#refer()。ide
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { /** * 异步转同步Invoker */ return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); }
即返回一个异步转同步的AsyncToSyncInvoker。DubboProtocol实现模板方法protocolBindingRefer()。优化
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException { // 优化序列化内容,目前没什么内容 optimizeSerialization(url); // create rpc invoker. /** * * 建立RPC DubboInvoker */ DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
该方法主要建立了DubboInvoker对象,并放入invokers中,经过getClients()方法获得具体的TCP链接客户端ExchangeClient。ui
从上面的分析能够知道,服务订阅的过程,服务拉取的方式是经过通知这种方式来获取,而且知道了Invoker的具体一个实现DubboInvoker和TCP链接客户端ExchangeClient进行关联的,在下一章咱们将剖析ExchangeClient是如何实现的。