咱们从原生SDK代码中入手,能够发现最核心的两行代码:算法
ConfigService configService=NacosFactory.createConfigService(properties);
String content=configService.getConfig(dataId,groupId,3000);
首先咱们先来看 NacosFactory.createConfigService :spring
public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class);
//调用反射建立一个NacosConfigService实例 ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } }
这一步的代码很简单,及经过类的全类名经过反射建立一个 NacosConfigService 实例,咱们跟进该类的构造方法:sql
public NacosConfigService(Properties properties) throws NacosException { String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { encode = Constants.ENCODE; } else { encode = encodeTmp.trim(); }//初始化命名空间 initNamespace(properties); agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); agent.start(); worker = new ClientWorker(agent, configFilterChainManager, properties); }
这一步主要初始化了 agent 与 worker 两个实例。这里又看到熟悉的包装器模式,将ServerHttpAgent 包装成MetricsHttpAgent,这里咱们须要知道,其中MetricsHttpAgent是对ServerHttpAgent功能的拓展,核心功能仍是由ServerHttpAgent去实现,接下去咱们来看一下 worker 的初始化,从名字上看能知道 最后真的工做的是他:apache
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter // 初始化一些参数
init(properties); //建立了一个定时任务的线程池
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); //建立了一个保持长链接的线程池
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); //建立了一个延迟任务线程池来每隔10ms来检查配置信息的线程池
executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }
这一步建立了两个线程池,第一个线程池负责与配置中心进行数据的交互,而且启动后延迟1ms,以后每隔10ms对配置信息进行定时检查,第二个线程池则是负责保持一个长链接。咱们再服务启动以后便会执行 checkConfigInfo(),跟进去看看:api
public void checkConfigInfo() { // 分任务(解决大数据量的传输问题)
int listenerSize = cacheMap.get().size(); // 向上取整为批数,分批次进行检查 // ParamUtil.getPerTaskConfigSize() =3000
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); // currentLongingTaskCount =0
if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // 要判断任务是否在执行 这块须要好好想一想。 任务列表如今是无序的。变化过程可能有问题
executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
这里主要是先去除缓存中 Map<String, CacheData> 的数量,为避免处理过量的数据,这里对缓存数据进行了分组,最后建立 LongPollingRunnable 去执行,能够知道 这里会进入 LongPollingRunnable 的 Run 方法:缓存
public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // check failover config
for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { //检查本地配置
checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) {
//检查缓存的MD5 cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } //检查服务端配置
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String content = getServerConfig(dataId, group, tenant, 3000L); //将配置设置进缓存
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)); } catch (NacosException ioe) { String message = String.format( "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } }
总的来讲,该方法主要流程是先检查本地缓存,再检查服务端的配置,由改变最后再回写到本地及加载到缓存。服务器
private void checkLocalConfig(CacheData cacheData) { final String dataId = cacheData.dataId; final String group = cacheData.group; final String tenant = cacheData.tenant;
//本地文件缓存 File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); // 没有 -> 有 //不使用本地配置,可是持久化文件存在,须要读取文件加载至内存
if (!cacheData.isUseLocalConfigInfo() && path.exists()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); String md5 = MD5.getInstance().getMD5String(content); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); return; } // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。 //使用本地配置,可是持久化文件不存在
if (cacheData.isUseLocalConfigInfo() && !path.exists()) { cacheData.setUseLocalConfigInfo(false); LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); return; } // 有变动 //使用本地配置,持久化文件存在,缓存跟文件最后修改时间不一致
if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path.lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); String md5 = MD5.getInstance().getMD5String(content); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); } }
本地检查主要是经过是否使用本地配置,继而寻找持久化缓存文件,再经过判断文件的最后修改事件与本地缓存的版本是否一致来判断是否由变动。本地检查完毕,若是使用本地配置会进入下列代码:mvc
if (cacheData.isUseLocalConfigInfo()) { //检查缓存的MD5
cacheData.checkListenerMd5(); }
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
//MD5由变动,说明数据变动
if (!md5.equals(wrap.lastCallMd5)) {
//执行回调
safeNotifyListener(dataId, group, content, md5, wrap);
}
}
}
本地检查完毕会进行远程服务器检查:app
//检查服务端配置
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
这里会去获取一个发生变化的GroupKeys 集合:dom
/** * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。 */ List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException { StringBuilder sb = new StringBuilder(); for (CacheData cacheData : cacheDatas) { if (!cacheData.isUseLocalConfigInfo()) { sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR); if (StringUtils.isBlank(cacheData.tenant)) { sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); } else { sb.append(cacheData.getMd5()).append(WORD_SEPARATOR); sb.append(cacheData.getTenant()).append(LINE_SEPARATOR); } if (cacheData.isInitializing()) { // cacheData 首次出如今cacheMap中&首次check更新
inInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); } } } boolean isInitializingCacheList = !inInitializingCacheList.isEmpty(); return checkUpdateConfigStr(sb.toString(), isInitializingCacheList); }
这里将可能发生变化的配置信息封装成一个 StringBuilder ,继而调用 checkUpdateConfigStr:
/** * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。 */ List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); List<String> headers = new ArrayList<String>(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); // told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) { headers.add("Long-Pulling-Timeout-No-Hangup"); headers.add("true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try {//发起一个Post请求 HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code); } } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }
就这样从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。获取到这个列表之后就便利这个列表,去服务器端获取对应变动后的配置:
for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String content = getServerConfig(dataId, group, tenant, 3000L); //将配置设置进缓存
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)); } catch (NacosException ioe) { String message = String.format( "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } }
这里会发起请求从服务器端获取配置:getServerConfig:
public String getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP; } HttpResult result = null; try { List<String> params = null; if (StringUtils.isBlank(tenant)) { params = Arrays.asList("dataId", dataId, "group", group); } else { params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant); } result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (IOException e) { String message = String.format( "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, e); throw new NacosException(NacosException.SERVER_ERROR, e); } switch (result.code) { case HttpURLConnection.HTTP_OK: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content); return result.content; case HttpURLConnection.HTTP_NOT_FOUND: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null); return null; case HttpURLConnection.HTTP_CONFLICT: { LOGGER.error( "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
+ "tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(NacosException.CONFLICT, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } case HttpURLConnection.HTTP_FORBIDDEN: { LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(result.code, result.content); } default: { LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId, group, tenant, result.code); throw new NacosException(result.code, "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } } }
经过初始化时候的 agent.httpGet 去发起一个Get请求,就这样变动本例的配置,当从远程服务器获取玩配置之后还有一个循环:
for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } }
这个循环主要是对有变化的配置进行监听回调。整个流程就差不都完成了,最后来一张流程图:
咱们知道客户端会有一个长轮训的任务去检查服务器端的配置是否发生了变化,若是发生了变动,那么客户端会拿到变动的 groupKey 再根据 groupKey 去获取配置项的最新值更新到本地的缓存以及文件中,那么这种每次都靠客户端去请求,那请求的时间间隔设置多少合适呢?
若是间隔时间设置的太长的话有可能没法及时获取服务端的变动,若是间隔时间设置的过短的话,那么频繁的请求对于服务端来讲无疑也是一种负担,因此最好的方式是客户端每隔一段长度适中的时间去服务端请求,而在这期间若是配置发生变动,服务端可以主动将变动后的结果推送给客户端,这样既能保证客户端可以实时感知到配置的变化,也下降了服务端的压力。 咱们来看看nacos设置的间隔时间是多久。
客户端发起一个请求到服务端,服务端收到客户端的请求后,并不会马上响应给客户端,而是先把这个请求hold住,而后服务端会在hold住的这段时间检查数据是否有更新,若是有,则响应给客户端,若是一直没有数据变动,则达到必定的时间(长轮训时间间隔)才返回。
长轮训典型的场景有: 扫码登陆、扫码支付。
在ClientWorker这个类里面,找到 checkUpdateConfigStr 这个方法,这里面就是去服务器端查询发生变化的groupKey。
/** * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。 */ List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); List<String> headers = new ArrayList<String>(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.add("Long-Pulling-Timeout-No-Hangup"); headers.add("true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try {//客户端发送的请求地址是: /v1/cs/configs/listener HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code); } } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }
这个方法最终会发起http请求,注意这里面有一个 timeout 的属性,
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout);
timeout是在init这个方法中赋值的,默认状况下是30秒,能够经过configLongPollTimeout进行修改
private void init(Properties properties) { // 默认长轮询的事件就是30S timeout = Math.max(NumberUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT), //public static final int CONFIG_LONG_POLL_TIMEOUT = 30000; //public static final int MIN_CONFIG_LONG_POLL_TIMEOUT = 10000; Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT); taskPenaltyTime = NumberUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME); enableRemoteSyncConfig = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG)); }
因此从这里得出的一个基本结论是:客户端发起一个轮询请求,超时时间是30s。 那么客户端为何要等待30s才超时呢?不是越快越好吗? 咱们能够在nacos的日志目录下 $NACOS_HOME/nacos/logs/config-client-request.log 文件.
能够看到一个现象,在配置没有发生变化的状况下,客户端会等29.5s以上,才请求到服务器端的结果。而后客户端拿到服务器端的结果以后,在作后续的操做。当服务器端频繁的修改,那么服务器端频繁客户端进行推送.
服务端是如何处理客户端的请求的?那么一样,咱们须要思考几个问题:
nacos是使用spring mvc提供的rest api,其中有个类是 ConfigController ,咱们在其中找到了Post 请求的 listener 路径的接口方法:
/** * 比较MD5 */ @RequestMapping(value = "/listener", method = RequestMethod.POST) public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); String probeModify = request.getParameter("Listening-Configs"); if (StringUtils.isBlank(probeModify)) { throw new IllegalArgumentException("invalid probeModify"); } probeModify = URLDecoder.decode(probeModify, Constants.ENCODE); Map<String, String> clientMd5Map; try { clientMd5Map = MD5Util.getClientMd5Map(probeModify); } catch (Throwable e) { throw new IllegalArgumentException("invalid probeModify"); } // do long-polling inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); }
先是获取了客户端的MD5集合,这里面会调用inner.doPollingConfig进行处理,这个方法中,兼容了长轮训和短轮询的逻辑,咱们只须要关注长轮训的部分:
/** * 轮询接口 */ public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException, ServletException { // 长轮询 if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; }
......//省略代码
}
这里咱们进入长轮询的代码块:
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { //超时时间 String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提早500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance */ long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // do nothing but set fix polling timeout } else { long start = System.currentTimeMillis(); List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // 必定要由HTTP线程调用,不然离开后容器会当即发送响应 final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout()的超时时间不许,因此只能本身控制 asyncContext.setTimeout(0L); scheduler.execute( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }
这个方法是把客户端的长轮训请求添加到任务中去。得到客户端传递过来的超时时间,而且进行本地计算,提早500ms返回响应,这就能解释为何客户端响应超时时间是29.5+了。固然若是 isFixedPolling=true 的状况下,不会提早返回响应根据客户端请求过来的md5和服务器端对应的group下对应内容的md5进行比较,若是不一致,则经过 generateResponse 将结果返回若是配置文件没有发生变化,则经过 scheduler.execute 启动了一个定时任务,将客户端的长轮询请求封装成一个叫 ClientLongPolling 的任务,交给 scheduler 去执行,那么接下去必定会进入ClientLongPolling 的Run 方法:
public void run() { asyncTimeoutFuture = scheduler.schedule(new Runnable() { @Override public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); /** * 删除订阅关系 */ allSubs.remove(ClientLongPolling.this); if (isFixedPolling()) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); List<String> changedGroups = MD5Util.compareMd5( (HttpServletRequest)asyncContext.getRequest(), (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
//有变化当即执行返回 if (changedGroups.size() > 0) { sendResponse(changedGroups); } else { sendResponse(null); } } else { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } catch (Throwable t) { LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause()); } } //延迟29.5秒后执行 }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this); }
在run方法中,经过scheduler.schedule实现了一个定时任务,它的delay时间正好是前面计算的29.5s。在这个任务中,会经过MD5Util.compareMd5来进行计算那另一个,当数据发生变化之后,确定不能等到29.5s以后才通知呀,那怎么办呢?咱们发现有一个allSubs 的东西,它彷佛和发布订阅有关系。那是否是有可能当前的clientLongPolling订阅了数据变化的事件呢?allSubs是一个队列,队列里面放了ClientLongPolling这个对象。这个队列彷佛和配置变动有某种关联关系:
/** * 长轮询订阅关系 */ final Queue<ClientLongPolling> allSubs;
注释里写明了他是和长轮询订阅相关的,接着咱们先来看一下他所归属的类的类图:
发现LongPollingService集成了AbstractEventListener,事件监听.
AbstractEventListener:
static public abstract class AbstractEventListener { public AbstractEventListener() { /** * automatic register */ EventDispatcher.addEventListener(this); } /** * 感兴趣的事件列表 * * @return event list */ abstract public List<Class<? extends Event>> interest(); /** * 处理事件 * * @param event event */ abstract public void onEvent(Event event); }
这里面有一个抽象的onEvent方法,明显是用来处理事件的方法,而抽象方法必须由子类实现,因此意味着LongPollingService里面确定实现了onEvent方法:
public void onEvent(Event event) { if (isFixedPolling()) { // ignore } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent)event; scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } }
因此到了这里,确定是修改了配置以后会有一个触发点去出发该事件,当匹配上事件类型,那么就会去执行这个回调,这个事件的实现方法中判断事件类型是否为LocalDataChangeEvent,经过scheduler.execute执行DataChangeTask这个任务:
public void run() { try { ConfigService.getContentBetaMd5(groupKey); for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); if (clientSub.clientMd5Map.containsKey(groupKey)) { // 若是beta发布且不在beta列表直接跳过 if (isBeta && !betaIps.contains(clientSub.ip)) { continue; } // 若是tag发布且不在tag列表直接跳过 if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) { continue; } getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // 删除订阅关系 LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); clientSub.sendResponse(Arrays.asList(groupKey)); } } } catch (Throwable t) { LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause()); } }
这个是数据变化的任务,最让人兴奋的应该是,它里面有一个循环迭代器,从allSubs里面得到ClientLongPolling最后经过clientSub.sendResponse把数据返回到客户端。因此,这也就可以理解为什么数据变化可以实时触发更新了。
那么接下来还有一个疑问是,数据变化以后是如何触发事件的呢? 因此咱们定位到数据变化的请求类中,在ConfigController这个类中,找到POST请求的方法找到配置变动的位置:
/** * 增长或更新非聚合数据。 * * @throws NacosException */ @RequestMapping(method = RequestMethod.POST) @ResponseBody public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam("content") String content, @RequestParam(value = "tag", required = false) String tag, @RequestParam(value = "appName", required = false) String appName, @RequestParam(value = "src_user", required = false) String srcUser, @RequestParam(value = "config_tags", required = false) String configTags, @RequestParam(value = "desc", required = false) String desc, @RequestParam(value = "use", required = false) String use, @RequestParam(value = "effect", required = false) String effect, @RequestParam(value = "type", required = false) String type, @RequestParam(value = "schema", required = false) String schema) throws NacosException { final String srcIp = RequestUtil.getRemoteIp(request); String requestIpApp = RequestUtil.getAppName(request); ParamUtils.checkParam(dataId, group, "datumId", content); ParamUtils.checkParam(tag); Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10); ......//省略代码 final Timestamp time = TimeUtils.getCurrentTime(); String betaIps = request.getHeader("betaIps"); ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content); if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } } else { // beta publish persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime())); } ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content); return true; }
发现数据持久化以后,会经过EventDispatcher进行事件发布EventDispatcher.fireEvent 可是这个事件彷佛不是咱们所关心的时间,缘由是这里发布的事件是ConfigDataChangeEvent , 而LongPollingService感兴趣的事件是 LocalDataChangeEvent。
在Nacos中有一个DumpService,它会定时把变动后的数据dump到磁盘上,DumpService在spring启动以后,会调用init方法启动几个dump任务。而后在任务执行结束以后,会触发一个LocalDataChangeEvent 的事件:
@PostConstruct public void init() { LogUtil.defaultLog.warn("DumpService start"); DumpProcessor processor = new DumpProcessor(this); DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this); DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this); DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);
......//省略代码
}
其中在 DumpProcessor的 process方法中会调用 ConfigService 的相关API对数据进行操做,其中调用 remove 后会传播这么一个事件:
/** * 删除配置文件,删除缓存。 */ static public boolean remove(String dataId, String group, String tenant) { final String groupKey = GroupKey2.getKey(dataId, group, tenant); final int lockResult = tryWriteLock(groupKey); /** * 数据不存在 */ if (0 == lockResult) { dumpLog.info("[remove-ok] {} not exist.", groupKey); return true; } /** * 加锁失败 */ if (lockResult < 0) { dumpLog.warn("[remove-error] write lock failed. {}", groupKey); return false; } try { if (!STANDALONE_MODE || PropertyUtil.isStandaloneUseMysql()) { DiskUtil.removeConfigInfo(dataId, group, tenant); } CACHE.remove(groupKey); EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey)); return true; } finally { releaseWriteLock(groupKey); } }
简单总结一下刚刚分析的整个过程。
因此总的来讲,Nacos采用推+拉的形式,来解决最开始关于长轮训时间间隔的问题。固然,30s这个时间是能够设置的,而之因此定30s,应该是一个经验值。
Nacos支持集群模式,很显然。而一旦涉及到集群,就涉及到主从,那么nacos是一种什么样的机制来实现的集群呢?
nacos的集群相似于zookeeper, 它分为leader角色和follower角色, 那么从这个角色的名字能够看出来,这个集群存在选举的机制。 由于若是本身不具有选举功能,角色的命名可能就是master/slave了,
Nacos集群采用 raft 算法来实现,它是相对zookeeper的选举算法较为简单的一种。选举算法的核心在 RaftCore 中,包括数据的处理和数据同步.
raft算法动画演示地址:http://thesecretlivesofdata.com/raft/ 。能够很直观的看到整个算法选举的过程。
在Raft中,节点有三种角色:
选举分为两个节点:
全部节点启动的时候,都是follower状态。 若是在一段时间内若是没有收到leader的心跳(多是没有leader,也多是leader挂了),那么follower会变成Candidate。而后发起选举,选举以前,会增长term,这个term和zookeeper中的epoch的道理是同样的。
follower会投本身一票,而且给其余节点发送票据vote,等到其余节点回复在这个过程当中,可能出现几种状况
约束条件在任一term中,单个节点最多只能投一票
选举的几种状况:
在动画演示中能够看到选举超时后,即每一个小球外围都变化先消失的座位候选人,接着发出请求让其余人投票选举本身,同时修改Term:
与Zookeeper同样,对于事务操做,请求会转发给leader,非事务操做上,能够任意一个节点来处理.