接上篇http://www.javashuo.com/article/p-poudvoae-u.htmljava
既然有集群容错,天然会有负载均衡。dubbo经过spi默认实现了4种lb策略
分别是
权重随机(random),实现类RandomLoadBalance
权重轮询(roundrobin),实现类RoundRobinLoadBalance
最少活跃(leastactive)负载策略,实现类LeastActiveLoadBalance
一致性hash(consistenthash)实现类ConsistentHashLoadBalance
类关系图:算法
4种实现都扩展了抽象类AbstractLoadBalance,
并实现了doSelect抽象方法,
这点和集群容错结构使用了一样的设计模式,这个doSelect方法在AbstractLoadBalance的select方法中被调用,select方法也是接口LoadBalance的惟一方法,是负载均衡的实现方法。设计模式
代码以下:数组
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); //回调子类的doSelect实现,实现具体的lb策略 return doSelect(invokers, url, invocation); }
dubbo负载均衡,默认是随机(random)
这个可经过上篇提到的AbstractClusterInvoker的invoke方法实现看到,代码:
负载均衡
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { //从url经过key "loadbalance" 取不到值,就取默认random随机策略 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { //取默认random随机策略 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
可是,这篇只说,最少活跃(leastactive)负载策略。dom
首先想说的是,要理解最少活跃数负载策略,就要先弄明白这里的最少活跃数,指的是什么数
先看实现代码:ide
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // 总个数 int leastActive = -1; // 最小的活跃数 int leastCount = 0; // 相同最小活跃数的个数 int[] leastIndexs = new int[length]; // 相同最小活跃数的下标 int totalWeight = 0; // 总权重 int firstWeight = 0; // 第一个权重,用于于计算是否相同 boolean sameWeight = true; // 是否全部权重相同 for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // 活跃数是经过RpcStatus,getStatus(invoker.getUrl(), invocation.getMethodName()).getActive()获取的。 // 能够先跳过去看下文的RpcStatus类解读 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重 if (leastActive == -1 || active < leastActive) { // 发现更小的活跃数,从新开始 leastActive = active; // 记录最小活跃数 leastCount = 1; // 从新统计相同最小活跃数的个数 leastIndexs[0] = i; // 从新记录最小活跃数下标 totalWeight = weight; // 从新累计总权重 firstWeight = weight; // 记录第一个权重 sameWeight = true; // 还原权重相同标识 } else if (active == leastActive) { // 累计相同最小的活跃数 leastIndexs[leastCount ++] = i; // 累计相同最小活跃数下标 totalWeight += weight; // 累计总权重 // 判断全部权重是否同样 if (sameWeight && i > 0 && weight != firstWeight) { sameWeight = false; } } } // assert(leastCount > 0) if (leastCount == 1) { // 若是只有一个最小则直接返回 return invokers.get(leastIndexs[0]); } if (! sameWeight && totalWeight > 0) { // 若是权重不相同且权重大于0则按总权重数随机 int offsetWeight = random.nextInt(totalWeight); // 并肯定随机值落在哪一个片段上 for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; //这里getWeight获得权重,不必定就是配置的,它兼容了java的warmup问题, //大概意思是,若是warmup时间设置为10分钟,权重配置为100, //而当前服务只启动了1分钟,那么这个方法为计算出一个值为10的新权值 //这其实,这会有个小问题的,应为上面计算的totalWeight是没有按warmup降权的, //因此,按目前落在哪一个片断上的算法,有可能一个也选不到。特别是服务刚启动时。 offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } } // 若是权重相同或权重为0则均等随机 return invokers.get(leastIndexs[random.nextInt(leastCount)]); }
这个方法,就是把invokers里,有最小活跃数的invoker(一个或多个)的下标,记录到leastIndexs数组里。
若是只有一个,就直接返回,不用选了。若是有多个,再计算这其中每一个的invoker的权重。
若是权重同样,就均等随机选一个。
若是权重不同,就再按权重随机(random策略)从中选一个。url
RpcStatus类,它是url统计类,有如下属性.net
//私有静态map,存调用统计信息用的 private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>(); private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>(); //具体表明各个调用指标统计值 private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>(); //活跃数 private final AtomicInteger active = new AtomicInteger(); private final AtomicLong total = new AtomicLong(); private final AtomicInteger failed = new AtomicInteger(); private final AtomicLong totalElapsed = new AtomicLong(); private final AtomicLong failedElapsed = new AtomicLong(); private final AtomicLong maxElapsed = new AtomicLong(); private final AtomicLong failedMaxElapsed = new AtomicLong(); private final AtomicLong succeededMaxElapsed = new AtomicLong();
上文提到的链接数是经过下面方法获得
public int getActive() {
return active.get();
}
而能改变这个active值的只有下面两个方法
private static void beginCount(RpcStatus status) {
}
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
}设计
这两个方法,何时被调用的呢
经过源码find usages发如今ActiveLimitFilter和ExecuteLimitFilter两个过滤器中调用的。
经过注解知道,ExecuteLimitFilter是服务端过滤器,ActiveLimitFilter是客户端过滤器(之后能够写专门介绍过滤器的)
咱们这边是调用方,应该用ActiveLimitFilter。而启用这个过滤器,则须要在调用方应用上配置filter="activelimit"
因为dubbo默认调用是没有启用这个过滤器的,因此要想使用最少活跃(leastactive)负载策略,须要配置启用这个activelimit过滤器。看下过滤器,惟一一个方法:
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0); RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); if (max > 0) { long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0); long start = System.currentTimeMillis(); long remain = timeout; int active = count.getActive(); if (active >= max) { synchronized (count) { while ((active = count.getActive()) >= max) { try { count.wait(remain); } catch (InterruptedException e) { } long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; if (remain <= 0) { throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max); } } } } } try { //业务方法调用前,调用beginCoun long begin = System.currentTimeMillis(); RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); //调用成功后,返回后,调用endCount RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); return result; } catch (RuntimeException t) { //调用失败后结束统计 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false); throw t; } } finally { if(max>0){ synchronized (count) { count.notify(); } } } }
再回头看下两个方法的具体实现:
/** * @param url */ public static void beginCount(URL url, String methodName) { //dubbo这里,把调用的url或方法名作key ,RpcStatus对象做为value是方法,静态map属性里 //经过这样把调用信息存起来。 //它能够统计一个url被调用的信息,也能够记录一个url里某个方法被调用的统计信息 beginCount(getStatus(url)); beginCount(getStatus(url, methodName)); } private static void beginCount(RpcStatus status) { status.active.incrementAndGet();//active值加1 } //beginCount的做用,能够理解某个方法调用前,它对应的active数目加1 private static void endCount(RpcStatus status, long elapsed, boolean succeeded) { status.active.decrementAndGet();//某个方法正调用结束,它对应的active减一 status.total.incrementAndGet(); status.totalElapsed.addAndGet(elapsed); if (status.maxElapsed.get() < elapsed) { status.maxElapsed.set(elapsed); } if (succeeded) { if (status.succeededMaxElapsed.get() < elapsed) { status.succeededMaxElapsed.set(elapsed); } } else { status.failed.incrementAndGet(); status.failedElapsed.addAndGet(elapsed); if (status.failedMaxElapsed.get() < elapsed) { status.failedMaxElapsed.set(elapsed); } } } //endCount的做用,能够理解某个方法调用结束后,它对应的active数目减1
因此,这个active数目就是表示,某个方法当前有多少正在执行(开始调用,但尚未返回) 也能够说最少活跃(leastactive)负载策略,就选择那些返回比较快的主机,或者本机调用较少的主机。