微服务-(ribbon负载均衡)

它实现的是客户端的负载均衡java

问题1:它是怎么实现的负载均衡算法?算法

问题2:它是怎么经过实例名获取到的ip地址?spring

咱们能够开始尝试跟踪一下:缓存

咱们对RestTemplate已经比较了解了,它自己只提供了Http调用的功能,并不具有负载均衡的能力,那么咱们能够猜想可能起到做用的就是@LoadBalanced这个注解。咱们进入这个注解,会发现注解上存在一句注释:服务器

Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.

这句注释说明了,当前这个注解是代表RestTemplate 将要使用 LoadBalancerClient ,咱们把这个bean给记下来。app

接下来咱们进入到LoadBalancerClient ` 发现它是一个接口,接口中提供了三个方法:负载均衡

//使用从LoadBalancer中选择出来的实例执行
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
//使用从LoadBalancer中选择出来的实例执行,指定哪一个实例来执行
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
//为系统构建一个合适的host:port形式的Url
URI reconstructURI(ServiceInstance instance, URI original);

追踪它的实现类,咱们会看到RibbonLoadBalancerClient这个类,它里面实现了这些方法。dom

咱们经过RestTemplate 的追踪,咱们会发现RestTemplate 中调用postForObject() 方法时会触发LoadBalancerInterceptor的方法,而后会发现它最后执行了一个this.loadBalancer.execute()方法ide

而这个execute方法就是咱们在LoadBalancerClient 接口中看到的方法!工具

咱们进入RibbonLoadBalancerClient中去查看它的实现方法--

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
       //经过seviceid获取到咱们的loadBalancer
       ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
       //获取到咱们的sever对象
       Server server = getServer(loadBalancer, hint);
       if (server == null) {
              throw new IllegalStateException("No instances available for " + serviceId);
       }
       RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
       serviceId), serverIntrospector(serviceId).getMetadata(server));
​
       return execute(serviceId, ribbonServer, request);
}

追踪getServer()方法 发现有这一行代码

return loadBalancer.chooseServer(hint != null ? hint : "default");

咱们能够获得一个结论,这行代码的chooseServer方法实现了负载均衡的策略,同时给咱们返回了一个实例回来。

那么它是如何找到的host:port的呢?

进入到RibbonLoadBalancerClient中找到reconstructURI()方法,能够看到有一个RibbonLoadBalancerContext类,进入这个类,会发现它在构造器中传入了一个ILoadBalancer

ILoadBalancer这个接口中存在addServers(List<Server> newServers)在内部咱们能够获得一个结论,

咱们全部的host:port形式的东西都存放在RibbonLoadBalancerContext中,它会去经过一个实例去获取到咱们的

host:port形式的一个uri地址。也就是一个server对象。

追踪RibbonLoadBalancerContext咱们会发现里面实现了不少方法

public class RibbonLoadBalancerContext extends LoadBalancerContext {
   public RibbonLoadBalancerContext(ILoadBalancer lb) {
      super(lb);
   }
    
   //构造器2,初始化了一个Client的配置项
   public RibbonLoadBalancerContext(ILoadBalancer lb, IClientConfig clientConfig) {
      super(lb, clientConfig);
   }
  
   //构造器3,初始化时带入了重试机制,进入RetryHandler,
   public RibbonLoadBalancerContext(ILoadBalancer lb, IClientConfig clientConfig,RetryHandler, handler) {
      super(lb, clientConfig, handler);
   }
   
   /**记录活跃数,这里注意Serverstats类,里面定义了大量的状态(响应时间,错误数量,活跃数量等),       *note         表明笔记本的意思,
   *它实际上就是记录一些状态数据 noteOpenConnection 方法里面 实际上就给咱们提供一次加一方法
   */
   @Override
   public void noteOpenConnection(ServerStats serverStats) {
      super.noteOpenConnection(serverStats);
   }
​
   @Override
   public Timer getExecuteTracer() {
      return super.getExecuteTracer();
   }
   
   /***
   * 好比这个,就是记录请求响应结束,这时候的异常会被记录。
   *
   */
   @Override
   public void noteRequestCompletion(ServerStats stats, Object response, Throwable e,long responseTime) {
      super.noteRequestCompletion(stats, response, e, responseTime);
   }
​
   @Override
   public void noteRequestCompletion(ServerStats stats, Object response, Throwable e,long responseTime, RetryHandler errorHandler) {
      super.noteRequestCompletion(stats, response, e, responseTime, errorHandler);
   }

上面代码能够看到一个很明显的RetryHandler,跟踪进去看

public interface RetryHandler {
​
   public static final RetryHandler DEFAULT = new DefaultLoadBalancerRetryHandler();
   
   /**
    * Test if an exception is retriable for the load balancer
    * 
    * @param e the original exception
    * @param sameServer if true, the method is trying to determine if retry can be 
    *       done on the same server. Otherwise, it is testing whether retry can be
    *       done on a different server
    */
   public boolean isRetriableException(Throwable e, boolean sameServer);
​
   /**
    * Test if an exception should be treated as circuit failure. For example, 
    * a {@link ConnectException} is a circuit failure. This is used to determine
    * whether successive exceptions of such should trip the circuit breaker to a particular
    * host by the load balancer. If false but a server response is absent, 
    * load balancer will also close the circuit upon getting such exception.
    */
   public boolean isCircuitTrippingException(Throwable e);
       
   /**
    * @return Number of maximal retries to be done on one server
    */
   //返回在同一台服务器最大的重试次数
   public int getMaxRetriesOnSameServer();
​
   /**
    * @return Number of maximal different servers to retry
    */
   //返回在下一个服务器最大重试次数
   public int getMaxRetriesOnNextServer();
}

这些东西只须要了解一下

问题3:它的负载均衡器有哪些?

追踪咱们的RibbonLoadBalancerClient中的execute中的loadBalancer.chooseServer()会发现它调用了接口ILoadBalancer中的chooseServer()方法.

追踪ILoadBalancer咱们能够看到它内部其实实现了这几种方法:

   //添加服务实例
public void addServers(List<Server> newServers);
//选择服务实例
public Server chooseServer(Object key);
   //由负载均衡器的客户端调用,以通知服务器宕机,不然,LB会认为它还活着,直到下一个Ping周期——有可能
public void markServerDown(Server server); 

@Deprecated //不推荐使用
public List<Server> getServerList(boolean availableOnly);

   //返回一个启动而且正常的服务
   public List<Server> getReachableServers();
   
​
   //返回全部服务
public List<Server> getAllServers();

结论:ILoadBalancer接口实际上给咱们提供了三种结果

  • 添加服务实例
  • 返回服务实例
  • 让服务下线

它的默认实现就是ZoneAwareLoadBalancer 

咱们能够在RibbonClientConfiguration#ribbonLoadBalancer()中看到它若是没有设定负载均衡器就返回默认的。

  • ZoneAwareLoadBalancer 是对DynamicServerListLoadBalancer的扩展,它重写了setServerListForZones()方法,这个方法在父类的做用是根据按区域zone的分组实例列表,再给每个Zone对应一个zoneStats来存储一些状态和统计信息的。

重写以后咱们能够看到,在该实现中建立了一个ConcurrentHashMap类型的balance对象,用来存储每一个Zone区域对应的负载均衡器,负载均衡器的建立就是经过getLoadBalancer(zone).setServersList(entry.getValue());来完成的。 这是ILoadBalancer的默认实现

查看这个方法的实现类,咱们会发现这里有几个类实现了它的方法:

  • BaseLoadBalaner 是实现Ribbon负载均衡的基础实现类,在该类中定义了不少关于负载均衡的基础内容
  1. 它维护了两个存储服务实例Server对象的列表,一个存储全部服务实例清单,一个存储正常服务实例清单
  2. 它定义了检查服务器里是否正常的Iping 对象,须要在构造时注入它的实现
  3. 定义了检查服务实例操做的执行策略对象IPingStrategy
  4. 它定义了负载均衡处理规则IRule
  5. 它定义了用来存储负载均衡器的各个服务实例属性和统计信息的LoadBalancerStats

BaseLoadBalaner基础上,还有两个子类

DynamicServerListLoadBalancer 是对BaseLoadBalaner 作的一个扩展,在父类的基础上,实现了服务实例清单在运行期的动态更新能力;同时还具有了对服务实例清单过滤的功能,也就是说,使用它的时候,能够经过过滤器来选择性的获取一批服务实例的清单。

1.新增了一个ServerList<T> serverListImpl的接口,去跟踪ServerList 会发现内部定义了两个抽象方法

public interface ServerList<T extends Server> {
  
   //获取初始化的服务实例清单
   public List<T> getInitialListOfServers();
   
  //获取更新的服务实例清单
   public List<T> getUpdatedListOfServers();   
​
}

继续追踪,会发现它的实现有五个,咱们须要去判断一下,在这里它用的是什么方式来作的实现?

咱们作一个猜想,既然在负载均衡器中须要实现服务实例的动态更新,那么它就势必须要有去访问Eureka来获取服务实例的能力。咱们能够去查看一下EurekaRibbonClientConfiguration

@Bean
   @ConditionalOnMissingBean
   public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
       if (this.propertiesFactory.isSet(ServerList.class, this.serviceId)) {
           return (ServerList)this.propertiesFactory.get(ServerList.class, config, this.serviceId);
      } else {
           /**经过DiscoveryEnabledNIWSServerList内部的obtainServersViaDiscovery()
           从注册中心经过serviceId获取到服务实例列表,将状态为UP的服务放入list返回**/
           DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(config, eurekaClientProvider);
           //经过discoveryServerList 获取到两个集合--初始化的服务清单,更新的服务清单
           DomainExtractingServerList serverList = new DomainExtractingServerList(discoveryServerList, config, this.approximateZoneFromHostname);
           return serverList;
      }
  }

在这里咱们可以看到里面是采用了DomainExtractingServerList来进行实现的。咱们开始追踪DomainExtractingServerList会发现它在构造器内部,经过传入的List<DiscoveryEnabledServer>

在实现这两个方法的时候经过setZones方法得到了两个集合。

内部的实现方法过于复杂,感兴趣的同窗能够本身追踪。只要知道流程就OK了。

问题4:刚才看到了IRule这个接口,也知道了这是ribbon的负载均衡策略,那么具体它包含了哪些策略?

默认策略:ZoneAvoidanceRule ===> RibbonClientConfiguration#ribbonRule()

  • ZoneAvoidanceRule 规避区域策略
  • AbstractLoadBalancerRule 策略的抽象类,它在内部定义了ILoadBalancer对象,这个对象主要是用来在具体选择哪一种策略的时候,获取到负载均衡器中维护的信息的。
  • AvailabilityFilteringRule该策略继承自抽象策略PredicateBasedRule因此也继承了"先过滤清单,再轮询选择"的基本处理逻辑,该策略经过线性抽样的方式直接尝试可用且较空闲的实例来使用,优化了父类每次都要遍历全部实例的开销。
  • BestAvailableRule继承自ClientConfigEnabledRoundRobinRule该策略的特性是可选出最空闲的实例
  • ClientConfigEnabledRoundRobinRule该策略较为特殊,咱们通常不直接使用它。由于它自己并无实现什么特殊的处理逻辑。经过继承该策略,默认的choose就实现了线性轮询机制,在子类中作一些高级策略时一般可能存在。一些没法实施的状况,就能够用父类的实现做为备选
  • PredicateBasedRule抽象策略,继承自ClientConfigEnabledRoundRobinRule,基于Predicate的策略 Predicateshi Google Guava Collection工具对集合进行过滤的条件接口
  • RandomRule 随机数策略,它就是经过一个随机数来获取uplist的某一个下标,再返回。
  • RetryRule 带重试机制策略,它在内部还定义了一个IRule,默认使用了RoundRobinRule,在内部实现了反复重试的机制,若是重试可以获得一个服务,就返回,若是不能就会根据以前设置的时间来决定,时间一到就返回null.
  • RoundRobinRule 一个轮询策略,经过一个count计数变量,每次循环都会累加,注意,若是一直没有server可供选择达到了10次,就会打印一个警告信息。
  • WeightedResponseTimeRule 这个策略是对轮询策略的扩展,增长了根据实例的运行状况来计算权重,并根据权重来挑选实例,用以达到更好的分配结果。

这个策略比较复杂,请注意:

首先类里面定义了一个定时任务DynamicServerWeightTask,默认30秒执行一次

class DynamicServerWeightTask extends TimerTask {
      public void run() {
          ServerWeight serverWeight = new ServerWeight();
          try {
              //每隔30秒计算一次权重
              serverWeight.maintainWeights();
          } catch (Exception e) {
              logger.error("Error running DynamicServerWeightTask for {}", name, e);
          }
      }
  }
public void maintainWeights() {
           ILoadBalancer lb = getLoadBalancer();
           if (lb == null) {
               return;
          }
           
           if (!serverWeightAssignmentInProgress.compareAndSet(false,  true)) {
               return; 
          }
           
           try {
               logger.info("Weight adjusting job started");
               AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
               LoadBalancerStats stats = nlb.getLoadBalancerStats();
               if (stats == null) {
                   // no statistics, nothing to do
                   return;
              }
               //计算全部实例的想赢时间的总和
               double totalResponseTime = 0;
               // find maximal 95% response time
               for (Server server : nlb.getAllServers()) {
                  //若是服务不在缓存中,就自动加载。
                   ServerStats ss = stats.getSingleServerStat(server);
                   totalResponseTime += ss.getResponseTimeAvg();
              }
               //遍历计算每一个实例的权重,公式以下:weightSoFar+totalResponseTime - 平均响应时间
               Double weightSoFar = 0.0;
               
               // create new list and hot swap the reference
               List<Double> finalWeights = new ArrayList<Double>();
               for (Server server : nlb.getAllServers()) {
                   ServerStats ss = stats.getSingleServerStat(server);
                   double weight = totalResponseTime - ss.getResponseTimeAvg();
                   weightSoFar += weight;
                   finalWeights.add(weightSoFar);   
              }
               setWeights(finalWeights);
          } catch (Exception e) {
               logger.error("Error calculating server weights", e);
          } finally {
               serverWeightAssignmentInProgress.set(false);
          }
​
      }
  }

这段代码利用了一个公式,weightSoFar+totalResponseTime - 平均响应时间

咱们能够举个例子 假设有,A,B,C三个实例能够选择,他们的平均响应时间为10,40,80,那么咱们能够获得它的一个总响应时长为10+40+80 = 130

那么根据这个公式能够获得的权重 

A:0+130 -10 =120;

B120+(130-40) = 210;

C: 210 +130-80 = 260

这里其实是一个数字轴,它会本身生成一个随机数落在这个数字轴上,在哪一个区间就会去选择哪台服务。

问题5:Ribbonping策略

ribbonIPing这个对象定义了它的ping策略,咱们都知道,须要判断服务是否存活的方式一般都是用心跳,在计算机中心跳一般都是ping这样标识,咱们会每隔一段时间去访问一次服务,一旦有正确返回状态,咱们就认为当前服务存活。

一样的咱们能够在RibbonClientConfiguration下去看到它的默认策略DummyPing()

问题6:服务列表ServerList<Server> 的初始化

RibbonClientConfiguration#ribbonServerList下会初始化

ServerList主要是负责:

  • 获取初始化的服务列表
  • 获取更新的服务列表

它的默认实现比较有意思,若是Eureka关闭,它实现的就是ConfigurationBasedServerList

若是咱们整合了Eureka,会发现它默认实现的就是DiscoveryEnabledNIWSServerList -->咱们能够经过EurekaRibbonClientConfiguration#ribbonServerList中去看到这个服务列表

问题7Ribbon的自动装配

  • RibbonAutoConfiguration 内部初始化了比较重要的两个东西:

LoadBalancerClient 这个咱们在以前解释过

PropertiesFactory 经过一些配置化的方式进行组装。

  • RibbonClientConfiguration 这个里面实现了很是多的东西

RibbonLoadBalancerContext 

IRule 规则

IPING 心跳

ServerList 服务列表

ILoadBalancer

IClientConfig

问题8:重试机制

在咱们使用ribbon中,一旦某个服务实例宕机或者掉线,而咱们的eureka没有及时清理,会发生返回错误的状况,那么针对这种状况下,咱们有没有什么机制能够去解决问题的?

咱们能够继续去找LoadBalancerAutoConfiguration 会看到这样一段代码

@Configuration
@ConditionalOnClass(RetryTemplate.class)
public static class RetryInterceptorAutoConfiguration {

   @Bean
   @ConditionalOnMissingBean
   public RetryLoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties,LoadBalancerRequestFactory requestFactory,LoadBalancedRetryFactory loadBalancedRetryFactory) {
   return new RetryLoadBalancerInterceptor(loadBalancerClient, properties, requestFactory, loadBalancedRetryFactory);
   }
​
   @Bean
   @ConditionalOnMissingBean
   public RestTemplateCustomizer restTemplateCustomizer(final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
   return restTemplate -> {
               List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                       restTemplate.getInterceptors());
               list.add(loadBalancerInterceptor);
               restTemplate.setInterceptors(list);
          };
   }
}

它在这里面定义了负载均衡的重试机制,咱们能够来看看这个怎么用的。

导入pom.xml

<!--引入重试机制,让重试生效-->
<dependency>
  <groupId>org.springframework.retry</groupId>
  <artifactId>spring-retry</artifactId>
</dependency>

而后,咱们再来尝试,咱们会发现重试机制生效了。

相关文章
相关标签/搜索