在Eureka - Client服务启动咱们看到,注册表获取的地方有两个,一个是EurekaClient构造函数,一个是定时器每隔30秒去获取。咱们先看看定时器TimedSupervisorTask的run方法segmentfault
这个方法有三个比较重要的参数,timeoutMillis、delay、maxDelay。好比频率是30s,那这个maxDelay就是30*10=300s(这个10的来源参考上一篇),delay在这里是每次都翻倍,可是不能比maxDelay大。
整个设计思路是,若是调用30s超时,那就用60秒,若是再超时,就一直翻,可是不能超过300s。若是在后面的调用中正常了,那delay就恢复到30s,下次超时继续翻倍。缓存
@Override public void run() { Future<?> future = null; try { // 执行任务 future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); // 指定超时时间 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout //设置delay delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); timeoutCounter.increment(); // 获取delay long currentDelay = delay.get(); // maxDelay和currentDelay的2倍中取最小值 long newDelay = Math.min(maxDelay, currentDelay * 2); // 设置为上面最小值 delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { // 其余的略 rejectedCounter.increment(); } catch (Throwable e) { // 其余的略 throwableCounter.increment(); } finally { if (future != null) { future.cancel(true); } // 把任务放入定时器,时间是delay if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }
判断增量获取仍是全量获取服务器
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { Applications applications = getApplications(); // 禁用增量、强制全量更新、没有注册信息的时候,进行全量更新 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { // 其余日志打印略 getAndStoreFullRegistry(); } else { //增量更新 getAndUpdateDelta(applications); } // 计算hash applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { // 其余日志打印略 return false; } finally { if (tracer != null) { tracer.stop(); } } // 其余略 return true; }
全量获取,调用注册中心地址+apps/获取。app
private void getAndStoreFullRegistry() throws Throwable { // 其余略 Applications apps = null; // 调用注册中心地址+apps/获取 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } // 其余略 }
增量更新,获取后,经过hash要判断是否和服务器一致,不一致就全量ide
private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; // 调用注册中心地址+apps/delta获取 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { //获取不到数据,全力更新 getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; //加锁 if (fetchRegistryUpdateLock.tryLock()) { try { // 和本地的合并 updateDelta(delta); // 计算hash reconcileHashCode = getReconcileHashCode(applications); } finally { //解锁 fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason // 和服务器的hash对比 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { // 不一致说明和服务器的不一致,直接全量 reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { // 其余略 } }
和本地数据的合并,包括新增、修改、删除。函数
private void updateDelta(Applications delta) { int deltaCount = 0; // 遍历全部增量 for (Application app : delta.getRegisteredApplications()) { // 遍历全部实例 for (InstanceInfo instance : app.getInstances()) { // 本地缓存 Applications applications = getApplications(); String instanceRegion = instanceRegionChecker.getInstanceRegion(instance); // 是否为同一个region,若是不是,则取同一个region的applications if (!instanceRegionChecker.isLocalRegion(instanceRegion)) { Applications remoteApps = remoteRegionVsApps.get(instanceRegion); if (null == remoteApps) { remoteApps = new Applications(); remoteRegionVsApps.put(instanceRegion, remoteApps); } applications = remoteApps; } ++deltaCount; if (ActionType.ADDED.equals(instance.getActionType())) { // 新增处理 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.MODIFIED.equals(instance.getActionType())) { // 修改处理 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Modified instance {} to the existing apps ", instance.getId()); // 和新增都是调用addInstance方法,由于Application#addInstance里是先移除再新增,全部修改也能够调用 applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.DELETED.equals(instance.getActionType())) { // 删除处理 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp != null) { logger.debug("Deleted instance {} to the existing apps ", instance.getId()); existingApp.removeInstance(instance); if (existingApp.getInstancesAsIsFromEureka().isEmpty()) { applications.removeApplication(existingApp); } } } } } logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount); getApplications().setVersion(delta.getVersion()); getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); for (Applications applications : remoteRegionVsApps.values()) { applications.setVersion(delta.getVersion()); applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } }