Ribbon - 初始化中提到了,@LoadBalanced
注解的RestTemplate
会注入拦截器LoadBalancerInterceptor
,咱们看看LoadBalancerInterceptor
是怎么作的。git
这里主要是经过URL把serviceId取出来,而后调用LoadBalancerClient
的execute
方法。github
@Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); // 这个就是咱们注册到注册中心的服务名称 String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution)); }
主要是获取ILoadBalancer
和Server
,经过Server进行远程调用。这个server已是有对应的IP和端口了。spring
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { // 获取ILoadBalancer ILoadBalancer loadBalancer = getLoadBalancer(serviceId); // 经过ILoadBalancer获取Server Server server = getServer(loadBalancer, hint); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } // 远程调用 RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); }
RibbonLoadBalancerClient#getLoadBalancer会调用SpringClientFactory#getLoadBalancer,SpringClientFactory的注入能够看上一章。SpringClientFactory#getLoadBalancer调用父类的NamedContextFactory#getInstance方法,NamedContextFactory#getInstance会判断以前是否建立了serviceId对应的AnnotationConfigApplicationContext,若是没有则建立。segmentfault
这里关注是createContext方法,他注册了RibbonEurekaAutoConfiguration和RibbonClientConfiguration,注册后就调用refresh方法。这两个值是怎么来的,能够参考上一章SpringClientFactory的建立。这两个有前后顺序,因此会先加载RibbonEurekaAutoConfiguration后加载RibbonEurekaAutoConfiguration,而IPing、ServerList都有@ConditionalOnMissingBean
注解,因此优先实例化RibbonEurekaAutoConfiguration的IPing、ServerList。负载均衡
protected AnnotationConfigApplicationContext getContext(String name) { // 若是没有,则调用createContext建立 if (!this.contexts.containsKey(name)) { synchronized (this.contexts) { if (!this.contexts.containsKey(name)) { this.contexts.put(name, createContext(name)); } } } //返回 return this.contexts.get(name); } protected AnnotationConfigApplicationContext createContext(String name) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); if (this.configurations.containsKey(name)) { for (Class<?> configuration : this.configurations.get(name) .getConfiguration()) { context.register(configuration); } } // 注册org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration for (Map.Entry<String, C> entry : this.configurations.entrySet()) { if (entry.getKey().startsWith("default.")) { for (Class<?> configuration : entry.getValue().getConfiguration()) { context.register(configuration); } } } // 注册org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration context.register(PropertyPlaceholderAutoConfiguration.class, this.defaultConfigType); context.getEnvironment().getPropertySources().addFirst(new MapPropertySource( this.propertySourceName, Collections.<String, Object>singletonMap(this.propertyName, name))); if (this.parent != null) { // Uses Environment from parent as well as beans context.setParent(this.parent); // jdk11 issue // https://github.com/spring-cloud/spring-cloud-netflix/issues/3101 context.setClassLoader(this.parent.getClassLoader()); } context.setDisplayName(generateDisplayName(name)); context.refresh(); return context; }
refresh加载比较重要的有如下几个,代码就不贴了,这几个类就是在上面注册的RibbonEurekaAutoConfiguration和RibbonClientConfiguration里面。ide
这几个bean加载的时候,都有相似如下的判断,这个主要是判断是否有自定义配置,若是没有,则取默认,有就取自定义的。函数
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); }
下面主要看ZoneAwareLoadBalancer的构造函数,他会调用父类的BaseLoadBalancer#initWithConfig方法和DynamicServerListLoadBalancer#restOfInit方法。this
这个方法主要是开启定时任务ping,也就是检查其余服务是不是存活的。spa
void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) { // 其余略 // 设置ping的时间,默认30秒,也就是说服务挂了最多30秒就发现了 setPingInterval(pingIntervalTime); // 设置最大次数 setMaxTotalPingTime(maxTotalPingTime); // 设置IRule,这里会把当前的ILoadBalancer赋值给IRule setRule(rule); // 开启定时任务ping setPing(ping); // 其余略 }
定时任务的调用过程简略为:PingTask#run-->BaseLoadBalance.Pinger#runPinger-->BaseLoadBalance.SerialPingStrategy#pingServers-->NIWSDiscoveryPing#isAlive,NIWSDiscoveryPing#isAlive是判断。.net
public void runPinger() throws Exception { // 其余略 // 经过BaseLoadBalance.SerialPingStrategy#pingServers调用NIWSDiscoveryPing#isAlive, // 主要是判断服务的状态是否是UP,若是是UP,就是存活。 results = pingerStrategy.pingServers(ping, allServers); final List<Server> newUpList = new ArrayList<Server>(); final List<Server> changedServers = new ArrayList<Server>(); // 后面代码主要是把存活的存入到newUpList再到upServerList,把变化的存入changedServers,并调用监听通知 // 其余略 }
这个方法主要有两个功能,一个是调用DynamicServerListLoadBalancer#enableAndInitLearnNewServersFeature开启定时任务获取Eureka的注册表,一个是调用DynamicServerListLoadBalancer#updateAllServerList方法获取Eureka的注册表。因此主要看看DynamicServerListLoadBalancer#updateListOfServers。
public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { // 获取Eureka的注册表 servers = serverListImpl.getUpdatedListOfServers(); if (filter != null) { // 过滤zone为defaultZone servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } // 赋值allServerList updateAllServerList(servers); }
咱们回到RibbonLoadBalancerClient#execute来,getLoadBalancer方法已经把该加载的加载了,获取到ILoadBalancer后,咱们就已经获取到被调用的服务列表了。如今就要获取某一个服务来作远程调用了,因为注入的是ZoneAvoidanceRule,因此默认的就是轮询来获取Server。获取到Server后,就能够进行远程调用了。
private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextIndex.get(); int next = (current + 1) % modulo; if (nextIndex.compareAndSet(current, next) && current < modulo) return current; } }
ribbon在调用以前,会获取到Eureka的注册信息,并开启定时任务去更新Eureka的注册信息,以及检测是否存活,默认都是30秒。调用的时候,经过serviceId,获取服务列表(此时已转为IP+端口),再经过负载均衡策略,获取某个服务进行远程调用。