文章异常啰嗦且绕弯。java
Dubbo 版本 : dubbo 3.0apache
Dubbo LoadBalance 是 Dubbo Consumer 中用于负载均衡的组件,位于 Cluster 层中。数组
LoadBalance 的组件遵循 Dubbo 的通常设计规律,接口在 dubbo-cluster 模块中:缓存
package org.apache.dubbo.rpc.cluster; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.Adaptive; import org.apache.dubbo.common.extension.SPI; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance; import java.util.List; @SPI(RandomLoadBalance.NAME) // RandomLoadBalance.NAME = random public interface LoadBalance { /** * 能够在 url 里传入 loadbalance 参数来切换负载均衡策略,默认根据 spi 机制,会使用 random */ @Adaptive("loadbalance") <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; }
package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.cluster.LoadBalance; import java.util.List; import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_SERVICE_REFERENCE_PATH; import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WARMUP; import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WEIGHT; import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY; import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY; /** * 负载均衡组件模板 */ public abstract class AbstractLoadBalance implements LoadBalance { static int calculateWarmupWeight(int uptime, int warmup, int weight) { // ww = (当前时间 - 启动时间) / 预热时间 * 权重 // 取 ww 和 权重 中的最小值 // 若是 当前时间 还在 预热时间 内,那么此处 ww 必然小于 权重 // 若是 当前时间 和 启动时间 相差很是近,或者 预热时间 很长,那么此处 ww 有可能会小于 1,此处会返回 1 // 若是 当前时间 小于 启动时间,那么是服务的时间问题,ww 就会小于 0,此处会返回 1 // 从 getWeight(...) 方法可知,此处 ww 必然小于 weight int ww = (int) ( uptime / ((float) warmup / weight)); return ww < 1 ? 1 : (Math.min(ww, weight)); } /** * 接口抽象方法 select 的实现,也是模版的核心方法 * * @param invokers 全部的服务提供者信息的封装 * @param url 当前调用者的 url * @param invocation 要发送的信息 */ @Override public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 若是没有服务提供者,此处返回 null if (CollectionUtils.isEmpty(invokers)) { return null; } // 服务的提供者只有一个,直接返回就能够了,没有负载均衡的必要 if (invokers.size() == 1) { return invokers.get(0); } // 有多个,那么此处须要不一样的策略自行完成具体逻辑 return doSelect(invokers, url, invocation); } /** * 模板方法的具体实现,从列表中选择一个 invoker * * @param invokers 全部的服务提供者信息的封装 * @param url 当前调用者的 url * @param invocation 要发送的信息 */ protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation); /** * 获取 invoker 权重的方法,在 random 和 robin * 中很重要的方法 */ int getWeight(Invoker<?> invoker, Invocation invocation) { int weight; // 获取 url URL url = invoker.getUrl(); // REGISTRY_SERVICE_REFERENCE_PATH = org.apache.dubbo.registry.RegistryService // REGISTRY_KEY = registry // WEIGHT_KEY = weight // DEFAULT_WEIGHT = 100 // TIMESTAMP_KEY = timestamp // WARMUP_KEY = warmup // DEFAULT_WARMUP = 600000 if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) { // 入参 registry.weight 和 100 weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT); } else { // provider 的权重 weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT); if (weight > 0) { // 权重大于 0 // provider 的启动的时间戳 long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L); if (timestamp > 0L) { long uptime = System.currentTimeMillis() - timestamp; if (uptime < 0) { // 启动的时间戳小于当前时间戳,这种状况多是存在服务器时间问题 // 此处为什么返回 1 ? return 1; } // warmup 是预热时间,若是当前时间内,这个 provider 还处于预热当中 // 那么就会调用到 calculateWarmupWeight(...) 方法 int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP); if (uptime > 0 && uptime < warmup) { weight = calculateWarmupWeight((int)uptime, warmup, weight); } } } } // 权重不能低于 0 return Math.max(weight, 0); } }
在 Dubbo 3.0 中,负载均衡策略存在如下几种:服务器
笔者这里暂时只列举前三种,后面两种有缘补充 (其实是由于还没看完)。负载均衡
默认策略,其实是考虑了权重以后的随机选择,若是每一个服务提供者的权重都一致,那么就使用 java 的随机函数去选择一个。dom
package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import java.util.List; import java.util.concurrent.ThreadLocalRandom; /** * 考虑权重值以后的随机负载均衡 */ public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 获取服务提供者的数量 int length = invokers.size(); // 默认全部的服务提供者是相同的权重 boolean sameWeight = true; // 权重数组 int[] weights = new int[length]; // 获取第一个服务提供者的权重 int firstWeight = getWeight(invokers.get(0), invocation); // 存入数组 weights[0] = firstWeight; // 权重的和 int totalWeight = firstWeight; // 轮询全部的提供者的权重并记录下来 for (int i = 1; i < length; i++) { // 此处和上方代码相似 int weight = getWeight(invokers.get(i), invocation); weights[i] = weight; totalWeight += weight; // 若是遇到不同的就把标识改为 false if (sameWeight && weight != firstWeight) { sameWeight = false; } } // 不一样权重模式下的随机计算 // 大概思路是 row 一个随机值,并按照顺序进行相减,观察落在哪一个区间内 if (totalWeight > 0 && !sameWeight) { int offset = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // 相同权重下的随机计算 return invokers.get(ThreadLocalRandom.current().nextInt(length)); } }
轮询负载均衡策略,本质上也是考虑了权重以后的轮循。若是 A 服务提供者的权重是 B 服务提供者的两倍,那么理论上 A 被轮循到的次数就会是 B 的两倍。ide
package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; /** * Round robin load balance. * * 轮询负载均衡策略 */ public class RoundRobinLoadBalance extends AbstractLoadBalance { public static final String NAME = "roundrobin"; private static final int RECYCLE_PERIOD = 60000; /** * 权重的封装 */ protected static class WeightedRoundRobin { private int weight; private AtomicLong current = new AtomicLong(0); private long lastUpdate; public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; current.set(0); } public long increaseCurrent() { return current.addAndGet(weight); } public void sel(int total) { current.addAndGet(-1 * total); } public long getLastUpdate() { return lastUpdate; } public void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; } } private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>(); /** * get invoker addr list cached for specified invocation * <p> * <b>for unit test only</b> * * @param invokers * @param invocation * @return */ protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); Map<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map != null) { return map.keySet(); } return null; } @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // key = serviceKey + methodName // 这个 key 表明一个 provider 接口 String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); // 获取权重记录,若是没有的话会建立一个空 map ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>()); int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker<T> selectedInvoker = null; // 被选中的 provider WeightedRoundRobin selectedWRR = null; // 被选中的 provider 的权重 entity for (Invoker<T> invoker : invokers) { // 此处若是存在权重记录就直接返回,不存在就初始化一个 // identifyString 是缓存的 key String identifyString = invoker.getUrl().toIdentityString(); /* 获取权重的封装对象,若是没有的话会建立一个 WeightedRoundRobin 维护两个重要的参数, 一个数 current,表明该 provider 当前的调用权重 一个是 weight,表明该 provider 恒定的配置权重 */ int weight = getWeight(invoker, invocation); WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> { WeightedRoundRobin wrr = new WeightedRoundRobin(); wrr.setWeight(weight); return wrr; }); // 改权重数据 if (weight != weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); } // cur = weightedRoundRobin.current + weightedRoundRobin.weight long cur = weightedRoundRobin.increaseCurrent(); weightedRoundRobin.setLastUpdate(now); // 此处的 cur > maxCurrent,本质上选出了全部 provider 中 current 最大的一个 // 此处结合上述逻辑,至关于给每一个 provider 的 current 增长了一次 weight // 并选出了 current 最大的那一个,做为调用方 if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } totalWeight += weight; } // 对 map 进行自检 // 若是超过 60 秒都没有被调用,此处即认为服务已经异常,就会移除 if (invokers.size() != map.size()) { map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); } if (selectedInvoker != null) { // weightedRoundRobin.current = weightedRoundRobin.current - totalWeight // 至关于 selectedWRR.sel(totalWeight); return selectedInvoker; } /** * 上述逻辑简图 * 假设三个服务 s1,s2,s3 权重均为 10 * * 第一轮叠加权重后的 current: * 10 10 10 * 第一轮选择推送 s1,推送完成后的 current: * -20 10 10 * * 第二轮叠加权重后的 current: * -10 20 20 * 第二轮选择推送 s2,推送完成后的 current: * -10 -10 20 * * 第三轮叠加权重后的 current: * 0 0 30 * 第三轮选择推送 s3,推送完成后的 current: * 0 0 0 * * 第四轮叠加权重后的 current: * 10 10 10 * 第四轮选择推送 s1,推送完成后的 current: * -20 10 10 * * * 以此类推。 */ // 上述代码出问题的状况下默认选第一个 return invokers.get(0); } }
根据响应时间和当前服务的请求量去得到一个最优解。若是存在多个最优解,则考虑权重,若是仅有一个则权重无效。函数
package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcStatus; import java.util.List; import java.util.concurrent.ThreadLocalRandom; /** * 根据最优解选择服务提供者 */ public class ShortestResponseLoadBalance extends AbstractLoadBalance { public static final String NAME = "shortestresponse"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 可调用的服务提供者的数量 int length = invokers.size(); // 初始化一个最短 response 时间 long shortestResponse = Long.MAX_VALUE; // 初始化一个最短 response 总数 int shortestCount = 0; // The index of invokers having the same estimated shortest response time int[] shortestIndexes = new int[length]; // 每一个服务提供者的权重 int[] weights = new int[length]; // 权重和 int totalWeight = 0; // 调用平均返回时间最短的服务提供者的权重 int firstWeight = 0; // 权重是否相同 boolean sameWeight = true; // 轮询全部的服务提供者 for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // 获取服务提供者的状态 RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); // 平均服务调用成功返回时间 long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed(); // 正在活跃的请求数 int active = rpcStatus.getActive(); // 此处用平均时间乘以活跃数,得到打分 // 若是服务提供方很健壮,平均时间很短,可是请求分配的不少,这里分数也会比较高 // 分数越低,优先级越高 long estimateResponse = succeededAverageElapsed * active; // 获取权重 int afterWarmup = getWeight(invoker, invocation); weights[i] = afterWarmup; /** * 计算最短数组,shortestResponse 记录当前最短的 */ if (estimateResponse < shortestResponse) { // 若是当前服务提供者的得分低于最低的得分,则更新最低得分, // 并将最优提供者数组的首位置为当前的提供者 shortestResponse = estimateResponse; shortestCount = 1; shortestIndexes[0] = i; totalWeight = afterWarmup; firstWeight = afterWarmup; sameWeight = true; } else if (estimateResponse == shortestResponse) { // 若是相等,则可能存在多个最优解 shortestIndexes[shortestCount++] = i; totalWeight += afterWarmup; if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // 最优解只有一个的状况,直接选最优解进行调用 if (shortestCount == 1) { return invokers.get(shortestIndexes[0]); } // 最优解不止一个,且最优解之间的权重不一样,那么此处根据权重去随机选择一个 if (!sameWeight && totalWeight > 0) { int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < shortestCount; i++) { int shortestIndex = shortestIndexes[i]; offsetWeight -= weights[shortestIndex]; if (offsetWeight < 0) { return invokers.get(shortestIndex); } } } // 最优解不止一个,且权重相同,则随机选择 return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]); } }
本文仅为我的的学习笔记,可能存在错误或者表述不清的地方,有缘补充学习