在上一篇文章当中,咱们讲解了NodeImpl在init方法里面会初始化话的动做,选举也是在这个方法里面进行的,这篇文章来从这个方法里详细讲一下选举的过程。html
因为我这里介绍的是如何实现的,因此请你们先看一下原理:SOFAJRaft 选举机制剖析 | SOFAJRaft 实现原理java
文章比较长,我也慢慢的写了半个月时间~node
我在这里只把有关选举的代码列举出来,其余的代码暂且忽略 NodeImpl#initapp
public boolean init(final NodeOptions opts) {
....
// Init timers
//设置投票计时器
this.voteTimer = new RepeatedTimer("JRaft-VoteTimer", this.options.getElectionTimeoutMs()) {
@Override
protected void onTrigger() {
//处理投票超时
handleVoteTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
//在必定范围内返回一个随机的时间戳
return randomTimeout(timeoutMs);
}
};
//设置预投票计时器
//当leader在规定的一段时间内没有与 Follower 舰船进行通讯时,
// Follower 就能够认为leader已经不能正常担任旗舰的职责,则 Follower 能够去尝试接替leader的角色。
// 这段通讯超时被称为 Election Timeout
//候选者在发起投票以前,先发起预投票
this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer", this.options.getElectionTimeoutMs()) {
@Override
protected void onTrigger() {
handleElectionTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
//在必定范围内返回一个随机的时间戳
//为了不同时发起选举而致使失败
return randomTimeout(timeoutMs);
}
};
//leader下台的计时器
//定时检查是否须要从新选举leader
this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", this.options.getElectionTimeoutMs() >> 1) {
@Override
protected void onTrigger() {
handleStepDownTimeout();
}
};
....
if (!this.conf.isEmpty()) {
//新启动的node须要从新选举
stepDown(this.currTerm, false, new Status());
}
....
}
复制代码
在这个init方法里面会初始化三个计时器是和选举有关的:dom
RepeatedTimer的分析我已经写好了:2. SOFAJRaft源码分析—JRaft的定时任务调度器是怎么作的?ide
咱们先跟着init方法的思路往下看,通常来讲this.conf里面装的是整个集群的节点信息,是不会为空的,因此会调用stepDown,因此先从这个方法看起。源码分析
private void stepDown(final long term, final boolean wakeupCandidate, final Status status) {
LOG.debug("Node {} stepDown, term={}, newTerm={}, wakeupCandidate={}.", getNodeId(), this.currTerm, term,
wakeupCandidate);
//校验一下当前节点的状态是否有异常,或正在关闭
if (!this.state.isActive()) {
return;
}
//若是是候选者,那么中止选举
if (this.state == State.STATE_CANDIDATE) {
//调用voteTimer的stop方法
stopVoteTimer();
//若是当前状态是leader或TRANSFERRING
} else if (this.state.compareTo(State.STATE_TRANSFERRING) <= 0) {
//让启动的stepDownTimer中止运做
stopStepDownTimer();
//清空选票箱中的内容
this.ballotBox.clearPendingTasks();
// signal fsm leader stop immediately
if (this.state == State.STATE_LEADER) {
//发送leader下台的事件给其余Follower
onLeaderStop(status);
}
}
// reset leader_id
//重置当前节点的leader
resetLeaderId(PeerId.emptyPeer(), status);
// soft state in memory
this.state = State.STATE_FOLLOWER;
//重置Configuration的上下文
this.confCtx.reset();
updateLastLeaderTimestamp(Utils.monotonicMs());
if (this.snapshotExecutor != null) {
//中止当前的快照生成
this.snapshotExecutor.interruptDownloadingSnapshots(term);
}
//设置任期为大的那个
// meta state
if (term > this.currTerm) {
this.currTerm = term;
this.votedId = PeerId.emptyPeer();
//重设元数据信息保存到文件中
this.metaStorage.setTermAndVotedFor(term, this.votedId);
}
if (wakeupCandidate) {
this.wakingCandidate = this.replicatorGroup.stopAllAndFindTheNextCandidate(this.conf);
if (this.wakingCandidate != null) {
Replicator.sendTimeoutNowAndStop(this.wakingCandidate, this.options.getElectionTimeoutMs());
}
} else {
//把replicatorGroup里面的全部replicator标记为stop
this.replicatorGroup.stopAll();
}
//leader转移的时候会用到
if (this.stopTransferArg != null) {
if (this.transferTimer != null) {
this.transferTimer.cancel(true);
}
// There is at most one StopTransferTimer at the same term, it's safe to
// mark stopTransferArg to NULL
this.stopTransferArg = null;
}
//启动
this.electionTimer.start();
}
复制代码
一个leader的下台须要作不少交接的工做:ui
调用stopVoteTimer和stopStepDownTimer方法里面主要是调用相应的RepeatedTimer的stop方法,在stop方法里面会将stopped状态设置为ture,并将timeout设置为取消,并将这个timeout加入到cancelledTimeouts集合中去: 若是看了2. SOFAJRaft源码分析—JRaft的定时任务调度器是怎么作的?这篇文章的话,那么下面这段代码应该一看就明白是怎么回事了的。this
public void stop() {
this.lock.lock();
try {
if (this.stopped) {
return;
}
this.stopped = true;
if (this.timeout != null) {
this.timeout.cancel();
this.running = false;
this.timeout = null;
}
} finally {
this.lock.unlock();
}
}
复制代码
在调用NodeImpl的onLeaderStop方法中,其实是调用了FSMCallerImpl的onLeaderStop方法 NodeImpl#onLeaderStopspa
private void onLeaderStop(final Status status) {
this.replicatorGroup.clearFailureReplicators();
this.fsmCaller.onLeaderStop(status);
}
复制代码
FSMCallerImpl#onLeaderStop
public boolean onLeaderStop(final Status status) {
return enqueueTask((task, sequence) -> {
//设置当前task的状态为LEADER_STOP
task.type = TaskType.LEADER_STOP;
task.status = new Status(status);
});
}
private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
if (this.shutdownLatch != null) {
// Shutting down
LOG.warn("FSMCaller is stopped, can not apply new task.");
return false;
}
//使用Disruptor发布事件
this.taskQueue.publishEvent(tpl);
return true;
}
复制代码
这个方法里像taskQueue队列里面发布了一个LEADER_STOP事件,taskQueue是在FSMCallerImpl的init方法中被初始化的:
public boolean init(final FSMCallerOptions opts) {
.....
this.disruptor = DisruptorBuilder.<ApplyTask>newInstance() //
.setEventFactory(new ApplyTaskFactory()) //
.setRingBufferSize(opts.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-FSMCaller-Disruptor-", true)) //
.setProducerType(ProducerType.MULTI) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.build();
this.disruptor.handleEventsWith(new ApplyTaskHandler());
this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
this.taskQueue = this.disruptor.start();
.....
}
复制代码
在taskQueue中发布了一个任务以后会交给ApplyTaskHandler进行处理
ApplyTaskHandler
private class ApplyTaskHandler implements EventHandler<ApplyTask> {
// max committed index in current batch, reset to -1 every batch
private long maxCommittedIndex = -1;
@Override
public void onEvent(final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception {
this.maxCommittedIndex = runApplyTask(event, this.maxCommittedIndex, endOfBatch);
}
}
复制代码
每当有任务到达taskQueue队列的时候会调用ApplyTaskHandler的onEvent方法来处理事件,具体的执行逻辑由runApplyTask方法进行处理
FSMCallerImpl#runApplyTask
private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final boolean endOfBatch) {
CountDownLatch shutdown = null;
...
switch (task.type) {
...
case LEADER_STOP:
this.currTask = TaskType.LEADER_STOP;
doLeaderStop(task.status);
break;
...
}
....
}
复制代码
在runApplyTask方法里会对不少的事件进行处理,咱们这里只看LEADER_STOP是怎么作的:
在switch里会调用doLeaderStop方法,这个方法会调用到FSMCallerImpl里面封装的StateMachine状态机的onLeaderStart方法:
private void doLeaderStop(final Status status) {
this.fsm.onLeaderStop(status);
}
复制代码
这样就能够对leader中止时进行客制化的处理了。
接下来会调用resetLeaderId(PeerId.emptyPeer(), status);方法来重置leader
private void resetLeaderId(final PeerId newLeaderId, final Status status) {
if (newLeaderId.isEmpty()) {
//这个判断表示若是当前节点是候选者或者是Follower,而且已经有leader了
if (!this.leaderId.isEmpty() && this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
//向状态机装发布中止跟随该leader的事件
this.fsmCaller.onStopFollowing(new LeaderChangeContext(this.leaderId.copy(), this.currTerm, status));
}
//把当前的leader设置为一个空值
this.leaderId = PeerId.emptyPeer();
} else {
//若是当前节点没有leader
if (this.leaderId == null || this.leaderId.isEmpty()) {
//那么发布要跟随该leader的事件
this.fsmCaller.onStartFollowing(new LeaderChangeContext(newLeaderId, this.currTerm, status));
}
this.leaderId = newLeaderId.copy();
}
}
复制代码
这个方法由两个做用,若是传入的newLeaderId不是个空的,那么就会设置一个新的leader,并向状态机发送一个START_FOLLOWING事件;若是传入的newLeaderId是空的,那么就会发送一个STOP_FOLLOWING事件,并把当前的leader置空。
electionTimer是RepeatedTimer的实现类,在这里我就很少说了,上一篇文章已经介绍过了。
我这里来看看electionTimer的onTrigger方法是怎么处理选举事件的,electionTimer的onTrigger方法会调用NodeImpl的handleElectionTimeout方法,因此直接看这个方法:
NodeImpl#handleElectionTimeout
private void handleElectionTimeout() {
boolean doUnlock = true;
this.writeLock.lock();
try {
if (this.state != State.STATE_FOLLOWER) {
return;
}
//若是当前选举没有超时则说明此轮选举有效
if (isCurrentLeaderValid()) {
return;
}
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.",
this.leaderId));
doUnlock = false;
//预投票 (pre-vote) 环节
//候选者在发起投票以前,先发起预投票,
// 若是没有获得半数以上节点的反馈,则候选者就会识趣的放弃参选
preVote();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
复制代码
在这个方法里,首先会加上一个写锁,而后进行校验,最后先发起一个预投票。
校验的时候会校验当前的状态是否是follower,校验leader和follower上次的通讯时间是否是超过了ElectionTimeoutMs,若是没有超过,说明leader存活,不必发起选举;若是通讯超时,那么会将leader置空,而后调用预选举。
NodeImpl#isCurrentLeaderValid
private boolean isCurrentLeaderValid() {
return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getElectionTimeoutMs();
}
复制代码
用当前时间和上次leader通讯时间相减,若是小于ElectionTimeoutMs(默认1s),那么就没有超时,说明leader有效
咱们在handleElectionTimeout方法中最后调用了preVote方法,接下来重点看一下这个方法。
下面我将preVote拆分红几部分来进行讲解: NodeImpl#preVote part1
private void preVote() {
long oldTerm;
try {
LOG.info("Node {} term {} start preVote.", getNodeId(), this.currTerm);
if (this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
LOG.warn(
"Node {} term {} doesn't do preVote when installing snapshot as the configuration may be out of date.",
getNodeId());
return;
}
//conf里面记录了集群节点的信息,若是当前的节点不包含在集群里说明是由问题的
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", getNodeId(), this.conf);
return;
}
//设置一下当前的任期
oldTerm = this.currTerm;
} finally {
this.writeLock.unlock();
}
....
}
复制代码
这部分代码是一开始进到preVote这个方法首先要通过一些校验,例如当前的节点不能再安装快照的时候进行选举;查看一下当前的节点是否是在本身设置的conf里面,conf这个属性会包含了集群的全部节点;最后设置一下当前的任期后解锁。
NodeImpl#preVote part2
private void preVote() {
....
//返回最新的log实体类
final LogId lastLogId = this.logManager.getLastLogId(true);
boolean doUnlock = true;
this.writeLock.lock();
try {
// pre_vote need defense ABA after unlock&writeLock
//由于在上面没有从新加锁的间隙里可能会被别的线程改变了,因此这里校验一下
if (oldTerm != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
return;
}
//初始化预投票投票箱
this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
for (final PeerId peer : this.conf.listPeers()) {
//若是遍历的节点是当前节点就跳过
if (peer.equals(this.serverId)) {
continue;
}
//失联的节点也跳过
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
//设置一个回调的类
final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
//向被遍历到的这个节点发送一个预投票的请求
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(true) // it's a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm + 1) // next term,注意这里发送过去的任期会加一
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
this.rpcService.preVote(peer.getEndpoint(), done.request, done);
}
//本身也能够投给本身
this.prevVoteCtx.grant(this.serverId);
if (this.prevVoteCtx.isGranted()) {
doUnlock = false;
electSelf();
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
复制代码
这部分代码:
初始化预投票箱是调用了Ballot的init方法进行初始化,分别传入新的集群节点信息,和老的集群节点信息
public boolean init(Configuration conf, Configuration oldConf) {
this.peers.clear();
this.oldPeers.clear();
quorum = oldQuorum = 0;
int index = 0;
//初始化新的节点
if (conf != null) {
for (PeerId peer : conf) {
this.peers.add(new UnfoundPeerId(peer, index++, false));
}
}
//设置须要多少票数才能成为leader
this.quorum = this.peers.size() / 2 + 1;
....
return true;
}
复制代码
我这里为了使逻辑更清晰,假设没有oldConf,省略oldConf相关设置。 这个方法里会遍历全部的peer节点,并将peer封装成UnfoundPeerId插入到peers集合中;而后设置quorum属性,这个属性会在每得到一个投票就减1,当减到0如下时说明得到了足够多的票数,就表明预投票成功。
//设置一个回调的类
final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
//向被遍历到的这个节点发送一个预投票的请求
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(true) // it's a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm + 1) // next term,注意这里发送过去的任期会加一
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
this.rpcService.preVote(peer.getEndpoint(), done.request, done);
复制代码
在构造RequestVoteRequest的时候,会将PreVote属性设置为true,表示此次请求是预投票;设置当前节点为ServerId;传给对方的任期是当前节点的任期加一。最后在发送成功收到响应以后会回调OnPreVoteRpcDone的run方法。
OnPreVoteRpcDone#run
public void run(final Status status) {
NodeImpl.this.metrics.recordLatency("pre-vote", Utils.monotonicMs() - this.startMs);
if (!status.isOk()) {
LOG.warn("Node {} PreVote to {} error: {}.", getNodeId(), this.peer, status);
} else {
handlePreVoteResponse(this.peer, this.term, getResponse());
}
}
复制代码
在这个方法中若是收到正常的响应,那么会调用handlePreVoteResponse方法处理响应
OnPreVoteRpcDone#handlePreVoteResponse
public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
boolean doUnlock = true;
this.writeLock.lock();
try {
//只有follower才能够尝试发起选举
if (this.state != State.STATE_FOLLOWER) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.",
getNodeId(), peerId, this.state);
return;
}
if (term != this.currTerm) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", getNodeId(),
peerId, term, this.currTerm);
return;
}
//若是返回的任期大于当前的任期,那么此次请求也是无效的
if (response.getTerm() > this.currTerm) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", getNodeId(), peerId,
response.getTerm(), this.currTerm);
stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term pre_vote_response."));
return;
}
LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId,
response.getTerm(), response.getGranted());
// check granted quorum?
if (response.getGranted()) {
this.prevVoteCtx.grant(peerId);
//获得了半数以上的响应
if (this.prevVoteCtx.isGranted()) {
doUnlock = false;
//进行选举
electSelf();
}
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
复制代码
这里作了3重校验,咱们分别来谈谈:
校验完以后响应的节点会返回一个受权,若是受权经过的话则调用Ballot的grant方法,表示给当前的节点投一票
Ballot#grant
public void grant(PeerId peerId) {
this.grant(peerId, new PosHint());
}
public PosHint grant(PeerId peerId, PosHint hint) {
UnfoundPeerId peer = findPeer(peerId, peers, hint.pos0);
if (peer != null) {
if (!peer.found) {
peer.found = true;
this.quorum--;
}
hint.pos0 = peer.index;
} else {
hint.pos0 = -1;
}
....
return hint;
}
复制代码
grant方法会根据peerId去集群集合里面去找被封装的UnfoundPeerId实例,而后判断一下,若是没有被记录过,那么就将quorum减一,表示收到一票,而后将found设置为ture表示已经找过了。
在查找UnfoundPeerId实例的时候方法里面作了一个颇有趣的设置: 首先在存入到peers集合里面的时候是这样的:
int index = 0;
for (PeerId peer : conf) {
this.peers.add(new UnfoundPeerId(peer, index++, false));
}
复制代码
这里会遍历conf,而后会存入index,index从零开始。 而后在查找的时候会传入peerId和posHint还有peers集合:
private UnfoundPeerId findPeer(PeerId peerId, List<UnfoundPeerId> peers, int posHint) {
if (posHint < 0 || posHint >= peers.size() || !peers.get(posHint).peerId.equals(peerId)) {
for (UnfoundPeerId ufp : peers) {
if (ufp.peerId.equals(peerId)) {
return ufp;
}
}
return null;
}
return peers.get(posHint);
}
复制代码
这里传入的posHint默认是-1 ,即若是是第一次传入,那么会遍历整个peers集合,而后一个个比对以后返回。
由于PosHint实例会在调用完以后将pos0设置为peer的index,若是grant方法不是第一次调用,那么在调用findPeer方法的时候就能够直接经过get方法获取,不用再遍历整个集合了。
这种写法也能够运用到平时的代码中去。
调用了grant方法以后会调用Ballot的isGranted判断一下是否达到了半数以上的响应。 Ballot#isGranted
public boolean isGranted() {
return this.quorum <= 0 && oldQuorum <= 0;
}
复制代码
即判断一下投票箱里面的票数是否是被减到了0。若是返回是的话,那么就调用electSelf进行选举。
选举的方法暂时先不看,咱们来看看预选举的请求是怎么被处理的
RequestVoteRequest请求的处理器是在RaftRpcServerFactory的addRaftRequestProcessors方法中被安置的,具体的处理时RequestVoteRequestProcessor。
具体的处理方法是交由processRequest0方法来处理的。
RequestVoteRequestProcessor#processRequest0
public Message processRequest0(RaftServerService service, RequestVoteRequest request, RpcRequestClosure done) {
//若是是预选举
if (request.getPreVote()) {
return service.handlePreVoteRequest(request);
} else {
return service.handleRequestVoteRequest(request);
}
}
复制代码
由于这个处理器能够处理选举和预选举,因此加了个判断。预选举的方法交给NodeImpl的handlePreVoteRequest来具体实现的。
NodeImpl#handlePreVoteRequest
public Message handlePreVoteRequest(final RequestVoteRequest request) {
boolean doUnlock = true;
this.writeLock.lock();
try {
//校验这个节点是否是正常的节点
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
getNodeId(), this.state.name());
}
final PeerId candidateId = new PeerId();
//发送过来的request请求携带的ServerId格式不能错
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(),
request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.",
request.getServerId());
}
boolean granted = false;
// noinspection ConstantConditions
do {
//已经有leader的状况
if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) {
LOG.info(
"Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);
break;
}
//请求的任期小于当前的任期
if (request.getTerm() < this.currTerm) {
LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
// A follower replicator may not be started when this node become leader, so we must check it.
//那么这个节点也多是leader,因此校验一下请求的节点是否是复制节点,从新加入到replicatorGroup中
checkReplicator(candidateId);
break;
} else if (request.getTerm() == this.currTerm + 1) {
// A follower replicator may not be started when this node become leader, so we must check it.
// check replicator state
//由于请求的任期和当前的任期相等,那么这个节点也多是leader,
// 因此校验一下请求的节点是否是复制节点,从新加入到replicatorGroup中
checkReplicator(candidateId);
}
doUnlock = false;
this.writeLock.unlock();
//获取最新的日志
final LogId lastLogId = this.logManager.getLastLogId(true);
doUnlock = true;
this.writeLock.lock();
final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm());
//比较当前节点的日志完整度和请求节点的日志完整度
granted = requestLastLogId.compareTo(lastLogId) >= 0;
LOG.info(
"Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId,
lastLogId);
} while (false);//这个while蛮有意思,为了用break想尽了办法
return RequestVoteResponse.newBuilder() //
.setTerm(this.currTerm) //
.setGranted(granted) //
.build();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
复制代码
这个方法里面也是蛮有意思的,写的很长,可是逻辑很清楚。
这里有一个有意思的地方是,由于java中只能在循环中goto,因此这里使用了do-while(false)只作单次的循环,这样就能够do代码块里使用break了。
下面稍微看一下checkReplicator: NodeImpl#checkReplicator
private void checkReplicator(final PeerId candidateId) {
if (this.state == State.STATE_LEADER) {
this.replicatorGroup.checkReplicator(candidateId, false);
}
}
复制代码
这里判断一下是否是leader,而后就会调用ReplicatorGroupImpl的checkReplicator
ReplicatorGroupImpl#checkReplicator
private final ConcurrentMap<PeerId, ThreadId> replicatorMap = new ConcurrentHashMap<>();
private final Set<PeerId> failureReplicators = new ConcurrentHashSet<>();
public void checkReplicator(final PeerId peer, final boolean lockNode) {
//根据传入的peer获取相应的ThreadId
final ThreadId rid = this.replicatorMap.get(peer);
// noinspection StatementWithEmptyBody
if (rid == null) {
// Create replicator if it's not found for leader.
final NodeImpl node = this.commonOptions.getNode();
if (lockNode) {
node.writeLock.lock();
}
try {
//若是当前的节点是leader,而且传入的peer在failureReplicators中,那么从新添加到replicatorMap
if (node.isLeader() && this.failureReplicators.contains(peer) && addReplicator(peer)) {
this.failureReplicators.remove(peer);
}
} finally {
if (lockNode) {
node.writeLock.unlock();
}
}
} else { // NOPMD
// Unblock it right now.
// Replicator.unBlockAndSendNow(rid);
}
}
复制代码
checkReplicator会从replicatorMap根据传入的peer实例校验一下是否是为空。由于replicatorMap里面存放了集群全部的节点。而后经过ReplicatorGroupImpl的commonOptions获取当前的Node实例,若是当前的node实例是leader,而且若是存在失败集合failureReplicators中的话就从新添加进replicatorMap中。
ReplicatorGroupImpl#addReplicator
public boolean addReplicator(final PeerId peer) {
//校验当前的任期
Requires.requireTrue(this.commonOptions.getTerm() != 0);
//若是replicatorMap里面已经有这个节点了,那么将它从failureReplicators集合中移除
if (this.replicatorMap.containsKey(peer)) {
this.failureReplicators.remove(peer);
return true;
}
//赋值一个新的ReplicatorOptions
final ReplicatorOptions opts = this.commonOptions == null ? new ReplicatorOptions() : this.commonOptions.copy();
//新的ReplicatorOptions添加这个PeerId
opts.setPeerId(peer);
final ThreadId rid = Replicator.start(opts, this.raftOptions);
if (rid == null) {
LOG.error("Fail to start replicator to peer={}.", peer);
this.failureReplicators.add(peer);
return false;
}
return this.replicatorMap.put(peer, rid) == null;
}
复制代码
addReplicator里面主要是作了两件事:1. 将要加入的节点从failureReplicators集合里移除;2. 将要加入的节点放入到replicatorMap集合中去。
private void electSelf() {
long oldTerm;
try {
LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm);
//1. 若是当前节点不在集群里面则不进行选举
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf);
return;
}
//2. 大概是由于要进行正式选举了,把预选举关掉
if (this.state == State.STATE_FOLLOWER) {
LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);
this.electionTimer.stop();
}
//3. 清空leader
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT,
"A follower's leader_id is reset to NULL as it begins to request_vote."));
this.state = State.STATE_CANDIDATE;
this.currTerm++;
this.votedId = this.serverId.copy();
LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);
//4. 开始发起投票定时器,由于可能投票失败须要循环发起投票
this.voteTimer.start();
//5. 初始化投票箱
this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
oldTerm = this.currTerm;
} finally {
this.writeLock.unlock();
}
final LogId lastLogId = this.logManager.getLastLogId(true);
this.writeLock.lock();
try {
// vote need defense ABA after unlock&writeLock
if (oldTerm != this.currTerm) {
LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm);
return;
}
//6. 遍历全部节点
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(false) // It's not a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm) //
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
this.rpcService.requestVote(peer.getEndpoint(), done.request, done);
}
this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);
this.voteCtx.grant(this.serverId);
if (this.voteCtx.isGranted()) {
//7. 投票成功,那么就晋升为leader
becomeLeader();
}
} finally {
this.writeLock.unlock();
}
}
复制代码
不要看这个方法这么长,其实都是和前面预选举的方法preVote重复度很高的。方法太长,因此标了号,从上面号码来一步步讲解:
我先来看看RequestVoteRequestProcessor怎么处理的选举: 在RequestVoteRequestProcessor的processRequest0会调用NodeImpl的handleRequestVoteRequest来处理具体的逻辑。
NodeImpl#handleRequestVoteRequest
public Message handleRequestVoteRequest(final RequestVoteRequest request) {
boolean doUnlock = true;
this.writeLock.lock();
try {
//是否存活
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
getNodeId(), this.state.name());
}
final PeerId candidateId = new PeerId();
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(),
request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.",
request.getServerId());
}
// noinspection ConstantConditions
do {
// check term
if (request.getTerm() >= this.currTerm) {
LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
//1. 若是请求的任期大于当前任期
// increase current term, change state to follower
if (request.getTerm() > this.currTerm) {
stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term RequestVoteRequest."));
}
} else {
// ignore older term
LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
break;
}
doUnlock = false;
this.writeLock.unlock();
final LogId lastLogId = this.logManager.getLastLogId(true);
doUnlock = true;
this.writeLock.lock();
// vote need ABA check after unlock&writeLock
if (request.getTerm() != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
break;
}
//2. 判断日志完整度
final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm())
.compareTo(lastLogId) >= 0;
//3. 判断当前的节点是否是已经投过票了
if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {
stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE,
"Raft node votes for some candidate, step down to restart election_timer."));
this.votedId = candidateId.copy();
this.metaStorage.setVotedFor(candidateId);
}
} while (false);
return RequestVoteResponse.newBuilder() //
.setTerm(this.currTerm) //
//4.赞成投票的条件是当前的任期和请求的任期同样,而且已经将votedId设置为请求节点
.setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) //
.build();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
复制代码
这个方法大体也和handlePreVoteRequest差很少。我这里只分析一下我标注的。
投票完毕以后若是收到的票数大于一半,那么就会晋升为leader,调用becomeLeader方法。
private void becomeLeader() {
Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);
LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,
this.conf.getConf(), this.conf.getOldConf());
// cancel candidate vote timer
//晋升leader以后就会把选举的定时器关闭了
stopVoteTimer();
//设置当前的状态为leader
this.state = State.STATE_LEADER;
this.leaderId = this.serverId.copy();
//复制集群中设置新的任期
this.replicatorGroup.resetTerm(this.currTerm);
//遍历全部的集群节点
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
LOG.debug("Node {} add replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
//若是成为leader,那么须要把本身的日志信息复制到其余节点
if (!this.replicatorGroup.addReplicator(peer)) {
LOG.error("Fail to add replicator, peer={}.", peer);
}
}
// init commit manager
this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1);
// Register _conf_ctx to reject configuration changing before the first log
// is committed.
if (this.confCtx.isBusy()) {
throw new IllegalStateException();
}
this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());
//若是是leader了,那么就要定时的检查不是有资格胜任
this.stepDownTimer.start();
}
复制代码
这个方法里面首先会中止选举定时器,而后设置当前的状态为leader,并设值任期,而后遍历全部的节点将节点加入到复制集群中,最后将stepDownTimer打开,定时对leader进行校验是否是又半数以上的节点响应当前的leader。
好了,到这里就讲完了,但愿下次还能够see you again~