实例列表获取主要是HostReactor#getServiceInfo方法。Nacos - 启动中namingService.subscribe注册监听的时候,也会调用这个方法。json
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { // 若是发生故障转移,就从文件缓存里取 NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // 从serviceInfoMap里取 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); // serviceInfoMap没有 if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); // 内存没有,从服务器取 updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { // 若是正在更新,则wait,避免多线程同时调用服务器 if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } // 开启定时任务更新服务列表 scheduleUpdateIfAbsent(serviceName, clusters); // 从内存里取 return serviceInfoMap.get(serviceObj.getKey()); }
从服务器获取,NamingProxy会调用NamingProxy#reqApi,他会随机取一个server,调用NamingProxy#callServer。NamingProxy的代码就略了。segmentfault
private void updateServiceNow(String serviceName, String clusters) { try { updateService(serviceName, clusters); } catch (NacosException e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } } public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { // 从服务器取 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { // 解析返回的字符串 processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } }
主要是判断是否有更新,有更新发送给serviceChanged,并写入文件缓存
public ServiceInfo processServiceJson(String json) { ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (serviceInfo.getHosts() == null || !serviceInfo.validate()) { return oldService; } boolean changed = false; if (oldService != null) { if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { oldHostMap.put(host.toInetAddr(), host); } Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size()); for (Instance host : serviceInfo.getHosts()) { newHostMap.put(host.toInetAddr(), host); } Set<Instance> modHosts = new HashSet<Instance>(); Set<Instance> newHosts = new HashSet<Instance>(); Set<Instance> remvHosts = new HashSet<Instance>(); // 下面是修改、新增、移除的筛选。 List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>( newHostMap.entrySet()); for (Map.Entry<String, Instance> entry : newServiceHosts) { Instance host = entry.getValue(); String key = entry.getKey(); if (oldHostMap.containsKey(key) && !StringUtils .equals(host.toString(), oldHostMap.get(key).toString())) { modHosts.add(host); continue; } if (!oldHostMap.containsKey(key)) { newHosts.add(host); } } for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue; } if (!newHostMap.containsKey(key)) { remvHosts.add(host); } } if (newHosts.size() > 0) { changed = true; NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(newHosts)); } if (remvHosts.size() > 0) { changed = true; NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(remvHosts)); } if (modHosts.size() > 0) { changed = true; updateBeatInfo(modHosts); NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(modHosts)); } serviceInfo.setJsonFromServer(json); // 有更新发送给serviceChanged,并写入文件 if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { eventDispatcher.serviceChanged(serviceInfo); DiskCache.write(serviceInfo, cacheDir); } } else { changed = true; NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 有更新发送给serviceChanged,并写入文件 eventDispatcher.serviceChanged(serviceInfo); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); } return serviceInfo; }
把任务加入线程池服务器
public void scheduleUpdateIfAbsent(String serviceName, String clusters) { // 任务已经有了就不加了 if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { // // 任务已经有了就不加了 if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } // 把任务加入线程池 ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); // 加入任务 futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } }
定时任务更新,默认每10秒更新,若是失败了,就每次乘以2,好比第一次1秒,第二次2秒,第三次4秒,最可能是2的6次方,最大等待60秒。多线程
public void run() { long delayTime = DEFAULT_DELAY; try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); // serviceInfoMap没有就直接更新 if (serviceObj == null) { updateService(serviceName, clusters); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { updateService(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } lastRefTime = serviceObj.getLastRefTime(); if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap .containsKey(ServiceInfo.getKey(serviceName, clusters))) { // abort the update task NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters); return; } // 失败了就增长失败次数 if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } delayTime = serviceObj.getCacheMillis(); // 成功就重置为1 resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } finally { // 从新到线程池 executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } }
结合Nacos - HostReactor的建立。ide