zkServer.sh 中的main函数: org.apache.zookeeper.server.quorum.QuorumPeerMain;java
zkCli.sh 中的main函数: org.apache.zookeeper.ZooKeeperMain;算法
key:推送过来的服务实例 myid,表明哪一个服务实例的选票; apache
Value: Vote对象对应Notification, 标识该服务器推举的选票信息:
版本默认为 version = 0x0
leader的myid: leader,
leader的zxid: zxid,
leader的epoch: peerEpoch,
发送者的logicalclock: electionEpoch服务器
选票来自OBSERVING,抛弃;ide
选票来自FOLLOWING/LEADING:函数
断定 外票的选举轮次electionEpoch与本身自己的logicalclockthis
当一个新启动的节点加入集群时,它对集群内其余节点发出投票请求,而其余节点已不处于LOOKING状态,此时其余节点回应选举结果,该节点收集这些结果到outofelection中,最终在收到合法LEADER消息且这些选票也构成选举结束条件时,该节点就结束本身的选举行为。注意到代码中会logicalclock = n.electionEpoch;更新选举轮数。spa
选票来自LOOKING:
比较 选票发送方的选举序列electionEpoch 与 自己时序logicalclockdebug
将外票保存在选票集合<recvset>中,若是更新以后的本地推荐选票知足集群配置的有效策略(过半有效) 则 从队列recvqueue中循环获取poll,比较:code
处理完以后recvqueue没有了对象, 说明接收选票完结了,当前更新以后的本地推荐选票就是leader。肯定本身的角色,清空接收队列,返回该选票。
switch (n.state) { case LOOKING: // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock) { logicalclock = n.electionEpoch; recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock)); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if(n.electionEpoch == logicalclock){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(ooePredicate(recvset, outofelection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify * a majority is following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock = n.electionEpoch; self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; }