做者:fredalxin
地址:https://fredal.xin/how-eureka...java
本文基于 spring cloud dalston,同时文章较长,请选择舒服姿式进行阅读。面试
Eureka 与 Ribbon 都是 Netflix 提供的微服务组件,分别用于服务注册与发现、负载均衡。同时,这二者均属于 spring cloud netflix 体系,和 spring cloud 无缝集成,也正因为此被你们所熟知。spring
Eureka 自己是服务注册发现组件,实现了完整的 Service Registry 和 Service Discovery。缓存
Ribbon 则是一款负载均衡组件,那它和服务发现又有什么关系呢?负载均衡在整个微服务的调用模型中是紧挨着服务发现的,而 Ribbon 这个框架它实际上是起到了开发者服务消费行为与底层服务发现组件 Eureka 之间桥梁的做用。架构
从严格概念上说 Ribbon 并非作服务发现的,可是因为 Netflix 组件的松耦合,Ribbon 须要对 Eureka 的缓存服务列表进行相似"服务发现"的行为,从而构建本身的负载均衡列表并及时更新,也就是说 Ribbon 中的"服务发现"的宾语变成了 Eureka(或其余服务发现组件)。intellij-idea
咱们会先对 Eureka 的服务发现进行描述,重点是 Eureka-client 是如何进行服务的注册与发现的,同时不会过多停留于 Eureka 的架构、Eureka-server 的实现、Zone/Region 等范畴。app
Eureka-client 的服务发现都是由 DiscoveryClient 类实现的,它主要包括的功能有:负载均衡
DiscoveryClient 全部的定时任务都是在 initScheduledTasks()方法里,咱们能够看到如下关键代码:框架
private void initScheduledTasks() { ... if (clientConfig.shouldRegisterWithEureka()) { ... // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize ... instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } }
咱们能够看到在 if 判断分支里建立了一个 instanceInfoReplicator 实例,它会经过 start 执行一个定时任务:async
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
咱们能够在 InstanceInfoReplicator 类的 run()方法中找到这一段,同时能够一眼发现其注册关键点在于discoveryClient.register()
这段,咱们点进去看看:
boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
这边能够发现是经过 HTTP REST (jersey 客户端)请求的方式将 instanceInfo 实例信息注册到 Eureka-server 上。咱们简单看一下 InstanceInfo 对象,属性基本上都能见名知义:
@JsonCreator public InstanceInfo( @JsonProperty("instanceId") String instanceId, @JsonProperty("app") String appName, @JsonProperty("appGroupName") String appGroupName, @JsonProperty("ipAddr") String ipAddr, @JsonProperty("sid") String sid, @JsonProperty("port") PortWrapper port, @JsonProperty("securePort") PortWrapper securePort, @JsonProperty("homePageUrl") String homePageUrl, @JsonProperty("statusPageUrl") String statusPageUrl, @JsonProperty("healthCheckUrl") String healthCheckUrl, @JsonProperty("secureHealthCheckUrl") String secureHealthCheckUrl, @JsonProperty("vipAddress") String vipAddress, @JsonProperty("secureVipAddress") String secureVipAddress, @JsonProperty("countryId") int countryId, @JsonProperty("dataCenterInfo") DataCenterInfo dataCenterInfo, @JsonProperty("hostName") String hostName, @JsonProperty("status") InstanceStatus status, @JsonProperty("overriddenstatus") InstanceStatus overriddenstatus, @JsonProperty("leaseInfo") LeaseInfo leaseInfo, @JsonProperty("isCoordinatingDiscoveryServer") Boolean isCoordinatingDiscoveryServer, @JsonProperty("metadata") HashMap<String, String> metadata, @JsonProperty("lastUpdatedTimestamp") Long lastUpdatedTimestamp, @JsonProperty("lastDirtyTimestamp") Long lastDirtyTimestamp, @JsonProperty("actionType") ActionType actionType, @JsonProperty("asgName") String asgName) { this.instanceId = instanceId; this.sid = sid; this.appName = StringCache.intern(appName); this.appGroupName = StringCache.intern(appGroupName); this.ipAddr = ipAddr; this.port = port == null ? 0 : port.getPort(); this.isUnsecurePortEnabled = port != null && port.isEnabled(); this.securePort = securePort == null ? 0 : securePort.getPort(); this.isSecurePortEnabled = securePort != null && securePort.isEnabled(); this.homePageUrl = homePageUrl; this.statusPageUrl = statusPageUrl; this.healthCheckUrl = healthCheckUrl; this.secureHealthCheckUrl = secureHealthCheckUrl; this.vipAddress = StringCache.intern(vipAddress); this.secureVipAddress = StringCache.intern(secureVipAddress); this.countryId = countryId; this.dataCenterInfo = dataCenterInfo; this.hostName = hostName; this.status = status; this.overriddenstatus = overriddenstatus; this.leaseInfo = leaseInfo; this.isCoordinatingDiscoveryServer = isCoordinatingDiscoveryServer; this.lastUpdatedTimestamp = lastUpdatedTimestamp; this.lastDirtyTimestamp = lastDirtyTimestamp; this.actionType = actionType; this.asgName = StringCache.intern(asgName); // --------------------------------------------------------------- // for compatibility if (metadata == null) { this.metadata = Collections.emptyMap(); } else if (metadata.size() == 1) { this.metadata = removeMetadataMapLegacyValues(metadata); } else { this.metadata = metadata; } if (sid == null) { this.sid = SID_DEFAULT; } }
总结一下整个过程以下:
服务续期提及来可能比较晦涩,其实就是在 client 端定时发起调用,让 Eureka-server 知道本身还活着,在 eureka 代码中的注释解释为心跳(heart-beat)。
这里有两个比较重要的配置须要注意:
咱们直接从代码角度看一下,一样呢相关定时任务在 initScheduledTasks()方法中:
private void initScheduledTasks() { ... if (clientConfig.shouldRegisterWithEureka()) { ... // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); ... } }
能够看到这里建立了一个 HeartbeatThread()线程执行操做:
private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
咱们直接看 renew()方法:
boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); return register(); } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } }
这里比较简单,能够发现和服务注册是相似的,一样使用 HTTP REST 发起一个 hearbeat 请求,底层使用 jersey 客户端。
总结一下整个过程以下:
服务注销逻辑比较简单,自己并不在定时任务中触发,而是经过对方法标记@PreDestroy,从而调用 shutdown 方法触发,最终会调用 unRegister()方法进行注销,一样的这也是一个 HTTP REST 请求,能够简单看下代码:
@PreDestroy @Override public synchronized void shutdown() { if (isShutdown.compareAndSet(false, true)) { logger.info("Shutting down DiscoveryClient ..."); if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } cancelScheduledTasks(); // If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); } if (eurekaTransport != null) { eurekaTransport.shutdown(); } heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); logger.info("Completed shut down of DiscoveryClient"); } } /** * unregister w/ the eureka service. */ void unregister() { // It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) { try { logger.info("Unregistering ..."); EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); logger.info(PREFIX + appPathIdentifier + " - deregister status: " + httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + appPathIdentifier + " - de-registration failed" + e.getMessage(), e); } } }
咱们来看做为服务消费者的关键逻辑,即发现服务以及更新服务。
首先 consumer 会在启动时从 Eureka-server 获取全部的服务列表,并在本地缓存。同时呢,因为本地有一份缓存,因此须要按期更新,更新频率能够配置。
启动时候在 consumer 在 discoveryClient 中会调用 fetchRegistry() 方法:
private boolean fetchRegistry(boolean forceFullRegistryFetch) { ... if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { ... getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } ... }
这里能够看到 fetchRegistry 里有 2 个判断分支,对应首次更新以及后续更新。首次更新会调用 getAndStoreFullRegistry()方法,咱们看一下:
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
能够看到和以前相似,若是在没有特殊指定的状况下,咱们会发起一个 HTTP REST 请求拉取全部应用的信息并进行缓存,缓存对象为 Applications,有兴趣的能够进一步查看。
接下来,在咱们熟悉的 initScheduledTasks()方法中,咱们还会启动一个更新应用信息缓存的 task:
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } ... }
在 CacheRefreshThread()这个 task 的 run 方法中,仍然会调用到咱们以前的 fetchRegistry()方法,同时在判断时会走到另外一个分支中,即调用到 getAndUpdateDelta()方法:
private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
能够看到,这边是使用 HTTP REST 发起一个 getDelta 请求,同时在 updateDelta()方法中会更新本地的 Applications 缓存对象。
总结一下,整个服务发现与更新的过程以下:
接下来咱们来看看 Ribbon 是怎么基于 Eureka 进行"服务发现"的,咱们以前说过这里的"服务发现"并非严格意义上的服务发现,而是 Ribbon 如何基于 Eureka 构建本身的负载均衡列表并及时更新,同时咱们也不关注 Ribbon 其余负载均衡的具体逻辑(包括 IRule 路由,IPing 判断可用性)。
咱们能够先作一些猜测,首先 Ribbon 确定是基于 Eureka 的服务发现的。咱们上边描述了 Eureka 会拉取全部服务信息到本地缓存 Applications 中,那么 Ribbon 确定是基于这个 Applications 缓存来构建负载均衡列表的了,同时呢,负载均衡列表一样须要一个定时更新的机制来保证一致性。
首先咱们从开发者的最初使用上看,在开发者在 RestTemplate 上开启@LoadBalanced 注解就可开启 Ribbon 的逻辑了,显然这是用了相似拦截的方法。在 LoadBalancerAutoConfiguration 类中,咱们能够看到相关代码:
... @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializer( final List<RestTemplateCustomizer> customizers) { return new SmartInitializingSingleton() { @Override public void afterSingletonsInstantiated() { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } } }; } @Configuration @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate") static class LoadBalancerInterceptorConfig { @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return new RestTemplateCustomizer() { @Override public void customize(RestTemplate restTemplate) { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); } }; } } ...
能够看到,在初始化的过程当中经过调用 customize()方法来给 RestTemplate 增长了拦截器 LoadBalancerInterceptor。而 LoadBalancerInterceptor 则是在拦截方法中使用了 loadBalancer(RibbonLoadBalancerClient 类) 完成请求调用:
@Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); }
到如今为止呢,咱们的请求调用已经被 RibbonLoadBalancerClient 所封装,而其"服务发现"也是发生在 RibbonLoadBalancerClient 中的。
咱们点到其 execute()方法中:
@Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); }
这里根据 serviceId 构建了一个 ILoadBalancer,同时从 loadBalancer 中获取到了最终的实例 server 信息。ILoadBalancer 是定义了负载均衡的一个接口,它的关键方法 chooseServer()便是从负载均衡列表根据路由规则中选取一个 server。固然咱们主要关心的点在于,负载均衡列表是怎么构建出来的。
经过源码跟踪咱们发现,在经过 getLoadBalancer()方法构建好 ILoadBalancer 对象后,对象中就已经包含了服务列表。因此咱们来看看 ILoadBalancer 对象是怎么建立的:
protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId); }
那么这里实际上是 springcloud 封装的 clientFactory,它会在 applicationContext 容器中寻找对应的 bean 。
经过源码追踪,咱们能够在自动配置类 RibbonClientConfiguration 中找到对应代码:
@Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); } return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); }
咱们看到这里最终构建了 ILoadBalancer,其实现类是 ZoneAwareLoadBalancer,咱们观察其超类的初始化:
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } restOfInit(clientConfig); }
这边最终执行了 restOfInit()方法,进一步跟踪:
void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); enableAndInitLearnNewServersFeature(); updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); }
updateListOfServers()方法是获取全部的 ServerList 的,最终由 serverListImpl.getUpdatedListOfServers()获取全部的服务列表,在此 serverListImpl 即实现类为 DiscoveryEnabledNIWSServerList。
其中 DiscoveryEnabledNIWSServerList 有 getInitialListOfServers()和 getUpdatedListOfServers()方法,具体代码以下
@Override public List<DiscoveryEnabledServer> getInitialListOfServers(){ return obtainServersViaDiscovery(); } @Override public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery(); }
此时咱们查看 obtainServersViaDiscovery()方法,已经基本接近于事物本质了,它建立了一个 EurekaClient 对象,在此就是 Eureka 的 DiscoveryClient 实现类,调用了其 getInstancesByVipAddress()方法,它最终从 DiscoveryClient 的 Applications 缓存中根据 serviceId 选取了对应的服务信息:
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr); des.setZone(DiscoveryClient.getZone(ii)); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; }
咱们已经知道初次启动时,Ribbon 是怎么结合 Eureka 完成负载均衡列表的构建了,那么与 Eureka 相似,咱们还须要及时对服务列表进行更新以保证一致性。
在 RibbonClientConfiguration 自动配置类中构建 ILoadBalancer 时咱们能够看到其构造器中有 ServerListUpdater 对象,而此对象也是在当前类中构建的:
@Bean @ConditionalOnMissingBean public ServerListUpdater ribbonServerListUpdater(IClientConfig config) { return new PollingServerListUpdater(config); }
咱们观察此对象中的 start()方法看是如何完成更新的:
@Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } }
这里有 2 个配置,即 initialDelayMs 首次检测默认 1s,refreshIntervalMs 检测间隔默认 30s(和 Eureka 一致),建立了一个定时任务,执行 updateAction.doUpdate()方法。
咱们回到以前的 restOfInit()方法,查看其中的 enableAndInitLearnNewServersFeature()方法,能够看到是在此处触发了 ServerListUpdater 的 start 方法,同时传入了 updateAction 对象:
public void enableAndInitLearnNewServersFeature() { LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName()); serverListUpdater.start(updateAction); }
其实 updateAction 一开始就已经建立好了,它仍然是调用 以前的 updateListOfServers 方法来进行后续的更新:
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } };
总结一下 Ribbon 三部分服务发现的总体流程以下:
参考资料:
近期热文推荐:
1.600+ 道 Java面试题及答案整理(2021最新版)
2.终于靠开源项目弄到 IntelliJ IDEA 激活码了,真香!
3.阿里 Mock 工具正式开源,干掉市面上全部 Mock 工具!
4.Spring Cloud 2020.0.0 正式发布,全新颠覆性版本!
以为不错,别忘了随手点赞+转发哦!