在 zookeeper 集群中发生选举的场景有如下三种:java
本文主要针对集群启动时发生的选举实现进行分析。算法
ZK 集群中节点在启动时会调用QuorumPeer.start方法服务器
public synchronized void start() {
/** * 加载数据文件,获取 lastProcessedZxid, currentEpoch,acceptedEpoch */
loadDataBase();
/** * 启动主线程 用于处理客户端链接请求 */
cnxnFactory.start();
/** * 开始 leader 选举; 会相继建立选举算法的实现,建立当前节点与集群中其余节点选举通讯的网络IO,并启动相应工做线程 */
startLeaderElection();
/** * 启动 QuorumPeer 线程,监听当前节点服务状态 */
super.start();
}
复制代码
在 loadDataBase 方法中,ZK 会经过加载数据文件获取 lastProcessedZxid , 并经过读取 currentEpoch , acceptedEpoch 文件来获取相对应的值;若上述两文件不存在,则以 lastProcessedZxid 的高 32 位做为 currentEpoch , acceptedEpoch 值并写入对应文件中。网络
synchronized public void startLeaderElection() {
try {
// 建立投票
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
}
// 从集群中节点列表,查找当前节点与其余进行信息同步的地址
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// electionType == 3
this.electionAlg = createElectionAlgorithm(electionType);
}
复制代码
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
// 忽略其余算法的实现
case 3:
/** * 建立 QuorumCnxManager 实例,并启动 QuorumCnxManager.Listener 线程用于与集群中其余节点进行选举通讯; */
qcm = createCnxnManager();
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
/** * 建立选举算法 FastLeaderElection 实例 */
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
复制代码
初始节点的相关实例以后,执行 super.start() 方法,因 QuorumPeer 类继承 ZooKeeperThread 故会启动 QuorumPeer 线程并发
public void run() {
// 代码省略
try {
/* * Main loop */
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
// 只读模式下代码省略
} else {
try {
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
// 忽略其余状态下的处理逻辑
}
}
} finally {
}
}
复制代码
从上述代码能够看出 QuorumPeer 线程在运行过程当中轮询监听当前节点的状态并进行相应的逻辑处理,集群启动时节点状态为 LOOKING (也就是选举 Leader 过程),此时会调用 FastLeaderElection.lookForLeader 方法 (也是投票选举算法的核心)简化后源码以下:app
public Vote lookForLeader() throws InterruptedException {
// 忽略
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
// logicalclock 逻辑时钟加一
logicalclock.incrementAndGet();
/** * 更新提案信息,用于后续投票;集群启动节点默认选举自身为 Leader */
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
/** * 发送选举投票提案 */
sendNotifications();
/* * Loop in which we exchange notifications until we find a leader */
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/* * Remove next notification from queue, times out after 2 times * the termination time */
/** * 从 recvqueue 队列中获取外部节点的选举投票信息 */
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */
if(n == null){
/** * 检查上一次发送的选举投票信息是否所有发送; * 若已发送则从新在发送一遍,反之说明当前节点与集群中其余节点未链接,则执行 connectAll() 创建链接 */
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
/* * Exponential backoff */
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if(self.getVotingView().containsKey(n.sid)) {
/** * 只处理同一集群中节点的投票请求 */
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
/** * 外部投票选举周期大于当前节点选举周期 * * step1 : 更新选举周期值 * step2 : 清空已收到的选举投票数据 * step3 : 选举投票 PK,选举规则参见 totalOrderPredicate 方法 * step4 : 变动选举投票并发送 */
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
// 丢弃小于当前选举周期的投票
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
/** * 同一选举周期 * * step1 : 选举投票 PK,选举规则参见 totalOrderPredicate 方法 * step2 : 变动选举投票并发送 */
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
/** * 记录外部选举投票信息 */
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
/** * 统计选举投票结果,判断是否能够结束此轮选举 */
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// ......
if (n == null) {
/** * 选举结束判断当前节点状态; 若提案的 leader == myid 则 state = LEADING, 反之为 FOLLOWING */
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
// 变动当前投票信息
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
// ......
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
// ......
}
}
复制代码
从 lookForLeader 方法的实现能够看出,选举流程以下:socket
发送内部投票ide
内部投票发送逻辑参考后续小节oop
接收外部投票ui
接收外部投票逻辑参考后续小节
选举投票 PK
当接收到外部节点投票信息后会与内部投票信息进行 PK 已肯定投票优先权;PK 规则参见 totalOrderPredicate 方法以下
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
复制代码
从其实现能够看出选举投票 PK 规则以下:
* 比较外部投票与内部投票的选举周期值,选举周期大的值优先
* 若选举周期值一致,则比较事务 ID; 事务 ID 最新的优先
* 若选举周期值一致且事务 ID 值相同,则比较投票节点的 server id; server id 最大的优先
复制代码
统计选举投票
当接收到外部投票以后,都会统计下此轮选举的投票状况并判断是否可结束选举; 参考 termPredicate 方法
protected boolean termPredicate( HashMap<Long, Vote> votes, Vote vote) {
HashSet<Long> set = new HashSet<Long>();
/** * 统计接收的投票中与当前节点所推举 leader 投票一致的个数 */
for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())){
set.add(entry.getKey());
}
}
/** * 若是超过一半的投票一致 则说明能够终止本次选举 */
return self.getQuorumVerifier().containsQuorum(set);
}
复制代码
确认节点角色
当此轮选举结束以后,经过判断所推举的 leader server id 是否与当前节点 server id 相等; 若相等则说明当前节点为 leader, 反之为 follower。
上文中主要聊了下 ZK 选举算法的核心部分,下面接着看下集群节点在选举过程当中是如何发送本身的投票和接收外部的投票及相关处理逻辑。
首先经过 FastLeaderElection.sendNotifications 方法看下发送投票逻辑:
private void sendNotifications() {
for (QuorumServer server : self.getVotingView().values()) {
long sid = server.id;
/** * 发送投票通知信息 * * leader : 被推举的服务器 myid * zxid : 被推举的服务器 zxid * electionEpoch : 当前节点选举周期 * ServerState state : 当前节点状态 * sid : 消息接收方 myid * peerEpoch : 被推举的服务器 epoch */
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch);
/** * 将消息添加到队列 sendqueue 中; * * @see Messenger.WorkerSender sendqueue 队列会被 WorkerSender 消费 */
sendqueue.offer(notmsg);
}
}
复制代码
从实现能够看出节点在启动阶段会将自身信息封装为 ToSend 实例(也就是选举自身为 leader)并添加到队列 FastLeaderElection.sendqueue 中;那么此时咱们会问到 FastLeaderElection.sendqueue 队列中的消息被谁消费处理呢 ? 让咱们回过头看下节点在启动初始化选举环境时建立 QuorumCnxManager, FastLeaderElection 实例的过程。
PS : FastLeaderElection.sendqueue 队列中消息被谁消费 ?
public QuorumCnxManager(final long mySid, Map<Long,QuorumPeer.QuorumServer> view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled, ConcurrentHashMap<Long, SendWorker> senderWorkerMap) {
this.senderWorkerMap = senderWorkerMap;
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){
this.cnxTO = Integer.parseInt(cnxToValue);
}
this.mySid = mySid;
this.socketTimeout = socketTimeout;
this.view = view;
this.listenOnAllIPs = listenOnAllIPs;
initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
quorumSaslAuthEnabled);
listener = new Listener();
}
复制代码
在 QuorumCnxManager 实例化后,会启动一个 QuorumCnxManager.Listener 线程;同时在 QuorumCnxManager 实例中存在三个重要的集合容器变量:
下面咱们再看下 QuorumCnxManager.Listener 线程启动后,主要作了什么:
public void run() {
int numRetries = 0;
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
/** * 获取当前节点的选举地址并 bind 监听等待外部节点链接 */
addr = view.get(QuorumCnxManager.this.mySid).electionAddr;
ss.bind(addr);
while (!shutdown) {
/** * 接收外部节点链接并处理 */
Socket client = ss.accept();
setSockOpts(client);
receiveConnection(client);
numRetries = 0;
}
} catch (IOException e) {
LOG.error("Exception while listening", e);
numRetries++;
ss.close();
Thread.sleep(1000);
}
}
}
复制代码
跟踪代码发现 receiveConnection 方法最终会调用方法 handleConnection 以下
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
/** * 读取外部节点的 server id * ps : 此时的 server id 是何时发送的呢 ? */
Long sid = din.readLong();
if (sid < this.mySid) {
/** * 若外部节点的 server id 小于当前节点的 server id,则关闭此链接,改成由当前节点发起链接 * ps : 该限制说明选举过程当中,zk 只容许 server id 较大的一方去主动发起链接避免重复链接 */
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
closeSocket(sock);
connectOne(sid);
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
/** * 按 server id 分组,为外部节点分配 SendWorker, RecvWorker 和一个消息发送队列 */
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
/** * 启动外部节点对应的 SendWorker, RecvWorker 线程 */
sw.start();
rw.start();
return;
}
}
复制代码
至此会发现 QuorumCnxManager.Listener 线程处理逻辑以下:
PS : 此处咱们会有个疑问外部节点的 server id 是何时发送过来的呢 ?
下面咱们在看下为每一个外部节点开启了 SendWorker, RecvWorker 线程后作了什么:
public void run() {
// 省略
try {
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
/** * 经过 server id 获取待发送给集群中节点的消息队列 */
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
.get(sid);
if (bq != null) {
/** * 从队列中获取待发送的消息 */
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " +
"server " + sid);
break;
}
if(b != null){
lastMessageSent.put(sid, b);
/** * 写入 socket 的输出流完成消息的发送 */
send(b);
}
} catch (InterruptedException e) {
}
}
} catch (Exception e) {
}
}
synchronized void send(ByteBuffer b) throws IOException {
byte[] msgBytes = new byte[b.capacity()];
try {
b.position(0);
b.get(msgBytes);
} catch (BufferUnderflowException be) {
LOG.error("BufferUnderflowException ", be);
return;
}
/** * 发送的报文包括:消息体正文长度和消息体正文 */
dout.writeInt(b.capacity());
dout.write(b.array());
dout.flush();
}
复制代码
经过代码实现咱们知道 SendWorker 的职责就是从 queueSendMap 队列中获取待发送给远程节点的消息并执行发送。
PS : 此处咱们会有个疑问 QuorumCnxManager.queueSendMap 中节点对应队列中待发送的消息是谁生产的呢 ?
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
/** * 读取外部节点发送的消息 * 由 SendWorker 可知前 4 字节为消息载体有效长度 */
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException(
"Received packet with invalid packet: "
+ length);
}
/** * 读取消息体正文 */
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
/** * 将读取的消息包装为 Message 对象添加到队列 recvQueue 中 */
addToRecvQueue(new Message(message.duplicate(), sid));
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id = "
+ QuorumCnxManager.this.mySid + ", error = " , e);
} finally {
LOG.warn("Interrupting SendWorker");
sw.finish();
if (sock != null) {
closeSocket(sock);
}
}
}
public void addToRecvQueue(Message msg) {
synchronized(recvQLock) {
// 省略
try {
recvQueue.add(msg);
} catch (IllegalStateException ie) {
// This should never happen
LOG.error("Unable to insert element in the recvQueue " + ie);
}
}
}
复制代码
从上面能够看出 RecvWorker 线程在运行期间会接收 server id 对应的外部节点发送的消息,并将其放入 QuorumCnxManager.recvQueue 队列中。 到目前为止咱们基本完成对 QuorumCnxManager 核心功能的分析,发现其功能主要是负责集群中当前节点与外部节点进行选举通信的网络 IO 操做,譬如接收外部节点选举投票和向外部节点发送内部投票。
下面咱们在接着回头看下 FastLeaderElection 类实例的过程:
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
this.stop = false;
this.manager = manager;
starter(self, manager);
}
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}
复制代码
Messenger(QuorumCnxManager manager) {
/** * 启动 WorkerSender 线程用于发送消息 */
this.ws = new WorkerSender(manager);
Thread t = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
/** * 启动 WorkerReceiver 线程用于接收消息 */
this.wr = new WorkerReceiver(manager);
t = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
}
复制代码
从 FastLeaderElection 实例化过程咱们知道,其内部分别启动了线程 WorkerSender,WorkerReceiver ;那么接下来看下这两个线程具体作什么吧。
public void run() {
while (!stop) {
try {
/** * 从 sendqueue 队列中获取 ToSend 待发送的消息 */
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
void process(ToSend m) {
// 将 ToSend 转换为 40字节 ByteBuffer
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch);
// 交由 QuorumCnxManager 执行发送
manager.toSend(m.sid, requestBuffer);
}
复制代码
看了 WorkerSender 的实现是否是明白了什么? 还记得上文中 FastLeaderElection.sendNotifications 方法执行发送通知的时候的疑惑吗 ? FastLeaderElection.sendqueue 队列产生的消息就是被 WorkerSender 线程所消费处理, WorkerSender 会将消息转发至 QuorumCnxManager 处理
public void toSend(Long sid, ByteBuffer b) {
/* * If sending message to myself, then simply enqueue it (loopback). * 若是是发给本身的投票,则将其添加到接收队列中等待处理 */
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/* * Otherwise send to the corresponding thread to send. */
} else {
/* * Start a new connection if doesn't have one already. */
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq);
// 将发送的消息放入对应的队列中,若队列满了则将队列头部元素移除
if (bqExisting != null) {
addToSendQueue(bqExisting, b);
} else {
addToSendQueue(bq, b);
}
connectOne(sid);
}
}
private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buffer) {
// 省略
try {
// 将消息插入节点对应的队列中
queue.add(buffer);
} catch (IllegalStateException ie) {
}
}
复制代码
QuorumCnxManager 在收到 FastLeaderElection.WorkerSender 转发的消息时,会判断当前消息是否发给本身的投票,如果则将消息添加到接收队列中,反之会将消息添加到 queueSendMap 对应 server id 的队列中;看到这里的时候是否是就明白了在 QuorumCnxManager.SendWorker 分析时候的疑惑呢 。 这个时候投票消息未必可以发送出去,由于当前节点与外部节点的通道是否已创建还未知,因此继续执行 connectOne
synchronized public void connectOne(long sid){
/** * 判断当前服务节点是否与 sid 外部服务节点创建链接;有可能对方先发起链接 * 若已链接则等待后续处理,反之发起链接 */
if (!connectedToPeer(sid)){
InetSocketAddress electionAddr;
if (view.containsKey(sid)) {
electionAddr = view.get(sid).electionAddr;
} else {
LOG.warn("Invalid server id: " + sid);
return;
}
try {
LOG.debug("Opening channel to server " + sid);
Socket sock = new Socket();
setSockOpts(sock);
sock.connect(view.get(sid).electionAddr, cnxTO);
LOG.debug("Connected to server " + sid);
initiateConnection(sock, sid);
} catch (UnresolvedAddressException e) {
} catch (IOException e) {
}
} else {
LOG.debug("There is a connection already for server " + sid);
}
}
public boolean connectedToPeer(long peerSid) {
return senderWorkerMap.get(peerSid) != null;
}
复制代码
private boolean startConnection(Socket sock, Long sid) throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
/** * 发送当前节点的 server id,需告知对方我是哪台节点 */
dout = new DataOutputStream(sock.getOutputStream());
dout.writeLong(this.mySid);
dout.flush();
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// 只容许 sid 值大的服务器去主动和其余服务器链接,不然断开链接
if (sid > this.mySid) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + this.mySid + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}
复制代码
从上述代码能够看出节点在与外部节点链接后会先发送 myid 报文告知对方我是哪一个节点(这也是为何 QuorumCnxManager.Listener 线程在接收到一个链接请求时会先执行 getLong 获取 server id 了);一样在链接创建的时候也遵循一个原则(只容许 server id 较大的一方发起链接)。
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try{
/** * 从 QuorumCnxManager.recvQueue 队列中获取接收的外部投票 */
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
if(!self.getVotingView().containsKey(response.sid)){
// 忽略对方是观察者的处理
} else {
// Instantiate Notification and set its attributes
Notification n = new Notification();
// 将 message 转成 notification 对象
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
// 当前节点状态为 looking,则将外部节点投票添加到 recvqueue 队列中
recvqueue.offer(n);
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())){
// 若外部节点选举周期小于当前节点选举周期则发送内部投票
Vote v = getVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch());
sendqueue.offer(notmsg);
}
} else {
// 忽略其余状态时的处理
}
}
} catch (InterruptedException e) {
}
}
LOG.info("WorkerReceiver is down");
}
复制代码
此时咱们明白 WorkerReceiver 线程在运行期间会一直从 QuorumCnxManager.recvQueue 的队列中拉取接收到的外部投票信息,若当前节点为 LOOKING 状态,则将外部投票信息添加到 FastLeaderElection.recvqueue 队列中,等待 FastLeaderElection.lookForLeader 选举算法处理投票信息。
到此咱们基本明白了 ZK 集群节点发送和接收投票的处理流程,可是这个时候您是否是又有一种懵的状态呢 笑哭,咱们会发现选举过程当中依赖了多个线程 WorkerSender, SendWorker, WorkerReceiver, RecvWorker ,多个阻塞队列 sendqueue, recvqueue,queueSendMap,recvQueue 并且名字起的很相似,更让人懵 ; 不过莫慌,咱们来经过下面的图来缕下思路
看了这么长时间的代码, 最后咱们就来个小结吧 :
QuorumCnxManager 类主要职能是负责集群中节点与外部节点进行通讯及投票信息的中转
FastLeaderElection 类是选举投票的核心实现
选举投票规则
集群中节点通讯时为了不重复创建链接,遵照一个原则:链接老是由 server id 较大的一方发起
源码看着真累 o(╯□╰)o