Spring Cloud Eureka 源码分析(一) 服务端启动过程

一. 前言

    咱们在使用Spring Cloud Eureka服务发现功能的时候,简单的引入maven依赖,且在项目入口类根据服务端和客户端加上不一样的注解就能够了;html

可是,这些功能是如何实现的呢?java

    咱们在下面进行一下分析,服务发现分为客户端和服务端,咱们分开来看,根据Spring Cloud 的版本不一样,类名略有不一样,可是实现逻辑是一致的,因此请读者注意这一点;node

    当前使用版本: <version>Dalston.RC1</version>spring

    水平有限,异议之处请留言讨论,相互学习;json

二. Eureka注册中心

2.1.启动过程分析

2.1.1 @EnableEurekaServer 

            使用 @EnableEurekaServer  来标记启动注册中心功能;bootstrap

            @Enable*******, 这种格式的注解有不少,是Spring Boot约定的开启某些功能的方式,从而避免一些配置的繁琐,可点击查看用法;tomcat

            查看该注解源码:网络

@EnableDiscoveryClient
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

            这里面有2个重要的操做:app

             1. 注解@EnableDiscoveryClient:dom

                        这个注解是开启SpringCloud服务发现客户端的注解,之因此这里没有说是开启Eureka客户端,是由于开启Eureka客户端的注解是

                @EnableEurekaClient,因为SpringCloud在服务发现组件上不仅支持Eureka,还支持例如Alibaba的Dubbo等,而前者才是       

                SpringCloud开启服务发现的注解;假若SpringCloud集成Dubbo,也许针对此的注解就是@EnableDubboClient了;

            2. @Import(EurekaServerMarkerConfiguration.class)

                 导入了配置类EurekaServerMarkerConfiguration,该配置类中实例化了一个Marker的bean,这个bean在此处还不知道有何做用

                打开  EurekaServerAutoConfiguration 这个类的实例化条件就豁然开朗了;

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {...代码省略先...}

            如今咱们展开来讲这个Eureka服务端的自动配置类;

            1. 这个配置类实例化的前提条件是上下文中存在 EurekaServerMarkerConfiguration.Marker 这个bean,解释了上面的问题;      

            2. 经过@EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class })导入了两个配置类;

            1) EurekaDashboardProperties.class

            这个配置类用于控制eureka面板的启动与否及打开的路径;

属性参数 释义   default value
eureka.dashboard.path eureka面板的路径

The path to the Eureka dashboard (relative to the servlet path). Defaults to "/".

/
eureka.dashboard.enabled 是否开启eureka面板

Flag to enable the Eureka dashboard. Default true.

  true

             2)  InstanceRegistryProperties.class

属性参数 释义   default value
eureka.server.expectedNumberOfRenewsPerMin 每分钟指望续约的数量      1
eureka.server.defaultOpenForTrafficCount 默认打开的通讯数量       1

                4. @Import(EurekaServerInitializerConfiguration.class) 

                      这里有导入了一个配置类,// TODO

                5.@PropertySource("classpath:/eureka/server.properties")

                      咱们打开这个配置文件看一下:spring.http.encoding.force=false,应该是控制字符集编码的;

2.1.2  配置类EurekaServerAutoConfiguration

           1. 首先实例化一个bean,目前没研究    TODO

@Bean
	public HasFeatures eurekaServerFeature() {
		return HasFeatures.namedFeature("Eureka Server",
				EurekaServerAutoConfiguration.class);
	}

          2. 在静态内部类中有条件的实例化了eureka服务端配置,配置类为 EurekaServerConfig,详细参数说明在另外一篇文章中;

@Configuration
	protected static class EurekaServerConfigBeanConfiguration {
		@Bean
		@ConditionalOnMissingBean
		public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
			EurekaServerConfigBean server = new EurekaServerConfigBean();
			if (clientConfig.shouldRegisterWithEureka()) {
				// Set a sensible default if we are supposed to replicate
				server.setRegistrySyncRetries(5);
			}
			return server;
		}
	}

         3. 实例化了进入eureka控制面板的Controller类:EurekaController:

@Bean
	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
	public EurekaController eurekaController() {
		return new EurekaController(this.applicationInfoManager);
	}

            打开EurekaController这个类咱们能够看到此类就是一个普通的请求入口类,用于展现eureka面板;

          4.实例化了eureka多个服务维持节点同步的bean;

@Bean
	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
			ServerCodecs serverCodecs) {
		this.eurekaClient.getApplications(); // force initialization
		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
				serverCodecs, this.eurekaClient,
				this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
	}

         5. 每一个eureka服务节点的生命周期管理

@Bean
	@ConditionalOnMissingBean
	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
			ServerCodecs serverCodecs) {
		return new PeerEurekaNodes(registry, this.eurekaServerConfig,
				this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
	}

        6. eureka服务的Context维护,具体这里先不解释了;

@Bean
	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
			PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
				registry, peerEurekaNodes, this.applicationInfoManager);
	}

       7.  经过tomcat管理eureka的生命周期;

@Bean
	public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
			EurekaServerContext serverContext) {
		return new EurekaServerBootstrap(this.applicationInfoManager,
				this.eurekaClientConfig, this.eurekaServerConfig, registry,
				serverContext);
	}

2.1.3 Eureka服务的启动

             1. 在EurekaServerBootstrap类中咱们看到了初始化方法:   

public void contextInitialized(ServletContext context) {
		try {

            //看源码可知这里主要初始化服务环境,配置信息;
			initEurekaEnvironment();
           
            //初始化了eureka服务端的上下文
			initEurekaServerContext();

			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
		}
		catch (Throwable e) {
			log.error("Cannot bootstrap eureka server :", e);
			throw new RuntimeException("Cannot bootstrap eureka server :", e);
		}
	}

         在这个方法中咱们看到了初始化eureka-server环境配置及eureka-server上下文的操做,那么这个方法应该在一个地方有调用,经过查找调用发现:

@Configuration
@CommonsLog
public class EurekaServerInitializerConfiguration
		implements ServletContextAware, SmartLifecycle, Ordered {

   @Override
	public void start() {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					//TODO: is this class even needed now?
					 
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
					log.info("Started Eureka Server");

					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
					EurekaServerInitializerConfiguration.this.running = true;
					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
				}
				catch (Exception ex) {
					// Help!
					log.error("Could not initialize Eureka servlet context", ex);
				}
			}
		}).start();
	}


    @Override
	public void stop() {
		this.running = false;
		eurekaServerBootstrap.contextDestroyed(this.servletContext);
	}

 ..... 部分代码省略.....

}

           这个类在顶层实现了 org.springframework.context.Lifecycle 接口,经过tomcat管理生命周期;

          2. 经过分析上面实例化bean,咱们能够看到eureka服务是经过tomcat调用其声明周期方法来启动的;

              那么在启动eureka服务有哪些操做呢?咱们来深刻跟进下源码.

protected void initEurekaServerContext() throws Exception {
		// For backward compatibility
		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);
		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);

		if (isAws(this.applicationInfoManager.getInfo())) {
			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
					this.eurekaClientConfig, this.registry, this.applicationInfoManager);
			this.awsBinder.start();
		}

		EurekaServerContextHolder.initialize(this.serverContext);

		log.info("Initialized server context");

		// Copy registry from neighboring eureka node
		int registryCount = this.registry.syncUp();
		this.registry.openForTraffic(this.applicationInfoManager, registryCount);

		// Register all monitoring statistics.
		EurekaMonitors.registerAllStats();
	}

           在初始化eureka服务端initEurekaServerContext()方法中,主要作了初始化server上下文,同步了其余节点的信息,启动了剔除不可用eureka客户端的定时任务;

 

 

三. 接收Eureka客户端请求        

       咱们都知道,Eureka注册中心经过接收客户端的注册、续约等Http请求来维持服务实例在注册中心的状态,那么注册中心确定有一个端口供客户端访问,他们在哪里呢?

3.1 接收注册信息

3.1.1. 注册入口

        咱们经过重启注册中心查看输出日志,在跟踪代码调用,找到了注册的入口;

2018-08-23 11:11:36.029  INFO 202760 --- [nio-8761-exec-6] c.n.e.registry.AbstractInstanceRegistry  : Registered instance APPLICATIONCLIENT/PC-HEPENGFEI.ppmoney.com:applicationClient:9001 with status UP

       找到该处日志打印,跟踪到了 com.netflix.eureka.resources.ApplicationResource#addInstance,

/**
     * Registers information about a particular instance for an
     * {@link com.netflix.discovery.shared.Application}.
     *
     * @param info
     *            {@link InstanceInfo} information of the instance.
     * @param isReplication
     *            a header parameter containing information whether this is
     *            replicated from other nodes.
     */
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);

       ....省略部分代码....

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

            首先看一下方法入参,由isReplication解释可知,该方法还接收其余节点同步注册列表,所以这个入口有两个做用;

             1. 接收eureka客户端的注册请求,完成服务实例向注册中心的注册;

             2. 接收其余注册中心节点的同步信息.完成节点间服务列表的同步工做;

3.1.2 注册过程

         咱们跟踪方法执行:com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register

@Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

        在这里更新了最后一次续约间隔多久剔除的参数;在注册以后,有一个向其余节点同步的操做replicateToPeers,这个咱们在3.1.3讲解;

        再继续跟踪:com.netflix.eureka.registry.AbstractInstanceRegistry#register:

        在这个方法第一步是先上了读锁:ReentrantReadWriteLock,接着是注册操做:

read.lock();
    Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
    REGISTER.increment(isReplication);
    if (gMap == null) {
        final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new 
      ConcurrentHashMap<String, Lease<InstanceInfo>>();
         gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
         if (gMap == null) {
             gMap = gNewMap;
         }
    }

    从这段代码,咱们提取出一个变量registry,这是一个map,保存了服务实例信息,也就是说注册信息所有保存在这个map中,他的声明:

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

   很容易知道key为服务实例的appName,也是们eureka客户端配置的spring.application.name参数;

         经过debug咱们发现: 

                registry的value也是一个map结构,其key是spring.application.name的值与host和端口信息组合,value为Lease<InstanceInfo>,其包含了服务实例信息及注册续约等时间戳信息,用于配合维护状态;

               这样存储咱们能够很容易理解,外层key是对外暴露服务的,value为服务的集群,内层map的key为集群单个实例的信息,value为实例;

         在这里我发现一个问题,就是当一个新的服务注册实例的时候:

当咱们向下debug一行的时候发现问题来了:

      这个问题先抛出来,咱们先接着分析,以后再讨论这个问题;

     如上的操做,咱们就算注册了"一个服务实例"了,但实际上,这里应该只是注册了一个服务集群而已,key为集群的同一应用名,value为集群map;

    再看接下来的操做: 

Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

               //若是本次注册时间小于注册中心保存的该实例注册最近一次注册时间,说明以前已经注册成功了,那么就用已存在的替换请求进来的;
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }

 再接着看com.netflix.eureka.registry.AbstractInstanceRegistry#register

Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);

       这里保存了服务实例注册的时间ServiceUpTimestamp,而且将实例塞入对应集群的map中,在这里才是注册服务的终极地方,实现了一个服务实例的注册;

3.1.3 同步到注册中心其余节点

       在3.1.2咱们提到了注册以后有个向替他节点同步的方法replicateToPeers,在这里咱们深刻一下;

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

在这个方法中,第二个foreach中的if判断,若是目标节点和本机的hostName一致则不会同步,所以咱们在应用的时候Eureka的集群高可用---SpringCloud(二)配置了不一样的hostname;

      以下图:

     具体方法在这里:

/**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

能够看到,Action指定了要同步到其余节点信息的类型,如实例的注册,续约等,再根据不一样动做执行不一样的方法,全部的动做都经过这里分发同步到其余节点;

咱们查看该方法(com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers)调用也得出此结论;

最后由batchingDispatcher.process()执行任务;expiryTime为过时时间,与实例最后一次续约后剔除时长相同;

public interface TaskDispatcher<ID, T> {
    
    void process(ID id, T task, long expiryTime);

    void shutdown();
}

咱们再来看一下是如何得知要同步的节点,就是如何过去存在的其余节点的;

在com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers方法中有peerEurekaNodes变量,咱们看一下这个变量是如何赋值的;咱们查看调用:

initialize方法的入参来自于成员变量,成员变量赋值铜鼓构造方法,再看构造方法调用:

@Bean
	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
			PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
				registry, peerEurekaNodes, this.applicationInfoManager);
	}

在这里bean初始化时赋值的;咱们在依次根据该bean实例化所依赖的bean,最终能够看到来自于配置文件;

@Bean
	public ServerCodecs serverCodecs() {
		return new CloudServerCodecs(this.eurekaServerConfig);
	}

所以咱们能够得出一个结论,当注册中心集群时,每一个节点只会向其配置文件所配置的其余节点同步信息,且是单向的;

3.2 续约

3.2.1 续约入口

       单纯的直接找这个入口比较很差找,咱们经过方法调用,还记得同步服务实例信息到其余节点的方法么com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers,还有这个调用,从这里能够看到5个动做;

咱们经过跟踪方法调用找到了续约的接收客户端请求的入口:com.netflix.eureka.resources.InstanceResource#renewLease

/**
     * A put request for renewing lease from a client instance.
     *
     * @param isReplication
     *            a header parameter containing information whether this is
     *            replicated from other nodes.
     * @param overriddenStatus
     *            overridden status if any.
     * @param status
     *            the {@link InstanceStatus} of the instance.
     * @param lastDirtyTimestamp
     *            last timestamp when this instance information was updated.
     * @return response indicating whether the operation was a success or
     *         failure.
     */
    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response = null;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
        return response;
    }

3.2.2 续约renew到底作了什么

       咱们跟踪代码到核心方法:com.netflix.eureka.registry.AbstractInstanceRegistry#renew

Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        }

        先看这段代码,从注册列表中获取对应实例信息,准确的说是Lease对象,它包含实例信息及时间戳等,若是获取为null,则返回false,这是客户端因为接收到的返回码404,从新发起注册;

        接下来执行正常续约else分支:

else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatus(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            leaseToRenew.renew();
            return true;
        }

在这个分支中首先判断了实例的状态,若状态UNKNOW则返回false,从新发起注册;

重点看一下执行续约的方法leaseToRenew.renew();究竟是如何实现续约的呢?咱们展开这个方法:

/**
     * Renew the lease, use renewal duration if it was specified by the
     * associated {@link T} during registration, otherwise default duration is
     * {@link #DEFAULT_DURATION_IN_SECS}.
     */
    public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;

    }

很简单的操做,更新了上一次更新的时间戳字段为当前时间+持续时间(duration,就是剔除时最后一次续约所间隔时间)

3.3 剔除,服务下线(EvictionTask)

      经过以前咱们分析的方法,很容易的找到剔除不可用实例的任务类EvictionTask,那么咱们直接进入核心方法:com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)

3.3.1 注册中心的自我保护

首先咱们看到这个方法,

if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

         看日志能够看出当前租户过时不可用,也就是说不会由于有实例续约过时而被剔除,那么咱们看一下是取决于哪些条件,点进去看一下:

@Override
    public boolean isSelfPreservationModeEnabled() {
        return serverConfig.shouldEnableSelfPreservation();
    }

        一是来自于配置文件 eureka.server.enableSelfPreservation自我保护模式是否开启,自我保护模式,当出现出现网络分区、eureka在短期内丢失过多客户端时,会进入自我保护模式,即一个服务长时间没有发送心跳,eureka也不会将其删除,默认为true.

       若是该值配置为false,则永远不会进入保护模式,那么一旦遇到网络波动,会有大量的服务实例被剔除,可是他们却都是可用的,这是很危险的;若是是内网则另当别论了;

       二是经过阈值控制;

       若是Eureka Server最近1分钟收到renew的次数小于阈值(即预期的最小值),则会触发自我保护模式,此时Eureka Server此时会认为这是网络问题,它不会注销任何过时的实例。等到最近收到renew的次数大于阈值后,则Eureka Server退出自我保护模式。

自我保护模式阈值计算:

  • 每一个instance的预期心跳数目 = 60/每一个instance的心跳间隔秒数
  • 阈值 = 全部注册到服务的instance的数量的预期心跳之和 *自我保护系数

以上的参数均可配置的:

  • instance的心跳间隔秒数:eureka.instance.lease-renewal-interval-in-seconds
  • 自我保护系数:eureka.server.renewal-percent-threshold

3.3.2 随机剔除服务实例

// We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

           这段代码遍历服务列表,并判断是否过时,若过时将其add到expiredLeases中;

           接下来注册中心有执行了一个保护的操做:根据本地服务的数量从新计算了续约阈值,而后与注册的服务数量作差做为本次剔除服务数量的最大值,再对比放在过时待剔除服务列表中的数量,取最小值做为本次剔除过时服务的数量.该计算过程为了不某些缘由使得该注册中心节点服务实例被所有剔除;

          若计算后最终要剔除的服务数量小于待剔除服务列表中的数量,则采起随机方式剔除;

// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        //从新计算剔除数量
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;

        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
            
            //随机剔除
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }

           剔除后通知到其余注册中心节点;

 

四.结语

       讲到这里呢,基本上注册中心的主要内容就差很少了,若是有什么疑问或者每讲到的,欢迎留言,我会及时补充;

相关文章
相关标签/搜索