1 ZAB介绍
ZAB协议全称就是ZooKeeper Atomic Broadcast protocol,是ZooKeeper用来实现一致性的算法,分红以下4个阶段。算法
先来解释下部分名词服务器
electionEpoch:每执行一次leader选举,electionEpoch就会自增,用来标记leader选举的轮次异步
peerEpoch:每次leader选举完成以后,都会选举出一个新的peerEpoch,用来标记事务请求所属的轮次spa
zxid:事务请求的惟一标记,由leader服务器负责进行分配。由2部分构成,高32位是上述的peerEpoch,低32位是请求的计数,从0开始。因此由zxid咱们就能够知道该请求是哪一个轮次的,而且是该轮次的第几个请求。线程
lastProcessedZxid:最后一次commit的事务请求的zxid日志
- Leader electionleader选举过程,electionEpoch自增,在选举的时候lastProcessedZxid越大,越有可能成为leader
- Discovery:第一:leader收集follower的lastProcessedZxid,这个主要用来经过和leader的lastProcessedZxid对比来确认follower须要同步的数据范围第二:选举出一个新的peerEpoch,主要用于防止旧的leader来进行提交操做(旧leader向follower发送命令的时候,follower发现zxid所在的peerEpoch比如今的小,则直接拒绝,防止出现不一致性)
- Synchronization:follower中的事务日志和leader保持一致的过程,就是依据follower和leader之间的lastProcessedZxid进行,follower多的话则删除掉多余部分,follower少的话则补充,一旦对应不上则follower删除掉对不上的zxid及其以后的部分而后再从leader同步该部分以后的数据
- Broadcast正常处理客户端请求的过程。leader针对客户端的事务请求,而后提出一个议案,发给全部的follower,一旦过半的follower回复OK的话,leader就能够将该议案进行提交了,向全部follower发送提交该议案的请求,leader同时返回OK响应给客户端
上面简单的描述了上述4个过程,这4个过程的详细描述在zab的paper中能够找到,可是我看了以后基本和zab的源码实现上相差有点大,这里就再也不提zab paper对上述4个过程的描述了,下面会详细的说明ZooKeeper源码中是具体怎么来实现的cdn
2 ZAB协议源码实现
先看下ZooKeeper总体的实现状况,以下图所示server

上述实现中Recovery Phase包含了ZAB协议中的Discovery和Synchronization。排序
2.1 重要的数据介绍
加上前面已经介绍的几个名词three
- long lastProcessedZxid:最后一次commit的事务请求的zxid
- LinkedList<Proposal> committedLog、long maxCommittedLog、long minCommittedLog:ZooKeeper会保存最近一段时间内执行的事务请求议案,个数限制默认为500个议案。上述committedLog就是用来保存议案的列表,上述maxCommittedLog表示最大议案的zxid,minCommittedLog表示committedLog中最小议案的zxid。
- ConcurrentMap<Long, Proposal> outstandingProposalsLeader拥有的属性,每当提出一个议案,都会将该议案存放至outstandingProposals,一旦议案被过半认同了,就要提交该议案,则从outstandingProposals中删除该议案
- ConcurrentLinkedQueue<Proposal> toBeAppliedLeader拥有的属性,每当准备提交一个议案,就会将该议案存放至该列表中,一旦议案应用到ZooKeeper的内存树中了,而后就能够将该议案从toBeApplied中删除
对于上述几个参数,整个Broadcast的处理过程能够描述为:
- leader针对客户端的事务请求(leader为该请求分配了zxid),建立出一个议案,并将zxid和该议案存放至leader的outstandingProposals中
- leader开始向全部的follower发送该议案,若是过半的follower回复OK的话,则leader认为能够提交该议案,则将该议案从outstandingProposals中删除,而后存放到toBeApplied中
- leader对该议案进行提交,会向全部的follower发送提交该议案的命令,leader本身也开始执行提交过程,会将该请求的内容应用到ZooKeeper的内存树中,而后更新lastProcessedZxid为该请求的zxid,同时将该请求的议案存放到上述committedLog,同时更新maxCommittedLog和minCommittedLog
- leader就开始向客户端进行回复,而后就会将该议案从toBeApplied中删除
2.2 Fast Leader Election
leader选举过程要关注的要点:
- 全部机器刚启动时进行leader选举过程
- 若是leader选举完成,刚启动起来的server怎么识别到leader选举已完成
投票过程有3个重要的数据:
- ServerState目前ZooKeeper机器所处的状态有4种,分别是
- LOOKING:进入leader选举状态
- FOLLOWING:leader选举结束,进入follower状态
- LEADING:leader选举结束,进入leader状态
- OBSERVING:处于观察者状态
- HashMap<Long, Vote> recvset用于收集LOOKING、FOLLOWING、LEADING状态下的server的投票
- HashMap<Long, Vote> outofelection用于收集FOLLOWING、LEADING状态下的server的投票(可以收集到这种状态下的投票,说明leader选举已经完成)
下面就来详细说明这个过程:
- 1 serverA首先将electionEpoch自增,而后为本身投票serverA会首先从快照日志和事务日志中加载数据,就能够获得本机器的内存树数据,以及lastProcessedZxid(这一部分后面再详细说明)初始投票Vote的内容:
- proposedLeader:ZooKeeper Server中的myid值,初始为本机器的id
- proposedZxid:最大事务zxid,初始为本机器的lastProcessedZxid
- proposedEpoch:peerEpoch值,由上述的lastProcessedZxid的高32获得
而后该serverA向其余全部server发送通知,通知内容就是上述投票信息和electionEpoch信息
- 2 serverB接收到上述通知,而后进行投票PK若是serverB收到的通知中的electionEpoch比本身的大,则serverB更新本身的electionEpoch为serverA的electionEpoch若是该serverB收到的通知中的electionEpoch比本身的小,则serverB向serverA发送一个通知,将serverB本身的投票以及electionEpoch发送给serverA,serverA收到后就会更新本身的electionEpoch
在electionEpoch达成一致后,就开始进行投票之间的pk,规则以下:
/* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
1 2 3 4 5 6 7 8 9 10 11 12 |
/* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); |
就是优先比较proposedEpoch,而后优先比较proposedZxid,最后优先比较proposedLeader
pk完毕后,若是本机器投票被pk掉,则更新投票信息为对方投票信息,同时从新发送该投票信息给全部的server。
若是本机器投票没有被pk掉,则看下面的过半判断过程
- 3 根据server的状态来断定leader若是当前发来的投票的server的状态是LOOKING状态,则只须要判断本机器的投票是否在recvset中过半了,若是过半了则说明leader选举就算成功了,若是当前server的id等于上述过半投票的proposedLeader,则说明本身将成为了leader,不然本身将成为了follower若是当前发来的投票的server的状态是FOLLOWING、LEADING状态,则说明leader选举过程已经完成了,则发过来的投票就是leader的信息,这里就须要判断发过来的投票是否在recvset或者outofelection中过半了
同时还要检查leader是否给本身发送过投票信息,从投票信息中确认该leader是否是LEADING状态。这个解释以下:
由于目前leader和follower都是各自检测是否进入leader选举过程。leader检测到未过半的server的ping回复,则leader会进入LOOKING状态,可是follower有本身的检测,感知这一事件,还须要必定时间,在此期间,若是其余server加入到该集群,可能会收到其余follower的过半的对以前leader的投票,可是此时该leader已经不处于LEADING状态了,因此须要这么一个检查来排除这种状况。
2.3 Recovery Phase
一旦leader选举完成,就开始进入恢复阶段,就是follower要同步leader上的数据信息
- 1 通讯初始化leader会建立一个ServerSocket,接收follower的链接,leader会为每个链接会用一个LearnerHandler线程来进行服务
- 2 从新为peerEpoch选举出一个新的peerEpochfollower会向leader发送一个Leader.FOLLOWERINFO信息,包含本身的peerEpoch信息leader的LearnerHandler会获取到上述peerEpoch信息,leader从中选出一个最大的peerEpoch,而后加1做为新的peerEpoch。
而后leader的全部LearnerHandler会向各自的follower发送一个Leader.LEADERINFO信息,包含上述新的peerEpoch
follower会使用上述peerEpoch来更新本身的peerEpoch,同时将本身的lastProcessedZxid发给leader
leader的全部LearnerHandler会记录上述各自follower的lastProcessedZxid,而后根据这个lastProcessedZxid和leader的lastProcessedZxid之间的差别进行同步
- 3 已经处理的事务议案的同步判断LearnerHandler中的lastProcessedZxid是否在minCommittedLog和maxCommittedLog之间
- LearnerHandler中的lastProcessedZxid和leader的lastProcessedZxid一致,则说明已经保持同步了
- 若是lastProcessedZxid在minCommittedLog和maxCommittedLog之间从lastProcessedZxid开始到maxCommittedLog结束的这部分议案,从新发送给该LearnerHandler对应的follower,同时发送对应议案的commit命令上述可能存在一个问题:即lastProcessedZxid虽然在他们之间,可是并无找到lastProcessedZxid对应的议案,即这个zxid是leader所没有的,此时的策略就是彻底按照leader来同步,删除该follower这一部分的事务日志,而后从新发送这一部分的议案,并提交这些议案
- 若是lastProcessedZxid大于maxCommittedLog则删除该follower大于部分的事务日志
- 若是lastProcessedZxid小于minCommittedLog则直接采用快照的方式来恢复
- 4 未处理的事务议案的同步LearnerHandler还会从leader的toBeApplied数据中将大于该LearnerHandler中的lastProcessedZxid的议案进行发送和提交(toBeApplied是已经被确认为提交的)LearnerHandler还会从leader的outstandingProposals中大于该LearnerHandler中的lastProcessedZxid的议案进行发送,可是不提交(outstandingProposals是还没被被确认为提交的)
- 5 将LearnerHandler加入到正式follower列表中意味着该LearnerHandler正式接受请求。即此时leader可能正在处理客户端请求,leader针对该请求发出一个议案,而后对该正式follower列表才会进行执行发送工做。这里有一个地方就是:上述咱们在比较lastProcessedZxid和minCommittedLog和maxCommittedLog差别的时候,必需要获取leader内存数据的读锁,即在此期间不能执行修改操做,当欠缺的数据包已经补上以后(先放置在一个队列中,异步发送),才能加入到正式的follower列表,不然就会出现顺序错乱的问题
同时也说明了,一旦一个follower在和leader进行同步的过程(这个同步过程仅仅是确认要发送的议案,先放置到队列中便可等待异步发送,并非说必需要发送过去),该leader是暂时阻塞一切写操做的。
对于快照方式的同步,则是直接同步写入的,写入期间对数据的改动会放在上述队列中的,而后当同步写入完成以后,再启动对该队列的异步写入。
上述的要理解的关键点就是:既要不能漏掉,又要保证顺序
- 6 LearnerHandler发送Leader.NEWLEADER以及Leader.UPTODATE命令该命令是在同步结束以后发的,follower收到该命令以后会执行一次版本快照等初始化操做,若是收到该命令的ACK则说明follower都已经完成同步了并完成了初始化leader开始进入心跳检测过程,不断向follower发送心跳命令,不断检是否有过半机器进行了心跳回复,若是没有过半,则执行关闭操做,开始进入leader选举状态
LearnerHandler向对应的follower发送Leader.UPTODATE,follower接收到以后,开始和leader进入Broadcast处理过程
2.4 Broadcast Phase
前面其实已经说过了,参见2.1中的内容
3 特殊状况的注意点
3.1 事务日志和快照日志的持久化和恢复
先来看看持久化过程:
再来讲说恢复:
由此咱们能够看到,在初始化恢复的时候,是会将全部最新的事务日志做为已经commit的事务来处理的
也就是说这里面可能会有部分事务日志还没真实提交,而这里所有当作已提交来处理。这个处理简单粗暴了一些,而raft对老数据的恢复则控制的更加严谨一些。
3.2 follower挂了以后又重启的恢复过程
一旦leader挂了,上述leader的2个集合
- ConcurrentMap<Long, Proposal> outstandingProposals
- ConcurrentLinkedQueue<Proposal> toBeApplied
就无效了。他们并不在leader恢复的时候起做用,而是在系统正常执行,而某个follower挂了又恢复的时候起做用。
咱们能够看到在上述2.3的恢复过程当中,会首先进行快照日志和事务日志的恢复,而后再补充leader的上述2个数据中的内容。
3.3 同步follower失败的状况
目前leader和follower之间的同步是经过BIO方式来进行的,一旦该链路出现异常则会关闭该链路,从新与leader创建链接,从新同步最新的数据
3.5 对client端是否一致
- 客户端收到OK回复,会不会丢失数据?
- 客户端没有收到OK回复,会不会多存储数据?
客户端若是收到OK回复,说明已通过半复制了,则在leader选举中确定会包含该请求对应的事务日志,则不会丢失该数据
客户端链接的leader或者follower挂了,客户端没有收到OK回复,目前是可能丢失也可能没丢失,由于服务器端的处理也很简单粗暴,对于将来leader上的事务日志都会当作提交来处理的,即都会被应用到内存树中。
同时目前ZooKeeper的原生客户端也没有进行重试,服务器端也没有对重试进行检查。这一部分到下一篇再详细探讨与raft的区别
4 未完待续
本文有不少细节,不免可能疏漏,还请指正。
4.1 问题
这里留个问题供你们思考下:
raft每次执行AppendEntries RPC的时候,都会带上当前leader的新term,来防止旧的leader的旧term来执行相关操做,而ZooKeeper的peerEpoch呢?达到防止旧leader的效果了吗?它的做用是干什么呢?