本篇主要对dubbo集群容错进行剖析,主要下面几个模块html
<dubbo:service cluster="failsafe" /> 服务提供方
<dubbo:reference cluster="failsafe" /> 服务消费方
接口类 com.alibaba.dubbo.rpc.cluster.Clusternode
1.AvailableCluster
获取可用的调用。遍历全部Invokers判断Invoker.isAvalible,只要一个有为true直接调用返回,无论成不成功算法
2.BroadcastCluster
广播调用。遍历全部Invokers, 逐个调用每一个调用catch住异常不影响其余invoker调用api
3.FailbackCluster
失败自动恢复, 对于invoker调用失败, 后台记录失败请求,任务定时重发, 一般用于通知数组
//FailbackClusterInvoker //记录失败的调用 private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>(); protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { //失败后调用 addFailed addFailed(invocation, this); return new RpcResult(); // ignore } } private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) { if (retryFuture == null) { synchronized (this) { if (retryFuture == null) { retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { public void run() { // 收集统计信息 try { retryFailed(); } catch (Throwable t) { // 防护性容错 logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } failed.put(invocation, router); } //失败的进行重试,重试成功后移除当前map void retryFailed() { if (failed.size() == 0) { return; } for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>( failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker<?> invoker = entry.getValue(); try { invoker.invoke(invocation); failed.remove(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); } } }
4.FailfastCluster
快速失败,只发起一次调用,失败当即保错,一般用于非幂等性操做安全
5.FailoverCluster default
失败转移,当出现失败,重试其它服务器,一般用于读操做,但重试会带来更长延迟
(1) 目录服务directory.list(invocation) 列出方法的全部可调用服务
获取重试次数,默认重试两次服务器
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
(2) 根据LoadBalance负载策略选择一个Invoker
(3) 执行invoker.invoke(invocation)调用
(4) 调用成功返回
调用失败小于重试次数,从新执行从3)步骤开始执行,调用次数大于等于重试次数抛出调用失败异常数据结构
6.FailsafeCluster
失败安全,出现异常时,直接忽略,一般用于写入审计日志等操做。app
7.ForkingCluster
并行调用,只要一个成功即返回,一般用于实时性要求较高的操做,但须要浪费更多服务资源。负载均衡
注:
还有 MergeableCluster 和 MockClusterWrapper策略,可是我的没有用过因此就不说了
静态目录服务, 它的全部Invoker经过构造函数传入, 服务消费方引用服务的时候, 服务对多注册中心的引用,将Invokers集合直接传入 StaticDirectory构造器
public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) { super(url == null && invokers != null && invokers.size() > 0 ? invokers.get(0).getUrl() : url, routers); if (invokers == null || invokers.size() == 0) throw new IllegalArgumentException("invokers == null"); this.invokers = invokers; }
StaticDirectory的list方法直接返回全部invoker集合
@Override protected List<Invoker<T>> doList(Invocation invocation) throws RpcException { return invokers; }
注册目录服务, 它的Invoker集合是从注册中心获取的, 它实现了NotifyListener接口实现了回调接口notify(List<Url>)。
好比消费方要调用某远程服务,会向注册中心订阅这个服务的全部服务提供方,订阅时和服务提供方数据有变更时回调消费方的NotifyListener服务的notify方法NotifyListener.notify(List<Url>) 回调接口传入全部服务的提供方的url地址而后将urls转化为invokers, 也就是refer应用远程服务到此时引用某个远程服务的RegistryDirectory中有对这个远程服务调用的全部invokers。
RegistryDirectory.list(invocation)就是根据服务调用方法获取全部的远程服务引用的invoker执行对象
dubbo路由功能貌似用的很少,目的主要是对已注册的服务进行过滤,好比只能调用某些配置的服务,或者禁用某些服务。
dubbo-admin 后台进行配置。
路由代码入口
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { if (invokers == null || invokers.size() == 0) { return invokers; } try { if (!matchWhen(url, invocation)) { return invokers; } List<Invoker<T>> result = new ArrayList<Invoker<T>>(); if (thenCondition == null) { logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey()); return result; } .............................
按照dubbo脚本规则进行编写,程序识别
default
随机,按权重设置随机几率。权重default=100
在一个截面上碰撞的几率高,但调用量越大分布越均匀,并且按几率使用权重后也比较均匀,有利于动态调整提供者权重。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // 总个数 int totalWeight = 0; // 总权重 boolean sameWeight = true; // 权重是否都同样 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // 累计总权重 if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; // 计算全部权重是否同样 } } if (totalWeight > 0 && !sameWeight) { // 若是权重不相同且权重大于0则按总权重数随机 int offset = random.nextInt(totalWeight); // 并肯定随机值落在哪一个片段上 for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // 若是权重相同或权重为0则均等随机 return invokers.get(random.nextInt(length)); }
算法含义
若是全部的服务权重都同样,就采用总服务数进行随机。若是权重不同,则按照权重出随机数,而后用随机数减去服务权重,结果为负数则使用当前循环的服务。其实也就是一个几率性问题 每一个服务的几率就是 当前服务的权重/ 总服务权重
轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求的问题,好比:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,长此以往,全部请求都卡在调到第二台上。
该负载算法维护着一个方法调用顺序计数
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
以方法名做为key
轮循分为 普通轮询和加权轮询。权重同样时,采用取模运算普通轮询,反之加权轮询。
下面看下具体的实现
RoundRobinLoadBalance#doSelect
i.普通轮询
AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } //获取本次调用的服务器序号,并+1 int currentSequence = sequence.getAndIncrement(); //当前序号和服务总数取模 return invokers.get(currentSequence % length);
ii.加权轮询
下面贴下核心实现代码。注意几个变量
weightSum
= 服务权重之和
invokerToWeightMap
= 权重>0的 invoker map
int currentSequence = sequence.getAndIncrement(); if (maxWeight > 0 && minWeight < maxWeight) { // 权重不同 // mod < weightSum,下面for循环进行weight递减,weight大的服务被调用的几率大 int mod = currentSequence % weightSum; for (int i = 0; i < maxWeight; i++) { for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final Invoker<T> k = each.getKey(); final IntegerWrapper v = each.getValue(); if (mod == 0 && v.getValue() > 0) { return k; } if (v.getValue() > 0) { v.decrement(); mod--; } } } }
能够举个例子
两个服务 A 和 B,权重分别是1和2
那么 mod=[0,1,2],通过上面的逻辑,调用几率是 A B B A B B A B B ..... 显然B的几率更大一些
最少活跃调用数优先,活跃数指调用先后计数差。使慢的提供者收到更少请求,由于越慢的提供者的调用先后计数差会越大。
每一个服务有一个活跃计数器,咱们假若有A,B两个提供者.计数均为0.当A提供者开始处理请求,该计数+1,此时A还没处理完,当处理完后则计数-1.而B请求接收到请求处理得很快.B处理完后A还没处理完,因此此时A,B的计数为1,0.那么当有新的请求来的时候,就会选择B提供者(B的活跃计数比A小).这就是文档说的,使慢的提供者收到更少请求。
int leastCount = 0; // 相同最小活跃数的个数
int[] leastIndexs = new int[length]; // 相同最小活跃数的下标
i.最小活跃服务个数=1, 该服务优先
if (leastCount == 1) { // 若是只有一个最小则直接返回 return invokers.get(leastIndexs[0]); }
ii.最小活跃服务个数>1, 最小活跃的服务按照权重随机
if (!sameWeight && totalWeight > 0) { // 若是权重不相同且权重大于0则按总权重数随机 int offsetWeight = random.nextInt(totalWeight); // 并肯定随机值落在哪一个片段上 for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; //权重越大,offsetWeight越快减成负数 offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } }
iii. 最小活跃服务个数>1, 权重相同,服务个数随机
// 若是权重相同或权重为0则均等随机 return invokers.get(leastIndexs[random.nextInt(leastCount)]);
<dubbo:parameter key="hash.arguments" value="0,1" />
<dubbo:parameter key="hash.nodes" value="320" />
<dubbo:reference id="demoService" interface="com.youzan.dubbo.api.DemoService" loadbalance="consistenthash"> <!--缺省只对第一个参数 Hash--> <dubbo:parameter key="hash.arguments" value="0,1" /> <!--缺省用 160 份虚拟节点,--> <dubbo:parameter key="hash.nodes" value="160" /> </dubbo:reference>
ConsistentHashLoadBalance为使用该算法的服务维护了一个selectors
,
key=invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName()
eg: com.youzan.dubbo.api.DemoService.sayHello
#com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int identityHashCode = System.identityHashCode(invokers); //获取该服务的ConsistentHashSelector,并跟进本次调用获取对应invoker ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); if (selector == null || selector.getIdentityHashCode() != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); }
ConsistentHashSelector做为ConsistentHashLoadBalance的内部类, 就是具体的一致性hash实现。
#com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance.ConsistentHashSelector //该服务的全部hash节点 private final TreeMap<Long, Invoker<T>> virtualInvokers; //虚拟节点数量 private final int replicaNumber; //该服务的惟一hashcode,经过System.identityHashCode(invokers)获取 private final int identityHashCode;
public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { // 建立TreeMap 来保存结点 this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); // 生成调用结点HashCode this.identityHashCode = System.identityHashCode(invokers); // 获取Url //dubbo://192.168.0.4:20880/com.youzan.dubbo.api.DemoService?anyhost=true&application=consumer-of-helloworld-app&check=false&class=com.youzan.dubbo.provider.DemoServiceImpl&dubbo=2.5.4&generic=false&hash.arguments=0,1&hash.nodes=160&interface=com.youzan.dubbo.api.DemoService&loadbalance=consistenthash&methods=sayHello&pid=32710&side=consumer×tamp=1527383363936 URL url = invokers.get(0).getUrl(); // 获取所配置的结点数,如没有设置则使用默认值160 this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); // 获取须要进行hash的参数数组索引,默认对第一个参数进行hash String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i ++) { argumentIndex[i] = Integer.parseInt(index[i]); } // 建立虚拟结点 // 对每一个invoker生成replicaNumber个虚拟结点,并存放于TreeMap中 for (Invoker<T> invoker : invokers) { for (int i = 0; i < replicaNumber / 4; i++) { // 根据md5算法为每4个结点生成一个消息摘要,摘要长为16字节128位。 byte[] digest = md5(invoker.getUrl().toFullString() + i); // 随后将128位分为4部分,0-31,32-63,64-95,95-128,并生成4个32位数,存于long中,long的高32位都为0 // 并做为虚拟结点的key。 for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } }
代码若是看的不是很懂,也不用去深究了(我就没看懂,瞻仰了网上大神的文章贴了帖注释),你们能够就粗略的认为,这段代码就是尽量的构建出散列均匀的服务hash表。
// 选择invoker public Invoker<T> select(Invocation invocation) { // 根据调用参数来生成Key String key = toKey(invocation.getArguments()); // 根据这个参数生成消息摘要 byte[] digest = md5(key); //调用hash(digest, 0),将消息摘要转换为hashCode,这里仅取0-31位来生成HashCode //调用sekectForKey方法选择结点。 Invoker<T> invoker = sekectForKey(hash(digest, 0)); return invoker; } private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); // 因为hash.arguments没有进行配置,由于只取方法的第1个参数做为key for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } //根据hashCode选择结点 private Invoker<T> sekectForKey(long hash) { Invoker<T> invoker; Long key = hash; // 若HashCode直接与某个虚拟结点的key同样,则直接返回该结点 if (!virtualInvokers.containsKey(key)) { // 若不一致,找到一个比传入的key大的第一个结点。 SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key); // 若不存在,那么选择treeMap中第一个结点 // 使用TreeMap的firstKey方法,来选择最小上界。 if (tailMap.isEmpty()) { key = virtualInvokers.firstKey(); } else { // 若存在则返回 key = tailMap.firstKey(); } } invoker = virtualInvokers.get(key); return invoker; }
ConsistentHashSelector.virtualInvokers
这个东西就是咱们的服务hash节点,单纯的从数据结构上的确看不到什么环状的存在,能够先示意下,当前的数据结构
咱们的服务节点只是一个普通的 map数据存储而已,如何造成环呢?其实所谓的环只是逻辑上的展示,ConsistentHashSelector.sekectForKey()
方法里经过 TreeMap.tailMap()、TreeMap.tailMap().firstKey、TreeMap.tailMap().firstKey() 结合case实现了环状逻辑。下面咱们画图说话。
第一步原始数据结构,咱们按照hash从小到大排列
A,B,C表示咱们提供的服务,改示意图假设服务节点散列均匀
第二步选择服务节点
i. 假设本地调用获得的key=2120, 代码逻辑(指ConsistentHashSelector.sekectForKey
)走到tailMap.firstKey()
那么读取到 3986
A服务
ii.假设本地调用获得的key=9991, tailMap为空,逻辑走到 virtualInvokers.firstKey()
回到起点
读取到 1579 A服务
上述两部状况基本已经可以描述清楚节点的选择逻辑,至于hash直接命中,那么读取对应的服务便可,无需多讲。
最后环状造成
上面两部的介绍已经描述hash算法,那么咱们所谓的环状是怎么一回事呢?其实也就是为了方便更好的理解这个逻辑,咱们将线性的hash排列做为环状,而后hash的选择按照顺时针方向选择节点(等价于上面hash比较大小)
节点选择算法与上面等价,本图主要用来示意,理想的hash环hash差距应该是等差,均匀的排列。
参考:
https://blog.csdn.net/column/details/learningdubbo.html?&page=1
https://blog.csdn.net/revivedsun/article/details/71022871
https://www.jianshu.com/p/53feb7f5f5d9