dubbo版本2.5.3java
咱们这里以zookeeper做为注册中心为例说明。spring
这里说的集群,能够理解为,一个接口服务对应有多个提供者。
在dubbo的调用方(reference)看来,每一个提供方(service)对应一个invoker。
关于一个调用方对应多个提供方的场景大概包括三大类:
1,者调者订阅一个注册中心,此注册中心,同一个服务有多个提供者(以不一样机器,端口,版本等发布的服务)
2,者调者订阅多个注册中心的服务,每一个注册中心都有引用的服务的提供者(一个或者多个)。
3,调用方,经过url配置,提供多个提供者地址,多个地址以分号隔开。
1,2是同一类场景,3是直连场景,这两中场景是互斥,也就是用户配置了reference的url属性,dubbo就不会再订阅注册中心。负载均衡
下面经过代码分析下,这三种场景的集群容错
客户端订阅能够看ReferenceConfig类的createProxy方法里如下代码jvm
if (isJvmRefer) {//引用本地服务,只返回一个exporter不会有集群。 URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else {//应用远程服务 if (url != null && url.length() > 0) { // 用户指定URL,指定的URL多是对点对直连地址,也多是注册中心URL String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) {//用户自定多个直连服务 for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 经过注册中心配置拼装URL List<URL> us = loadRegistries(false); if (us != null && us.size() > 0) {//用户自定多个注册中心 for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.size() == 0) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (urls.size() == 1) {//调用方订阅一个注册中心,或者自定一个直连服务(直连的这种状况不考虑集群,只有一个提供者) //一个注册中心时,这个refprotocol自适应后是RegistryProtocol //一个直连者时,这个refprotocol自适应后是DubboProtocol(若是是duboo协议) invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else {//调用方订阅多个注册中心,或者多个直连地址 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // 用了最后一个registry url } } if (registryURL != null) { // 有 注册中心协议的URL // 对有注册中心的Cluster 只用 AvailableCluster 容错策略 // 对于订阅多个注册中心的,这里其实有两层的容错机制,只是第一层,被强制设置为AvailableCluster 容错策略 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers));//cluster是经过spi机制注入的自适应adaptive实现,场景2执行逻辑 } else { // 不是 注册中心的URL invoker = cluster.join(new StaticDirectory(invokers));//cluster是经过spi机制注入的自适应adaptive实现,场景3执行逻辑 } } }
经过代码咱们看到,对于场景1,引用一个注册中心的场景,会执行
invoker = refprotocol.refer(interfaceClass, urls.get(0));代码ide
经过代码调试,能够发现,refprotocol.refer会调用RegistryProtocol的refer方法最终进入doRefer方法,
会执行以下代码oop
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); return cluster.join(directory);//cluster是经过spi机制注入的自适应adaptive实现。此时的directory是RegistryDirectory类型 }
三种场景实际上,都执行了dubbo SPI机制生成的adaptive的Cluster实现代码
经过dubbo打印日志,能够看到adaptive的Cluster实现代码以下url
package com.alibaba.dubbo.rpc.cluster; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster { public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("cluster", "failover");//能够看到,经过url里的cluster键值获取容错机制,url中没有指定cluster键值,dubbo默认是用failover集群容错策略 if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])"); com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName); return extension.join(arg0); } }
到此,咱们能够看到对于多注册中心的,第一层容错机制被强制设置为available,
而后第二层,就和单个注册中心多服务提供者集群容错机制同样了,即默认为failover容错机制。这里看下这两种容错机制的代码实现
1,failover容错机制
经过spi机制咱们找到Cluster failover扩展FailoverCluster类是这样实现的.net
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } }
接着看FailoverClusterInvoker类,先看它的父类AbstractClusterInvoker,这个类实现了Invoker接口:调试
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; //这里是获取负载均衡策略 List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance);//回调子类的doInvoke方法 }
而后再回到子类看doInvoke方法:日志
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory<T> directory) { super(directory); } @SuppressWarnings({ "unchecked", "rawtypes" }) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); //获取重试次数 +1是由于第一次调用不算重试次数 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重试时,进行从新选择,避免重试时invoker列表已发生变化. //注意:若是列表发生了变化,那么invoked判断会失效,由于invoker示例已经改变 if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); //从新检查一下 checkInvokers(copyinvokers, invocation); } //这里是经过负载均衡策略获取下一个服务提供者 Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List)invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception.若是是业务异常,则再也不重试,直接抛出异常 throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress());//记录调用失败信息 } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); } }
经过代码能够看到,
failvoer集群容错机制,总的逻辑是,以方法重复次数为限制,每次调用若是失败,
就利用负责均衡策略获取下一个提供者(invoker),直到调用成功,或者最后方法超限,抛出异常,
其中中间若是有业务异常,则再也不重试,直接抛出异常。
2,available集群容错机制,咱们找到AvailableCluster类,它只有一个方法
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { //它没经过扩展AbstractClusterInvoker抽象类,而是直接实现它,它没用负载均衡策略,而是简单选择一个可达的服务 return new AbstractClusterInvoker<T>(directory) { public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { for (Invoker<T> invoker : invokers) { if (invoker.isAvailable()) {//获取第一个,可达的服务提供方, return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers); } }; }
经过代码能够看到,
available集群容错机制,则是简单的调用第一个可到达的服务。都不可达是,抛出异常
最后
dubbo自己还有其余集群容错的扩展实现,这里http://www.javashuo.com/article/p-awxzbqol-ce.html