在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。html
Service代理对象初始化环节,涉及到Cluster的初始化,而且调用过程也涉及到Cluster组件的集群容错,接下来将详细讲解Dubbo是如何利用Cluster进行容错处理 及Cluster的种类。java
首先,咱们看下代理对象初始化过程当中 Cluster的组装过程。git
服务引用过程与发布过程的调用链很是相似一样也是api
/**
* Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol
* =>
* Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
*/缓存
服务发布 能够参考:《Dubbo服务发布之服务暴露&心跳机制&服务注册》安全
咱们直接来看核心代码:RegistryProtocol.refer 方法服务器
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 将param中的registry属性,设置为Protocol 删除param中的registry url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); // 链接注册中心,监听reconnect状态改变 Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // 得到服务引用配置参数集合 group="a,b" or group="*" refer中引用远程服务的信息 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); // 分组处理方式 if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } // cluster , registry 注册中心 , type 接口Class类型 , url 注册中心信息 +refer 引用信息 return doRefer(cluster, registry, type, url); } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 建立 RegistryDirectory 对象,并设置注册中心 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // 建立订阅 URL // all attributes of REFER_KEY URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { // 向注册中心注册本身(服务消费者) // category = comsumers , side = consumer // registry注册 /dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer%3A%2F%2F10.8.0.49%2Fcom.alibaba.dubbo.demo.DemoService%3Fapplication%3Ddemo-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D6496%26side%3Dconsumer%26timestamp%3D1533729758117 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } // 向注册中心订阅服务提供者 // 订阅信息 category 设置为providers,configurators,routers 进行订阅 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // 建立 Invoker 对象 基于 Directory ,建立 Invoker 对象,实现统1、透明的 Invoker 调用过程。 return cluster.join(directory); }
doRefer方法网络
接下来咱们看下,RegistryProtocol 是如何实现 cluster的自适配对象(Cluster$Adaptive)和RegistryDirectory 生成Invoker对象的过程。app
public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster { public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("cluster", "failover"); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])"); com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName); return extension.join(arg0); } }
默认获取的是failover对应的通过MockClusterWrapper装饰器装饰的 FailoverCluster对象负载均衡
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); }
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } }
实际上FailoverCluster.join方法实现了 Cluster 到 invoker的转换过程。
FailoverCluster 与 FailoverClusterInvoker 之间是对应的。 其中 FailoverCluster 做用是: 失败转移,当出现失败,重试其它服务器,一般用于读操做,但重试会带来更长延迟。
咱们来看Cluster 和 Invoker 的集成结构图:
MockClusterInvoker mock用于非业务异常时的服务降级 ,非业务异常是指 网络抖动,超时,或者没有服务提供者时等异常条件下 ,服务的临时返回方案。 业务异常并不会走mock 的替代返回。
例如:某商品详情访问接口服务端异常,并不会走咱们的mock逻辑 , 若是网络抖动或者没有服务提供者这种状况会走 mock逻辑,例如返回"商品禁止访问,请稍后重试。"
直接上调用方法
public Result invoke(Invocation invocation) throws RpcException { Result result = null; String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")) { //1. no mock result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } //force:direct mock 强制走moke逻辑 result = doMockInvoke(invocation, null); } else { //fail-mock 异常moke逻辑 try { result = this.invoker.invoke(invocation); } catch (RpcException e) { if (e.isBiz()) { throw e; } else { if (logger.isWarnEnabled()) { logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } result = doMockInvoke(invocation, e); } } } return result; }
mock配置方式有如下两种:
1.远程调用方配置mock参数。配置方式:
<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" check="false" mock="return zhangsan"/>
说明:配置了mock参数以后,好比在调用服务的时候出现网络断连,或者没有服务启动,那么会把这个mock设置的值返回,也就是zhangsan
经过这种方式就能够避免由于服务调用不到而带来的程序不可用问题。
2. 经过制定业务处理类来进行返回。 配置方式:
<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" check="false" mock="true"/>
当服务调用过程当中,网络异常了,它会去自定义的mock业务处理类进行业务处理。所以还须要建立自定义mock业务处理类:
规则:在接口目录下建立自定义mock业务处理类 , 同时实现Service接口 。 命名规则符合:interfaceService + Mock ,而且有无参构造方法
如:
public class DemoServiceMock implements DemoService { @Override public String sayHello(String name) { return "张三李四"; } }
配置完成后,若是出现非业务异常,则会调用自定义降级业务。
因为MockClusterInvoker 是 ClusterInvoker 的装饰器,因此接下来还要执行 ClusterInvoker 的 invoke方法。接下来以 FailoverClusterInvoker 为例进行ClusterInvoker讲解。
可经过 retries="2"
来设置重试次数(不含第一次)。
重试次数配置以下:
<dubbo:service retries="2" /> 或 <dubbo:reference retries="2" /> 或 <dubbo:reference> <dubbo:method name="findFoo" retries="2" /> </dubbo:reference>
FailoverClusterInvoker 源码以下:
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; // 得到全部服务提供者 Invoker 集合 List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);//异步的话,须要添加id return doInvoke(invocation, invokers, loadbalance); } public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重试时,进行从新选择,避免重试时invoker列表已发生变化. //注意:若是列表发生了变化,那么invoked判断会失效,由于invoker示例已经改变 if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); //从新检查一下 checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); }
咱们看到若是仅是集群容错 它的实现原理比较简单:
除了FailoverCluster 失败转移的集群容错功能外,还有其余:
快速失败,只发起一次调用,失败当即报错。一般用于非幂等性的写操做,好比新增记录。
失败安全,出现异常时,直接忽略。一般用于写入审计日志等操做。
失败自动恢复,后台记录失败请求,定时重发。一般用于消息通知操做。
并行调用多个服务器,只要一个成功即返回。一般用于实时性要求较高的读操做,但须要浪费更多服务资源。可经过 forks="2"
来设置最大并行数。
广播调用全部提供者,逐个调用,任意一台报错则报错。一般用于通知全部提供者更新缓存或日志等本地资源信息。
按照如下示例在服务提供方和消费方配置集群模式
<dubbo:service cluster="failsafe" />
或
<dubbo:reference cluster="failsafe" />