你们好,今天给你们分享 Dubbo 中的负载均衡。在前一个章节中咱们介绍 Dubbo延迟服务暴露,咱们例举了常见的使用场景而且进行了源码解析来分析其实现原理,同时咱们也知道 Dubbo 延迟服务暴露其核心就是经过一个 延迟的调度器指定延迟时间后开始服务的暴露。不少小伙伴可能会好奇:咱们的服务部署基本都是集群形式,那服务消费端究竟是调用哪个服务提供者呢?都有哪些常见的服务选择算法?为了揭开这些困惑,下面就让咱们快速开始吧!java
那么什么是负载均衡呢?举个简单例子:在火车站购买火车票场景,咱们知道节假日确定有很是多的人购买火车票,那么一个售票窗口卖票的话确定会排很长的队对用户来讲体验很是差。那咱们能不能多增长几个售票窗口呢?是否是并行处理的能力立刻获得提升了呢?没错!咱们在软件工程里面也是如此。好比 Nginx 经常用做软件负载均衡、F5用做硬件的负载均衡器等等。负载均衡算法能够有效提升系统的并发处理能力、容错能力、一致性访问等。以下图所示:node
随机算法比较简单就和数学中的随机值选取同样,好比:1-100中随机选择一个数字。在 Dubbo 中对随机算法增长了一个权重概念。举个例子:git
服务列表 | 权重 |
---|---|
Service01 | 10 |
Service02 | 20 |
Service03 | 20 |
Service04 | 50 |
从上表中咱们知道4个服务权重分别是十、20、20、50,参考下图描述了随机选择步骤:算法
offset
,假设 offset
值为 60。offset
- weight
结果值为 50 > 0 继续执行从新赋值offset
= 50。offset
- weight
结果值为 30 > 0 继续执行。offset
- weight
结果值为 10 > 0 继续执行。offset
- weight
结果值为 -40 > 0 找到服务终止。Tips:此算法数据量越大数据分配越均匀。
public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 服务总数 int length = invokers.size(); // 标识这些服务是否具备相同的权重 boolean sameWeight = true; // 全部服务权重集合 int[] weights = new int[length]; // 第一个服务权重 int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // 权重总和 int totalWeight = firstWeight; for (int i = 1; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); // 记录权重 weights[i] = weight; // 合计权重值 totalWeight += weight; if (sameWeight && weight != firstWeight) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // 从(0,totalWeight]范围选择权重值 int offset = ThreadLocalRandom.current().nextInt(totalWeight); // 基于随机值返回服务 for (int i = 0; i < length; i++) { // offset = offset - weights[i] offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // 若是权重值相同或者totalWeight=0返回服务列表中随机服务 return invokers.get(ThreadLocalRandom.current().nextInt(length)); } }
invokers
服务列表计算出权重总和并标记出是否存在权重所有相同状况。invokers
服务列表全部服务权重是否相同,相同则随机返回服务列表中的一个服务。invokers
服务列表存在服务权重不相同时,产生一个权重总和范围内的一个大于0的随着整数赋值给 offset
,而后循环访问服务列表中的元素而且使用 offset
减去当前循环元素的权重值,若是差值大于0则赋值给当前的 offset
继续执行元素循环,当 offset
减去当前元素的权重大于零时中止循环,则当前查找的元素为选择的服务。轮询负载均衡算法顾名思义就是轮流调用服务。举个例子:假设4个请求经过 Nginx 代理转发到后端服务:编程
同时 Dubbo 中的轮询算法也是须要权重支持。轮询负载均衡算法可让 RPC 调用严格按照咱们设置的比例来分配。不论是少许的调用仍是大量的调用。可是轮询负载均衡算法也有不足的地方,存在慢的服务累积请求的问题,好比:第二台 Service2
很慢,但没挂,当请求调到第二台时就卡在那,长此以往,全部请求都卡在调到第二台上,致使整个系统变慢。如下是加权轮询算法图:后端
public class RoundRobinLoadBalance extends AbstractLoadBalance { @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //服务惟一标识,获取这一组服务第一个服务:由于这是一组服务全部key是相同的 String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();//DemoService.method1 //经过服务惟一标识对这一组服务进行缓存 key -> Map ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map == null) { methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>()); map = methodWeightMap.get(key); } int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker<T> selectedInvoker = null; WeightedRoundRobin selectedWRR = null; for (Invoker<T> invoker : invokers) { String identifyString = invoker.getUrl().toIdentityString();//服务实例惟一表示:[test://127.0.0.1:1/DemoService,test://127.0.0.1:2/DemoService,test://127.0.0.1:3/DemoService] WeightedRoundRobin weightedRoundRobin = map.get(identifyString); //获取服务权重 int weight = getWeight(invoker, invocation); //WeightedRoundRobin封装实体 if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); weightedRoundRobin.setWeight(weight); map.putIfAbsent(identifyString, weightedRoundRobin); } //权重发生变动 if (weight != weightedRoundRobin.getWeight()) { //从新赋值 weightedRoundRobin.setWeight(weight); } long cur = weightedRoundRobin.increaseCurrent(); //记录更新时间 weightedRoundRobin.setLastUpdate(now); if (cur > maxCurrent) {//大于当前最大权重则被选择 maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } //计算全部权重值 totalWeight += weight; } //获取锁而且存在服务下线状况【invokers.size() != map.size()】 if (!updateLock.get() && invokers.size() != map.size()) { if (updateLock.compareAndSet(false, true)) { try { // copy -> modify -> update reference ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map); //回收周期 newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); methodWeightMap.put(key, newMap); } finally { updateLock.set(false); } } } if (selectedInvoker != null) { //将其选择服务权重值减去总权重值 selectedWRR.sel(totalWeight); return selectedInvoker; } // should not happen here return invokers.get(0); } }
maxCurrent
为最小值 Long.MIN_VALUE
、totalWeight
为0。invokers
服务列表,每一个元素中的 current
(初始化为0)加上元素权重值大于 maxCurrent
,maxCurrent
而且赋值选中服务( selectedInvoker
)为当前元素,而且累计当前元素权重值赋值给 totalWeight
。Tips:上述算法存在必定的难度,稍做了解便可。
咱们在实际服务调度场景中通常存在多服务实例部署状况,当相同的应用部署到不一样机器极可能存在不一样服务处理的能力不一样有快有慢,那么咱们理想化的认为在服务调用过程当中处理能力慢的服务器接收较少的服务请求更为符合咱们的要求。以下示例图:数组
从图中能够看出假设咱们这里有5个请求经过 Ngingx 代理转发到后端服务,这里 Service02
只接收到一个请求(处理能力较弱)。缓存
public class LeastActiveLoadBalance extends AbstractLoadBalance { public static final String NAME = "leastactive"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); //初始化最小活跃值,最小活跃数,最小活跃的invoker索引(index),权重数组 int leastActive = -1; int leastCount = 0; int[] leastIndexes = new int[length]; int[] weights = new int[length]; int totalWeight = 0; int firstWeight = 0; boolean sameWeight = true; // 过滤出全部最少活跃服务 for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // 获取这个invoker活跃数 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 获取invoker的权重 默认100. int afterWarmup = getWeight(invoker, invocation); // 权重保存到集合中 weights[i] = afterWarmup; //找到最小活跃数invoker if (leastActive == -1 || active < leastActive) { //重置最小活跃数值 leastActive = active; //重置最小活跃计数 leastCount = 1; // 记录最小活跃数对应索引 leastIndexes[0] = i; // 重置权重合计 totalWeight = afterWarmup; // 记录第一个最小活跃数对应权重值 firstWeight = afterWarmup; // 重置是否具备相同权重标志 sameWeight = true; //当前invoker活跃数等于最小活跃数 } else if (active == leastActive) { // 最小活跃数leastCount增长而且记录当前invoker对应索引 leastIndexes[leastCount++] = i; // 累计invoker权重值 totalWeight += afterWarmup; // 判断是否权重值相同 if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } //最小活跃数invoker只有一个 if (leastCount == 1) { // 直接返回 return invokers.get(leastIndexes[0]); } //具备多个相同活跃数、具备不一样权重且权重和大于0 if (!sameWeight && totalWeight > 0) { // 根据权重值进行一个(0,totalWeight]整数随机值 int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // 基于随机值选择所在区间和随机负载均衡算法一致 for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // 若是全部权重值相同或者totalWeight=0返回服务列表中随机一个服务 return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }
invoker
索引(index
),权重数组。invokers
服务列表获取 invoker
活跃数、权重且过滤出全部最少活跃服务并记录最小活跃数对应索引,同时标记服务列表是否存在不一样权重值。totalWeight
]整数随机值totalWeight=0
返回服务列表中随机一个服务。Tips:上述算法存在必定的难度,稍做了解便可。
要明白什么是一致性 Hash 首先得知道什么是 Hash 算法。咱们在学习 HashMap 的 JDK 源码时能够了解到 HashMap 的数据结构由一个数组+链表构成以下所示:服务器
上图简单画出了 HashMap 底层数据存储结构,从中能够看出当咱们调用 HashMap.put(K key, V value)
方法时经过hash(key)
计算出一个 Hash 值与数组长度求模获得一个索引即为数组下标对应位置,当这个位置存在元素时即发生 Hash 碰撞就会产生一个链。微信
一个常见的场景,好比咱们在请求某个服务时须要固定访问某一台服务器以下所示:
从图中能够看出 Client01
经过 Nginx 固定访问 Service01
、Client02
经过 Nginx 固定访问 Service02
、Client03
经过 Nginx 固定访问 Service03
。要知足这种场景就须要使用 Hash 算法。可是普通 Hash 算法存在一个问题是 Hash 出来的是实际后端服务节点,当其中某个服务宕机时 Hash 计算因子发生改变(求模的分子)若是计算 Hash 因子不调整则可能会路由到宕机那台服务就会存在问题。那一致性 Hash 就是解决这样的问题。
什么是一致性 Hash 呢?一致性哈希算法是采用的环形哈希空间的方式,它首先根据 Ip 或者其余信息为机器节点生成一个 Hash 值,它投射到一个[0,2^32-1]的圆环中。以后咱们能够认为它是一个锚点。当请求进来后,它携带的数据,都统一辈子成一个 Hash 值,这时候比对以前的机器节点信息产生的 Hash 值,当遇到第一个大于或者等于该 Hash 值的缓存节点,咱们将这个数据便归属于这个机器节点。
假设虚拟节点12个、真实服务3个分别是:Invoker1
、Invoker2
、Invoker3
。下图展现了服务节点怎样进行虚拟节点映射的过程:
Invoker1
通过 Hash 计算后映射到环上对应位置Invoker2
通过 Hash 计算后映射到环上对应位置Invoker3
通过 Hash 计算后映射到环上对应位置上图中长方形色块表明真实的服务,圆形表明虚拟节点。全部同一种颜色的圆形虚拟节点归属于同一色块的长方形色块所表明的真实服务。
public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash"; /** * Hash nodes name */ public static final String HASH_NODES = "hash.nodes"; /** * Hash arguments name */ public static final String HASH_ARGUMENTS = "hash.arguments"; private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //获取调用方法名称 String methodName = RpcUtils.getMethodName(invocation); //生成惟一标识 String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; int identityHashCode = System.identityHashCode(invokers); //对ConsistentHashSelector进行缓存 ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); //缓存中selector == null 或 当invokers中服务发生变化 服务下限/服务上线 invokers hash值发生变动须要从新构建ConsistentHashSelector if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } //执行服务选择 return selector.select(invocation); } private static final class ConsistentHashSelector<T> { private final TreeMap<Long, Invoker<T>> virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { //虚拟Invoker(虚拟节点) this.virtualInvokers = new TreeMap<>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); //虚拟节点数 this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));//参与hash的参数索引 argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress();//127.0.0.1:1 // 将虚拟节点细化为 (replicaNumber / 4) 份 for (int i = 0; i < replicaNumber / 4; i++) { //地址+序号进行摘要 byte[] digest = md5(address + i); // 每份4个点散列到treeMap中 for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } public Invoker<T> select(Invocation invocation) { String key = toKey(invocation.getArguments()); byte[] digest = md5(key); return selectForKey(hash(digest, 0)); } private Invoker<T> selectForKey(long hash) { //获取至少大于或等于给定hash值 Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash); if (entry == null) { //获取第一个Entry entry = virtualInvokers.firstEntry(); } return entry.getValue(); } } }
hash.arguments
属性值,缺省是0参数位置。invokers
服务列表根据 ip+port+序号生成 replicaNumber
个的节点(生成虚拟节点),其中 key
就是算出来的 Hash 值,value
就是 invoker
且 key
的值从小到大排序的。hash.arguments
参数取出对应位的参数,拼接成 key
。md5
对 key
计算,使用 Hash 算法算出 key
对应的 Hash 值。key
的 Hash 值找出对应的 invoker
,这里返回键值大于或等于 key
的那部分 ,而后再取第一个。Tips:上述算法存在必定的难度,稍做了解便可。
在本小节中咱们主要学习了 Dubbo 中常见的负载均衡算法以及使用场景。
本节课程的重点以下:
我的从事金融行业,就任过易极付、思建科技、某网约车平台等重庆一流技术团队,目前就任于某银行负责统一支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。同时也热衷于技术分享创立公众号和博客站点对知识体系进行分享。关注公众号: 青年IT男 获取最新技术文章推送!
博客地址: http://youngitman.tech
微信公众号: