eureka-server
是经过提供rest
接口来跟eureka-client
交互,咱们重点关注跟eureka-client
相关的几个接口。java
注册的uri
是GET /apps
。入口在ApplicationResource
。node
@Path("/{version}/apps") @Produces({"application/xml", "application/json"}) public class ApplicationsResource { private final ResponseCache responseCache; @GET public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); } // Check if the server allows the access to the registry. The server can // restrict access if it is not // ready to serve traffic depending on various reasons. if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { return Response.status(Status.FORBIDDEN).build(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { response = Response.ok(responseCache.get(cacheKey)) .build(); } return response; } }
version
为eureka
版本号,最新API默认为V2
,Spring Cloud Netfix Eureka 跟 Spring Boot 适配以后,提供的 REST API 与原始的 REST API 有一点点不一样,其路径中的 version
值固定为eureka
。responseCache
里获取,从key
的建立参数上能够看到,缓存里根据key
分红了不少个类别。public class ResponseCacheImpl implements ResponseCache { private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>(); private final LoadingCache<Key, Value> readWriteCacheMap; private final AbstractInstanceRegistry registry; public String get(final Key key) { return get(key, shouldUseReadOnlyResponseCache); } @VisibleForTesting String get(final Key key, boolean useReadOnlyCache) { Value payload = getValue(key, useReadOnlyCache); if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) { return null; } else { return payload.getPayload(); } } @VisibleForTesting Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key : {}", key, t); } return payload; } private Value generatePayload(Key key) { Stopwatch tracer = null; try { String payload; switch (key.getEntityType()) { case Application: boolean isRemoteRegionRequested = key.hasRegions(); if (ALL_APPS.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeAllAppsWithRemoteRegionTimer.start(); payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions())); } else { tracer = serializeAllAppsTimer.start(); payload = getPayLoad(key, registry.getApplications()); } } else if (ALL_APPS_DELTA.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeDeltaAppsWithRemoteRegionTimer.start(); versionDeltaWithRegions.incrementAndGet(); versionDeltaWithRegionsLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); } else { tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltas()); } } else { tracer = serializeOneApptimer.start(); payload = getPayLoad(key, registry.getApplication(key.getName())); } break; case VIP: case SVIP: tracer = serializeViptimer.start(); payload = getPayLoad(key, getApplicationsForVip(key, registry)); break; default: logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType()); payload = ""; break; } return new Value(payload); } finally { if (tracer != null) { tracer.stop(); } } } }
eureka
为了保证性能,引入了三级缓存机制:web
readOnlyCacheMap
,是ConcurrentHashMap
,springcloud项目能够经过配置eureka.server.use-read-only-response-cache
来指定是否开启,默认为true
。readWriteCacheMap
,是guava cache
。register
存储在线服务信息,ConcurrentHashMap<String,ConcurrentHashMap<String, Lease<InstanceInfo>>>
类型,存储格式为ConcurrentHashMap<服务名,ConcurrentHashMap<机器ID(默认为“IP:服务名:配置的端口”), 服务信息(Lease)>>
,例如<SERVICE-TEST,ConcurrentHashMap<localhost:SERVICE-TEST:6666, Lease<InstanceInfo>>>
。总体流程:spring
useReadOnlyCache
,则从readOnlyCacheMap
获取数据。useReadOnlyCache
,或者readOnlyCacheMap
获取不到数据,则从readWriteCacheMap
获取;获取到数据后,若是开启了useReadOnlyCache
,则将结果设置到readOnlyCacheMap
中。readWriteCacheMap
获取不到数据,则从register
中获取数据,获取到数据后,将结果设置到readWriteCacheMap
中。public class ResponseCacheImpl implements ResponseCache { ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); this.registry = registry; long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener<Key, Value>() { @Override public void onRemoval(RemovalNotification<Key, Value> notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); return value; } }); if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e); } } private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { logger.debug("Updating the client cache from response cache for key : {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType()); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } } } }; } }
readOnlyCacheMap
,是经过定时任务,定时从readWriteCacheMap
读取数据刷新值,定时任务默认30秒
间隔。readWriteCacheMap
,经过guava cache
自身的机制来实现过时并刷新,默认是从写入时间起算,30秒以后过时
。注册的uri
是GET /apps/delta
。入口在ApplicationResource
。json
@Path("/{version}/apps") @Produces({"application/xml", "application/json"}) public class ApplicationsResource { @Path("delta") @GET public Response getContainerDifferential( @PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); // If the delta flag is disabled in discovery or if the lease expiration // has been disabled, redirect clients to get all instances if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) { return Response.status(Status.FORBIDDEN).build(); } String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL_DELTA.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS_DELTA, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { return Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { return Response.ok(responseCache.get(cacheKey)) .build(); } } }
增量的逻辑基本上跟全量同样,差异在于cacheKey
的KeyType
为ResponseCacheImpl.ALL_APPS_DELTA
。ResponseCache
里,全量获取是经过registry.getApplications()
遍历registry
的Map
获取全部的服务;而增量的registry.getApplicationDeltas()
方法,则是有专门的queue
存储最近变化的信息。缓存
public abstract class AbstractInstanceRegistry implements InstanceRegistry { public Applications getApplicationDeltas() { GET_ALL_CACHE_MISS_DELTA.increment(); Applications apps = new Applications(); apps.setVersion(responseCache.getVersionDelta().get()); Map<String, Application> applicationInstancesMap = new HashMap<String, Application>(); try { write.lock(); Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator(); logger.debug("The number of elements in the delta queue is : {}", this.recentlyChangedQueue.size()); while (iter.hasNext()) { Lease<InstanceInfo> lease = iter.next().getLeaseInfo(); InstanceInfo instanceInfo = lease.getHolder(); logger.debug( "The instance id {} is found with status {} and actiontype {}", instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name()); Application app = applicationInstancesMap.get(instanceInfo .getAppName()); if (app == null) { app = new Application(instanceInfo.getAppName()); applicationInstancesMap.put(instanceInfo.getAppName(), app); apps.addApplication(app); } app.addInstance(new InstanceInfo(decorateInstanceInfo(lease))); } boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion(); if (!disableTransparentFallback) { Applications allAppsInLocalRegion = getApplications(false); for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) { Applications applications = remoteRegistry.getApplicationDeltas(); for (Application application : applications.getRegisteredApplications()) { Application appInLocalRegistry = allAppsInLocalRegion.getRegisteredApplications(application.getName()); if (appInLocalRegistry == null) { apps.addApplication(application); } } } } Applications allApps = getApplications(!disableTransparentFallback); apps.setAppsHashCode(allApps.getReconcileHashCode()); return apps; } finally { write.unlock(); } } }
recentlyChangedQueue
由定时任务定时清理过时数据,定时任务30
秒执行一次,清理180
秒以前的数据。安全
public abstract class AbstractInstanceRegistry implements InstanceRegistry { protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) { this.serverConfig = serverConfig; this.clientConfig = clientConfig; this.serverCodecs = serverCodecs; this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000); this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000); this.renewsLastMin = new MeasuredRate(1000 * 60 * 1); this.deltaRetentionTimer.schedule(getDeltaRetentionTask(), serverConfig.getDeltaRetentionTimerIntervalInMs(), serverConfig.getDeltaRetentionTimerIntervalInMs()); } private TimerTask getDeltaRetentionTask() { return new TimerTask() { @Override public void run() { Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator(); while (it.hasNext()) { if (it.next().getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) { it.remove(); } else { break; } } } }; } }
获取服务总体流程以下:网络
注册的uri
是POST /apps/{serviceId}
。入口在ApplicationResource
。app
@Produces({"application/xml", "application/json"}) public class ApplicationResource { private static final Logger logger = LoggerFactory.getLogger(ApplicationResource.class); private final PeerAwareInstanceRegistry registry; /** * Registers information about a particular instance for an * {@link com.netflix.discovery.shared.Application}. * * @param info * {@link InstanceInfo} information of the instance. * @param isReplication * a header parameter containing information whether this is * replicated from other nodes. */ @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible } }
InstanceInfo info
是服务节点信息,String isReplication
用来标识是否集群内其余eureka-server
节点发送过来的复制信息。接口主要代码在PeerAwareInstanceRegistry.register
。dom
@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { @Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } }
PeerAwareInstanceRegistry.register
主要两件事,一是调用父类的register
,再就是向集群内其余节点传播注册信息。先看下父类的注册方法。
public abstract class AbstractInstanceRegistry implements InstanceRegistry { private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>(); protected volatile ResponseCache responseCache; public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); } Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } } }
方法用lock
保证线程安全,大概流程以下:
register
,register
存储在线服务信息,ConcurrentHashMap<String,ConcurrentHashMap<String, Lease<InstanceInfo>>>
类型,存储格式为ConcurrentHashMap<服务名,ConcurrentHashMap<机器ID(默认为“IP:服务名:配置的端口”), 服务信息(Lease)>>
,例如<SERVICE-TEST,ConcurrentHashMap<localhost:SERVICE-TEST:6666, Lease<InstanceInfo>>>
。最近注册队列-recentRegisteredQueue
中。最近变动队列-recentlyChangedQueue
中。eureka
为了保证性能,引入了三级缓存机制(后续会讲到)。进入清理缓存看下:
protected volatile ResponseCache responseCache; private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { // invalidate cache responseCache.invalidate(appName, vipAddress, secureVipAddress); }
实际清理代码在ResponseCache
内。
public class ResponseCacheImpl implements ResponseCache { private final LoadingCache<Key, Value> readWriteCacheMap; @Override public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { for (Key.KeyType type : Key.KeyType.values()) { for (Version v : Version.values()) { invalidate( new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact), new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact), new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact) ); if (null != vipAddress) { invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full)); } if (null != secureVipAddress) { invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full)); } } } } public void invalidate(Key... keys) { for (Key key : keys) { logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(key); Collection<Key> keysWithRegions = regionSpecificKeys.get(key); if (null != keysWithRegions && !keysWithRegions.isEmpty()) { for (Key keysWithRegion : keysWithRegions) { logger.debug("Invalidating the response cache key : {} {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(keysWithRegion); } } } } ,}
再回到PeerAwareInstanceRegistry.replicateToPeers
看怎么传播信息到集群其余节点。
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
Action
为Register
,进入到PeerEurekaNode.register
方法
public class PeerEurekaNode { public void register(final InstanceInfo info) throws Exception { long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); } }
传播节点是经过TaskExecutors
异步执行(也就是即便当eureka-client
成功调用注册接口,也并不表明全部的eureka-server
都已经有这个client
的信息),最终执行方法是HttpReplicationClient.register
,HttpReplicationClient
默认是JerseyReplicationClient
。
public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient { @Override protected void addExtraHeaders(Builder webResource) { webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true"); } }
JerseyReplicationClient
是AbstractJerseyEurekaHttpClient
,因此传播实际上也是调用其余节点的注册接口,只是额外在头部添加了x-netflix-discovery-replication
为true
,而经过前面的代码@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication
,咱们知道若是isReplication
为true
时,表示这是一个集群其余节点的复制信息
,就不会再作传播这一步。
因为传播是异步进行的,异步有失败的风险,这个在续约(renew)
时会作补充。
注册的uri
是PUT apps/{serviceId}/{instanceId}
。入口在InstanceResource
。
public class InstanceResource { public Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); // Not found in the registry, immediately ask for a register if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } // Check if we need to sync based on dirty time stamp, the client // instance might have changed some value Response response; if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) { response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); // Store the overridden status since the validation found out the node that replicates wins if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus != null) && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); } } else { response = Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus()); return response; } }
renewLease
方法主要作两件事:
registry.renew
执行续约。lastDirtyTimestamp
。404
。public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { public boolean renew(final String appName, final String id, final boolean isReplication) { if (super.renew(appName, id, isReplication)) { replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; } }
跟注册同样,续约也是调用父类的方法,而后传播到集群其余节点。
public abstract class AbstractInstanceRegistry implements InstanceRegistry { public boolean renew(String appName, String id, boolean isReplication) { RENEW.increment(isReplication); Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToRenew = null; if (gMap != null) { leaseToRenew = gMap.get(id); } if (leaseToRenew == null) { RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null) { // touchASGCache(instanceInfo.getASGName()); InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required", instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status", instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId()); instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } renewsLastMin.increment(); leaseToRenew.renew(); return true; } } } public class Lease<T> { private volatile long lastUpdateTimestamp; public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; } }
Lease
,若是没有,就返回失败。最终状态
不一致,则用最终状态
覆盖调当前状态。传播续租
给集群其余节点,跟注册也同样:异步调用其余节点的续租
接口,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication
参数为true
。
注册的uri
是DELETE apps/{serviceId}/{instanceId}
。入口在InstanceResource
。
public class InstanceResource { @DELETE public Response cancelLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { try { boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication)); if (isSuccess) { logger.debug("Found (Cancel): {} - {}", app.getName(), id); return Response.ok().build(); } else { logger.info("Not Found (Cancel): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } } catch (Throwable e) { logger.error("Error (cancel): {} - {}", app.getName(), id, e); return Response.serverError().build(); } } }
一样的,registry
是PeerAwareInstanceRegistryImpl
。
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { @Override public boolean cancel(final String appName, final String id, final boolean isReplication) { if (super.cancel(appName, id, isReplication)) { replicateToPeers(Action.Cancel, appName, id, null, null, isReplication); synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to cancel it, reduce the number of clients to send renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; updateRenewsPerMinThreshold(); } } return true; } return false; } protected void updateRenewsPerMinThreshold() { this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold()); } }
PeerAwareInstanceRegistryImpl
除了调用父类的cancel
、传播信息给集群其余节点以外,还会从新计算numberOfRenewsPerMinThreshold
,numberOfRenewsPerMinThreshold
在自我保护机子里会用到。
public abstract class AbstractInstanceRegistry implements InstanceRegistry { @Override public boolean cancel(String appName, String id, boolean isReplication) { return internalCancel(appName, id, isReplication); } protected boolean internalCancel(String appName, String id, boolean isReplication) { try { read.lock(); CANCEL.increment(isReplication); Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToCancel = null; if (gMap != null) { leaseToCancel = gMap.remove(id); } synchronized (recentCanceledQueue) { recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")")); } InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); if (instanceStatus != null) { logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name()); } if (leaseToCancel == null) { CANCEL_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); return false; } else { leaseToCancel.cancel(); InstanceInfo instanceInfo = leaseToCancel.getHolder(); String vip = null; String svip = null; if (instanceInfo != null) { instanceInfo.setActionType(ActionType.DELETED); recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel)); instanceInfo.setLastUpdatedTimestamp(); vip = instanceInfo.getVIPAddress(); svip = instanceInfo.getSecureVipAddress(); } invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); return true; } } finally { read.unlock(); } } }
super.cancel
主要作:清理数据(registry)、添加事件、清理缓存(清理readWriteCacheMap,但不清理readOnlyCacheMap,readOnlyCacheMap只经过定时任务来刷新)。
再回到PeerAwareInstanceRegistryImpl.replicateToPeers
方法,跟注册/续租同样,是异步调其余eureka-server
节点的接口,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication
参数为true
。
正常状况下,应用实例下线时候会主动向eureka-Server
发起下线请求。但实际状况下,应用实例可能异常崩溃,又或者是网络异常等缘由,致使下线请求没法被成功提交。
public abstract class AbstractInstanceRegistry implements InstanceRegistry { protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); } }
AbstractInstanceRegistry
里会启动定时任务EvictionTask
,按期下线/清理失效的服务信息,默认是60秒指定一次清理任务,清理超过90秒
未续约服务,但实际上计算时,还会算上补偿时间(补偿时间 = 当前时间 - 最后任务执行时间 - 任务执行频率),至于为何要算上补偿事件,能够看Lease.isExpired(long additionalLeaseMs)
方法的注释。
public abstract class AbstractInstanceRegistry implements InstanceRegistry { class EvictionTask extends TimerTask { private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l); @Override public void run() { try { long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } } long getCompensationTimeMs() { long currNanos = getCurrentTimeNano(); long lastNanos = lastExecutionNanosRef.getAndSet(currNanos); if (lastNanos == 0l) { return 0l; } long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos); long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs(); return compensationTime <= 0l ? 0l : compensationTime; } long getCurrentTimeNano() { // for testing return System.nanoTime(); } } public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // We collect first all expired items, to evict them in random order. For large eviction sets, // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it, // the impact should be evenly distributed across all applications. List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry. int registrySize = (int) getLocalRegistrySize(); int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); internalCancel(appName, id, false); } } } } public class Lease<T> { /** * Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not. * 关于补偿时间的说明: * Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than * what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect * instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will * not be fixed. * * @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms. */ public boolean isExpired(long additionalLeaseMs) { return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); } }
isLeaseExpirationEnabled
是判断是否开启了自我保护机制
,当时是否符合自我保护机制
条件。
清理的时候:
分批清理例子:
// 假设 20 个租约,其中有 10 个租约过时。 // 第一轮执行开始 int registrySize = 20; int registrySizeThreshold = (int) (20 * 0.85) = 17; int evictionLimit = 20 - 17 = 3; int toEvict = Math.min(10, 3) = 3; // 第一轮执行结束,剩余 17 个租约,其中有 7 个租约过时。 // 第二轮执行开始 int registrySize = 17; int registrySizeThreshold = (int) (17 * 0.85) = 14; int evictionLimit = 17 - 14 = 3; int toEvict = Math.min(7, 3) = 3; // 第二轮执行结束,剩余 14 个租约,其中有 4 个租约过时。 // 第三轮执行开始 int registrySize = 14; int registrySizeThreshold = (int) (14 * 0.85) = 11; int evictionLimit = 14 - 11 = 3; int toEvict = Math.min(4, 3) = 3; // 第三轮执行结束,剩余 11 个租约,其中有 1 个租约过时。 // 第四轮执行开始 int registrySize = 11; int registrySizeThreshold = (int) (11 * 0.85) = 9; int evictionLimit = 11 - 9 = 2; int toEvict = Math.min(1, 2) = 1; // 第四轮执行结束,剩余 10 个租约,其中有 0 个租约过时。结束。
下线
的代码internalCancel
跟前面的下线是同一个方法。
参考:
《 Eureka 源码解析 —— 应用实例注册发现(五)之过时》