Alibaba Nacos配置中心服务端处理源码

服务端收到客户端的配置变动请求查询的长轮训请求以后,服务端怎么来处理这个长轮训呢?算法

长轮训的时间间隔

上节课讲到了配置更新的整个原理及源码,咱们知道客户端会有一个长轮训的任务去检查服务器端的配置是否发生了变化,若是发生了变动,那么客户端会拿到变动的 groupKey 再根据 groupKey 去获取配置项的最新值更新到本地的缓存以及文件中,那么这种每次都靠客户端去请求,那请求的时间间隔设置多少合适呢?spring

若是间隔时间设置的太长的话有可能没法及时获取服务端的变动,若是间隔时间设置的过短的话,那么频繁的请求对于服务端来讲无疑也是一种负担,因此最好的方式是客户端每隔一段长度适中的时间去服务端请求,而在这期间若是配置发生变动,服务端可以主动将变动后的结果推送给客户端,这样既能保证客户端可以实时感知到配置的变化,也下降了服务端的压力。 咱们来看看nacos设置的间隔时间是多久apache

长轮训的概念

那么在讲解原理以前,先给你们解释一下什么叫长轮训json

客户端发起一个请求到服务端,服务端收到客户端的请求后,并不会马上响应给客户端,而是先把这个请求hold住,而后服务端会在hold住的这段时间检查数据是否有更新,若是有,则响应给客户端,若是一直没有数据变动,则达到必定的时间(长轮训时间间隔)才返回。api

长轮训典型的场景有: 扫码登陆、扫码支付。缓存

1564895144314

客户端长轮训

回到咱们昨天上课讲到的代码,在ClientWorker这个类里面,找到checkUpdateConfigStr这个方法,这里面就是去服务器端查询发生变化的groupKey。bash

123456789101112131415161718192021222324252627282930313233343536复制代码
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {        List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);        List<String> headers = new ArrayList<String>(2);        headers.add("Long-Pulling-Timeout");        headers.add("" + timeout);        // told server do not hang me up if new initializing cacheData added in        if (isInitializingCacheList) {            headers.add("Long-Pulling-Timeout-No-Hangup");            headers.add("true");        }        if (StringUtils.isBlank(probeUpdateString)) {            return Collections.emptyList();        }        try {            HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,                agent.getEncode(), timeout);            if (HttpURLConnection.HTTP_OK == result.code) {                setHealthServer(true);                return parseUpdateDataIdResponse(result.content);            } else {                setHealthServer(false);                LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);            }        } catch (IOException e) {            setHealthServer(false);            LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);            throw e;        }        return Collections.emptyList();    }复制代码

这个方法最终会发起http请求,注意这里面有一个timeout的属性,服务器

12复制代码
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,                agent.getEncode(), timeout);复制代码

timeout是在init这个方法中赋值的,默认状况下是30秒,能够经过configLongPollTimeout进行修改mvc

12345复制代码
private void init(Properties properties) {        this.timeout = (long)Math.max(NumberUtils.toInt(properties.getProperty("configLongPollTimeout"), 30000), 10000);        this.taskPenaltyTime = NumberUtils.toInt(properties.getProperty("configRetryTime"), 2000);        this.enableRemoteSyncConfig = Boolean.parseBoolean(properties.getProperty("enableRemoteSyncConfig"));    }复制代码

因此从这里得出的一个基本结论是app

客户端发起一个轮询请求,超时时间是30s。 那么客户端为何要等待30s才超时呢?不是越快越好吗?

客户端长轮训的时间间隔

咱们能够在nacos的日志目录下$NACOS_HOME/nacos/logs/config-client-request.log文件

123456复制代码
2019-08-04 13:22:19,736|0|nohangup|127.0.0.1|polling|1|55|02019-08-04 13:22:49,443|29504|timeout|127.0.0.1|polling|1|552019-08-04 13:23:18,983|29535|timeout|127.0.0.1|polling|1|552019-08-04 13:23:48,493|29501|timeout|127.0.0.1|polling|1|552019-08-04 13:24:18,003|29500|timeout|127.0.0.1|polling|1|552019-08-04 13:24:47,509|29501|timeout|127.0.0.1|polling|1|55复制代码

能够看到一个现象,在配置没有发生变化的状况下,客户端会等29.5s以上,才请求到服务器端的结果。而后客户端拿到服务器端的结果以后,在作后续的操做。

若是在配置变动的状况下,因为客户端基于长轮训的链接保持,因此返回的时间会很是的短,咱们能够作个小实验,在nacos console中频繁修改数据而后再观察一下

config-client-request.log的变化

12345复制代码
2019-08-04 13:30:17,016|0|in-advance|127.0.0.1|polling|1|55|example+DEFAULT_GROUP2019-08-04 13:30:17,022|3|null|127.0.0.1|get|example|DEFAULT_GROUP||e10e4d5973c497e490a8d7a9e4e9be64|unknown2019-08-04 13:30:20,807|10|true|0:0:0:0:0:0:0:1|publish|example|DEFAULT_GROUP||81360b7e732a5dbb37d62d81cebb85d2|null2019-08-04 13:30:20,843|0|in-advance|127.0.0.1|polling|1|55|example+DEFAULT_GROUP2019-08-04 13:30:20,848|1|null|127.0.0.1|get|example|DEFAULT_GROUP||81360b7e732a5dbb37d62d81cebb85d2|unknown复制代码

1564896925676

服务端的处理

分析完客户端以后,随着好奇心的驱使,服务端是如何处理客户端的请求的?那么一样,咱们须要思考几个问题

  • 客户端的长轮训响应时间受到哪些因素的影响
  • 客户端的超时时间为何要设置30s

客户端发送的请求地址是:/v1/cs/configs/listener 找到服务端对应的方法

ConfigController

nacos是使用spring mvc提供的rest api。这里面会调用inner.doPollingConfig进行处理

123456789101112131415161718192021复制代码
@RequestMapping(value = "/listener", method = RequestMethod.POST)    public void listener(HttpServletRequest request, HttpServletResponse response)        throws ServletException, IOException {        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);        String probeModify = request.getParameter("Listening-Configs");        if (StringUtils.isBlank(probeModify)) {            throw new IllegalArgumentException("invalid probeModify");        }        probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);        Map<String, String> clientMd5Map;        try {            clientMd5Map = MD5Util.getClientMd5Map(probeModify);        } catch (Throwable e) {            throw new IllegalArgumentException("invalid probeModify");        }        // do long-polling        inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());    }复制代码

doPollingConfig

这个方法中,兼容了长轮训和短轮询的逻辑,咱们只须要关注长轮训的部分。再次进入到longPollingService.addLongPollingClient

12345678910111213141516171819202122232425262728293031323334353637383940复制代码
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,                                  Map<String, String> clientMd5Map, int probeRequestSize)        throws IOException, ServletException {        // 长轮询        if (LongPollingService.isSupportLongPolling(request)) {            longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);            return HttpServletResponse.SC_OK + "";        }        // else 兼容短轮询逻辑        List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);        // 兼容短轮询result        String oldResult = MD5Util.compareMd5OldResult(changedGroups);        String newResult = MD5Util.compareMd5ResultString(changedGroups);        String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);        if (version == null) {            version = "2.0.0";        }        int versionNum = Protocol.getVersionNumber(version);        /**         * 2.0.4版本之前, 返回值放入header中         */        if (versionNum < START_LONGPOLLING_VERSION_NUM) {            response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);            response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);        } else {            request.setAttribute("content", newResult);        }        // 禁用缓存        response.setHeader("Pragma", "no-cache");        response.setDateHeader("Expires", 0);        response.setHeader("Cache-Control", "no-cache,no-store");        response.setStatus(HttpServletResponse.SC_OK);        return HttpServletResponse.SC_OK + "";    }复制代码

longPollingService.addLongPollingClient

从方法名字上能够推测出,这个方法应该是把客户端的长轮训请求添加到某个任务中去。

  • 得到客户端传递过来的超时时间,而且进行本地计算,提早500ms返回响应,这就能解释为何客户端响应超时时间是29.5+了。固然若是isFixedPolling=true的状况下,不会提早返回响应
  • 根据客户端请求过来的md5和服务器端对应的group下对应内容的md5进行比较,若是不一致,则经过generateResponse将结果返回
  • 若是配置文件没有发生变化,则经过scheduler.execute 启动了一个定时任务,将客户端的长轮询请求封装成一个叫 ClientLongPolling 的任务,交给 scheduler 去执行
12345678910111213141516171819202122232425262728293031323334353637383940复制代码
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,                                     int probeRequestSize) {        //str表示超时时间,也就是timeout        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);        String tag = req.getHeader("Vipserver-Tag");        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);        /**         * 提早500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance         */        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);        if (isFixedPolling()) {            timeout = Math.max(10000, getFixedPollingInterval());            // do nothing but set fix polling timeout        } else {            long start = System.currentTimeMillis();            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);            if (changedGroups.size() > 0) {                generateResponse(req, rsp, changedGroups);                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",                    System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",                    clientMd5Map.size(), probeRequestSize, changedGroups.size());                return;            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,                    changedGroups.size());                return;            }        }        String ip = RequestUtil.getRemoteIp(req);        // 必定要由HTTP线程调用,不然离开后容器会当即发送响应        final AsyncContext asyncContext = req.startAsync();        // AsyncContext.setTimeout()的超时时间不许,因此只能本身控制        asyncContext.setTimeout(0L);        scheduler.execute(            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));    }复制代码

ClientLongPolling

接下来咱们来分析一下,clientLongPolling到底作了什么操做。或者说咱们能够先猜想一下应该会作什么事情

  • 这个任务要阻塞29.5s才能执行,由于立马执行没有任何意义,毕竟前面已经执行过一次了
  • 若是在29.5s+以内,数据发生变化,须要提早通知。须要有一种监控机制

基于这些猜测,咱们能够看看它的实现过程

从代码粗粒度来看,它的实现彷佛和咱们的猜测一致,在run方法中,经过scheduler.schedule实现了一个定时任务,它的delay时间正好是前面计算的29.5s。在这个任务中,会经过MD5Util.compareMd5来进行计算

那另一个,当数据发生变化之后,确定不能等到29.5s以后才通知呀,那怎么办呢?咱们发现有一个allSubs的东西,它彷佛和发布订阅有关系。那是否是有可能当前的clientLongPolling订阅了数据变化的事件呢?

12345678910111213141516171819202122232425262728293031323334353637383940414243复制代码
public void run() {    asyncTimeoutFuture = scheduler.schedule(new Runnable() {        @Override        public void run() {            try {                getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMil·1·s());                /**                 * 删除订阅关系                 */                allSubs.remove(ClientLongPolling.this);                if (isFixedPolling()) {                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",                                           (System.currentTimeMillis() - createTime),                                           "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),                                           "polling",                                           clientMd5Map.size(), probeRequestSize);                    List<String> changedGroups = MD5Util.compareMd5(                        (HttpServletRequest)asyncContext.getRequest(),                        (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);                    if (changedGroups.size() > 0) {                        sendResponse(changedGroups);                    } else {                        sendResponse(null);                    }                } else {                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",                                           (System.currentTimeMillis() - createTime),                                           "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),                                           "polling",                                           clientMd5Map.size(), probeRequestSize);                    sendResponse(null);                }            } catch (Throwable t) {                LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());            }        }    }, timeoutTime, TimeUnit.MILLISECONDS);    allSubs.add(this);}复制代码

allSubs

allSubs是一个队列,队列里面放了ClientLongPolling这个对象。这个队列彷佛和配置变动有某种关联关系

123456复制代码
/** * 长轮询订阅关系 */final Queue<ClientLongPolling> allSubs;allSubs.add(this);复制代码

那这个时候,个人第一想法是,先去看一下当前这个类的类图,发现LongPollingService集成了AbstractEventListener,事件监听?果真没猜错。

1564902541390

AbstractEventListener

这里面有一个抽象的onEvent方法,明显是用来处理事件的方法,而抽象方法必须由子类实现,因此意味着LongPollingService里面确定实现了onEvent方法

1234567891011121314151617181920212223复制代码
static public abstract class AbstractEventListener {        public AbstractEventListener() {            /**             * automatic register             */            EventDispatcher.addEventListener(this);        }        /**         * 感兴趣的事件列表         *         * @return event list         */        abstract public List<Class<? extends Event>> interest();        /**         * 处理事件         *         * @param event event         */        abstract public void onEvent(Event event);    }复制代码

LongPollingService.onEvent

这个事件的实现方法中

  • 判断事件类型是否为LocalDataChangeEvent
  • 经过scheduler.execute执行DataChangeTask这个任务
1234567891011复制代码
@Override    public void onEvent(Event event) {        if (isFixedPolling()) {            // ignore        } else {            if (event instanceof LocalDataChangeEvent) {                LocalDataChangeEvent evt = (LocalDataChangeEvent)event;                scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));            }        }    }复制代码

DataChangeTask.run

从名字能够看出来,这个是数据变化的任务,最让人兴奋的应该是,它里面有一个循环迭代器,从allSubs里面得到ClientLongPolling

最后经过clientSub.sendResponse把数据返回到客户端。因此,这也就可以理解为什么数据变化可以实时触发更新了。

12345678910111213141516171819202122232425262728293031复制代码
public void run() {    try {        ConfigService.getContentBetaMd5(groupKey);        for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {            ClientLongPolling clientSub = iter.next();            if (clientSub.clientMd5Map.containsKey(groupKey)) {                // 若是beta发布且不在beta列表直接跳过                if (isBeta && !betaIps.contains(clientSub.ip)) {                    continue;                }                // 若是tag发布且不在tag列表直接跳过                if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {                    continue;                }                getRetainIps().put(clientSub.ip, System.currentTimeMillis());                iter.remove(); // 删除订阅关系                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",                                       (System.currentTimeMillis() - changeTime),                                       "in-advance",                                       RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),                                       "polling",                                       clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);                clientSub.sendResponse(Arrays.asList(groupKey));            }        }    } catch (Throwable t) {        LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());    }}复制代码

那么接下来还有一个疑问是,数据变化以后是如何触发事件的呢? 因此咱们定位到数据变化的请求类中,在ConfigController这个类中,找到POST请求的方法

找到配置变动的位置, 发现数据持久化以后,会经过EventDispatcher进行事件发布EventDispatcher.fireEvent 可是这个事件彷佛不是咱们所关心的时间,缘由是这里发布的事件是ConfigDataChangeEvent, 而LongPollingService感兴趣的事件是LocalDataChangeEvent

1234567891011121314151617181920212223复制代码
@RequestMapping(method = RequestMethod.POST)    @ResponseBody    public Boolean publishConfig(...)        throws NacosException {        //省略部分代码        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);        if (StringUtils.isBlank(betaIps)) {            if (StringUtils.isBlank(tag)) {                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));            } else {                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));            }        } else { // beta publish            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);            EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));        }        ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),            LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);        return true;    }复制代码

后来我发现,在Nacos中有一个DumpService,它会定时把变动后的数据dump到磁盘上,DumpService在spring启动以后,会调用init方法启动几个dump任务。而后在任务执行结束以后,会触发一个LocalDataChangeEvent 的事件

1234567复制代码
@PostConstruct    public void init() {        LogUtil.defaultLog.warn("DumpService start");        DumpProcessor processor = new DumpProcessor(this);        DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);        DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this);        DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);复制代码

简单总结

简单总结一下刚刚分析的整个过程。

  • 客户端发起长轮训请求
  • 服务端收到请求之后,先比较服务端缓存中的数据是否相同,若是不通,则直接返回
  • 若是相同,则经过schedule延迟29.5s以后再执行比较
  • 为了保证当服务端在29.5s以内发生数据变化可以及时通知给客户端,服务端采用事件订阅的方式来监听服务端本地数据变化的事件,一旦收到事件,则触发DataChangeTask的通知,而且遍历allStubs队列中的ClientLongPolling,把结果写回到客户端,就完成了一次数据的推送
  • 若是 DataChangeTask 任务完成了数据的 “推送” 以后,ClientLongPolling 中的调度任务又开始执行了怎么办呢?
    很简单,只要在进行 “推送” 操做以前,先将原来等待执行的调度任务取消掉就能够了,这样就防止了推送操做写完响应数据以后,调度任务又去写响应数据,这时确定会报错的。因此,在ClientLongPolling方法中,最开始的一个步骤就是删除订阅事件

因此总的来讲,Nacos采用推+拉的形式,来解决最开始关于长轮训时间间隔的问题。固然,30s这个时间是能够设置的,而之因此定30s,应该是一个经验值。

集群选举问题

Nacos支持集群模式,很显然。

而一旦涉及到集群,就涉及到主从,那么nacos是一种什么样的机制来实现的集群呢?

nacos的集群相似于zookeeper, 它分为leader角色和follower角色, 那么从这个角色的名字能够看出来,这个集群存在选举的机制。 由于若是本身不具有选举功能,角色的命名可能就是master/slave了,固然这只是我基于这么多组件的命名的一个猜想

选举算法

Nacos集群采用raft算法来实现,它是相对zookeeper的选举算法较为简单的一种。

选举算法的核心在RaftCore 中,包括数据的处理和数据同步

raft算法演示地址

在Raft中,节点有三种角色:

  • Leader:负责接收客户端的请求
  • Candidate:用于选举Leader的一种角色
  • Follower:负责响应来自Leader或者Candidate的请求

选举分为两个节点

  • 服务启动的时候
  • leader挂了的时候

全部节点启动的时候,都是follower状态。 若是在一段时间内若是没有收到leader的心跳(多是没有leader,也多是leader挂了),那么follower会变成Candidate。而后发起选举,选举以前,会增长term,这个term和zookeeper中的epoch的道理是同样的。

  • follower会投本身一票,而且给其余节点发送票据vote,等到其余节点回复
  • 在这个过程当中,可能出现几种状况
    • 收到过半的票数经过,则成为leader
    • 被告知其余节点已经成为leader,则本身切换为follower
    • 一段时间内没有收到过半的投票,则从新发起选举
  • 约束条件在任一term中,单个节点最多只能投一票

选举的几种状况

  • 第一种状况,赢得选举以后,leader会给全部节点发送消息,避免其余节点触发新的选举
  • 第二种状况,好比有三个节点A B C。A B同时发起选举,而A的选举消息先到达C,C给A投了一票,当B的消息到达C时,已经不能知足上面提到的第一个约束,即C不会给B投票,而A和B显然都不会给对方投票。A胜出以后,会给B,C发心跳消息,节点B发现节点A的term不低于本身的term,知道有已经有Leader了,因而转换成follower
  • 第三种状况, 没有任何节点得到majority投票,多是平票的状况。加入总共有四个节点(A/B/C/D),Node C、Node D同时成为了candidate,但Node A投了NodeD一票,NodeB投了Node C一票,这就出现了平票 split vote的状况。这个时候你们都在等啊等,直到超时后从新发起选举。若是出现平票的状况,那么就延长了系统不可用的时间,所以raft引入了randomized election timeouts来尽可能避免平票状况

数据的处理

对于事务操做,请求会转发给leader

非事务操做上,能够任意一个节点来处理

下面这段代码摘自 RaftCore , 在发布内容的时候,作了两个事情

  • 若是当前的节点不是leader,则转发给leader节点处理
  • 若是是,则向全部节点发送onPublish
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970复制代码
public void signalPublish(String key, Record value) throws Exception {        if (!isLeader()) {            JSONObject params = new JSONObject();            params.put("key", key);            params.put("value", value);            Map<String, String> parameters = new HashMap<>(1);            parameters.put("key", key);            raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);            return;        }        try {            OPERATE_LOCK.lock();            long start = System.currentTimeMillis();            final Datum datum = new Datum();            datum.key = key;            datum.value = value;            if (getDatum(key) == null) {                datum.timestamp.set(1L);            } else {                datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());            }            JSONObject json = new JSONObject();            json.put("datum", datum);            json.put("source", peers.local());            onPublish(datum, peers.local());            final String content = JSON.toJSONString(json);            final CountDownLatch latch = new CountDownLatch(peers.majorityCount());            for (final String server : peers.allServersIncludeMyself()) {                if (isLeader(server)) {                    latch.countDown();                    continue;                }                final String url = buildURL(server, API_ON_PUB);                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {                    @Override                    public Integer onCompleted(Response response) throws Exception {                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {                            Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",                                datum.key, server, response.getStatusCode());                            return 1;                        }                        latch.countDown();                        return 0;                    }                    @Override                    public STATE onContentWriteCompleted() {                        return STATE.CONTINUE;                    }                });            }            if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {                // only majority servers return success can we consider this update success                Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);                throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);            }            long end = System.currentTimeMillis();            Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);        } finally {            OPERATE_LOCK.unlock();        }复制代码

#

(^U^)ノ~YO同窗,你已经看到告终尾吗?
看到这里,爱学习的你是否是在上班、学习和读文章的时候有什么问题想要大牛和大神解答呢?
因此,快来点击这里�(☄⊙ω⊙)☄
gper.club/answers/7e7…提出问题让大牛大神们给你解惑吧~

相关文章
相关标签/搜索