[toc]html
本篇文章,将开始分析 Dubbo 集群容错方面的源码。集群容错源码包含四个部分,分别是服务目录 Directory、服务路由 Router、集群 Cluster 和负载均衡 LoadBalance。 这四个接口都是 dubbo-cluster
工程中定义的。java
相关文档推荐:spring
public interface Directory<T> extends Node { // 1. 获取 serviceInterface Class<T> getInterface(); // 2. 获取指定 serviceInterface 对应的服务接口实例 List<Invoker<T>> list(Invocation invocation) throws RpcException; }
总结: Directory 只负责管理单个 serviceInterface 对应的实例。这里出现了 Dubbo 领域模型中的两个核心概念,Invoker
和 Invocation
,经过会话的参数 invocation 能够获取其可执行体 invokers 列表,进而发起远程调用。apache
Invoker
是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它表明一个可执行体,可向它发起 invoke 调用,它有多是一个本地的实现,也多是一个远程的实现,也可能一个集群实现。Invocation
是会话域,它持有调用过程当中的变量,好比方法名,参数等。Protocol
是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。服务目录目前内置的实现有两个,分别为 StaticDirectory 和 RegistryDirectory,它们均是 AbstractDirectory 的子类。AbstractDirectory 实现了 Directory 接口。下面咱们来看一下他们的继承体系图。缓存
总结: 服务目录 Directory
负载管理单个服务接口对应的全部实例。它有两个实现:app
StaticDirectory
顾名思义,serviceInterface 对应的服务提供者是一成不变的,即从配置文件中读取服务列表信息。RegistryDirectory
从注册中心动态获取指定 serviceInterface 对应的服务提供者。Directory 接口设计原则分析:负载均衡
Directory
核心方法为 List<Invoker<T>> list(Invocation invocation)
,该方法只关注核心的功能,经过调用参数 invocation 获取执行实例 invokers。Directory 接口自己只有读,没有写功能。AbstractDirectory
定义了一些通用实现,增长了路由 routerChain 和订阅者 consumerUrl 的信息。StaticDirectory/RegistryDirectory
具备写的功能。StaticDirectory 是经过构造器传入的,不能动态更新。而 RegistryDirectory 进一步实现了 NotifyListener 接口,当服务接口对应的注册信息发生变化时会回调 notify(URL url, NotifyListener listener, List<URL> urls)
方法,通知 RegistryDirectory 更新 Invoker 列表,具备动态写的功能。另外,Directory 继承自 Node 接口,Node 这个接口继承者比较多,像 Registry、Monitor、Invoker 等均继承了这个接口。这个接口包含了一个获取配置信息的方法 getUrl,实现该接口的类能够向外提供配置信息。ide
StaticDirectory#getUrl 通常为 null,RegistryDirectory 对应的 URL 示例以下。老实说在我看来 Dubbo URL 本质上是一个配置类,各类配置信息都会转换成 URL,致使 URL 的理解有些困难,有时候真不知道这个 URL 到底表明什么意思。oop
## RegistryDirectory#getUrl() -> Nacos 注册中心地址 registry://192.168.139.101:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=24924&qos.port=33333&refer=application%3Ddubbo-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.demo.DemoService%26lazy%3Dfalse%26methods%3DsayHello%26pid%3D24924%26qos.port%3D33333%26register.ip%3D192.168.139.1%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1570940706766®istry=nacos×tamp=1570940706897
StaticDirectory 很简单,就不介绍了,下面主要介绍 RegistryDirectory 。源码分析
上面也提到了 RegistryDirectory 主要有两方面的功能,一是读功能,根据会话参数 invocation 获取 Invoker 可执行体列表;二是写功能,当注册中心服务发生变化时更新 Invoker 列表。
总结: RegistryDirectory#list 委托给 doList 方法获取服务列表,doList 通过路由规则过滤后将可用的执行体列表返回。其中 routerChain 持有所有的 invokers,当调用 notify -> refreshOverrideAndInvoker -> refreshInvoker -> routerChain.setInvokers(newInvokers)
时都会更新 routerChain 持有的 invokers。
@Override public List<Invoker<T>> doList(Invocation invocation) { if (forbidden) { // 1. No service provider 2. Service providers are disabled ... } List<Invoker<T>> invokers = null; try { // getConsumerUrl 返回服务订阅者的URL invokers = routerChain.route(getConsumerUrl(), invocation); } catch (Throwable t) { } return invokers == null ? Collections.emptyList() : invokers; }
RegistryDirectory 除了实现 Directory 接口来获取服务列表信息外,还实现了 NotifyListener 接口,动态更新服务列表。相对于服务获取,服务更新要复杂的多。
RegistryDirectory 持有 Registry 注册中心实例,须要首先订阅指定的服务 consumer url,这样当这个服务发生变化时就会调用 notify 通知 RegistryDirectory 更新服务列表。
总结: RegistryDirectory 首先会订阅 consumerUrl,这样当服务发生变化时会 notify 通知更新服务列表。
RegistryDirectory 的初始化在 DubboRegistryFactory、RegistryProtocol#doRefer 都会有建立,前者是 Dubbo 自带的注册中心,是基于内存的注册中心,在 dubbo-registry-default 工程中,后者则是整合其它已有注册中心的实现。一般,基于注册中心的服务引入都是通过 RegistryProtocol#doRefer 建立的。下面的源码分析也是对 RegistryProtocol 进行分析。
/** * RegistryProtocol:建立 type 的远程代理 @Reference * @param registry 注册中心实例 * @param type 服务接口类型 * @param url 注册中心地址 */ private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 建立 RegistryDirectory 实例。type是订阅的服务接口类型,url是注册中心地址。 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); // 设置注册中心和协议 directory.setRegistry(registry); directory.setProtocol(protocol); // 生成服务消费者连接 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); // 设置服务策略 directory.buildRouterChain(subscribeUrl); // 订阅 providers、configurators、routers 等节点数据 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); ... }
总结: RegistryDirectory 构造过程最主要的任务:
registry.subscribe(url, this)
protocol.refer(serviceType, url)
routerChain.route(getConsumerUrl(), invocation)
directory.subscribe(url)
// url指的是注册中心地址,serviceType是服务接口的类型 public RegistryDirectory(Class<T> serviceType, URL url) { super(url); this.serviceType = serviceType; //订阅的服务接口类型 this.serviceKey = url.getServiceKey(); //{group/}serivceInterface{:version} this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url); String group = directoryUrl.getParameter(GROUP_KEY, ""); this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(",")); }
思考:RegistryDirectory 的主要属性都是经过 set 方法进行设置,构造器的参数有重合。
构造 RegistryDirectory 后会调用 subscribe 订阅服务列表,返回 url.serviceInterface 对应的服务列表。
public void subscribe(URL url) { setConsumerUrl(url); CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); serviceConfigurationListener = new ReferenceConfigurationListener(this, url); registry.subscribe(url, this); }
订阅服务后,当服务更新时通知 RegistryDirectory 更新本地的服务列表,也是 RegistryDirectory 最复杂的一部分。根据从注册中心获取的 invokerUrls 生成更新 invokers 列表。若是不存在则建立新 Invoker,若是已经存在则忽略。
@Override public synchronized void notify(List<URL> urls) { // 按 category 分类存储,服务提供者url,路由url,外部化配置url Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(url -> { if (UrlUtils.isConfigurator(url)) { return CONFIGURATORS_CATEGORY; } else if (UrlUtils.isRoute(url)) { return ROUTERS_CATEGORY; } else if (UrlUtils.isProvider(url)) { return PROVIDERS_CATEGORY; } return ""; })); // configuratorURLs List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); // routerURLs List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters); // providerURLs List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); refreshOverrideAndInvoker(providerURLs); }
总结: 虽然服务列表的更新比较复杂,但这段代理的逻辑仍是很清楚的。
将订阅 serviceInterface 对应的服务列表按 category 进行分类。providers、routers、configurators。
将 configuratorURLs 转化为 Configurator。外部化配置的 Configurator 具备更高的优先级。保存在变量 configurators 中。
将 routerURLs 转化为 Router。经过 routerChain.addRouters(routers)
设置到变量 routerChain 中。
将 providerURLs 转化为 Invoker。保存在变量 invokers 中。
前三步都很简单,refreshOverrideAndInvoker 的主要逻辑都委托给了 refreshInvoker 方法。
refreshInvoker 根据从注册中心获取的 invokerUrls 生成更新 invokers 列表。若是不存在则建立新 Invoker,若是已经存在则忽略。
private void refreshInvoker(List<URL> invokerUrls) { Assert.notNull(invokerUrls, "invokerUrls should not be null"); if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { // 1. invokerUrls 仅有一个元素,且 url 协议头为 empty,此时表示禁用全部服务 // 设置 forbidden 为 true this.forbidden = true; // Forbid to access this.invokers = Collections.emptyList(); routerChain.setInvokers(this.invokers); // 销毁全部 Invoker destroyAllInvokers(); // Close all invokers } else { // 2. 有可用的url this.forbidden = false; // Allow to access // 2.1 urlInvokerMap保存上一次的invokers Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls == Collections.<URL>emptyList()) { invokerUrls = new ArrayList<>(); } // 2.2 cachedInvokerUrls保存上一次的invokerUrls if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { // 添加缓存 url 到 invokerUrls 中 invokerUrls.addAll(this.cachedInvokerUrls); } else { // 缓存 invokerUrls this.cachedInvokerUrls = new HashSet<>(); this.cachedInvokerUrls.addAll(invokerUrls); } if (invokerUrls.isEmpty()) { return; } // 2.3 核心方法:将 url 转成 Invoker Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls); // 2.4 转换出错,直接打印异常,并返回 if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { return; } // 2.5 更新routerChain中的invokers列表 List<Invoker<T>> newInvokers = Collections.unmodifiableList( new ArrayList<>(newUrlInvokerMap.values())); routerChain.setInvokers(newInvokers); // 合并多个组的 Invoker, <methodName, Invoker> 列表映射关系 this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; try { // 2.6 销毁下线服务的 Invoker destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }
总结: refreshInvoker 涉及到几个集合,简单的说明一下:urlInvokerMap
缓存上一次的服务列表;cachedInvokerUrls
缓存上一次的 URL。
invokerUrls
只有一个 empty 协议的服务时,说明此时须要禁用服务,销毁全部的服务后返回。此时 forbidden=false。将 providerUrls 转换为 Invoker 对象,返回的对象是一个 <URL#toFullString(),Invoker>
的 Map。其中最核心的代码则是 protocol.refer(serviceType, url)
根据 url 生成 Invoker 对象。另外,URL url=mergeUrl(providerUrl)
也要关心一下,主要合并外部化配置。
private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } Set<String> keys = new HashSet<>(); // 获取消费端配置的协议 String queryProtocols = this.queryMap.get(PROTOCOL_KEY); for (URL providerUrl : urls) { // 1.1 协议匹配,queryProtocols是消费者可接收的协议类型,可有多个, // providerUrl.getProtocol()是服务者提供的协议类型 if (queryProtocols != null && queryProtocols.length() > 0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } // providerUrl协议没法匹配,直接过滤掉 if (!accept) { continue; } } // 1.2 empty 协议,也直接过滤 if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } // 1.3 providerUrl.getProtocol() 不存在,也直接过滤 if (!ExtensionLoader.getExtensionLoader(Protocol.class) .hasExtension(providerUrl.getProtocol())) { continue; } // 2. 合并 url,参数配置 URL url = mergeUrl(providerUrl); // 1.4 忽略重复 url,已经处理过了 String key = url.toFullString(); // The parameter urls are sorted if (keys.contains(key)) { // Repeated url continue; } keys.add(key); // 3.1 匹配缓存中Invoker,若是命中直接添加到新集合newUrlInvokerMap中, // 未命中则生成新的Invoker后添加到新集合newUrlInvokerMap中 Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); // 3.2 缓存未命中,真正将 providerUrl -> Invoker if (invoker == null) { // Not in the cache, refer again try { boolean enabled = true; // 匹配参数:disable或enable,是否容许生成代理 if (url.hasParameter(DISABLED_KEY)) { enabled = !url.getParameter(DISABLED_KEY, false); } else { enabled = url.getParameter(ENABLED_KEY, true); } // * 核心方法:调用 refer 获取 Invoker if (enabled) { invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { } if (invoker != null) { // Put new invoker in cache // 将 invoker 存储到 newUrlInvokerMap 中 newUrlInvokerMap.put(key, invoker); } } else { // 3.2 缓存未命中,真正将 providerUrl -> Invoker // 将 invoker 存储到 newUrlInvokerMap 中 newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }
总结: toInvokers 代码很长,核心逻辑是 invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl)
,至于其它的逻辑主要都是判断是否须要执行这句代码,生成新的 Invoker。
上面的逻辑大部分都很简单,主要关注一下 mergeUrl(providerUrl)
方法,参数的覆盖规则。
配置文件覆盖规则:外部化配置优先,消费者优先。
private URL mergeUrl(URL providerUrl) { // 1. consumerUrl > providerUrl providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // 2. configuratorUrl > consumerUrl providerUrl = overrideWithConfigurator(providerUrl); providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); ... return providerUrl; } private URL overrideWithConfigurator(URL providerUrl) { // 1. configuratorUrl "override://" providerUrl = overrideWithConfigurators(this.configurators, providerUrl); // 2. configuratorUrl from "app-name.configurators"。针对整个应用 providerUrl = overrideWithConfigurators(CONSUMER_CONFIGURATION_LISTENER.getConfigurators(), providerUrl); // 3. configuratorUrl from "service-name.configurators"。针对应用中的某个服务接口 if (serviceConfigurationListener != null) { providerUrl = overrideWithConfigurators(serviceConfigurationListener.getConfigurators(), providerUrl); } return providerUrl; }
总结: mergeUrl 覆盖原则:外部化配置优先,消费者优先。至于 CONSUMER_CONFIGURATION_LISTENER 和 serviceConfigurationListener 主要是 dubbo-configcenter 的内容。
外部化配置示例:
override://0.0.0.0/org.apache.dubbo.DemoService?category=configurators&dynamic=false&enable=true&application=dubbo-test&timeout=1000
override
:override 协议。0.0.0.0
:表示对全部的服务生效,具体的 IP 则表示只对指定的 IP 生效。必填。org.apache.dubbo.DemoService
:表示只对具体的服务接口生效。必填。category=configurators
:表示这个参数是动态配置类型。必填。dynamic=false
:false 表示持久化数据,当注册方退出时数据仍保存在注册中心。必填。enable=true
:表示规则是否生效,默认为 true。选填。application=dubbo-test
:表示只对指定的应用生效。选填。timeout=1000&...
:若是前面的规则生效,则覆盖相应的配置信息。this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
toMergeInvokerList 方法当订阅者 group 配置有多个时 multiGroup =true,按组合并 invokers。一般状况下,咱们使用 dubbo 时不会设置组,也就是不会走这个方法,直接返回 invokers。
private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) { List<Invoker<T>> mergedInvokers = new ArrayList<>(); Map<String, List<Invoker<T>>> groupMap = new HashMap<>(); // group -> Invoker 列表 for (Invoker<T> invoker : invokers) { String group = invoker.getUrl().getParameter(GROUP_KEY, ""); groupMap.computeIfAbsent(group, k -> new ArrayList<>()); groupMap.get(group).add(invoker); } if (groupMap.size() == 1) { // 1. 只有一个组,直接添加 mergedInvokers.addAll(groupMap.values().iterator().next()); } else if (groupMap.size() > 1) { // 2. 多个组,则须要使用 CLUSTER.join 将同组的 invoker 合并 // { // "dubbo": [invoker1, invoker2, invoker3, ...], // "hello": [invoker4, invoker5, invoker6, ...] // } // 经过集群类合并每一个分组对应的 Invoker 列表 for (List<Invoker<T>> groupList : groupMap.values()) { StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList); staticDirectory.buildRouterChain(); mergedInvokers.add(CLUSTER.join(staticDirectory)); } } else { // 3. invokers.isEmpty() mergedInvokers = invokers; } return mergedInvokers; }
总结: 主要的逻辑是 CLUSTER.join(staticDirectory)
,后期再研究一下这个方法。
天天用心记录一点点。内容也许不重要,但习惯很重要!