Zookeeper源码分析(四) ----- 集群模式(replicated)运行

zookeeper源码分析系列文章:算法

原创博客,纯手敲,转载请注明出处,谢谢!数据库

如你所知,zk的运行方式有两种,独立模式和复制模式。很显然复制模式是用来搭建zk集群的,所以我把复制模式称为集群模式。在以前的文章中咱们已经对独立模式下运行zk的源码进行相关分析,接下来咱们一块儿来研究研究Zk集群模式下的源码。数组

集群模式下的调试不像独立模式那么简单,也许你可能会问,那是否须要多台物理机来搭建一个zk集群呢?其实也不须要,单台物理机也是能够模拟集群运行的。所以,下文咱们将按照如下目录开展讨论:bash

1、zk集群搭建及相关配置

zk配置集群其实很是简单,在上篇博客中讲到,zk在解析配置文件时会判断你配置文件中是否有相似server.的配置项,若是没有相似server.的配置项,则默认以独立模式运行zk。相反,集群模式下就要求你进行相应的配置了。下面将一步一步对搭建环境进行讲解:服务器

  • 一、在zk的conf目录中增长3个配置文件,名字分别为zoo1.cfgzoo2.cfgzoo3.cfg

其内容分别以下:数据结构

zoo1.cfgeclipse

tickTime=200000
initLimit=10
syncLimit=5
dataDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\1
dataLogDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\log\\1
maxClientCnxns=2
# 服务器监听客户端链接的端口
clientPort=2181 

server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
复制代码

zoo2.cfgide

tickTime=200000
initLimit=10
syncLimit=5
dataDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\2
dataLogDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\log\\2
maxClientCnxns=2
# 服务器监听客户端链接的端口
clientPort=2182

server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
复制代码

zoo3.cfg源码分析

tickTime=200000
initLimit=10
syncLimit=5
dataDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\3
dataLogDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\log\\3
maxClientCnxns=2
# 服务器监听客户端链接的端口
clientPort=2183

server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
复制代码

上面相关通用的配置项在此处我就不作一一解释,相关含义在上篇文章中都有提到。下面咱们重点关注下clientPort属性和server.x属性。post

clientPort表明服务器监听客户端链接的端口,换句话说就是客户端链接到服务器的那个端口。该属性的默认配置通常都是2181,那为何咱们这里要写成218121822183呢?其实缘由很简单,由于咱们的集群式搭建在单台物理机上面,为了防止端口冲突,咱们设置3台zk服务器分别监听不一样的端口。

至于server.x属性,用于配置参与集群的每台服务器的地址和端口号。其格式为:

server.x addressIP:port1:port2

其中x表示zk节点的惟一编号,也就是咱们常说的sid的值,下面讲到zk选举的时候将会进一步讲解。你可能会很好奇port1port2之间有什么区别,在zk中,port1表示fllowers链接到leader的端口,port2表示当前结点参与选举的端口。之因此要这么设计,其实我以为在ZAB协议中,当客户端发出的写操做在服务器端执行完毕时,leader节点必须将状态同步给全部的fllowersleaderfllowers之间须要进行通讯嘛!另一种是全部节点进行快速选举时,各个节点之间须要进行投票,投票选出完一个leader节点以后须要通知其余节点。因此说,明白端口含义便可,它们就是区别做用罢了。

  • 二、建立3个myid文件

zk在集群模式下运行时会读取位于dataDir目录下的myid文件,若是没有找到,则会报错。所以,下面咱们将分别在对应的dataDir下新建myid文件,该文件的内容填写当前服务器的编号,也就是咱们上面说到的server.x中的x值。

E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\1下建立该文件,文件内容为序号1 E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\2下建立该文件,文件内容为序号2 E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\3下建立该文件,文件内容为序号3

  • 三、分别采用不一样的配置文件运行QuorumPeerMainmain()方法便可。配置文件路径能够这样传给eclipse,以下图:

上面讲的内容彷佛和源码打不上边,嗯,别着急,下面就讲源码。

首先咱们看看zk是如何解析server.x标签的,进入QuorumPeerConfig类的parseProperties()方法,你将看到以下代码片断:

// 判断属性是否以server.开始
if (key.startsWith("server.")) {
    int dot = key.indexOf('.');
    // 获取sid的值,也就是咱们server.x中的x值
    long sid = Long.parseLong(key.substring(dot + 1));
    // 将配置值拆分为数组,格式为[addressIP,port1,port2]
    String parts[] = splitWithLeadingHostname(value);
    if ((parts.length != 2) && (parts.length != 3) && (parts.length != 4)) {
    	LOG.error(value + " does not have the form host:port or host:port:port "
    			+ " or host:port:port:type");
    }
    // 表明当前结点的类型,能够是观察者类型(不须要参与投票),也能够是PARTICIPANT(表示该节点后期可能成为follower和leader)
    LearnerType type = null;
    String hostname = parts[0];
    Integer port = Integer.parseInt(parts[1]);
    Integer electionPort = null;
    if (parts.length > 2) {
    	electionPort = Integer.parseInt(parts[2]);
    }
} 
复制代码

上面源码将会根据你的配置解析每个server配置,源码也不是很复杂,接下来咱们将看看zk如何读取dataDir目录下的myid文件,继续在QuorumPeerConfigparseProperties()方法中,找到以下代码片断:

File myIdFile = new File(dataDir, "myid");
// 必须在快照目录下建立myid文件,不然报错
if (!myIdFile.exists()) {
    throw new IllegalArgumentException(myIdFile.toString() + " file is missing");
}
// 读取myid的值
BufferedReader br = new BufferedReader(new FileReader(myIdFile));
String myIdString;
try {
    myIdString = br.readLine();
} finally {
    br.close();// 注意,优秀的人都不会丢三落四,对于打开的各类io流,不用的时候记得关闭,不要浪费资源
}
复制代码

2、zk集群模式下的初始化

在zk中,不管是独立模式运行仍是复制模式运行,其初始化的步骤均可以归为:

  • 一、解析配置文件
  • 二、初始化运行服务器

对于配置文件的解析,咱们在上一篇文章和本文上节已作出相关分析。咱们重点看下zk集群模式运行的相关源码,让咱们进入QuormPeerMain类的runFromConfig()方法,源码以下:

/**
* 加载配置运行服务器
* @param config
* @throws IOException
*/
public void runFromConfig(QuorumPeerConfig config) throws IOException {
    LOG.info("Starting quorum peer");
    try {
    	// 建立一个ServerCnxnFactory,默认为NIOServerCnxnFactory
    	ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
    
    	// 对ServerCnxnFactory进行相关配置
    	cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
    
    	// 初始化QuorumPeer,表明服务器节点server运行时的各类信息,如节点状态state,哪些服务器server参与竞选了,咱们    能够将它理解为集群模式下运行的容器
    	quorumPeer = getQuorumPeer();

    	// 设置参与竞选的全部服务器
    	quorumPeer.setQuorumPeers(config.getServers());
    	// 设置事务日志和数据快照工厂
        quorumPeer.setTxnFactory(
    	    new FileTxnSnapLog(new File(config.getDataDir()), new File(config.getDataLogDir())));

    	// 设置选举的算法
    	quorumPeer.setElectionType(config.getElectionAlg());
    	// 设置当前服务器的id,也就是在data目录下的myid文件
    	quorumPeer.setMyid(config.getServerId());
    	// 设置心跳时间
    	quorumPeer.setTickTime(config.getTickTime());
    	// 设置容许follower同步和链接到leader的时间总量,以ticket为单位
    	quorumPeer.setInitLimit(config.getInitLimit());
    	// 设置follower与leader之间同步的时间量
    	quorumPeer.setSyncLimit(config.getSyncLimit());
    	// 当设置为true时,ZooKeeper服务器将侦听来自全部可用IP地址的对等端的链接,而不只仅是在配置文件的服务器列表中配置的地址(即集群中配置的server.1,server.2。。。。)。 它会影响处理ZAB协议和Fast Leader Election协议的链接。 默认值为false
    	quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
    	// 设置工厂,默认是NIO工厂
    	quorumPeer.setCnxnFactory(cnxnFactory);
    	// 设置集群数量验证器,默认为半数原则
    	quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
    	// 设置客户端链接的服务器ip地址
    	quorumPeer.setClientPortAddress(config.getClientPortAddress());
    	// 设置最小Session过时时间,默认是2*ticket
    	quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
    	// 设置最大Session过时时间,默认是20*ticket
    	quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
    	// 设置zkDataBase
    	quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
    	quorumPeer.setLearnerType(config.getPeerType());
    	quorumPeer.setSyncEnabled(config.getSyncEnabled());

    	// 设置NIO处理连接的线程数
    	quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
    
    	quorumPeer.start();
    	quorumPeer.join();
    } catch (InterruptedException e) {
    	// warn, but generally this is ok
    	LOG.warn("Quorum Peer interrupted", e);
    }
}
复制代码

在处理客户端请求方面,集群模式和独立模式都是使用ServerCnxnFactory的相关子类实现,默认采用基于NIO的NIOServerCnxnFactory,对于QuormPeer类,你能够把它想象成一个容器或者上下文,它包含着集群模式下当前结点的全部配置信息,如哪些服务器参与选举,每一个节点的状态等等。当该方法运行至quorumPeer.join();时,当前线程将阻塞,直到其余全部线程退出为止。

让咱们进入quorumPeer.start()方法,看看它作了什么动做:

public synchronized void start() {
    // 初始化是内存数据库
    loadDataBase();
    // 用于处理程序为捕获的异常和处理客户端请求
    cnxnFactory.start();
    // 选举前相关配置
    startLeaderElection();
    // 线程调用本类的run()方法,实施选举
    super.start();
}
复制代码

zk自己运行时会在内存中维护一个目录树,也就是一个内存数据库,初始化服务器时,zk会从本地配置文件中装载数据近内存数据库,若是没有本地记录,则建立一个空的内存数据库,同时,快照数据的保存也是基于内存数据库完成的。

3、zk集群模式下如何进行选举?

小编目测了代码以后发现zk应该是采用JMX来管理选举功能,但因为小编对JMX暂时不熟悉,所以,此部分将不结合源码进行解释,直接说明zk中选举流程。

首先每一个服务器启动以后将进入LOOKING状态,开始选举一个新的群首或者查找已经存在的群首,若是群首存在,其余服务器就会通知这个新启动的服务器,告知那个服务器是群首,于此同时,新的服务器会与群首创建连接,以确保本身的状态和群首一致。

对于群首选举时发送的消息,咱们称之为通知消息。当服务器进入LOOKING状态时,会想集群中全部其余节点发送一个通知,该同志包括了本身的投票信息vote,vote的数据结构很简单,通常由sid和zxid组成,sid表示当前服务器的编号,zxid表示当前服务器最大的事务编号,投票信息的交换规则以下:

  • 一、若是voteZxid > myzxid 或者 (voteZxid = myZxid 且 voteId > mySid ) ,保留当前的投票信息
  • 二、不然修改本身的投票信息,将voteZxid赋值给myZxid,将voteId赋值给mySid

总之就是先比较事务ID,若是相等,再比较服务器编号Sid。若是一个服务器接收到的全部通知都同样时,则表示群首选举成功(zxid最大或者sid最大)

4、为何说组成zk集群的节点数最好为奇数,且建议为3个节点?

Zk集群建议服务器的数量为奇数个,其内部采用多数原则,由于这样能使得整个集群更加高可用。固然这也是由zk选举算法决定的,一个节点虽然能够为外界提供服务,但只有一个节点的zk还能算做是集群吗?很明显不是,只能说是独立模式运行zk。

假设咱们配置的机器有5台,那么咱们认为只要超过一半(即3)的服务器是可用的,那么整个集群就是可用的,至于为何必定要数量的半数,这是因为zk中采用多数原则决定的,具体能够查看QuorumMaj类,该类有个校验多数原则的方法,代码以下:

/**
* 这个类实现了对法定服务器的校验
* This class implements a validator for majority quorums. The 
* implementation is straightforward.
*/
public class QuorumMaj implements QuorumVerifier {
    
    private static final Logger LOG = LoggerFactory.getLogger(QuorumMaj.class);
    
    // 一半的服务器数量,若是是5,则half=2,只要可用的服务器数量大于2,则整个zk就是可用的
    int half;
    /**
    * Defines a majority to avoid computing it every time.
    */
    public QuorumMaj(int n) {
    this.half = n / 2;
    }
    
    /**
    * Returns weight of 1 by default.权重
    */
    public long getWeight(long id) {
    return (long) 1;
    }

    /**
    * Verifies if a set is a majority.
    */
    public boolean containsQuorum(HashSet<Long> set) {
    return (set.size() > half);//传入一组服务器id,校验必须大于半数才能正常提供服务
    }
}
复制代码

咱们再来看看QuormPeerConfig类中的parseProperties()方法中的代码片断:

// 只有2台服务器server
if (servers.size() == 2) {
    // 打印日志,警告至少须要3台服务器,但不会报错
    LOG.warn("No server failure will be tolerated. " + "You need at least 3 servers.");
    } else if (servers.size() % 2 == 0) {
    LOG.warn("Non-optimial configuration, consider an odd number of servers.");
}
复制代码

该代码片断对你配置文件中配置的服务器数量进行校验,若是是偶数或者等于2,则会发出诸如“该配置不是推荐配置”的警告,若是服务器数量等于2,则不能容忍哪怕1台服务器崩溃。

为了加深印象,咱们来看看为何zk推荐使用奇数台服务器。

  • 若是配置3台服务器,那么当一台挂了之后,3台服务器中的2台票数过半,能够选出一台leader;
  • 若是配置4台,那么容许1台挂掉,这和只有3台服务器是同样的,为节省成本,何不选择3台,可是当4台中2台挂了以后,那么4台中可用的2台票数没过半没法选择出leader
相关文章
相关标签/搜索