本文使用Spring Cloud Eureka分析java
Spring Cloud版本: Dalston.SR5node
spring-cloud-starter-eureka版本: 1.3.6.RELEASEgit
netflix eureka版本: 1.6.2github
继续 从Eureka Client发起注册请求到Eureka Server处理的整个服务注册过程(上) 分析spring
目录:docker
建立Spring Cloud Eureka Server首先要使用@EnableEurekaServer
注解,其实质是:json
@EnableDiscoveryClient @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(EurekaServerMarkerConfiguration.class) public @interface EnableEurekaServer { }
@EnableDiscoveryClient
: 引入服务发现客户端相关配置(身为Server的同时,在Server集群复制时也会做为Client)EurekaServerMarkerConfiguration
: 激活EurekaServerAutoConfiguration
因此,@EnableEurekaServer
注解和上一篇分析的Client启动注解都是经过向Spring容器注入Maker的形式激活xxAutoConfiguration配置类,Eureka Client是EurekaClientAutoConfiguration
,Eureka Server是EurekaServerAutoConfiguration
bootstrap
如下是对自动注入的各个组件的简单分析:缓存
头部注解
@Import(EurekaServerInitializerConfiguration.class):导入Eureka Server初始化的配置类,其实现SmartLifecycle接口,会在Spring容器基本refresh完毕时调用EurekaServerBootstrap#contextInitialized()
Eureka Server启动分析重点
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
EurekaDashboardProperties
是仪表盘相关属性
InstanceRegistryProperties
是实例注册相关属性
@ConfigurationProperties(PREFIX) public class InstanceRegistryProperties { public static final String PREFIX = "eureka.instance.registry"; /* Default number of expected renews per minute, defaults to 1. * Setting expectedNumberOfRenewsPerMin to non-zero to ensure that even an isolated * server can adjust its eviction policy to the number of registrations (when it's * zero, even a successful registration won't reset the rate threshold in * InstanceRegistry.register()). * 每分钟默认续约数量为1 * 将expectedNumberOfRenewsPerMin设置为非零 * 以确保即便是隔离的服务器也能够根据注册数量调整其驱逐策略 * (当它为零时,即便成功注册也不会重置InstanceRegistry.register()中的速率阈值) */ @Value("${eureka.server.expectedNumberOfRenewsPerMin:1}") // for backwards compatibility // 为了向后兼容 private int expectedNumberOfRenewsPerMin = 1; /** * Value used in determining when leases are cancelled, default to 1 for standalone. * Should be set to 0 for peer replicated eurekas * 决定租约什么时候取消的值 * 单机默认值为1,对于同行复制的eurekas,应设置为0 */ @Value("${eureka.server.defaultOpenForTrafficCount:1}") // for backwards compatibility private int defaultOpenForTrafficCount = 1;
@PropertySource("classpath:/eureka/server.properties") :在spring-cloud-netflix-eureka-server-xxx.jar中,只包含 spring.http.encoding.force=false
EurekaServerFeature: 访问/features
端点时会显示启用的Eureka Server自动配置类为EurekaServerAutoConfiguration
EurekaServerConfig: 注入Eureka Server配置类,EurekaServerConfig
是netflix的接口,里面有不少记录eureka服务器运行所需的配置信息,netflix的默认实现类是DefaultEurekaServerConfig
,spring cloud的默认实现类是EurekaServerConfigBean
@Configuration protected static class EurekaServerConfigBeanConfiguration { @Bean @ConditionalOnMissingBean public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) { EurekaServerConfigBean server = new EurekaServerConfigBean(); //建立EurekaServerConfigBean // 若是当前Eureka Server自己也须要做为客户端注册(集群模式必须开启??) if (clientConfig.shouldRegisterWithEureka()) { // Set a sensible default if we are supposed to replicate // 设置EurekaServer在启动期间eureka节点尝试从对等放获取注册表信息的重试次数 server.setRegistrySyncRetries(5); } return server; } }
EurekaController:Eureka Server Dashborad 对应的 Controller(默认path: /)
PeerAwareInstanceRegistry: 直译是对等体可见的应用实例注册器,就是在注册实例时会考虑集群状况下其它Node相关操做的注册器
@Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry( ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization // 强制初始化eurekaClient,在以前看RefreshScope的bug时,也使用到了这种方式强制建立eurekaClient // 建立InstanceRegistry(是spring cloud的实现) // 继承了PeerAwareInstanceRegistryImpl,PeerAwareInstanceRegistry接口的实现类 return new InstanceRegistry( this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); }
PeerEurekaNodes: 用来管理PeerEurekaNode的帮助类
EurekaServerConfigBean.peerEurekaNodesUpdateIntervalMs=10 * MINUTES
,调用时机是:DefaultEurekaServerContext在@PostConstruct调用initialize()-->peerEurekaNodes.start()PeerEurekaNode#shutdown()
,在添加新的能够PeerEurekaNodes#createPeerEurekaNode()
EurekaServerContext: Eureka Server启动分析重点
Eureka Server上下文接口,包含initialize()、shutdown()方法,EurekaServerConfig配置,PeerEurekaNodes节点管理帮助类,PeerAwareInstanceRegistry对等体可见的应用实例注册器,ApplicationInfoManager当前应用实例info信息管理器(是由Client配置初始化的)
默认实现类 com.netflix.eureka.DefaultEurekaServerContext
@PostConstruct方法包含一些初始化逻辑(说明初始化方法是在DefaultEurekaServerContext构造后由@PostConstruct触发的?)
@PostConstruct @Override public void initialize() throws Exception { logger.info("Initializing ..."); // PeerEurekaNode的帮助类start // 会启动更新PeerNode列表的定时线程 peerEurekaNodes.start(); // PeerAwareInstanceRegistry初始化 // 启动numberOfReplicationsLastMin定时线程、initializedResponseCache()、scheduleRenewalThresholdUpdateTask()、initRemoteRegionRegistry(),还有添加JMX监控 registry.init(peerEurekaNodes); logger.info("Initialized"); }
EurekaServerBootstrap: Eureka Server启动引导,会在Spring容器基本refresh()完毕时由EurekaServerInitializerConfiguration#run()方法真正调用eurekaServerBootstrap.contextInitialized()
初始化,其中会initEurekaEnvironment()
、initEurekaServerContext()
Eureka Server启动分析重点
注册 Jersey filter: 全部/eureka
的请求都须要通过Jersery Filter,其处理类是com.sun.jersey.spi.container.servlet.ServletContainer,其既是Filter,也是Servlet,包含Jersey的处理逻辑。在构造时已经将com.netflix.discovery包 和 com.netflix.eureka包 下的类做为处理请求的资源导入,如处理单个应用请求的com.netflix.eureka.resources.ApplicationResource
通过上面的EurekaServerAutoConfiguration自动配置类分析后,我的感受有几个重点:
一、DefaultEurekaServerContext(Eureka Server上下文) 初始化
由于netflix设计的EurekaServerContext接口自己包含不少成员变量,如PeerEurekaNodes管理对等节点、PeerAwareInstanceRegistry考虑对等节点的实例注册器等,在Eureka Server上下文初始化时会对这些组件初始化,还会启动一些定时线程
二、EurekaServerBootstrap初始化
EurekaServerBootstrap是spring cloud实现的Eureka Server的启动引导类,在netflix对应的是
EurekaBootstrap
。而这个启动引导类初始化是在EurekaServerInitializerConfiguration这个Spring的SmartLifecycle bean的生命周期方法中触发的,在refresh()几乎完成的时候,因此会在Eureka Server上下文初始化以后三、jerseyFilter,用于处理全部到/eureka的请求
首先看Netflix的EurekaServerContext接口是如何定义的:
public interface EurekaServerContext { void initialize() throws Exception; void shutdown() throws Exception; EurekaServerConfig getServerConfig(); PeerEurekaNodes getPeerEurekaNodes(); ServerCodecs getServerCodecs(); PeerAwareInstanceRegistry getRegistry(); ApplicationInfoManager getApplicationInfoManager(); }
除了初始化initialize()方法,shutdown()方法,还有一些组件EurekaServerConfig、PeerEurekaNodes、ServerCodecs、PeerAwareInstanceRegistry、ApplicationInfoManager,而在自动配置构造DefaultEurekaServerContext时,这些组件都已设置好
@Inject public DefaultEurekaServerContext(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes, ApplicationInfoManager applicationInfoManager) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; this.registry = registry; this.peerEurekaNodes = peerEurekaNodes; this.applicationInfoManager = applicationInfoManager; }
接下来是由@PostConstruct
触发的初始化方法
@PostConstruct @Override public void initialize() throws Exception { logger.info("Initializing ..."); peerEurekaNodes.start(); registry.init(peerEurekaNodes); logger.info("Initialized"); }
主要调用了2个组件的初始化方法:PeerEurekaNodes
和 PeerAwareInstanceRegistry
public void start() { // 后台运行的单线程定时任务执行器,定时线程名:Eureka-PeerNodesUpdater taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { // 解析Eureka Server URL,并更新PeerEurekaNodes列表 updatePeerEurekaNodes(resolvePeerUrls()); // 启动定时执行任务peersUpdateTask(定时默认10min,由peerEurekaNodesUpdateIntervalMs配置) Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { // 定时任务中仍然是 解析Eureka Server URL,并更新PeerEurekaNodes列表 updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } // 打印对等体节点(应该没有当前节点本身) for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: " + node.getServiceUrl()); } }
PeerEurekaNodes启动主要作了2件事:
protected List<String> resolvePeerUrls() { // 当前Eureka Server本身的InstanceInfo信息 InstanceInfo myInfo = applicationInfoManager.getInfo(); // 当前Eureka Server所在的zone,默认是 defaultZone String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo); // 获取配置的service-url List<String> replicaUrls = EndpointUtils .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo)); // 遍历service-url,排除本身 int idx = 0; while (idx < replicaUrls.size()) { if (isThisMyUrl(replicaUrls.get(idx))) { replicaUrls.remove(idx); } else { idx++; } } return replicaUrls; }
isThisMyUrl()
是如何判断是本身的URL,进而排除的呢?
public boolean isThisMyUrl(String url) { return isInstanceURL(url, applicationInfoManager.getInfo()); } public boolean isInstanceURL(String url, InstanceInfo instance) { // 根据配置项的url获取host主机信息 String hostName = hostFromUrl(url); // 根据当前Eureka Server的Instance实例信息获取host主机信息 String myInfoComparator = instance.getHostName(); // 若是eureka.client.transport.applicationsResolverUseIp==true,即按照IP解析URL // 那么将当前Eureka Server的Instance实例信息转换为IP if (clientConfig.getTransportConfig().applicationsResolverUseIp()) { myInfoComparator = instance.getIPAddr(); } // 比较配置项的hostName 和 当前Eureka Server的Instance实例信息 return hostName != null && hostName.equals(myInfoComparator); }
其中配置项中的hostName基本上就是 http:// 和 端口号 之间的部分,而当前Eureka Server实例的用于比较的myInfoComparator信息是
EurekaClientAutoConfiguration
中建立EurekaInstanceConfigBean
时使用的InetUtils中获取,InetUtils是spring cloud网络相关的工具类,其首先根据第一个非回环网卡获取IP(注意:docker容器环境有坑),再根据InetAddress获取与IP对应的hostname,我已知的是从如Linux的 /etc/hosts配置文件中获取 或者 从hostname环境变量获取// PeerEurekaNodes#updatePeerEurekaNodes() // newPeerUrls为本次要更新的Eureka对等体URL列表 protected void updatePeerEurekaNodes(List<String> newPeerUrls) { if (newPeerUrls.isEmpty()) { logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry"); return; } // 计算 原peerEurekaNodeUrls - 新newPeerUrls 的差集,就是多余可shutdown节点 Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); toShutdown.removeAll(newPeerUrls); // 计算 新newPeerUrls - 原peerEurekaNodeUrls 的差集,就是须要新增节点 Set<String> toAdd = new HashSet<>(newPeerUrls); toAdd.removeAll(peerEurekaNodeUrls); if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change 没有变动 return; } // Remove peers no long available List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes); // shutDown多余节点 if (!toShutdown.isEmpty()) { logger.info("Removing no longer available peer nodes {}", toShutdown); int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); if (toShutdown.contains(eurekaNode.getServiceUrl())) { newNodeList.remove(i); eurekaNode.shutDown(); } else { i++; } } } // Add new peers // 添加新的peerEurekaNode - createPeerEurekaNode() if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (String peerUrl : toAdd) { newNodeList.add(createPeerEurekaNode(peerUrl)); } } this.peerEurekaNodes = newNodeList; this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); }
根据上一步初始化好的peerEurekaNodes,来初始化PeerAwareInstanceRegistry,考虑集群中的对等体的实例注册器
// PeerAwareInstanceRegistryImpl#init() @Override public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { // 【重要】启动用于统计最后xx毫秒续约状况的定时线程 this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; // 【重要】初始化ResponseCache: 对客户端查询服务列表信息的缓存(全部服务列表、增量修改、单个应用) // 默认responseCacheUpdateIntervalMs=30s initializedResponseCache(); // 【重要】按期更新续约阀值的任务,默认900s执行一次 // 调用 PeerAwareInstanceRegistryImpl#updateRenewalThreshold() scheduleRenewalThresholdUpdateTask(); // 初始化 远程区域注册 相关信息(默认没有远程Region,都是使用us-east-1) initRemoteRegionRegistry(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } }
numberOfReplicationsLastMin是com.netflix.eureka.util.MeasuredRate
用于统计测量上一分钟来自对等节点复制的续约数
// MeasuredRate#start() public synchronized void start() { if (!isActive) { timer.schedule(new TimerTask() { @Override public void run() { try { // Zero out the current bucket. // 将当前的桶的统计数据放到lastBucket,当前桶置为0 lastBucket.set(currentBucket.getAndSet(0)); } catch (Throwable e) { logger.error("Cannot reset the Measured Rate", e); } } }, sampleInterval, sampleInterval); isActive = true; } } /** * Returns the count in the last sample interval. * 返回上一分钟的统计数 */ public long getCount() { return lastBucket.get(); } /** * Increments the count in the current sample interval. * 增长当前桶的计数,在如下2个场景有调用: * AbstractInstanceRegistry#renew() - 续约 * PeerAwareInstanceRegistryImpl#replicateToPeers() - */ public void increment() { currentBucket.incrementAndGet(); }
ResponseCache主要是缓存服务列表信息,根据注释可知,缓存以压缩和非压缩形式维护,用于三类请求: all applications,增量更改和单个application
// ResponseCacheImpl构造 private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>(); private final LoadingCache<Key, Value> readWriteCacheMap; ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; // 根据配置eureka.server.useReadOnlyResponseCache判断,是否使用只读ResponseCache,默认true // 因为ResponseCache维护这一个可读可写的readWriteCacheMap,还有一个只读的readOnlyCacheMap // 此配置控制在get()应用数据时,是去只读Map读,仍是读写Map读,应该只读Map是按期更新的 this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); this.registry = registry; // eureka.server.responseCacheUpdateIntervalMs缓存更新频率,默认30s long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); // 建立读写Map,com.google.common.cache.LoadingCache // 能够设置初始值,数据写入过时时间,删除监听器等 this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(1000) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener<Key, Value>() { @Override public void onRemoval(RemovalNotification<Key, Value> notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); return value; } }); // 若是启用只读缓存,那么每隔responseCacheUpdateIntervalMs=30s,执行getCacheUpdateTask() if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e); } }
可见ResponseCache维护了两个Map,一个可读可写的readWriteCacheMap,应该每一个操做都会写入,一个只读的readOnlyCacheMap,默认应该每30s更新一次,下面具体看看getCacheUpdateTask()
// ResponseCacheImpl#getCacheUpdateTask() private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); // 遍历只读Map for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()}; logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); // 若是只读Map中的值 和 读写Map中的值不一样,用读写Map更新只读Map if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache", th); } } } }; }
每30s会比较只读Map和读写Map中的值,以读写Map中的为准
/** * Schedule the task that updates <em>renewal threshold</em> periodically. * The renewal threshold would be used to determine if the renewals drop * dramatically because of network partition and to protect expiring too * many instances at a time. * 每隔 eureka.server.renewalThresholdUpdateIntervalMs=900秒 更新一次续约阀值 */ private void scheduleRenewalThresholdUpdateTask() { timer.schedule(new TimerTask() { @Override public void run() { updateRenewalThreshold(); } }, serverConfig.getRenewalThresholdUpdateIntervalMs(), serverConfig.getRenewalThresholdUpdateIntervalMs()); }
更新续约阀值在updateRenewalThreshold()
方法
// PeerAwareInstanceRegistryImpl#updateRenewalThreshold() /** * Updates the <em>renewal threshold</em> based on the current number of * renewals. The threshold is a percentage as specified in * {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals * received per minute {@link #getNumOfRenewsInLastMin()}. */ private void updateRenewalThreshold() { try { Applications apps = eurekaClient.getApplications(); int count = 0; // 统计全部Instance实例个数 for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { if (this.isRegisterable(instance)) { ++count; } } } synchronized (lock) { // Update threshold only if the threshold is greater than the // current expected threshold of if the self preservation is disabled. // 只有当阀值大于当前预期值时,才更新 或者 关闭了自我保护模式 if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold) || (!this.isSelfPreservationModeEnabled())) { this.expectedNumberOfRenewsPerMin = count * 2; this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold()); } } logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold); } catch (Throwable e) { logger.error("Cannot update renewal threshold", e); } }
其实大致意思是:先计算全部Instance实例个数,默认每一个实例1分钟应该续约2次(30s一次)
但如上代码是有问题的,不管是注释仍是判断逻辑,当前版本:eureka-core-1.6.2
直到 v1.9.3版本才修复
https://github.com/Netflix/eureka/commit/a4dd6b22ad447c706234e63fe83cb58413f7618b#diff-4aec7ea96457f5084840fc40f501c320
以后又有两个版本,修改了这里的计算逻辑和作了方法抽取
Extract calculation of renews threshold to separate method
上面的自动配置过程当中已经注册了处理全部 /eureka/** 请求的Jersey Filter,这样全部Client的注册、续约等请求均可以处理了。而还有一些工做是经过EurekaServerBootstrap#contextInitialized()
完成的,在Spring容器基本上refresh()完毕的时候
EurekaServerBootstrap是 spring cloud的实现,而netflix的Eureka Server启动引导的实现是 EurekaBootStrap
// EurekaServerBootstrap#contextInitialized() public void contextInitialized(ServletContext context) { try { initEurekaEnvironment(); // 初始化环境 initEurekaServerContext(); // 初始化上下文 context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } }
这两个里面咱们主要关注上下文的初始化initEurekaServerContext()
// EurekaServerBootstrap#initEurekaServerContext() protected void initEurekaServerContext() throws Exception { // For backward compatibility JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); // 是否为AWS环境 if (isAws(this.applicationInfoManager.getInfo())) { this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager); this.awsBinder.start(); } // 将serverContext由Holder保管 EurekaServerContextHolder.initialize(this.serverContext); log.info("Initialized server context"); // Copy registry from neighboring eureka node // 从相邻的eureka节点拷贝注册列表信息 int registryCount = this.registry.syncUp(); this.registry.openForTraffic(this.applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }
有两个重要环接:
/** * Populates the registry information from a peer eureka node. This * operation fails over to other nodes until the list is exhausted if the * communication fails. */ @Override public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; // 循环,最多重试RegistrySyncRetries次(默认 5) // eurekaClient中的逻辑会重试其它的eureka节点 for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); //30s } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } // 从eurekaClient获取服务列表 Applications apps = eurekaClient.getApplications(); // 循环服务列表,并依次注册 for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; }
// InstanceRegistry#openForTraffic() /** * If * {@link PeerAwareInstanceRegistryImpl#openForTraffic(ApplicationInfoManager, int)} * is called with a zero argument, it means that leases are not automatically * cancelled if the instance hasn't sent any renewals recently. This happens for a * standalone server. It seems like a bad default, so we set it to the smallest * non-zero value we can, so that any instances that subsequently register can bump up * the threshold. */ @Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // 若是count==0,即没有从相邻eureka节点获得服务列表,如单机启动模式,defaultOpenForTrafficCount=1 super.openForTraffic(applicationInfoManager, count == 0 ? this.defaultOpenForTrafficCount : count); } // PeerAwareInstanceRegistryImpl#openForTraffic() @Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. // 每分钟期待的续约数(默认30s续约,60s就是2次) this.expectedNumberOfRenewsPerMin = count * 2; // 每分钟续约的阀值:85% * expectedNumberOfRenewsPerMin this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); logger.info("Got " + count + " instances from neighboring DS node"); logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { //可count默认值是1,那么peerInstancesTransferEmptyOnStartup始终不会是true //在PeerAwareInstanceRegistryImpl#shouldAllowAccess(boolean)方法有用 this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 开启新的【EvictionTask】 super.postInit(); } // AbstractInstanceRegistry#postInit() protected void postInit() { renewsLastMin.start(); //统计上一分钟续约数的监控Timer if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), //默认60s serverConfig.getEvictionIntervalTimerInMs()); }
通过上面的Eureka Server自动配置及初始化,Eureka Server已经成功启动并能够经过Jersey处理各类请求,具体的注册请求是由com.netflix.eureka.resources.ApplicationResource#addInstance()
处理的
// ApplicationResource#addInstance() @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields // 验证Instance实例的全部必填字段 if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data // 处理客户端可能正在使用缺乏数据的错误DataCenterInfo注册的状况 DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } // 【 使用PeerAwareInstanceRegistry集群实例注册器register当前实例 】 // isReplication表示此操做是不是节点间的复制,此处isReplication==null registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible // 注册成功返回204状态码 }
重点是 registry.register(info, "true".equals(isReplication))
,即便用PeerAwareInstanceRegistry集群实例注册器register当前实例
// PeerAwareInstanceRegistryImpl#register() /** * Registers the information about the {@link InstanceInfo} and replicates * this information to all peer eureka nodes. If this is replication event * from other replica nodes then it is not replicated. * 注册有关InstanceInfo信息,并将此信息复制到全部对等的eureka节点 * 若是这是来自其余节点的复制事件,则不会继续复制它 * * @param info * the {@link InstanceInfo} to be registered and replicated. * @param isReplication * true if this is a replication event from other replica nodes, * false otherwise. */ @Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; //默认的租约持续时间是90s // 若是当前Instance实例的租约信息中有leaseDuration持续时间,使用实例的leaseDuration if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } // 【 当前Eureka Server注册实例信息 】 super.register(info, leaseDuration, isReplication); // 【 将注册实例信息复制到集群中其它节点 】 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
/** * Registers a new instance with a given duration. * * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean) */ public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); //读锁 // registry是保存全部应用实例信息的Map:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> // 从registry中获取当前appName的全部实例信息 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); //注册统计+1 // 若是当前appName实例信息为空,新建Map if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } // 获取实例的Lease租约信息 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease // 若是已经有租约,则保留最后一个脏时间戳而不覆盖它 // (比较当前请求实例租约 和 已有租约 的LastDirtyTimestamp,选择靠后的) if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration // 若是以前不存在实例的租约,说明是新实例注册 // expectedNumberOfRenewsPerMin期待的每分钟续约数+2(由于30s一个) // 并更新numberOfRenewsPerMinThreshold每分钟续约阀值(85%) synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { // Since the client wants to cancel it, reduce the threshold // (1 // for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); //当前实例信息放到维护注册信息的Map // 同步维护最近注册队列 synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens // 若是当前实例已经维护了OverriddenStatus,将其也放到此Eureka Server的overriddenInstanceStatusMap中 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules // 根据overridden status规则,设置状态 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp // 若是租约以UP状态注册,设置租赁服务时间戳 if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); //ActionType为 ADD recentlyChangedQueue.add(new RecentlyChangedItem(lease)); //维护recentlyChangedQueue registrant.setLastUpdatedTimestamp(); //更新最后更新时间 // 使当前应用的ResponseCache失效 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); //读锁 } }
维护当前Instance实例的Lease租约信息,并放到Eureka Server维护注册信息的Map:【ConcurrentHashMap<String, Map<String, Lease
若是是新注册,expectedNumberOfRenewsPerMin期待的每分钟续约数+2, 并更新numberOfRenewsPerMinThreshold每分钟续约阀值
维护 recentRegisteredQueue最近注册队列,recentlyChangedQueue最近更改队列,维护的目的是能够获取最近xx操做的状况
若是本次注册实例已经维护了OverriddenStatus,根据必定规则,维护本Server节点当前实例的OverriddenStatus
设置Instance实例的最后更新时间戳
对当前应用对应的ResponseCache缓存失效
responseCache 用于缓存查询的应用实例信息
其使用guava cache维护了一个可读可写的LocalLoadingCache本地缓存【readWriteCacheMap】,还有一个只读的ConcurrentMap 【readOnlyCacheMap】
在 get(key, useReadOnlyCache)时首先会检查【readOnlyCacheMap】只读缓存,如没有,再查【readWriteCacheMap】,而【readWriteCacheMap】的
get()
其含义实际是getOrLoad()
,若是获取不到从CacheLoader加载,而CacheLoader会到维护应用实例注册信息的Map中获取【readWriteCacheMap】是直接与维护应用实例注册信息Map交互的,查询时会Load加载,注册新实例时会失效整个应用的
【readOnlyCacheMap】是在【readWriteCacheMap】之上的只读缓存,由配置 eureka.server.useReadOnlyResponseCache控制,默认true,每隔 eureka.server.responseCacheUpdateIntervalMs=30s 与【readWriteCacheMap】同步一次
// PeerAwareInstanceRegistryImpl#replicateToPeers() /** * Replicates all eureka actions to peer eureka nodes except for replication * traffic to this node. */ private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { // 若是是复制操做(针对当前节点,false) if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication // 若是它已是复制,请不要再次复制,直接return if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } // 遍历集群全部节点(除当前节点外) for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } // 复制Instance实例操做到某个node节点 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
下面是replicateInstanceActionsToPeers()
复制Instance实例操做到其它节点
// PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers() /** * Replicates all instance changes to peer eureka nodes except for * replication traffic to this node. * */ private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: //取消 node.cancel(appName, id); break; case Heartbeat: //心跳 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: //注册 node.register(info); break; case StatusUpdate: //状态更新 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: //删除OverrideStatus infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
本次只关心节点的注册操做
// PeerEurekaNode#register() /** * Sends the registration information of {@link InstanceInfo} receiving by * this node to the peer node represented by this class. * * @param info * the instance information {@link InstanceInfo} of any instance * that is send to this instance. * @throws Exception */ public void register(final InstanceInfo info) throws Exception { // 当前时间 + 30s后 过时 long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); // 提交相同的操做到批量复制任务处理 batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, overriddenStatus:null, replicateInstanceInfo:true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); }
而以后就和Eureka Client发起注册请求的调用差很少 replicationClient.register(info)
至此,Spring Cloud Eureka Server的整个自动配置及初始化,以及接收注册请求,并复制到集群中的对等节点就分析完了
大致时序流程参考:
参考:
Dive into Eureka: 宋顺