Spring Cloud Ribbon 源码解析

专栏目录

  1. Spring Cloud OpenFeign 源码解析
  2. Spring Cloud Ribbon 源码解析
  3. Spring Cloud Alibaba Sentinel 源码解析
  4. Spring Cloud Gatway 源码解析
  5. Spring Cloud Alibaba Nacos 源码解析

代码准备

依赖关系

+------------+              +------------+
|            |              |            |
|            |              |            |
|            |              |            |
|            |              |            |
|  consumer  +------------> |   provider |
|            | RestTemplate |            |
|            |              |            |
|            |              |            |
|            |              |            |
+------------+              +------------+

pom 依赖

加入nacos 服务发现便可,内部引用了 spring-cloud-ribbon 相关依赖java

<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

调用客户端

咱们这里以最简单的 RestTemplate 调用开始使用Ribbongit

@Bean
@LoadBalanced
public RestTemplate restTemplate() {
    return new RestTemplate();
}

// Controller 使用restTemplate 调用服务提供方接口
ResponseEntity<String> forEntity = restTemplate.getForEntity("http://provider/req", String.class);

源码解析

建立调用拦截器

1. 获取所有 @LoadBalanced标记的RestTemplate

public class LoadBalancerAutoConfiguration {
    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();
}

2. 增长 LoadBalancerInterceptor 处理逻辑

  • 没有引入 spring-retry使用的是
@Bean
public LoadBalancerInterceptor ribbonInterceptor() {
    return new LoadBalancerInterceptor();
}
  • 引入 spring-retry 使用的是
@Bean
@ConditionalOnMissingBean
public RetryLoadBalancerInterceptor ribbonInterceptor() {
    return new RetryLoadBalancerInterceptor();
}
  • LoadBalancerInterceptor 业务逻辑
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    @Override
    public ClientHttpResponse intercept() {
        final URI originalUri = request.getURI();
        // http://demo-provider/req 截取 demo-provider 服务名称
        String serviceName = originalUri.getHost();

        // 默认注入的 RibbonAutoConfiguration.RibbonLoadBalancerClient
        return this.loadBalancer.execute(serviceName,
                // 建立请求对象
                this.requestFactory.createRequest(request, body, execution));
    }
}

执行拦截器

3. RibbonLoadBalancerClient执行

//RibbonAutoConfiguration默认注入的RibbonLoadBalancerClient
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
  return new RibbonLoadBalancerClient(springClientFactory());
}

4.execute执行

public class RibbonLoadBalancerClient implements LoadBalancerClient {
    public <T> T execute(){
        //获取具体的ILoadBalancer实现
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);

        // 调用ILoadBalancer 实现获取Server
        Server server = getServer(loadBalancer, hint);
        RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));

        //获取状态记录器,保存这次选取的server
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
        T returnVal = request.apply(serviceInstance);
        statsRecorder.recordStats(returnVal);
        return returnVal;
    }
}

获取ILoadBalancer

5 SpringClientFactory

// bean 工厂生成LoadBalancer 的实现
protected ILoadBalancer getLoadBalancer(String serviceId) {
    return this.springClientFactory.getLoadBalancer(serviceId);
}

// 具体生成逻辑看 RibbonClientConfiguration,这个Bean 只有工厂调用的时候才会建立
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    return new ZoneAwareLoadBalancer<>();
}

6.建立LoadBalancer 的依赖要素

名称 默认实现 做用
IClientConfig DefaultClientConfigImpl ribbon 客户端配置参数,例如: 超时设置、压缩设置等
ServerList NacosServerList 目标服务的实例实例表,具体服务发现客户端实现
ServerListFilter ZonePreferenceServerListFilter 针对ServerList 实例列表的过滤逻辑处理
IRule ZoneAvoidanceRule 负载均衡选择Server 的规则
IPing DummyPing 检验服务是否可用的方法实现
ServerListUpdater PollingServerListUpdater 针对ServerList 更新的操做实现

以上默认实现参考 RibbonClientConfiguration. ZoneAwareLoadBalancergithub

获取服务实例

//Server server = getServer(loadBalancer, hint);  4. excute 方法
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
    return loadBalancer.chooseServer(hint != null ? hint : "default");
}

7. ZoneAwareLoadBalancer

public class ZoneAwareLoadBalancer{
    public ZoneAwareLoadBalancer() {
        // 调用父类初始化方法。 这里会开启实例维护的定时任务等 (具体解析参考 扩展部分)
        super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
    }
    @Override
    public Server chooseServer(Object key) {
        // 如果使用的 Nacos 服务发现,则没有 Zone 的概念,直接调用父类的实现
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            return super.chooseServer(key);
        }
        // 如下为有 Zone 的概念 例如 Eureka  (具体)
        ...
        return server;
    }
}
  • 父类调用IRule实现选择Server
public Server chooseServer(Object key) {
    return rule.choose(key);
}

8.PredicateBasedRule 选择规则

public abstract class PredicateBasedRule {
    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        // 获取断言配置
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }
    }
}

9. ZoneAvoidancePredicate服务列表断言

public class ZoneAvoidancePredicate {
    @Override
    public boolean apply(@Nullable PredicateKey input) {
        if (!ENABLED.get()) {
            return true;
        }
        // 仍是获取区域配置,如是使用的 Nacos 直接返回true
        String serverZone = input.getServer().getZone();
        if (serverZone == null) {
            // there is no zone information from the server, we do not want to filter
            // out this server
            return true;
        }
        // 区域高可用判断
        ...
    }
}

扩展: ServerList 维护

初始化ServerList

在上文 6.建立LoadBalancer 的依赖要素,中 ServerList 目标服务的实例实例表,具体服务发现客户端实现。咱们来看下 Nacos 的实现spring

public class NacosServerList extends AbstractServerList<NacosServer> {
    @Override
    public List<NacosServer> getInitialListOfServers() {
        return getServers();
    }

    @Override
    public List<NacosServer> getUpdatedListOfServers() {
        return getServers();
    }

    private List<NacosServer> getServers() {
        String group = discoveryProperties.getGroup();
        //调用nacos-sdk 查询实例列表
        List<Instance> instances = discoveryProperties.namingServiceInstance()
                .selectInstances(serviceId, group, true);
        // 类型转换
        return instancesToServerList(instances);

    }
}

更新ServerListUpdater

  • ServerList 初始化后更新操做经过 PollingServerListUpdater
public class PollingServerListUpdater implements ServerListUpdater {
    @Override
    public synchronized void start(final UpdateAction updateAction) {
        // 更新任务 交给updateAction 具体实现
        final Runnable wrapperRunnable = () -> {
            updateAction.doUpdate();
            lastUpdated = System.currentTimeMillis();
        };

        // 开启后台线程定时执行  updateAction
        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                wrapperRunnable,
                initialDelayMs,
                refreshIntervalMs,
                TimeUnit.MILLISECONDS
        );
    }
}
  • updateAction 实现
public void doUpdate() {
    DynamicServerListLoadBalancer.this.updateListOfServers();
}
public class PollingServerListUpdater implements ServerListUpdater {
    public void updateListOfServers() {
        List<T> servers = new ArrayList();
        
        // 调用NacosServiceList 获取所有服务列表
        servers = this.serverListImpl.getUpdatedListOfServers();
        
        // 若是配置实例过滤器在执行过滤
        if (this.filter != null) {
            servers = this.filter.getFilteredListOfServers((List)servers);
        }
        
        // 更新LoadBalancer 服务列表
        this.updateAllServerList((List)servers);
    }
}

扩展: Server 状态维护

  • LoadBalancer 初始构造时会触发 setupPingTask()
public BaseLoadBalancer() {
  this.name = DEFAULT_NAME;
  this.ping = null;
  setRule(DEFAULT_RULE);
  // 开启ping 检查任务
  setupPingTask();
  lbStats = new LoadBalancerStats(DEFAULT_NAME);
}
  • setupPingTask
void setupPingTask() {
  // 是否能够ping, 默认的DummyPing 直接 跳过不执行
  if (canSkipPing()) {
    return;
  }
  // 执行PingTask
  lbTimer.schedule(new BaseLoadBalancer.PingTask(), 0, pingIntervalSeconds * 1000);
  // 开启任务
  new BaseLoadBalancer.Pinger(pingStrategy).runPinger();
}
  • SerialPingStrategy 串行执行逻辑
// 串行调度执行 Iping 逻辑
private static class SerialPingStrategy implements IPingStrategy {
  @Override
  public boolean[] pingServers(IPing ping, Server[] servers) {
    int numCandidates = servers.length;
    boolean[] results = new boolean[numCandidates];

    for (int i = 0; i < numCandidates; i++) {
      results[i] = false; /* Default answer is DEAD. */
      if (ping != null) {
        results[i] = ping.isAlive(servers[i]);
      }

    }
    return results;
  }
}
  • 调用url 判断可用性
public class PingUrl implements IPing {
    public boolean isAlive(Server server) {
        urlStr = urlStr + server.getId();
        urlStr = urlStr + this.getPingAppendString();
        boolean isAlive = false;
        HttpClient httpClient = new DefaultHttpClient();
        HttpUriRequest getRequest = new HttpGet(urlStr);
        String content = null;

        HttpResponse response = httpClient.execute(getRequest);
        content = EntityUtils.toString(response.getEntity());
        isAlive = response.getStatusLine().getStatusCode() == 200;
        return isAlive;
    }
}

扩展: RibbonClient 懒加载处理

由上文可知,默认状况下 Ribbon 在第一次请求才会去建立 LoadBalancer ,这种懒加载机制会致使服务启动后,第一次调用服务延迟问题,甚至在整合 断路器(hystrix)等出现超时熔断 。segmentfault

为了解决这个问题,咱们会配置 Ribbon 的饥饿加载架构

ribbon:
  eager-load:
    clients:
      - provider
  • RibbonApplicationContextInitializer 服务启动后自动调用 工厂提早建立须要的ribbon clients
public class RibbonApplicationContextInitializer
        implements ApplicationListener<ApplicationReadyEvent> {
    private final List<String> clientNames;

    protected void initialize() {
        if (clientNames != null) {
            for (String clientName : clientNames) {
                this.springClientFactory.getContext(clientName);
            }
        }
    }
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        initialize();
    }

}

后续计划

欢迎关注我,后边更新 RibbonHystrixSentinelNacos 等组件源码图文解析。app

另注: 以上图片素材 (omnigraffle & 亿图) 能够在公众号 JAVA架构日记 获取负载均衡

『★★★★★』 基于Spring Boot 2.二、 Spring Cloud Hoxton & Alibaba、 OAuth2 的RBAC 权限管理系统
imageide

相关文章
相关标签/搜索