它实现的是客户端的负载均衡java
问题1:它是怎么实现的负载均衡算法?算法
问题2:它是怎么经过实例名获取到的ip地址?spring
咱们能够开始尝试跟踪一下:缓存
咱们对RestTemplate已经比较了解了,它自己只提供了Http调用的功能,并不具有负载均衡的能力,那么咱们能够猜想可能起到做用的就是@LoadBalanced这个注解。咱们进入这个注解,会发现注解上存在一句注释:服务器
Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.
这句注释说明了,当前这个注解是代表RestTemplate 将要使用 LoadBalancerClient ,咱们把这个bean给记下来。app
接下来咱们进入到LoadBalancerClient ` 发现它是一个接口,接口中提供了三个方法:负载均衡
//使用从LoadBalancer中选择出来的实例执行 <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; //使用从LoadBalancer中选择出来的实例执行,指定哪一个实例来执行 <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; //为系统构建一个合适的host:port形式的Url URI reconstructURI(ServiceInstance instance, URI original);
追踪它的实现类,咱们会看到RibbonLoadBalancerClient这个类,它里面实现了这些方法。dom
咱们经过RestTemplate 的追踪,咱们会发现RestTemplate 中调用postForObject() 方法时会触发LoadBalancerInterceptor的方法,而后会发现它最后执行了一个this.loadBalancer.execute()方法ide
而这个execute方法就是咱们在LoadBalancerClient 接口中看到的方法!工具
咱们进入RibbonLoadBalancerClient中去查看它的实现方法--
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { //经过seviceid获取到咱们的loadBalancer ILoadBalancer loadBalancer = getLoadBalancer(serviceId); //获取到咱们的sever对象 Server server = getServer(loadBalancer, hint); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); }
追踪getServer()方法 发现有这一行代码
return loadBalancer.chooseServer(hint != null ? hint : "default");
咱们能够获得一个结论,这行代码的chooseServer方法实现了负载均衡的策略,同时给咱们返回了一个实例回来。
那么它是如何找到的host:port的呢?
进入到RibbonLoadBalancerClient中找到reconstructURI()方法,能够看到有一个RibbonLoadBalancerContext类,进入这个类,会发现它在构造器中传入了一个ILoadBalancer
在ILoadBalancer这个接口中存在addServers(List<Server> newServers)在内部咱们能够获得一个结论,
咱们全部的host:port形式的东西都存放在RibbonLoadBalancerContext中,它会去经过一个实例去获取到咱们的
host:port形式的一个uri地址。也就是一个server对象。
追踪RibbonLoadBalancerContext咱们会发现里面实现了不少方法
public class RibbonLoadBalancerContext extends LoadBalancerContext { public RibbonLoadBalancerContext(ILoadBalancer lb) { super(lb); } //构造器2,初始化了一个Client的配置项 public RibbonLoadBalancerContext(ILoadBalancer lb, IClientConfig clientConfig) { super(lb, clientConfig); } //构造器3,初始化时带入了重试机制,进入RetryHandler, public RibbonLoadBalancerContext(ILoadBalancer lb, IClientConfig clientConfig,RetryHandler, handler) { super(lb, clientConfig, handler); } /**记录活跃数,这里注意Serverstats类,里面定义了大量的状态(响应时间,错误数量,活跃数量等), *note 表明笔记本的意思, *它实际上就是记录一些状态数据 noteOpenConnection 方法里面 实际上就给咱们提供一次加一方法 */ @Override public void noteOpenConnection(ServerStats serverStats) { super.noteOpenConnection(serverStats); } @Override public Timer getExecuteTracer() { return super.getExecuteTracer(); } /*** * 好比这个,就是记录请求响应结束,这时候的异常会被记录。 * */ @Override public void noteRequestCompletion(ServerStats stats, Object response, Throwable e,long responseTime) { super.noteRequestCompletion(stats, response, e, responseTime); } @Override public void noteRequestCompletion(ServerStats stats, Object response, Throwable e,long responseTime, RetryHandler errorHandler) { super.noteRequestCompletion(stats, response, e, responseTime, errorHandler); }
上面代码能够看到一个很明显的RetryHandler,跟踪进去看
public interface RetryHandler { public static final RetryHandler DEFAULT = new DefaultLoadBalancerRetryHandler(); /** * Test if an exception is retriable for the load balancer * * @param e the original exception * @param sameServer if true, the method is trying to determine if retry can be * done on the same server. Otherwise, it is testing whether retry can be * done on a different server */ public boolean isRetriableException(Throwable e, boolean sameServer); /** * Test if an exception should be treated as circuit failure. For example, * a {@link ConnectException} is a circuit failure. This is used to determine * whether successive exceptions of such should trip the circuit breaker to a particular * host by the load balancer. If false but a server response is absent, * load balancer will also close the circuit upon getting such exception. */ public boolean isCircuitTrippingException(Throwable e); /** * @return Number of maximal retries to be done on one server */ //返回在同一台服务器最大的重试次数 public int getMaxRetriesOnSameServer(); /** * @return Number of maximal different servers to retry */ //返回在下一个服务器最大重试次数 public int getMaxRetriesOnNextServer(); }
这些东西只须要了解一下
问题3:它的负载均衡器有哪些?
追踪咱们的RibbonLoadBalancerClient中的execute中的loadBalancer.chooseServer()会发现它调用了接口ILoadBalancer中的chooseServer()方法.
追踪ILoadBalancer咱们能够看到它内部其实实现了这几种方法:
//添加服务实例 public void addServers(List<Server> newServers); //选择服务实例 public Server chooseServer(Object key); //由负载均衡器的客户端调用,以通知服务器宕机,不然,LB会认为它还活着,直到下一个Ping周期——有可能 public void markServerDown(Server server); @Deprecated //不推荐使用 public List<Server> getServerList(boolean availableOnly); //返回一个启动而且正常的服务 public List<Server> getReachableServers(); //返回全部服务 public List<Server> getAllServers();
结论:ILoadBalancer接口实际上给咱们提供了三种结果
它的默认实现就是ZoneAwareLoadBalancer
咱们能够在RibbonClientConfiguration#ribbonLoadBalancer()中看到它若是没有设定负载均衡器就返回默认的。
重写以后咱们能够看到,在该实现中建立了一个ConcurrentHashMap类型的balance对象,用来存储每一个Zone区域对应的负载均衡器,负载均衡器的建立就是经过getLoadBalancer(zone).setServersList(entry.getValue());来完成的。 这是ILoadBalancer的默认实现
查看这个方法的实现类,咱们会发现这里有几个类实现了它的方法:
在BaseLoadBalaner基础上,还有两个子类
DynamicServerListLoadBalancer 是对BaseLoadBalaner 作的一个扩展,在父类的基础上,实现了服务实例清单在运行期的动态更新能力;同时还具有了对服务实例清单过滤的功能,也就是说,使用它的时候,能够经过过滤器来选择性的获取一批服务实例的清单。
1.新增了一个ServerList<T> serverListImpl的接口,去跟踪ServerList 会发现内部定义了两个抽象方法
public interface ServerList<T extends Server> { //获取初始化的服务实例清单 public List<T> getInitialListOfServers(); //获取更新的服务实例清单 public List<T> getUpdatedListOfServers(); }
继续追踪,会发现它的实现有五个,咱们须要去判断一下,在这里它用的是什么方式来作的实现?
咱们作一个猜想,既然在负载均衡器中须要实现服务实例的动态更新,那么它就势必须要有去访问Eureka来获取服务实例的能力。咱们能够去查看一下EurekaRibbonClientConfiguration
@Bean @ConditionalOnMissingBean public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) { if (this.propertiesFactory.isSet(ServerList.class, this.serviceId)) { return (ServerList)this.propertiesFactory.get(ServerList.class, config, this.serviceId); } else { /**经过DiscoveryEnabledNIWSServerList内部的obtainServersViaDiscovery() 从注册中心经过serviceId获取到服务实例列表,将状态为UP的服务放入list返回**/ DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(config, eurekaClientProvider); //经过discoveryServerList 获取到两个集合--初始化的服务清单,更新的服务清单 DomainExtractingServerList serverList = new DomainExtractingServerList(discoveryServerList, config, this.approximateZoneFromHostname); return serverList; } }
在这里咱们可以看到里面是采用了DomainExtractingServerList来进行实现的。咱们开始追踪DomainExtractingServerList会发现它在构造器内部,经过传入的List<DiscoveryEnabledServer>
在实现这两个方法的时候经过setZones方法得到了两个集合。
内部的实现方法过于复杂,感兴趣的同窗能够本身追踪。只要知道流程就OK了。
问题4:刚才看到了IRule这个接口,也知道了这是ribbon的负载均衡策略,那么具体它包含了哪些策略?
默认策略:ZoneAvoidanceRule ===> RibbonClientConfiguration#ribbonRule()
这个策略比较复杂,请注意:
首先类里面定义了一个定时任务DynamicServerWeightTask,默认30秒执行一次
class DynamicServerWeightTask extends TimerTask { public void run() { ServerWeight serverWeight = new ServerWeight(); try { //每隔30秒计算一次权重 serverWeight.maintainWeights(); } catch (Exception e) { logger.error("Error running DynamicServerWeightTask for {}", name, e); } } } public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { // no statistics, nothing to do return; } //计算全部实例的想赢时间的总和 double totalResponseTime = 0; // find maximal 95% response time for (Server server : nlb.getAllServers()) { //若是服务不在缓存中,就自动加载。 ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } //遍历计算每一个实例的权重,公式以下:weightSoFar+totalResponseTime - 平均响应时间 Double weightSoFar = 0.0; // create new list and hot swap the reference List<Double> finalWeights = new ArrayList<Double>(); for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); } } }
这段代码利用了一个公式,weightSoFar+totalResponseTime - 平均响应时间
咱们能够举个例子 假设有,A,B,C三个实例能够选择,他们的平均响应时间为10,40,80,那么咱们能够获得它的一个总响应时长为10+40+80 = 130
那么根据这个公式能够获得的权重
A:0+130 -10 =120;
B:120+(130-40) = 210;
C: 210 +(130-80) = 260;
这里其实是一个数字轴,它会本身生成一个随机数落在这个数字轴上,在哪一个区间就会去选择哪台服务。
问题5:Ribbon的ping策略
在ribbon中IPing这个对象定义了它的ping策略,咱们都知道,须要判断服务是否存活的方式一般都是用心跳,在计算机中心跳一般都是ping这样标识,咱们会每隔一段时间去访问一次服务,一旦有正确返回状态,咱们就认为当前服务存活。
一样的咱们能够在RibbonClientConfiguration下去看到它的默认策略DummyPing()
问题6:服务列表ServerList<Server> 的初始化
RibbonClientConfiguration#ribbonServerList下会初始化
ServerList主要是负责:
它的默认实现比较有意思,若是Eureka关闭,它实现的就是ConfigurationBasedServerList
若是咱们整合了Eureka,会发现它默认实现的就是DiscoveryEnabledNIWSServerList -->咱们能够经过EurekaRibbonClientConfiguration#ribbonServerList中去看到这个服务列表
问题7:Ribbon的自动装配
LoadBalancerClient 这个咱们在以前解释过
PropertiesFactory 经过一些配置化的方式进行组装。
RibbonLoadBalancerContext
IRule 规则
IPING 心跳
ServerList 服务列表
ILoadBalancer
IClientConfig
问题8:重试机制
在咱们使用ribbon中,一旦某个服务实例宕机或者掉线,而咱们的eureka没有及时清理,会发生返回错误的状况,那么针对这种状况下,咱们有没有什么机制能够去解决问题的?
咱们能够继续去找LoadBalancerAutoConfiguration 会看到这样一段代码
@Configuration @ConditionalOnClass(RetryTemplate.class) public static class RetryInterceptorAutoConfiguration { @Bean @ConditionalOnMissingBean public RetryLoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties,LoadBalancerRequestFactory requestFactory,LoadBalancedRetryFactory loadBalancedRetryFactory) { return new RetryLoadBalancerInterceptor(loadBalancerClient, properties, requestFactory, loadBalancedRetryFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer(final RetryLoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } }
它在这里面定义了负载均衡的重试机制,咱们能够来看看这个怎么用的。
导入pom.xml
<!--引入重试机制,让重试生效--> <dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> </dependency>
而后,咱们再来尝试,咱们会发现重试机制生效了。