服务端收到客户端的配置变动请求查询的长轮训请求以后,服务端怎么来处理这个长轮训呢?算法
上节课讲到了配置更新的整个原理及源码,咱们知道客户端会有一个长轮训的任务去检查服务器端的配置是否发生了变化,若是发生了变动,那么客户端会拿到变动的 groupKey 再根据 groupKey 去获取配置项的最新值更新到本地的缓存以及文件中,那么这种每次都靠客户端去请求,那请求的时间间隔设置多少合适呢?spring
若是间隔时间设置的太长的话有可能没法及时获取服务端的变动,若是间隔时间设置的过短的话,那么频繁的请求对于服务端来讲无疑也是一种负担,因此最好的方式是客户端每隔一段长度适中的时间去服务端请求,而在这期间若是配置发生变动,服务端可以主动将变动后的结果推送给客户端,这样既能保证客户端可以实时感知到配置的变化,也下降了服务端的压力。 咱们来看看nacos设置的间隔时间是多久apache
那么在讲解原理以前,先给你们解释一下什么叫长轮训json
客户端发起一个请求到服务端,服务端收到客户端的请求后,并不会马上响应给客户端,而是先把这个请求hold住,而后服务端会在hold住的这段时间检查数据是否有更新,若是有,则响应给客户端,若是一直没有数据变动,则达到必定的时间(长轮训时间间隔)才返回。api
长轮训典型的场景有: 扫码登陆、扫码支付。缓存
回到咱们昨天上课讲到的代码,在ClientWorker这个类里面,找到checkUpdateConfigStr
这个方法,这里面就是去服务器端查询发生变化的groupKey。bash
123456789101112131415161718192021222324252627282930313233343536复制代码 |
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); List<String> headers = new ArrayList<String>(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.add("Long-Pulling-Timeout-No-Hangup"); headers.add("true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try { HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code); } } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }复制代码 |
这个方法最终会发起http请求,注意这里面有一个timeout
的属性,服务器
12复制代码 |
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout);复制代码 |
timeout是在init这个方法中赋值的,默认状况下是30秒,能够经过configLongPollTimeout进行修改mvc
12345复制代码 |
private void init(Properties properties) { this.timeout = (long)Math.max(NumberUtils.toInt(properties.getProperty("configLongPollTimeout"), 30000), 10000); this.taskPenaltyTime = NumberUtils.toInt(properties.getProperty("configRetryTime"), 2000); this.enableRemoteSyncConfig = Boolean.parseBoolean(properties.getProperty("enableRemoteSyncConfig")); }复制代码 |
因此从这里得出的一个基本结论是app
客户端发起一个轮询请求,超时时间是30s。 那么客户端为何要等待30s才超时呢?不是越快越好吗?
咱们能够在nacos的日志目录下$NACOS_HOME/nacos/logs/config-client-request.log
文件
123456复制代码 |
2019-08-04 13:22:19,736|0|nohangup|127.0.0.1|polling|1|55|02019-08-04 13:22:49,443|29504|timeout|127.0.0.1|polling|1|552019-08-04 13:23:18,983|29535|timeout|127.0.0.1|polling|1|552019-08-04 13:23:48,493|29501|timeout|127.0.0.1|polling|1|552019-08-04 13:24:18,003|29500|timeout|127.0.0.1|polling|1|552019-08-04 13:24:47,509|29501|timeout|127.0.0.1|polling|1|55复制代码 |
能够看到一个现象,在配置没有发生变化的状况下,客户端会等29.5s以上,才请求到服务器端的结果。而后客户端拿到服务器端的结果以后,在作后续的操做。
若是在配置变动的状况下,因为客户端基于长轮训的链接保持,因此返回的时间会很是的短,咱们能够作个小实验,在nacos console中频繁修改数据而后再观察一下
config-client-request.log
的变化
12345复制代码 |
2019-08-04 13:30:17,016|0|in-advance|127.0.0.1|polling|1|55|example+DEFAULT_GROUP2019-08-04 13:30:17,022|3|null|127.0.0.1|get|example|DEFAULT_GROUP||e10e4d5973c497e490a8d7a9e4e9be64|unknown2019-08-04 13:30:20,807|10|true|0:0:0:0:0:0:0:1|publish|example|DEFAULT_GROUP||81360b7e732a5dbb37d62d81cebb85d2|null2019-08-04 13:30:20,843|0|in-advance|127.0.0.1|polling|1|55|example+DEFAULT_GROUP2019-08-04 13:30:20,848|1|null|127.0.0.1|get|example|DEFAULT_GROUP||81360b7e732a5dbb37d62d81cebb85d2|unknown复制代码 |
分析完客户端以后,随着好奇心的驱使,服务端是如何处理客户端的请求的?那么一样,咱们须要思考几个问题
客户端发送的请求地址是:/v1/cs/configs/listener
找到服务端对应的方法
nacos是使用spring mvc提供的rest api。这里面会调用inner.doPollingConfig进行处理
123456789101112131415161718192021复制代码 |
@RequestMapping(value = "/listener", method = RequestMethod.POST) public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); String probeModify = request.getParameter("Listening-Configs"); if (StringUtils.isBlank(probeModify)) { throw new IllegalArgumentException("invalid probeModify"); } probeModify = URLDecoder.decode(probeModify, Constants.ENCODE); Map<String, String> clientMd5Map; try { clientMd5Map = MD5Util.getClientMd5Map(probeModify); } catch (Throwable e) { throw new IllegalArgumentException("invalid probeModify"); } // do long-polling inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); }复制代码 |
这个方法中,兼容了长轮训和短轮询的逻辑,咱们只须要关注长轮训的部分。再次进入到longPollingService.addLongPollingClient
12345678910111213141516171819202122232425262728293031323334353637383940复制代码 |
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException, ServletException { // 长轮询 if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; } // else 兼容短轮询逻辑 List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map); // 兼容短轮询result String oldResult = MD5Util.compareMd5OldResult(changedGroups); String newResult = MD5Util.compareMd5ResultString(changedGroups); String version = request.getHeader(Constants.CLIENT_VERSION_HEADER); if (version == null) { version = "2.0.0"; } int versionNum = Protocol.getVersionNumber(version); /** * 2.0.4版本之前, 返回值放入header中 */ if (versionNum < START_LONGPOLLING_VERSION_NUM) { response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult); response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult); } else { request.setAttribute("content", newResult); } // 禁用缓存 response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); response.setStatus(HttpServletResponse.SC_OK); return HttpServletResponse.SC_OK + ""; }复制代码 |
从方法名字上能够推测出,这个方法应该是把客户端的长轮训请求添加到某个任务中去。
isFixedPolling=true
的状况下,不会提早返回响应generateResponse
将结果返回scheduler.execute
启动了一个定时任务,将客户端的长轮询请求封装成一个叫 ClientLongPolling 的任务,交给 scheduler 去执行12345678910111213141516171819202122232425262728293031323334353637383940复制代码 |
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { //str表示超时时间,也就是timeout String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提早500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance */ long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // do nothing but set fix polling timeout } else { long start = System.currentTimeMillis(); List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // 必定要由HTTP线程调用,不然离开后容器会当即发送响应 final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout()的超时时间不许,因此只能本身控制 asyncContext.setTimeout(0L); scheduler.execute( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }复制代码 |
接下来咱们来分析一下,clientLongPolling到底作了什么操做。或者说咱们能够先猜想一下应该会作什么事情
基于这些猜测,咱们能够看看它的实现过程
从代码粗粒度来看,它的实现彷佛和咱们的猜测一致,在run方法中,经过scheduler.schedule实现了一个定时任务,它的delay时间正好是前面计算的29.5s。在这个任务中,会经过MD5Util.compareMd5来进行计算
那另一个,当数据发生变化之后,确定不能等到29.5s以后才通知呀,那怎么办呢?咱们发现有一个allSubs
的东西,它彷佛和发布订阅有关系。那是否是有可能当前的clientLongPolling订阅了数据变化的事件呢?
12345678910111213141516171819202122232425262728293031323334353637383940414243复制代码 |
public void run() { asyncTimeoutFuture = scheduler.schedule(new Runnable() { @Override public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMil·1·s()); /** * 删除订阅关系 */ allSubs.remove(ClientLongPolling.this); if (isFixedPolling()) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); List<String> changedGroups = MD5Util.compareMd5( (HttpServletRequest)asyncContext.getRequest(), (HttpServletResponse)asyncContext.getResponse(), clientMd5Map); if (changedGroups.size() > 0) { sendResponse(changedGroups); } else { sendResponse(null); } } else { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } catch (Throwable t) { LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause()); } } }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this);}复制代码 |
allSubs是一个队列,队列里面放了ClientLongPolling这个对象。这个队列彷佛和配置变动有某种关联关系
123456复制代码 |
/** * 长轮询订阅关系 */final Queue<ClientLongPolling> allSubs;allSubs.add(this);复制代码 |
那这个时候,个人第一想法是,先去看一下当前这个类的类图,发现LongPollingService集成了AbstractEventListener,事件监听?果真没猜错。
这里面有一个抽象的onEvent方法,明显是用来处理事件的方法,而抽象方法必须由子类实现,因此意味着LongPollingService里面确定实现了onEvent方法
1234567891011121314151617181920212223复制代码 |
static public abstract class AbstractEventListener { public AbstractEventListener() { /** * automatic register */ EventDispatcher.addEventListener(this); } /** * 感兴趣的事件列表 * * @return event list */ abstract public List<Class<? extends Event>> interest(); /** * 处理事件 * * @param event event */ abstract public void onEvent(Event event); }复制代码 |
这个事件的实现方法中
1234567891011复制代码 |
@Override public void onEvent(Event event) { if (isFixedPolling()) { // ignore } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent)event; scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } }复制代码 |
从名字能够看出来,这个是数据变化的任务,最让人兴奋的应该是,它里面有一个循环迭代器,从allSubs里面得到ClientLongPolling
最后经过clientSub.sendResponse把数据返回到客户端。因此,这也就可以理解为什么数据变化可以实时触发更新了。
12345678910111213141516171819202122232425262728293031复制代码 |
public void run() { try { ConfigService.getContentBetaMd5(groupKey); for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); if (clientSub.clientMd5Map.containsKey(groupKey)) { // 若是beta发布且不在beta列表直接跳过 if (isBeta && !betaIps.contains(clientSub.ip)) { continue; } // 若是tag发布且不在tag列表直接跳过 if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) { continue; } getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // 删除订阅关系 LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); clientSub.sendResponse(Arrays.asList(groupKey)); } } } catch (Throwable t) { LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause()); }}复制代码 |
那么接下来还有一个疑问是,数据变化以后是如何触发事件的呢? 因此咱们定位到数据变化的请求类中,在ConfigController这个类中,找到POST请求的方法
找到配置变动的位置, 发现数据持久化以后,会经过EventDispatcher进行事件发布EventDispatcher.fireEvent
可是这个事件彷佛不是咱们所关心的时间,缘由是这里发布的事件是ConfigDataChangeEvent
, 而LongPollingService感兴趣的事件是LocalDataChangeEvent
1234567891011121314151617181920212223复制代码 |
@RequestMapping(method = RequestMethod.POST) @ResponseBody public Boolean publishConfig(...) throws NacosException { //省略部分代码 ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content); if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } } else { // beta publish persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime())); } ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content); return true; }复制代码 |
后来我发现,在Nacos中有一个DumpService,它会定时把变动后的数据dump到磁盘上,DumpService在spring启动以后,会调用init方法启动几个dump任务。而后在任务执行结束以后,会触发一个LocalDataChangeEvent 的事件
1234567复制代码 |
@PostConstruct public void init() { LogUtil.defaultLog.warn("DumpService start"); DumpProcessor processor = new DumpProcessor(this); DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this); DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this); DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);复制代码 |
简单总结一下刚刚分析的整个过程。
因此总的来讲,Nacos采用推+拉的形式,来解决最开始关于长轮训时间间隔的问题。固然,30s这个时间是能够设置的,而之因此定30s,应该是一个经验值。
Nacos支持集群模式,很显然。
而一旦涉及到集群,就涉及到主从,那么nacos是一种什么样的机制来实现的集群呢?
nacos的集群相似于zookeeper, 它分为leader角色和follower角色, 那么从这个角色的名字能够看出来,这个集群存在选举的机制。 由于若是本身不具有选举功能,角色的命名可能就是master/slave了,固然这只是我基于这么多组件的命名的一个猜想
Nacos集群采用raft算法来实现,它是相对zookeeper的选举算法较为简单的一种。
选举算法的核心在RaftCore
中,包括数据的处理和数据同步
在Raft中,节点有三种角色:
选举分为两个节点
全部节点启动的时候,都是follower状态。 若是在一段时间内若是没有收到leader的心跳(多是没有leader,也多是leader挂了),那么follower会变成Candidate。而后发起选举,选举以前,会增长term,这个term和zookeeper中的epoch的道理是同样的。
选举的几种状况
对于事务操做,请求会转发给leader
非事务操做上,能够任意一个节点来处理
下面这段代码摘自 RaftCore , 在发布内容的时候,作了两个事情
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970复制代码 |
public void signalPublish(String key, Record value) throws Exception { if (!isLeader()) { JSONObject params = new JSONObject(); params.put("key", key); params.put("value", value); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters); return; } try { OPERATE_LOCK.lock(); long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } JSONObject json = new JSONObject(); json.put("datum", datum); json.put("source", peers.local()); onPublish(datum, peers.local()); final String content = JSON.toJSONString(json); final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) { latch.countDown(); continue; } final String url = buildURL(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, response.getStatusCode()); return 1; } latch.countDown(); return 0; } @Override public STATE onContentWriteCompleted() { return STATE.CONTINUE; } }); } if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) { // only majority servers return success can we consider this update success Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key); throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key); } long end = System.currentTimeMillis(); Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key); } finally { OPERATE_LOCK.unlock(); }复制代码 |
#
(^U^)ノ~YO同窗,你已经看到告终尾吗?
看到这里,爱学习的你是否是在上班、学习和读文章的时候有什么问题想要大牛和大神解答呢?
因此,快来点击这里�(☄⊙ω⊙)☄
gper.club/answers/7e7…提出问题让大牛大神们给你解惑吧~