本文主要研究一下EurekaHealthCheckHandlerhtml
eureka-client-1.8.8-sources.jar!/com/netflix/appinfo/HealthCheckHandler.javajava
/** * This provides a more granular healthcheck contract than the existing {@link HealthCheckCallback} * * @author Nitesh Kant */ public interface HealthCheckHandler { InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus); }
netflix的eureka-client提供了HealthCheckHandler接口,用来对获取服务实例的健康状态spring
eureka-client-1.8.8-sources.jar!/com/netflix/appinfo/HealthCheckCallbackToHandlerBridge.javaspringboot
@SuppressWarnings("deprecation") public class HealthCheckCallbackToHandlerBridge implements HealthCheckHandler { private final HealthCheckCallback callback; public HealthCheckCallbackToHandlerBridge() { callback = null; } public HealthCheckCallbackToHandlerBridge(HealthCheckCallback callback) { this.callback = callback; } @Override public InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus) { if (null == callback || InstanceInfo.InstanceStatus.STARTING == currentStatus || InstanceInfo.InstanceStatus.OUT_OF_SERVICE == currentStatus) { // Do not go to healthcheck handler if the status is starting or OOS. return currentStatus; } return callback.isHealthy() ? InstanceInfo.InstanceStatus.UP : InstanceInfo.InstanceStatus.DOWN; } }
这个类被标记为废弃,若是没有callback,或者当前状态是STARTING或OUT_OF_SERVICE,都会返回当前状态;不然才会调用callback的isHealthy方法来判断是UP仍是DOWN.也就是说若是没有callback,一开始启动的时候是UP,则以后都是返回UPapp
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.javaide
/** * Register {@link HealthCheckCallback} with the eureka client. * * Once registered, the eureka client will invoke the * {@link HealthCheckCallback} in intervals specified by * {@link EurekaClientConfig#getInstanceInfoReplicationIntervalSeconds()}. * * @param callback app specific healthcheck. * * @deprecated Use */ @Deprecated @Override public void registerHealthCheckCallback(HealthCheckCallback callback) { if (instanceInfo == null) { logger.error("Cannot register a listener for instance info since it is null!"); } if (callback != null) { healthCheckHandler = new HealthCheckCallbackToHandlerBridge(callback); } } @Override public void registerHealthCheck(HealthCheckHandler healthCheckHandler) { if (instanceInfo == null) { logger.error("Cannot register a healthcheck handler when instance info is null!"); } if (healthCheckHandler != null) { this.healthCheckHandler = healthCheckHandler; // schedule an onDemand update of the instanceInfo when a new healthcheck handler is registered if (instanceInfoReplicator != null) { instanceInfoReplicator.onDemandUpdate(); } } }
registerHealthCheckCallback被标记为废弃,缘由是它默认注册了一个HealthCheckCallbackToHandlerBridge;后续是在这个方法里头处理fallback逻辑ui
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.javathis
@Override public HealthCheckHandler getHealthCheckHandler() { if (healthCheckHandler == null) { if (null != healthCheckHandlerProvider) { healthCheckHandler = healthCheckHandlerProvider.get(); } else if (null != healthCheckCallbackProvider) { healthCheckHandler = new HealthCheckCallbackToHandlerBridge(healthCheckCallbackProvider.get()); } if (null == healthCheckHandler) { healthCheckHandler = new HealthCheckCallbackToHandlerBridge(null); } } return healthCheckHandler; }
这里判断,若是最后healthCheckHandler为null,则会建立HealthCheckCallbackToHandlerBridgedebug
eureka-client-1.8.8-sources.jar!/com/netflix/appinfo/HealthCheckCallback.javarest
@Deprecated public interface HealthCheckCallback { /** * If false, the instance will be marked * {@link InstanceInfo.InstanceStatus#DOWN} with eureka. If the instance was * already marked {@link InstanceInfo.InstanceStatus#DOWN} , returning true * here will mark the instance back to * {@link InstanceInfo.InstanceStatus#UP}. * * @return true if the call back returns healthy, false otherwise. */ boolean isHealthy(); }
HealthCheckCallback目前被标记为废弃,能够理解为最后的healthCheckHandler = new HealthCheckCallbackToHandlerBridge(null);
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/serviceregistry/EurekaServiceRegistry.java
@Override public void register(EurekaRegistration reg) { maybeInitializeClient(reg); if (log.isInfoEnabled()) { log.info("Registering application " + reg.getInstanceConfig().getAppname() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus()); } reg.getApplicationInfoManager() .setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler)); }
这个方法判断,若是有healthCheckHandler实例,则才会调用registerHealthCheck去注册。
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaDiscoveryClientConfiguration.java
@Configuration @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) public class EurekaDiscoveryClientConfiguration { class Marker {} @Bean public Marker eurekaDiscoverClientMarker() { return new Marker(); } @Configuration @ConditionalOnClass(RefreshScopeRefreshedEvent.class) protected static class EurekaClientConfigurationRefresher { @Autowired(required = false) private EurekaClient eurekaClient; @Autowired(required = false) private EurekaAutoServiceRegistration autoRegistration; @EventListener(RefreshScopeRefreshedEvent.class) public void onApplicationEvent(RefreshScopeRefreshedEvent event) { //This will force the creation of the EurkaClient bean if not already created //to make sure the client will be reregistered after a refresh event if(eurekaClient != null) { eurekaClient.getApplications(); } if (autoRegistration != null) { // register in case meta data changed this.autoRegistration.stop(); this.autoRegistration.start(); } } } @Configuration @ConditionalOnProperty(value = "eureka.client.healthcheck.enabled", matchIfMissing = false) protected static class EurekaHealthCheckHandlerConfiguration { @Autowired(required = false) private HealthAggregator healthAggregator = new OrderedHealthAggregator(); @Bean @ConditionalOnMissingBean(HealthCheckHandler.class) public EurekaHealthCheckHandler eurekaHealthCheckHandler() { return new EurekaHealthCheckHandler(this.healthAggregator); } } }
默认eureka.client.healthcheck.enabled为false,若是设置为true的话则会注入EurekaHealthCheckHandler
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaHealthCheckHandler.java
/** * A Eureka health checker, maps the application status into {@link InstanceStatus} * that will be propagated to Eureka registry. * * On each heartbeat Eureka performs the health check invoking registered {@link HealthCheckHandler}. By default this * implementation will perform aggregation of all registered {@link HealthIndicator} * through registered {@link HealthAggregator}. * * @author Jakub Narloch * @see HealthCheckHandler * @see HealthAggregator */ public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean { private static final Map<Status, InstanceInfo.InstanceStatus> STATUS_MAPPING = new HashMap<Status, InstanceInfo.InstanceStatus>() {{ put(Status.UNKNOWN, InstanceStatus.UNKNOWN); put(Status.OUT_OF_SERVICE, InstanceStatus.OUT_OF_SERVICE); put(Status.DOWN, InstanceStatus.DOWN); put(Status.UP, InstanceStatus.UP); }}; private final CompositeHealthIndicator healthIndicator; private ApplicationContext applicationContext; public EurekaHealthCheckHandler(HealthAggregator healthAggregator) { Assert.notNull(healthAggregator, "HealthAggregator must not be null"); this.healthIndicator = new CompositeHealthIndicator(healthAggregator); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public void afterPropertiesSet() throws Exception { final Map<String, HealthIndicator> healthIndicators = applicationContext.getBeansOfType(HealthIndicator.class); for (Map.Entry<String, HealthIndicator> entry : healthIndicators.entrySet()) { //ignore EurekaHealthIndicator and flatten the rest of the composite //otherwise there is a never ending cycle of down. See gh-643 if (entry.getValue() instanceof DiscoveryCompositeHealthIndicator) { DiscoveryCompositeHealthIndicator indicator = (DiscoveryCompositeHealthIndicator) entry.getValue(); for (DiscoveryCompositeHealthIndicator.Holder holder : indicator.getHealthIndicators()) { if (!(holder.getDelegate() instanceof EurekaHealthIndicator)) { healthIndicator.addHealthIndicator(holder.getDelegate().getName(), holder); } } } else { healthIndicator.addHealthIndicator(entry.getKey(), entry.getValue()); } } } @Override public InstanceStatus getStatus(InstanceStatus instanceStatus) { return getHealthStatus(); } protected InstanceStatus getHealthStatus() { final Status status = getHealthIndicator().health().getStatus(); return mapToInstanceStatus(status); } protected InstanceStatus mapToInstanceStatus(Status status) { if (!STATUS_MAPPING.containsKey(status)) { return InstanceStatus.UNKNOWN; } return STATUS_MAPPING.get(status); } protected CompositeHealthIndicator getHealthIndicator() { return healthIndicator; } }
能够看到在afterPropertiesSet的时候,将整个springboot的healthIndicator添加进来并转化映射为eureka的InstanceStatus,组合为CompositeHealthIndicator。而client的health check则会调用这个getStatus接口,返回的是compositeHealthIndicator的健康状态。
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } };
注册了StatusChangeListener,当状态发生变化的时候,触发instanceInfoReplicator.onDemandUpdate()
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/InstanceInfoReplicator.java
public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { if (!scheduler.isShutdown()) { scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to stopped scheduler"); return false; } } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } } public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
这里会触发一个调度任务,首先是discoveryClient.refreshInstanceInfo(),以后判断是否有脏数据,有脏数据则再调用discoveryClient.register()与eureka server更新数据,以后重置脏数据的时间。 注意这里onDemandUpdate()首先会进行一个频率控制,由于这个方法会被循环触发调用,因此这里进行频率控制,以防止死循环。
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java
/** * Refresh the current local instanceInfo. Note that after a valid refresh where changes are observed, the * isDirty flag on the instanceInfo is set to true */ void refreshInstanceInfo() { applicationInfoManager.refreshDataCenterInfoIfRequired(); applicationInfoManager.refreshLeaseInfoIfRequired(); InstanceStatus status; try { status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); } catch (Exception e) { logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e); status = InstanceStatus.DOWN; } if (null != status) { applicationInfoManager.setInstanceStatus(status); } }
这里调用applicationInfoManager.setInstanceStatus(status)
eureka-client-1.8.8-sources.jar!/com/netflix/appinfo/ApplicationInfoManager.java
/** * Set the status of this instance. Application can use this to indicate * whether it is ready to receive traffic. Setting the status here also notifies all registered listeners * of a status change event. * * @param status Status of the instance */ public synchronized void setInstanceStatus(InstanceStatus status) { InstanceStatus next = instanceStatusMapper.map(status); if (next == null) { return; } InstanceStatus prev = instanceInfo.setStatus(next); if (prev != null) { for (StatusChangeListener listener : listeners.values()) { try { listener.notify(new StatusChangeEvent(prev, next)); } catch (Exception e) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } }
这里会发布StatusChangeEvent
eureka client的health check默认是false,即最后使用的是HealthCheckCallbackToHandlerBridge,即HealthCheckCallbackToHandlerBridge(null),callback为null,则默认返回的都是启动时注册的状态,通常是UP。若是开启eureka.client.healthcheck.enabled=true则会将springboot的actuator的health indicator都归入health check当中。