本文将按照《RocketMQ 多副本前置篇:初探raft协议》的思路来学习RocketMQ DLedger 多副本即主从切换选主逻辑。首先先回顾一下关于Leader的一些思考:java
> 舒适提示:本文是从源码的角度分析 DLedger 选主实现原理,可能比较鼓噪,文末给出了选主流程图。服务器
多副本模块相关的配置信息,例如集群节点信息。网络
节点状态机,即raft协议中的follower、candidate、leader三种状态的状态机实现。并发
DLedger客户端协议,主要定义以下三个方法,在后面的日志复制部分会重点阐述。app
DLedger服务端协议,主要定义以下三个方法。dom
DLedgerClientProtocolHandler、DLedgerProtocolHander协议处理器。异步
DLedger Server(节点)之间的网络通讯,默认基于Netty实现,其实现类为:DLedgerRpcNettyService。async
Leader选举实现器。源码分析
Dledger Server,Dledger节点的封装类。学习
接下来将从DLedgerLeaderElector开始剖析DLedger是如何实现Leader选举的。(基于raft协议)。
咱们先一一来介绍其属性的含义:
经过 DLedgerLeaderElector 的 startup 方法启动状态管理机,代码以下: DLedgerLeaderElector#startup
public void startup() { stateMaintainer.start(); // [@1](https://my.oschina.net/u/1198) for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) { // @2 roleChangeHandler.startup(); } }
代码@1:启动状态维护管理器。
代码@2:遍历状态改变监听器并启动它,可经过DLedgerLeaderElector 的 addRoleChangeHandler 方法增长状态变化监听器。
其中的是启动状态管理器线程,其run方法实现:
public void run() { while (running.get()) { try { doWork(); } catch (Throwable t) { if (logger != null) { logger.error("Unexpected Error in running {} ", getName(), t); } } } latch.countDown(); }
从上面来看,主要是循环调用doWork方法,接下来重点看其doWork的实现:
public void doWork() { try { if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) { // [@1](https://my.oschina.net/u/1198) DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig); // @2 DLedgerLeaderElector.this.maintainState(); // @3 } sleep(10); // @4 } catch (Throwable t) { DLedgerLeaderElector.logger.error("Error in heartbeat", t); } }
代码@1:若是该节点参与Leader选举,则首先调用@2重置定时器,而后驱动状态机(@3),是接下来重点须要剖析的。
代码@4:没执行一次选主,休息10ms。
DLedgerLeaderElector#maintainState
private void maintainState() throws Exception { if (memberState.isLeader()) { maintainAsLeader(); } else if (memberState.isFollower()) { maintainAsFollower(); } else { maintainAsCandidate(); } }
根据当前的状态机状态,执行对应的操做,从raft协议中可知,总共存在3种状态:
咱们在继续往下看以前,须要知道 memberState 的初始值是什么?咱们追溯到建立 MemberState 的地方,发现其初始状态为 CANDIDATE。那咱们接下从 maintainAsCandidate 方法开始跟进。
> 舒适提示:在raft协议中,节点的状态默认为follower,DLedger的实现从candidate开始,一开始,集群内的全部节点都会尝试发起投票,这样第一轮要达成选举几乎不太可能。
整个状态机的驱动,由线程反复执行maintainState方法。下面重点来分析其状态的驱动。
DLedgerLeaderElector#maintainAsCandidate
if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) { return; } long term; long ledgerEndTerm; long ledgerEndIndex;
Step1:首先先介绍几个变量的含义:
DLedgerLeaderElector#maintainAsCandidate
synchronized (memberState) { if (!memberState.isCandidate()) { return; } if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) { long prevTerm = memberState.currTerm(); term = memberState.nextTerm(); logger.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term); lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; } else { term = memberState.currTerm(); } ledgerEndIndex = memberState.getLedgerEndIndex(); ledgerEndTerm = memberState.getLedgerEndTerm(); }
Step2:初始化team、ledgerEndIndex 、ledgerEndTerm 属性,其实现关键点以下:
DLedgerLeaderElector#maintainAsCandidate
if (needIncreaseTermImmediately) { nextTimeToRequestVote = getNextTimeToRequestVote(); needIncreaseTermImmediately = false; return; }
Step3:若是needIncreaseTermImmediately为true,则重置该标记位为false,并从新设置下一次投票超时时间,其实现代码以下:
private long getNextTimeToRequestVote() { return System.currentTimeMillis() + lastVoteCost + minVoteIntervalMs + random.nextInt(maxVoteIntervalMs - minVoteIntervalMs); }
下一次倒计时:当前时间戳 + 上次投票的开销 + 最小投票间隔(300ms) + (1000- 300 )之间的随机值。
final List<completablefuture<voteresponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);
Step4:向集群内的其余节点发起投票请,并返回投票结果列表,稍后会重点分析其投票过程。能够预见,接下来就是根据各投票结果进行仲裁。
final AtomicLong knownMaxTermInGroup = new AtomicLong(-1); final AtomicInteger allNum = new AtomicInteger(0); final AtomicInteger validNum = new AtomicInteger(0); final AtomicInteger acceptedNum = new AtomicInteger(0); final AtomicInteger notReadyTermNum = new AtomicInteger(0); final AtomicInteger biggerLedgerNum = new AtomicInteger(0); final AtomicBoolean alreadyHasLeader = new AtomicBoolean(false);
Step5:在进行投票结果仲裁以前,先来介绍几个局部变量的含义:
for (CompletableFuture<voteresponse> future : quorumVoteResponses) { // 省略部分代码 }
Step5:遍历投票结果,收集投票结果,接下来重点看其内部实现。
if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) { validNum.incrementAndGet(); }
Step6:若是投票结果不是UNKNOW,则有效投票数量增1。
synchronized (knownMaxTermInGroup) { switch (x.getVoteResult()) { case ACCEPT: acceptedNum.incrementAndGet(); break; case REJECT_ALREADY_VOTED: break; case REJECT_ALREADY_HAS_LEADER: alreadyHasLeader.compareAndSet(false, true); break; case REJECT_TERM_SMALL_THAN_LEDGER: case REJECT_EXPIRED_VOTE_TERM: if (x.getTerm() > knownMaxTermInGroup.get()) { knownMaxTermInGroup.set(x.getTerm()); } break; case REJECT_EXPIRED_LEDGER_TERM: case REJECT_SMALL_LEDGER_END_INDEX: biggerLedgerNum.incrementAndGet(); break; case REJECT_TERM_NOT_READY: notReadyTermNum.incrementAndGet(); break; default: break; } }
Step7:统计投票结构,几个关键点以下:
try { voteLatch.await(3000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS); } catch (Throwable ignore) { }
Step8:等待收集投票结果,并设置超时时间。
lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs); VoteResponse.ParseResult parseResult; if (knownMaxTermInGroup.get() > term) { parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT; nextTimeToRequestVote = getNextTimeToRequestVote(); changeRoleToCandidate(knownMaxTermInGroup.get()); } else if (alreadyHasLeader.get()) { parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT; nextTimeToRequestVote = getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak; } else if (!memberState.isQuorum(validNum.get())) { parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; nextTimeToRequestVote = getNextTimeToRequestVote(); } else if (memberState.isQuorum(acceptedNum.get())) { parseResult = VoteResponse.ParseResult.PASSED; } else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) { parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY; } else if (memberState.isQuorum(acceptedNum.get() + biggerLedgerNum.get())) { parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; nextTimeToRequestVote = getNextTimeToRequestVote(); } else { parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT; nextTimeToRequestVote = getNextTimeToRequestVote(); }
Step9:根据收集的投票结果判断是否能成为Leader。 > 舒适提示:在讲解关键点以前,咱们先定义先将(当前时间戳 + 上次投票的开销 + 最小投票间隔(300ms) + (1000- 300 )之间的随机值)定义为“ 1个常规计时器”。
其关键点以下:
if (parseResult == VoteResponse.ParseResult.PASSED) { logger.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term); changeRoleToLeader(term); }
Step10:若是投票成功,则状态机状态设置为Leader,而后状态管理在驱动状态时会调用DLedgerLeaderElector#maintainState时,将进入到maintainAsLeader方法。
通过maintainAsCandidate 投票选举后,被其余节点选举成为领导后,会执行该方法,其余节点的状态仍是Candidate,并在计时器过时后,又尝试去发起选举。接下来重点分析成为Leader节点后,该节点会作些什么?
DLedgerLeaderElector#maintainAsLeader
private void maintainAsLeader() throws Exception { if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) { // @1 long term; String leaderId; synchronized (memberState) { if (!memberState.isLeader()) { // @2 //stop sending return; } term = memberState.currTerm(); leaderId = memberState.getLeaderId(); lastSendHeartBeatTime = System.currentTimeMillis(); // @3 } sendHeartbeats(term, leaderId); // @4 } }
代码@1:首先判断上一次发送心跳的时间与当前时间的差值是否大于心跳包发送间隔,若是超过,则说明须要发送心跳包。
代码@2:若是当前不是leader节点,则直接返回,主要是为了二次判断。
代码@3:重置心跳包发送计时器。
代码@4:向集群内的全部节点发送心跳包,稍后会详细介绍心跳包的发送。
当 Candidate 状态的节点在收到主节点发送的心跳包后,会将状态变动为follower,那咱们先来看一下在follower状态下,节点会作些什么事情?
private void maintainAsFollower() { if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2 * heartBeatTimeIntervalMs) { synchronized (memberState) { if (memberState.isFollower() && (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs)) { logger.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId()); changeRoleToCandidate(memberState.currTerm()); } } } }
若是maxHeartBeatLeak (默认为3)个心跳包周期内未收到心跳,则将状态变动为Candidate。
状态机的驱动就介绍到这里,在上面的流程中,其实咱们忽略了两个重要的过程,一个是发起投票请求与投票请求响应、发送心跳包与心跳包响应,那咱们接下来将重点介绍这两个过程。
节点的状态为 Candidate 时会向集群内的其余节点发起投票请求(我的以为理解为拉票更好),向对方询问是否愿意选举我为Leader,对端节点会根据本身的状况对其投同意票、拒绝票,若是是拒绝票,还会给出拒绝缘由,具体由voteForQuorumResponses、handleVote 这两个方法来实现,接下来咱们分别对这两个方法进行详细分析。
发起投票请求。
private List<completablefuture<voteresponse>> voteForQuorumResponses(long term, long ledgerEndTerm, long ledgerEndIndex) throws Exception { // @1 List<completablefuture<voteresponse>> responses = new ArrayList<>(); for (String id : memberState.getPeerMap().keySet()) { // @2 VoteRequest voteRequest = new VoteRequest(); // @3 start voteRequest.setGroup(memberState.getGroup()); voteRequest.setLedgerEndIndex(ledgerEndIndex); voteRequest.setLedgerEndTerm(ledgerEndTerm); voteRequest.setLeaderId(memberState.getSelfId()); voteRequest.setTerm(term); voteRequest.setRemoteId(id); CompletableFuture<voteresponse> voteResponse; // @3 end if (memberState.getSelfId().equals(id)) { // @4 voteResponse = handleVote(voteRequest, true); } else { //async voteResponse = dLedgerRpcService.vote(voteRequest); // @5 } responses.add(voteResponse); } return responses; }
代码@1:首先先解释一下参数的含义:
代码@2:遍历集群内的节点集合,准备异步发起投票请求。这个集合在启动的时候指定,不能修改。
代码@3:构建投票请求。
代码@4:若是是发送给本身的,则直接调用handleVote进行投票请求响应,若是是发送给集群内的其余节点,则经过网络发送投票请求,对端节点调用各自的handleVote对集群进行响应。
接下来重点关注 handleVote 方法,重点探讨其投票处理逻辑。
因为handleVote 方法会并发被调用,由于可能同时收到多个节点的投票请求,故本方法都被synchronized方法包含,锁定的对象为状态机 memberState 对象。
if (!memberState.isPeerMember(request.getLeaderId())) { logger.warn("[BUG] [HandleVote] remoteId={} is an unknown member", request.getLeaderId()); return CompletableFuture.completedFuture(newVoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNKNOWN_LEADER)); } if (!self && memberState.getSelfId().equals(request.getLeaderId())) { logger.warn("[BUG] [HandleVote] selfId={} but remoteId={}", memberState.getSelfId(), request.getLeaderId()); return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNEXPECTED_LEADER)); }
Step1:为了逻辑的完整性对其请求进行检验,除非有BUG存在,不然是不会出现上述问题的。
if (request.getTerm() < memberState.currTerm()) { // @1 return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM)); } else if (request.getTerm() == memberState.currTerm()) { // @2 if (memberState.currVoteFor() == null) { //let it go } else if (memberState.currVoteFor().equals(request.getLeaderId())) { //repeat just let it go } else { if (memberState.getLeaderId() != null) { return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY__HAS_LEADER)); } else { return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED)); } } } else { // @3 //stepped down by larger term changeRoleToCandidate(request.getTerm()); needIncreaseTermImmediately = true; //only can handleVote when the term is consistent return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY)); }
Step2:判断发起节点、响应节点维护的team进行投票“仲裁”,分以下3种状况讨论:
if (request.getLedgerEndTerm() < memberState.getLedgerEndTerm()) { return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM)); } else if (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && request.getLedgerEndIndex() < memberState.getLedgerEndIndex()) { return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX)); } if (request.getTerm() < memberState.getLedgerEndTerm()) { return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.getLedgerEndTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_SMALL_THAN_LEDGER)); }
Step3:判断请求节点的 ledgerEndTerm 与当前节点的 ledgerEndTerm,这里主要是判断日志的复制进度。
memberState.setCurrVoteFor(request.getLeaderId()); return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.ACCEPT));
Step4:通过层层条件帅选,将宝贵的同意票投给请求节点。
通过几轮投票,最终一个节点能成功被推举出来,选为主节点。主节点为了维持其领导地位,须要定时向从节点发送心跳包,接下来咱们重点看一下心跳包的发送与响应。
Step1:遍历集群中的节点,异步发送心跳包。
CompletableFuture<heartbeatresponse> future = dLedgerRpcService.heartBeat(heartBeatRequest); future.whenComplete((HeartBeatResponse x, Throwable ex) -> { try { if (ex != null) { throw ex; } switch (DLedgerResponseCode.valueOf(x.getCode())) { case SUCCESS: succNum.incrementAndGet(); break; case EXPIRED_TERM: maxTerm.set(x.getTerm()); break; case INCONSISTENT_LEADER: inconsistLeader.compareAndSet(false, true); break; case TERM_NOT_READY: notReadyNum.incrementAndGet(); break; default: break; } if (memberState.isQuorum(succNum.get()) || memberState.isQuorum(succNum.get() + notReadyNum.get())) { beatLatch.countDown(); } } catch (Throwable t) { logger.error("Parse heartbeat response failed", t); } finally { allNum.incrementAndGet(); if (allNum.get() == memberState.peerSize()) { beatLatch.countDown(); } } }); }
Step2:统计心跳包发送响应结果,关键点以下:
这些响应值,咱们在处理心跳包时重点探讨。
beatLatch.await(heartBeatTimeIntervalMs, TimeUnit.MILLISECONDS); if (memberState.isQuorum(succNum.get())) { // @1 lastSuccHeartBeatTime = System.currentTimeMillis(); } else { logger.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}", memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime)); if (memberState.isQuorum(succNum.get() + notReadyNum.get())) { // @2 lastSendHeartBeatTime = -1; } else if (maxTerm.get() > term) { // @3 changeRoleToCandidate(maxTerm.get()); } else if (inconsistLeader.get()) { // @4 changeRoleToCandidate(term); } else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs) { changeRoleToCandidate(term); } }
对收集的响应结果作仲裁,其实现关键点:
接下来咱们重点看一下心跳包的处理逻辑。
if (request.getTerm() < memberState.currTerm()) { return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode())); } else if (request.getTerm() == memberState.currTerm()) { if (request.getLeaderId().equals(memberState.getLeaderId())) { lastLeaderHeartBeatTime = System.currentTimeMillis(); return CompletableFuture.completedFuture(new HeartBeatResponse()); } }
Step1:若是主节点的 term 小于 从节点的term,发送反馈给主节点,告知主节点的 term 已过期;若是投票轮次相同,而且发送心跳包的节点是该节点的主节点,则返回成功。
下面重点讨论主节点的 term 大于从节点的状况。
synchronized (memberState) { if (request.getTerm() < memberState.currTerm()) { // @1 return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode())); } else if (request.getTerm() == memberState.currTerm()) { // @2 if (memberState.getLeaderId() == null) { changeRoleToFollower(request.getTerm(), request.getLeaderId()); return CompletableFuture.completedFuture(new HeartBeatResponse()); } else if (request.getLeaderId().equals(memberState.getLeaderId())) { lastLeaderHeartBeatTime = System.currentTimeMillis(); return CompletableFuture.completedFuture(new HeartBeatResponse()); } else { //this should not happen, but if happened logger.error("[{}][BUG] currTerm {} has leader {}, but received leader {}", memberState.getSelfId(), memberState.currTerm(), memberState.getLeaderId(), request.getLeaderId()); return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.INCONSISTENT_LEADER.getCode())); } } else { //To make it simple, for larger term, do not change to follower immediately //first change to candidate, and notify the state-maintainer thread changeRoleToCandidate(request.getTerm()); needIncreaseTermImmediately = true; //TOOD notify return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode())); } }
Step2:加锁来处理(这里更多的是从节点第一次收到主节点的心跳包)
代码@1:若是主节的投票轮次小于当前投票轮次,则返回主节点投票轮次过时。
代码@2:若是投票轮次相同。
代码@3:若是主节点的投票轮次大于从节点的投票轮次,则认为从节点并为准备好,则从节点进入Candidate 状态,并当即发起一次投票。
心跳包的处理就介绍到这里。
RocketMQ 多副本之 Leader 选举的源码分析就介绍到这里了,为了增强对源码的理解,先梳理流程图以下:
本文就介绍到这里了,若是对您有必定的帮助,麻烦帮忙点个赞,谢谢。
做者简介:《RocketMQ技术内幕》做者,RocketMQ 社区布道师,维护公众号:中间件兴趣圈,可扫描以下二维码与做者进行互动。