本系列示例与胶水代码地址: https://github.com/HashZhang/spring-cloud-scaffoldjava
负载均衡Ribbon替换成Spring Cloud Load Balancer
Spring Cloud Load Balancer
并非一个独立的项目,而是spring-cloud-commons
其中的一个模块。 项目中用了Eureka
以及相关的 starter,想彻底剔除Ribbon
的相关依赖基本是不可能的,Spring 社区的人也是看到了这一点,经过配置去关闭Ribbon
启用Spring-Cloud-LoadBalancer
。react
spring.cloud.loadbalancer.ribbon.enabled=false
关闭ribbon以后,Spring Cloud LoadBalancer就会加载成为默认的负载均衡器。git
Spring Cloud LoadBalancer 结构以下所示:github
其中:算法
- 全局只有一个
BlockingLoadBalancerClient
,负责执行全部的负载均衡请求。 BlockingLoadBalancerClient
从LoadBalancerClientFactory
里面加载对应微服务的负载均衡配置。- 每一个微服务下有独自的
LoadBalancer
,LoadBalancer
里面包含负载均衡的算法,例如RoundRobin
.根据算法,从ServiceInstanceListSupplier
返回的实例列表中选择一个实例返回。
1. 实现zone
隔离
要想实现zone
隔离,应该从ServiceInstanceListSupplier
里面作手脚。默认的实现里面有关于zone
隔离的ServiceInstanceListSupplier
-> org.springframework.cloud.loadbalancer.core.ZonePreferenceServiceInstanceListSupplier
:spring
private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) { if (zone == null) { zone = zoneConfig.getZone(); } //若是zone不为null,而且该zone下有存活实例,则返回这个实例列表 //不然,返回全部的实例 if (zone != null) { List<ServiceInstance> filteredInstances = new ArrayList<>(); for (ServiceInstance serviceInstance : serviceInstances) { String instanceZone = getZone(serviceInstance); if (zone.equalsIgnoreCase(instanceZone)) { filteredInstances.add(serviceInstance); } } if (filteredInstances.size() > 0) { return filteredInstances; } } // If the zone is not set or there are no zone-specific instances available, // we return all instances retrieved for given service id. return serviceInstances; }
这里对于没指定zone
或者该zone
下没有存活实例的状况下,会返回全部查到的实例,不区分zone
。这个不符合咱们的要求,因此咱们修改并实现下咱们本身的com.github.hashjang.hoxton.service.consumer.config.SameZoneOnlyServiceInstanceListSupplier:缓存
private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) { if (zone == null) { zone = zoneConfig.getZone(); } if (zone != null) { List<ServiceInstance> filteredInstances = new ArrayList<>(); for (ServiceInstance serviceInstance : serviceInstances) { String instanceZone = getZone(serviceInstance); if (zone.equalsIgnoreCase(instanceZone)) { filteredInstances.add(serviceInstance); } } if (filteredInstances.size() > 0) { return filteredInstances; } } //若是没找到就返回空列表,毫不返回其余集群的实例 return List.of(); }
而后咱们来看一下默认的 Spring Cloud LoadBalancer
提供的 LoadBalancer
,它是带缓存的:负载均衡
org.springframework.cloud.loadbalancer.annotation.LoadBalancerClientConfiguration
框架
@Bean @ConditionalOnBean(ReactiveDiscoveryClient.class) @ConditionalOnMissingBean public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier( ReactiveDiscoveryClient discoveryClient, Environment env, ApplicationContext context) { DiscoveryClientServiceInstanceListSupplier delegate = new DiscoveryClientServiceInstanceListSupplier( discoveryClient, env); ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); if (cacheManagerProvider.getIfAvailable() != null) { return new CachingServiceInstanceListSupplier(delegate, cacheManagerProvider.getIfAvailable()); } return delegate; }
DiscoveryClientServiceInstanceListSupplier
每次从Eureka
上面拉取实例列表,CachingServiceInstanceListSupplier
提供了缓存,这样没必要每次从Eureka
上面拉取。能够看出CachingServiceInstanceListSupplier
是一种代理模式的实现,和SameZoneOnlyServiceInstanceListSupplier
的模式是同样的。dom
咱们来组装咱们的ServiceInstanceListSupplier
,因为咱们是同步的环境,只用实现同步的ServiceInstanceListSupplier
就好了。
public class CommonLoadBalancerConfig { /** * 同步环境下的ServiceInstanceListSupplier * SameZoneOnlyServiceInstanceListSupplier限制仅返回同一个zone下的实例(注意) * CachingServiceInstanceListSupplier启用缓存,不每次访问eureka请求实例列表 * * @param discoveryClient * @param env * @param zoneConfig * @param context * @return */ @Bean @Order(Integer.MIN_VALUE) public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier( DiscoveryClient discoveryClient, Environment env, LoadBalancerZoneConfig zoneConfig, ApplicationContext context) { ServiceInstanceListSupplier delegate = new SameZoneOnlyServiceInstanceListSupplier( new DiscoveryClientServiceInstanceListSupplier(discoveryClient, env), zoneConfig ); ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); if (cacheManagerProvider.getIfAvailable() != null) { return new CachingServiceInstanceListSupplier( delegate, cacheManagerProvider.getIfAvailable() ); } return delegate; } }
2. 实现下一次重试的时候,若是存在其余实例,则必定会重试与本次不一样的其余实例
默认的RoundRobinLoadBalancer
,其中的轮询position
,是一个Atomic
类型的,在某个微服务的调用请求下,全部线程,全部请求共用(调用其余的每一个微服务会建立一个RoundRobinLoadBalancer
)。在使用的时候,会有这样的一个问题:
- 假设某个微服务有两个实例,实例 A 和实例 B
- 某次请求 X 发往实例 A,position = position + 1
- 在请求没有返回时,请求 Y 到达,发往实例 B,position = position + 1
- 请求 A 失败,重试,重试的实例仍是实例 A
这样在重试的状况下,某个请求的重试可能会发送到上一次的实例进行重试,这不是咱们想要的。针对这个,我提了个Issue:Enhance RoundRoubinLoadBalancer position。我修改的思路是,咱们须要一个单次请求隔离的position
,这个position
对于实例个数取余得出请求要发往的实例。那么如何进行请求隔离呢?
首先想到的是线程隔离,可是这个是不行的。Spring Cloud LoadBalancer 底层运用了 reactor 框架,致使实际承载选择实例的线程,不是业务线程,而是 reactor 里面的线程池,如图所示: 因此,不能用
ThreadLocal
的方式实现position
。
因为咱们用到了sleuth
,通常请求的context
会传递其中的traceId
,咱们根据这个traceId
区分不一样的请求,实现咱们的 LoadBalancer
:
RoundRobinBaseOnTraceIdLoadBalancer
//这个超时时间,须要设置的比你的请求的 connectTimeout + readTimeout 长 private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(3, TimeUnit.SECONDS).build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000))); private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } //若是没有 traceId,就生成一个新的,可是最好检查下为啥会没有 //是否是 MQ 消费这种没有主动生成 traceId 的状况,最好主动生成下。 Span currentSpan = tracer.currentSpan(); if (currentSpan == null) { currentSpan = tracer.newTrace(); } long l = currentSpan.context().traceId(); int seed = positionCache.get(l).getAndIncrement(); return new DefaultResponse(serviceInstances.get(seed % serviceInstances.size())); }
3. 替换默认的负载均衡相关 Bean 实现
咱们要用上面的两个类替换默认的实现,先编写一个配置类:
public class CommonLoadBalancerConfig { private volatile boolean isValid = false; /** * 同步环境下的ServiceInstanceListSupplier * SameZoneOnlyServiceInstanceListSupplier限制仅返回同一个zone下的实例(注意) * CachingServiceInstanceListSupplier启用缓存,不每次访问eureka请求实例列表 * * @param discoveryClient * @param env * @param zoneConfig * @param context * @return */ @Bean @Order(Integer.MIN_VALUE) public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier( DiscoveryClient discoveryClient, Environment env, LoadBalancerZoneConfig zoneConfig, ApplicationContext context) { isValid = true; ServiceInstanceListSupplier delegate = new SameZoneOnlyServiceInstanceListSupplier( new DiscoveryClientServiceInstanceListSupplier(discoveryClient, env), zoneConfig ); ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); if (cacheManagerProvider.getIfAvailable() != null) { return new CachingServiceInstanceListSupplier( delegate, cacheManagerProvider.getIfAvailable() ); } return delegate; } @Bean public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer( Environment environment, ServiceInstanceListSupplier serviceInstanceListSupplier, Tracer tracer) { if (!isValid) { throw new IllegalStateException("should use the ServiceInstanceListSupplier in this configuration, please check config"); } String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new RoundRobinBaseOnTraceIdLoadBalancer( name, serviceInstanceListSupplier, tracer ); } }
而后,指定默认的负载均衡配置采起这个配置, 经过注解:
@LoadBalancerClients(defaultConfiguration = {CommonLoadBalancerConfig.class})