前面的文章,已经单独对服务发现(Directory、RegistryDirectory)、路由机制(Router)、负载均衡机制( LoadBalance ),本节将重点分析集群容错机制 ( AbstractClusterInvoker), AbstractClusterInvoker 就是将上述机制融合在一块儿,整个集群容错中,上述组件扮演的角色见下图所示,本文将重点分析 AbstractClusterInvoker 是如何融合这些组件的。 AbstractClusterInvoker#invoke算法
@Override public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance = null; List<invoker<t>> invokers = list(invocation); // @1 if (invokers != null && !invokers.isEmpty()) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); // @2 } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); // @3 }
代码@1:根据调用上下文,获取服务提供者列表,服务提供者从Directory中获取。缓存
protected List<invoker<t>> list(Invocation invocation) throws RpcException { List<invoker<t>> invokers = directory.list(invocation); return invokers; }
最终会调用RegistryDirecotry的list方法,该方法的服务提供者是当该消费者订阅的服务的服务提供者列表发送变化后,会在注册中心产生事件,而后通知消费者更新服务提供者列表(本地缓存)。须要注意的是RegistryDirecotry在返回Invoker以前,已经使用Router进行了一次筛选,具体实如今RegistryDirectory#notify方法时。服务器
代码@2:根据SPI机制,获取负载均衡算法的实现类,根据< dubbo:consumer loadbalance=""/>、< dubbo:reference loadbalance=""/>等标签的配置值,默认为random,加权随机算法。架构
代码@3:根据调用上下文,服务提供者列表,负载均衡算法选择一服务提供者,具体代码由AbstractClusterInvoker的各个子类实现。并发
Dubbo目前支持的集群容错策略在中/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.cluster.Cluster定义,具体内容以下:app
mock=com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper failover=com.alibaba.dubbo.rpc.cluster.support.FailoverCluster failfast=com.alibaba.dubbo.rpc.cluster.support.FailfastCluster failsafe=com.alibaba.dubbo.rpc.cluster.support.FailsafeCluster failback=com.alibaba.dubbo.rpc.cluster.support.FailbackCluster forking=com.alibaba.dubbo.rpc.cluster.support.ForkingCluster available=com.alibaba.dubbo.rpc.cluster.support.AvailableCluster mergeable=com.alibaba.dubbo.rpc.cluster.support.MergeableCluster broadcast=com.alibaba.dubbo.rpc.cluster.support.BroadcastCluster
上述各类集群策略,对应的执行器为Cluser+Invoker,例如FailoverCluster对应的Invoker为:FailoverClusterInvoker。负载均衡
在讲解各类集群容错策略以前,咱们首先关注一下AbstractClusterInvoker具体从服务提供者中按照不一样的负载均衡算法选取服务提供者的算法。dom
AbstractClusterInvoker#select异步
protected Invoker<t> select(LoadBalance loadbalance, Invocation invocation, List<invoker<t>> invokers, List<invoker<t>> selected) throws RpcException { // @1 if (invokers == null || invokers.isEmpty()) return null; String methodName = invocation == null ? "" : invocation.getMethodName(); boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY); // @2 { //ignore overloaded method if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } //ignore concurrency problem if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } } Invoker<t> invoker = doSelect(loadbalance, invocation, invokers, selected); // @3 if (sticky) { stickyInvoker = invoker; } return invoker; }
代码@1:参数说明分布式
代码@3:执行doSelect选择。
private Invoker<t> doSelect(LoadBalance loadbalance, Invocation invocation, List<invoker<t>> invokers, List<invoker<t>> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; if (invokers.size() == 1) // @1 return invokers.get(0); // If we only have two invokers, use round-robin instead. if (invokers.size() == 2 && selected != null && !selected.isEmpty()) { // @2 return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0); } if (loadbalance == null) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } Invoker<t> invoker = loadbalance.select(invokers, getUrl(), invocation); // @3 //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect. if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { Invoker<t> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); // @4 if (rinvoker != null) { invoker = rinvoker; } else { //Check the index of current selected invoker, if it's not the last one, choose the one at index+1. int index = invokers.indexOf(invoker); try { //Avoid collision invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker; } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t); } } return invoker; }
代码@1:若是可选Invoker只有一个的话,直接返回该Invoker。
代码@2:若是只有两个Invoker,而且其中一个已被选择,返回另一个未选择的Invoker。
代码@3:调用loadBalance负载均衡算法,选择一个服务提供者。
代码@4:若是选择的Invoker已被选择,则从新选择,这里有一个疑问,为何不在选以前,先过滤掉已被选的Invoker。
从服务提供者列表中选择一个服务提供者算法就介绍到这里,接下来将一一分析Dubbo提供的集群容错方式。
策略:失败后自动选择其余服务提供者进行重试,重试次数由retries属性设置,< dubbo:reference retries = "2"/>设置,默认为2,表明重试2次,最多执行3次。
FailoverClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, final List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { List<invoker<t>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; // @1 if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<invoker<t>> invoked = new ArrayList<invoker<t>>(copyinvokers.size()); // invoked invokers. Set<string> providers = new HashSet<string>(len); // @2 for (int i = 0; i < len; i++) { // @3 //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { // @4 checkWhetherDestroyed(); copyinvokers = list(invocation); // check again checkInvokers(copyinvokers, invocation); } Invoker<t> invoker = select(loadbalance, invocation, copyinvokers, invoked); // @5 invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); // @6 if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); // @7 } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); }
代码@1:首先校验服务提供者列表,若是为空,则抛出RpcException,提示没有可用的服务提供者。
代码@2:构建Set< Stirng> providers,主要用来已调用服务提供者的地址,若是本次调用失败,将在日志信息中打印已调用的服务提供者信息。
代码@3,循环执行次数,等于retries + 1 次。
代码@4:若是i>0,表示服务调用,在重试,此时须要从新调用Directory#list方法,获取最小的服务提供者列表。
代码@5:根据负载均衡算法,选择Invoker,后续详细分析。
代码@6:根据负载算法,路由算法从服务提供者列表选一个服务提供者,发起RPC调用。
代码@7:将本次服务提供者的地址添加到providers集合中,若是屡次重试后,没法完成正常的调用,将在错误日志中包含这些信息。
策略:选择集群第一个可用的服务提供者。 缺点:至关于服务的主备,但同时只有一个服务提供者承载流量,并无使用集群的负载均衡机制。 AvailableClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { for (Invoker<t> invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers); }
遍历服务提供者列表,选择第一个可用服务提供者,而后执行RPC服务调用,若是调用失败,则失败。
策略:广播调用,将调用全部服务提供者,一个服务调用者失败,并不会熔断,而且一个服务提供者调用失败,整个调用认为失败。 场景:刷新缓存。
public Result doInvoke(final Invocation invocation, List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); // @1 RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null; Result result = null; for (Invoker<t> invoker : invokers) { // @2 try { result = invoker.invoke(invocation); } catch (RpcException e) { exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); } } if (exception != null) { // @3 throw exception; } return result; }
代码@1:检测服务提供者列表,若是为空,则抛出没有服务提供的异常。
代码@2:遍历服务提供者列表,依次调用服务提供者的invoker,每一个服务调用用try catch语句包裹,当服务调用发生异常时,记录异常信息,但并不当即返回,广播模式,每一个服务提供者调用是异步仍是同步,取决服务调用的配置,默认是同步调用。
代码@3:只要其中一个服务调用发送一次,将抛出异常 信息,异常信息被封装为RpcException。
策略:调用失败后,返回成功,但会在后台定时重试,重试次数(反复) 场景:一般用于消息通知,但消费者重启后,重试任务丢失。
FailbackClusterInvoker#doInvoke
protected Result doInvoke(Invocation invocation, List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); // @1 Invoker<t> invoker = select(loadbalance, invocation, invokers, null); // @2 return invoker.invoke(invocation); // @3 } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e); addFailed(invocation, this); // @4 return new RpcResult(); // ignore } }
代码@1:校验服务提供者列表,若是为空,则抛出没有服务提供者错误。
代码@2:根据负载均衡机制,选择一个服务提供者。
代码@3:发起远程服务调用,若是出现异常,调用addFailed方法,添加剧试任务,而后返回给调用方成功。
接下来看一下addFailed方法。
FailbackClusterInvoker#addFailed
private void addFailed(Invocation invocation, AbstractClusterInvoker<!--?--> router) { // @1 if (retryFuture == null) { // @2 synchronized (this) { if (retryFuture == null) { retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { // @3 @Override public void run() { // collect retry statistics try { retryFailed(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } failed.put(invocation, router); // @4 }
代码@1:Invocation invocation:调用上下文;AbstractClusterInvoker< ?> router:调用集群策略。
代码@2:若是retryFuture(ScheduledFuture< ?> retryFuture)为空,则加锁建立一个定时调度任务,任务以每隔5s的频率调用retryFailed方法。
代码@3:添加剧试任务(ConcurrentMap< Invocation, AbstractClusterInvoker< ?>> failed)。想必retryFailed方法就是遍历failed,一个一个重复调用,若是调用成功则移除,调用不成功,继续放入。
FailbackClusterInvoker#retryFailed
void retryFailed() { if (failed.size() == 0) { return; } for (Map.Entry<invocation, abstractclusterinvoker<?>> entry : new HashMap<invocation, abstractclusterinvoker<?>>( // @1 failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker<!--?--> invoker = entry.getValue(); try { invoker.invoke(invocation); // @2 failed.remove(invocation); // @3 } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); } } }
代码@1:遍历待重试列表,而后发起远程调用,若是调用成功,则从集合中移除,若是只选失败,并不会从待重试列表中移除,也就是在消费端不重启的状况下,会一直重复调用,直到成功。
策略:快速失败,服务调用失败后立马抛出异常,不进行重试。 场景:是否修改类服务(未实行幂等的服务调用)
public Result doInvoke(Invocation invocation, List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); // @1 Invoker<t> invoker = select(loadbalance, invocation, invokers, null); // @2 try { return invoker.invoke(invocation); // @3 } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. throw (RpcException) e; } throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); // @4 } }
代码@1:检查服务提供者,若是服务提供者列表为空,抛出没有服务提供者错误。
代码@2:根据负载算法选择一个服务提供者。
代码@3:发起RPC服务调用。
代码@4:若是服务调用异常,抛出异常,打印服务消费者,服务提供者信息。
策略:服务调用失败后,只打印错误日志,而后返回服务调用成功。 场景:调用审计,日志类服务接口。
FailsafeClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); // @1 Invoker<t> invoker = select(loadbalance, invocation, invokers, null); // @2 return invoker.invoke(invocation); // @3 } catch (Throwable e) { logger.error("Failsafe ignore exception: " + e.getMessage(), e); return new RpcResult(); // ignore } }
代码@1:检查服务提供者,若是服务提供者列表为空,抛出没有服务提供者错误。
代码@2:根据负载算法选择一个服务提供者。
代码@3:发起RPC服务调用,若是出现异常,记录错误堆栈信息,并返回成功。
策略:并行调用多个服务提供者,当一个服务提供者返回成功,则返回成功。 场景:实时性要求比较高的场景,但浪费服务器资源,一般能够经过forks参数设置并发调用度。
ForkingClusterInvoker#doInvoke
public Result doInvoke(final Invocation invocation, List<invoker<t>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); // @1 final List<invoker<t>> selected; final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); // @2 final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList<invoker<t>>(); for (int i = 0; i < forks; i++) { // TODO. Add some comment here, refer chinese version for more details. Invoker<t> invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) {//Avoid add the same invoker several times. selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue<object> ref = new LinkedBlockingQueue<object>(); for (final Invoker<t> invoker : selected) { // @3 executor.execute(new Runnable() { @Override public void run() { try { Result result = invoker.invoke(invocation); ref.offer(result); } catch (Throwable e) { int value = count.incrementAndGet(); if (value >= selected.size()) { ref.offer(e); } } } }); } try { Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); // @4 if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); } }
代码@1:检查服务提供者,若是服务提供者列表为空,抛出没有服务提供者错误。
代码@2:获取forks属性,貌似只能经过在< dubbo:reference />用< dubbo:parameter key="forks" value=""/>来设置forks,其默认值为2,若是forks值大于服务提供者的数量,则将调用全部服务提供者,若是forks值小于服务提供者的数量,则使用负载均衡算法,选择forks个服务提供者。
代码@3:依次异步向服务提供者发起RPC调用,并将结果添加到BlockingQueue< Object> ref,若是服务调用发送错误,而且发生错误的个数大于等于本次调用的个数,则将错误信息放入BlockingQueue< Object> ref,不然,将错误数增长1。
代码@4:Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS),从该队列中获取结果,若是队列未空,则会阻塞等待,直到超时,当有一个调用成功后,将返回,忽略其余调用结果。
本文重点分析了Dubbo集群容错机制,路由发现、路由算法、负载均衡等是如何共同协做完成Dubbo的服务调用,并详细分析了Dubbo各类集群策略,例如failover、failfast、failsafe、failback、forking、available等实现细节。
做者介绍:丁威,《RocketMQ技术内幕》做者,RocketMQ 社区布道师,公众号:中间件兴趣圈 维护者,目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。能够点击连接:中间件知识星球,一块儿探讨高并发、分布式服务架构,交流源码。 </t></object></object></t></invoker<t></invoker<t></invoker<t></t></invoker<t></t></invoker<t></invocation,></invocation,></t></invoker<t></t></invoker<t></t></invoker<t></t></string></string></invoker<t></invoker<t></invoker<t></invoker<t></t></t></invoker<t></invoker<t></t></t></invoker<t></invoker<t></t></invoker<t></invoker<t></invoker<t>