zookeeper(16)源码分析-ZAB协议

Zookeeper使用了Zookeeper Atomic Broadcast(ZAB,Zookeeper原子消息广播协议)的协议做为其数据一致性的核心算法。ZAB协议是为Zookeeper专门设计的一种支持崩溃恢复的原子广播协议。算法

ZAB理论

ZAB协议的核心是定义了对于那些会改变Zookeeper服务器数据状态的事务请求的处理方式,即:全部事务请求必须由一个全局惟一的服务器来协调处理,这样的服务器被称为Leader服务器,余下的服务器则称为Follower服务器,Leader服务器负责将一个客户端事务请求转化成一个事务Proposal(提议),并将该Proposal分发给集群中全部的Follower服务器,以后Leader服务器须要等待全部Follower服务器的反馈,一旦超过半数的Follower服务器进行了正确的反馈后,那么Leader就会再次向全部的Follower服务器分发Commit消息,要求其将前一个Proposal进行提交。数据库

ZAB一些包括两种基本的模式:崩溃恢复消息广播服务器

一、当整个服务框架启动过程当中或Leader服务器出现网络中断、崩溃退出与重启等异常状况时,ZAB协议就会进入恢复模式并选举产生新的Leader服务器。当选举产生了新的Leader服务器,同时集群中已经有过半的机器与该Leader服务器完成了状态同步以后,ZAB协议就会退出恢复模式,状态同步是指数据同步,用来保证集群在过半的机器可以和Leader服务器的数据状态保持一致。网络

二、当集群中已经有过半的Follower服务器完成了和Leader服务器的状态同步,那么整个服务框架就能够进入消息广播模式,当一台一样遵照ZAB协议的服务器启动后加入到集群中,若是此时集群中已经存在一个Leader服务器在负责进行消息广播,那么加入的服务器就会自觉地进入数据恢复模式:找到Leader所在的服务器,并与其进行数据同步,而后一块儿参与到消息广播流程中去。Zookeeper只容许惟一的一个Leader服务器来进行事务请求的处理,Leader服务器在接收到客户端的事务请求后,会生成对应的事务提议并发起一轮广播协议,而若是集群中的其余机器收到客户端的事务请求后,那么这些非Leader服务器会首先将这个事务请求转发给Leader服务器。并发

三、当Leader服务器出现崩溃或者机器重启、集群中已经不存在过半的服务器与Leader服务器保持正常通讯时,那么在从新开始新的一轮的原子广播事务操做以前,全部进程首先会使用崩溃恢复协议来使彼此到达一致状态,因而整个ZAB流程就会从消息广播模式进入到崩溃恢复模式。一个机器要成为新的Leader,必须得到过半机器的支持,同时因为每一个机器都有可能会崩溃,所以,ZAB协议运行过程当中,先后会出现多个Leader,而且每台机器也有可能会屡次成为Leader,进入崩溃恢复模式后,只要集群中存在过半的服务器可以彼此进行正常通讯,那么就能够产生一个新的Leader并再次进入消息广播模式。如一个由三台机器组成的ZAB服务,一般由一个Leader、2个Follower服务器组成,某一个时刻,加入其中一个Follower挂了,整个ZAB集群是不会中断服务的。app

① 消息广播

Zab协议消息广播有如下4个步骤组成:框架

  1. Leader发送PROPOSAL给集群中全部的节点。
  2. 节点在收到PROPOSAL以后,把PROPOSAL落盘,发送一个ACK给Leader。
  3. Leader在收到大多数节点的ACK以后,发送COMMIT给集群中全部的Follower节点。
  4. 若是存在Observer节点,Leader同时发送INFORM信息给Observer服务节点同步数据,Observer只接收Leader的INFORM消息同步数据,不参与Leader选举和事务提交。

zookeeper(16)源码分析-ZAB协议

② 崩溃恢复

在Leader服务器出现崩溃,或者因为网络缘由致使Leader服务器失去了与过半Follower的联系,那么就会进入崩溃恢复模式,在ZAB协议中,为了保证程序的正确运行,整个恢复过程结束后须要选举出一个新的Leader服务器,所以,ZAB协议须要一个高效且可靠的Leader选举算法,从而保证可以快速地选举出新的Leader,同时,Leader选举算法不只仅须要让Leader自身知道已经被选举为Leader,同时还须要让集群中的全部其余机器也可以快速地感知到选举产生的新的Leader服务器。ide

③ ZAB基本特性

ZAB协议的基本原则oop

3.一、ZAB协议须要确保那些已经在Leader服务器上提交的事务最终被全部服务器都提交

假设一个事务在Leader服务器上被提交了,而且已经获得了过半Follower服务器的Ack反馈,可是在它Commit消息发送给全部Follower机器以前,Leader服务挂了。以下图所示:源码分析

zookeeper(16)源码分析-ZAB协议

在集群正常运行过程当中的某一个时刻,Server1是Leader服务器,其前后广播了P一、P二、C一、P三、C2(C2是Commit Of Proposal2的缩写),其中,当Leader服务器发出C2后就当即崩溃退出了,针对这种状况,ZAB协议就须要确保事务Proposal2最终可以在全部的服务器上都被提交成功,不然将出现不一致。

3.二、ZAB协议须要确保丢弃那些只在Leader服务器上被提出的事务。

若是在崩溃恢复过程当中出现一个须要被丢弃的提议,那么在崩溃恢复结束后须要跳过该事务Proposal,以下图所示:

zookeeper(16)源码分析-ZAB协议

假设初始的Leader服务器Server1在提出一个事务Proposal3以后就崩溃退出了,从而致使集群中的其余服务器都没有收到这个事务Proposal,因而,当Server1恢复过来再次加入到集群中的时候,ZAB协议须要确保丢弃Proposal3这个事务。

3.三、ZAB协议必须的Leader选举算法

可以确保提交已经被Leader提交的事务的Proposal,同时丢弃已经被跳过的事务Proposal。若是让Leader选举算法可以保证新选举出来的Leader服务器拥有集群中全部机器最高编号(ZXID最大)的事务Proposal,那么就能够保证这个新选举出来的Leader必定具备全部已经提交的提议,更为重要的是若是让具备最高编号事务的Proposal机器称为Leader,就能够省去Leader服务器查询Proposal的提交和丢弃工做这一步骤了。

3.四、数据同步,一致性

完成Leader选举后,在正式开始工做前,Leader服务器首先会确认日志中的全部Proposal是否都已经被集群中的过半机器提交了,便是否完成了数据同步。Leader服务器须要确全部的Follower服务器都可以接收到每一条事务Proposal,而且可以正确地将全部已经提交了的事务Proposal应用到内存数据库中。Leader服务器会为每一个Follower服务器维护一个队列,并将那些没有被各Follower服务器同步的事务以Proposal消息的形式逐个发送给Follower服务器,并在每个Proposal消息后面紧接着再发送一个Commit消息,以表示该事务已经被提交,等到Follower服务器将全部其还没有同步的事务Proposal都从Leader服务器上同步过来并成功应用到本地数据库后,Leader服务器就会将该Follower服务器加入到真正的可用Follower列表并开始以后的其余流程。

④ ZAB总结

一、 发现,选举产生Leader,产生最新的epoch(每次选举产生新Leader的同时产生新epoch)。

二、 同步,各Follower和Leader完成数据同步。

三、广播,Leader处理客户端的写操做,并将状态变动广播至Follower,Follower多数经过以后Leader发起将状态变动落地Commit。

在正常运行过程当中,ZAB协议会一直运行于阶段三来反复进行消息广播流程,若是出现崩溃或其余缘由致使Leader缺失,那么此时ZAB协议会再次进入发现阶段,选举新的Leader。

源码分析

一、Leader发送PROPOSAL

ProposalRequestProcessor.proce***equest()方法发送PROPOSAL 给每个节点。它调用Leader.propose()方法把PROPOSAL
入队到各个follower的queuedPackets,而后直接把PROPOSAL提交给leader节点本身的SyncRequestProcessor 。

如下是大概的代码路径:

ProposalRequestProcessor.proce***equest(request)
  zks.getLeader().propose(request)
        sendPacket(pp)
            for f in forwardingFollowers
                f.queuePacket(qp) 
                    queuedPackets.add(p)
  syncProcessor.proce***equest(request)

二、Leader处理PROPOSAL

SyncRequestProcessor先处理

SyncRequestProcessor.run() 
    zks.getZKDatabase().append(si) 
    flush(toFlush)
        zks.getZKDatabase().commit() 
            while (!toFlush.isEmpty())
                Request i = toFlush.remove()
                if (nextProcessor != null)
                    nextProcessor.proce***equest(i)

而后是Leader的ACK处理器处理,返回给Leader本身ACK结果

AckRequestProcessor.proce***equest()
    proce***equest()
        leader.processAck(self.getId(), request.zxid, null)

zookeeper(16)源码分析-ZAB协议

三、Follower处理PROPOSAL

Follower. followLeader()方法处理接收到的QuorumPacket, case Leader.PROPOSAL分支处理的就是PROPOSAL。

Follower.followLeader() 
    loop
    readPacket(qp)
      leaderIs.readRecord(pp, "packet")
        processPacket(qp) 
            case Leader.PROPOSAL
                Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr)
                fzk.logRequest(hdr, txn)
                    syncProcessor.proce***equest(request) 
            case Leader.COMMIT:
                    fzk.commit(qp.getZxid())
                        commitProcessor.commit(request)

SyncRequestProcessor的处理逻辑

SyncRequestProcessor.run() 
    zks.getZKDatabase().append(si) 
    flush(toFlush)
        zks.getZKDatabase().commit()
        while (!toFlush.isEmpty())
            Request i = toFlush.remove() 
            if (nextProcessor != null)
                nextProcessor.proce***equest(i)
                    QuorumPacket qp = new QuorumPacket(Leader.ACK) 
                    learner.writePacket(qp, false)
                         leaderOs.writeRecord(pp, "packet")
         ((Flushable)nextProcessor).flush()
                learner.writePacket(null, true) 
                    bufferedOutput.flush()

zookeeper(16)源码分析-ZAB协议

四、Leader的ACK处理

Leader的processAck()处理ACK消息,若是收到大多数节点的ACK,发送COMMIT给全部的follower节点,并调用leader本身 的CommitProcessor。 processAck()有两个调用入口:1. LeaderHandler的run()方法处理来自follower的ACK。2. AckRequestProcessor的proce***equest方法处理leader本身的ACK。

Leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()) 
    Proposal p = outstandingProposals.get(zxid)
    p.addAck(sid)
    tryToCommit(p, zxid, followerAddr)
        if !p.hasAllQuorums() 
            return false;
        // Commit on all followers
        commit(zxid)
            QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null)
            sendPacket(qp)
        // Commit on Leader 
        zk.commitProcessor.commit(p.request)

五、Leader的COMMIT处理

CommitProcessor.run()
    request = queuedRequests.poll() 
    processCommitted()
        sendToNextProcessor(pending)

已经提交的请求,交给ToBeAppliedRequestProcessor准备应用到内存数据库

ToBeAppliedRequestProcessor.proce***equest()
    next.proce***equest(request)

最后交给FinalRequestProcessor,返回响应结果

zookeeper(16)源码分析-ZAB协议

六、Follower的COMMIT处理

CommitProcessor.run()
    request = queuedRequests.poll() 
    processCommitted()
        sendToNextProcessor(pending) 
//返回响应结果        
FinalRequestProcessor.proce***equest()

zookeeper(16)源码分析-ZAB协议

相关文章
相关标签/搜索