Spring Cloud Ribbon是一个基于HTTP和TCP的客户端负载均衡工具,基于Netflix Ribbon实现。java
负载均衡器相关内容见上一篇文章算法
负载均衡策略的抽象类,在该抽象类中定义了负载均衡器ILoadBalancer对象,该对象可以在具体实现选择服务策略时,获取到一些负载均衡器中维护的信息做为分配依据,并以此设计一些算法来实现针对特定场景的高效策略。并发
package com.netflix.loadbalancer; import com.netflix.client.IClientConfigAware; public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware { private ILoadBalancer lb; @Override public void setLoadBalancer(ILoadBalancer lb){ this.lb = lb; } @Override public ILoadBalancer getLoadBalancer(){ return lb; } }
该策略实现了从服务实例清单中随机选择一个服务实例的功能。下面先看一下源码:app
package com.netflix.loadbalancer; import java.util.List; import java.util.Random; import com.netflix.client.config.IClientConfig; public class RandomRule extends AbstractLoadBalancerRule { Random rand; public RandomRule() { rand = new Random(); } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { if (Thread.interrupted()) { return null; } List<Server> upList = lb.getReachableServers(); List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ return null; } int index = rand.nextInt(serverCount); server = upList.get(index); if (server == null) { /* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Shouldn't actually happen.. but must be transient or a bug. server = null; Thread.yield(); } return server; } @Override public Server choose(Object key) { return choose(getLoadBalancer(), key); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { // TODO Auto-generated method stub } }
分析源码能够看出,IRule接口中Server choose(Object key)函数的实现委托给了该类中的Server choose(ILoadBalancer lb, Object key)函数,该方法增长了一个负载均衡器参数。从具体的实现能够看出,它会使用负载均衡器来得到可用实例列表upList和全部的实例列表allList,而且使用rand.nextInt(serverCount)函数来获取一个随机数,并将该随机数做为upList的索引值来返回具体实例。同时,具体的选择逻辑在一个while (server == null)循环以内,而根据选择逻辑的实现,正常状况下每次都应该选出一个服务实例,若是出现死循环获取不到服务实例时,则颇有可能存在并发的Bug。负载均衡
该策略实现了按照线性轮询的方式依次选择每一个服务实例的功能。下面看一下源码:less
package com.netflix.loadbalancer; import com.netflix.client.config.IClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class RoundRobinRule extends AbstractLoadBalancerRule { private AtomicInteger nextServerCyclicCounter; public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } /** * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}. * * @param modulo The modulo to bound the value of the counter. * @return The next value. */ private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextServerCyclicCounter.get(); int next = (current + 1) % modulo; if (nextServerCyclicCounter.compareAndSet(current, next)) return next; } } }
RoundRobinRule具体实现和RandomRule相似,可是循环条件和从可用列表获取实例的逻辑不一样。循环条件中增长了一个count计数变量,该变量会在每次循环以后累加,若是循环10次还没获取到Server,就会结束,并打印一个警告信息No available alive servers after 10 tries from load balancer:...。dom
线性轮询的实现是经过AtomicInteger nextServerCyclicCounter对象实现,每次进行实例选择时经过调用int incrementAndGetModulo(int modulo)方法来实现。ide
该策略实现了一个具有重试机制的实例选择功能。从源码中能够看出,内部定义了一个IRule对象,默认是RoundRobinRule实例,choose方法中则实现了对内部定义的策略进行反复尝试的策略,若期间可以选择到具体的服务实例就返回,若选择不到而且超过设置的尝试结束时间(maxRetryMillis参数定义的值 + choose方法开始执行的时间戳)就返回null。函数
package com.netflix.loadbalancer; import com.netflix.client.config.IClientConfig; public class RetryRule extends AbstractLoadBalancerRule { IRule subRule = new RoundRobinRule(); long maxRetryMillis = 500; /* * Loop if necessary. Note that the time CAN be exceeded depending on the * subRule, because we're not spawning additional threads and returning * early. */ public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis(); long deadline = requestTime + maxRetryMillis; Server answer = null; answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); while (!Thread.interrupted()) { answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { /* pause and retry hoping it's transient */ Thread.yield(); } else { break; } } task.cancel(); } if ((answer == null) || (!answer.isAlive())) { return null; } else { return answer; } } }
该策略是对RoundRobinRule的扩展,增长了根据实例的运行状况来计算权重,并根据权重来挑选实例,以达到更优的分配效果。它的实现主要有三个核心内容。微服务
WeightedResponseTimeRule策略在初始化的时候会经过serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval)启动一个定时任务,用来为每一个服务实例计算权重,该任务默认30s执行一次。
在源码中咱们能够轻松找到用于存储权重的对象private volatile List<Double> accumulatedWeights = new ArrayList<Double>();该List中每一个权重值所处的位置对应了负载均衡器维护的服务实例清单中全部实例在清单中的位置。下面看一下权重计算函数maintainWeights的源码:
public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { // no statistics, nothing to do return; } double totalResponseTime = 0; // find maximal 95% response time for (Server server : nlb.getAllServers()) { // this will automatically load the stats if not in cache ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // weight for each server is (sum of responseTime of all servers - responseTime) // so that the longer the response time, the less the weight and the less likely to be chosen Double weightSoFar = 0.0; // create new list and hot swap the reference List<Double> finalWeights = new ArrayList<Double>(); for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); } }
该方法的实现主要分为两个步骤:
经过概算计算出来的权重值只是表明了各实例权重区间的上限。下面图节选自Spring Cloud 微服务实战。
下面看一下Server choose(ILoadBalancer lb, Object key)如何选择Server的
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { // get hold of the current reference in case it is changed from the other thread List<Double> currentWeights = accumulatedWeights; if (Thread.interrupted()) { return null; } List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; // last one in the list is the sum of all weights double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); // No server has been hit yet and total weight is not initialized // fallback to use round robin if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) { server = super.choose(getLoadBalancer(), key); if(server == null) { return server; } } else { // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive) double randomWeight = random.nextDouble() * maxTotalWeight; // pick the server index based on the randomIndex int n = 0; for (Double d : currentWeights) { if (d >= randomWeight) { serverIndex = n; break; } else { n++; } } server = allList.get(serverIndex); } if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Next. server = null; } return server; }
下面咱们看一下源码的主要步骤有:
细心的可能会发现第一个服务实例的权重区间是双闭,最后一个服务实例的权重区间是双开,其余服务实例的区间都是左开右闭。这是由于随机数的最小值能够为0,因此第一个实例下限是闭区间,同时随机数的最大值取不到最大权重值,因此最后一个实例的上限是开区间。
该策略比较特殊,通常不直接使用它。由于他自己并无实现特殊的处理逻辑,在他内部定义了一个RoundRobinRule策略,choose函数的实现其实就是采用了RoundRobinRule的线性轮询机制。
在实际开发中,咱们并不会直接使用该策略,而是基于它作高级策略扩展。
该策略继承自ClientConfigEnabledRoundRobinRule,在实现中它注入了负载均衡器的统计对象LoadBalancerStats,同时在choose方法中利用LoadBalancerStats保存的实例统计信息来选择知足要求的服务实例。
当LoadBalancerStats为空时,会使用RoundRobinRule线性轮询策略,当有LoadBalancerStats时,会经过遍历负载均衡器中维护的全部服务实例,会过滤掉故障的实例,并找出并发请求数最小的一个。
该策略的特性是能够选出最空闲的服务实例。
这是一个抽象策略,它继承了ClientConfigEnabledRoundRobinRule,从命名中能够猜出这是一个基于Predicate实现的策略,Predicate是Google Guava Collection工具对集合进行过滤的条件接口。
public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }
在该源码中,它定义了一个抽象函数getPredicate来获取AbstractServerPredicate对象的实现,在choose方法中,经过AbstractServerPredicate的chooseRoundRobinAfterFiltering函数来选择具体的服务实例。从该方法的命名咱们能够看出大体的逻辑:首先经过子类中实现的Predicate逻辑来过滤一部分服务实例,而后再以线性轮询的方式从过滤后的实例清单中选出一个。
在上面choose函数中调用的chooseRoundRobinAfterFiltering方法先经过内部定义的getEligibleServers函数来获取备选的实例清单(实现了过滤),若是返回的清单为空,则用Optional.absent来表示不存在,反之则以线性轮询的方式从备选清单中获取一个实例。
下面看一下getEligibleServers方法的源码
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { if (loadBalancerKey == null) { return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate())); } else { List<Server> results = Lists.newArrayList(); for (Server server: servers) { if (this.apply(new PredicateKey(loadBalancerKey, server))) { results.add(server); } } return results; } }
上述源码的大体逻辑是遍历服务清单,使用this.apply方法来判断实例是否须要保留,若是是就添加到结果列表中。
实际上,AbstractServerPredicate实现了com.google.common.base.Predicate接口,apply方法是接口中的定义,主要用来实现过滤条件的判断逻辑,它输入的参数则是过滤条件须要用到的一些信息(好比源码中的new PredicateKey(loadBalancerKey, server)),传入了关于实例的统计信息和负载均衡器的选择算法传递过来的key。
AbstractServerPredicate没有apply的实现,因此这里的chooseRoundRobinAfterFiltering方法只是定义了一个模板策略:先过滤清单,再轮询选择。
对于如何过滤,须要在AbstractServerPredicate的子类中实现apply方法来肯定具体的过滤策略。
&emsps;该类继承自PredicateBasedRule,遵循了先过滤清单,再轮询选择的基本处理逻辑,其中过滤条件使用了AvailabilityPredicate,下面看一下AvailabilityPredicate的源码:
package com.netflix.loadbalancer; import javax.annotation.Nullable; import com.netflix.client.config.IClientConfig; import com.netflix.config.ChainedDynamicProperty; import com.netflix.config.DynamicBooleanProperty; import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; public class AvailabilityPredicate extends AbstractServerPredicate { @Override public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); } private boolean shouldSkipServer(ServerStats stats) { if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; } }
从上面的源码能够看出,主要过的过滤逻辑都是在boolean shouldSkipServer(ServerStats stats)方法中实现,该方法主要判断服务实例的两项内容:
上面两项只要知足一项,apply方法就返回false,表明该服务实例可能存在故障或负载太高,都不知足就返回true。
在AvailabilityFilteringRule进行实例选择时作了小小的优化,它并无向父类同样先遍历全部的节点进行过滤,而后在过滤后的集合中选择实例。而是先以线性的方式选择一个实例,接着使用过滤条件来判断该实例是否知足要求,若知足就直接使用该实例,若不知足要求就再选择下一个实例,检查是否知足要求,这个过程循环10次若是尚未找到合适的服务实例,就采用父类的实现方案。
该策略经过线性轮询的方式直接尝试寻找可用且比较空闲的实例来用,优化了每次都要遍历全部实例的开销。
该类也是PredicateBasedRule的子类,它的实现是经过组合过滤条件CompositePredicate,以ZoneAvoidancePredicate为主过滤条件,以AvailabilityPredicate为次过滤条件。
ZoneAvoidanceRule的实现并无像AvailabilityFilteringRule重写choose函数来优化,因此它遵循了先过滤清单再轮询选择的基本逻辑。
下面看一下CompositePredicate的源码
package com.netflix.loadbalancer; import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Lists; public class CompositePredicate extends AbstractServerPredicate { private AbstractServerPredicate delegate; private List<AbstractServerPredicate> fallbacks = Lists.newArrayList(); private int minimalFilteredServers = 1; private float minimalFilteredPercentage = 0; @Override public boolean apply(@Nullable PredicateKey input) { return delegate.apply(input); } @Override public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { List<Server> result = super.getEligibleServers(servers, loadBalancerKey); Iterator<AbstractServerPredicate> i = fallbacks.iterator(); while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage)) && i.hasNext()) { AbstractServerPredicate predicate = i.next(); result = predicate.getEligibleServers(servers, loadBalancerKey); } return result; } }
从源码中能够看出,CompositePredicate定义了一个主过滤条件delegate和一组过滤条件列表fallbacks,次过滤条件的过滤顺序是按存储顺序执行的。
在获取结果的getEligibleServers函数中的主要逻辑是:
后面会介绍Spring Cloud Ribbon配置方式,请持续关注!!!