zookeeper
源码分析系列文章:算法
原创博客,纯手敲,转载请注明出处,谢谢!数据库
如你所知,zk的运行方式有两种,独立模式和复制模式。很显然复制模式是用来搭建zk集群的,所以我把复制模式称为集群模式。在以前的文章中咱们已经对独立模式下运行zk的源码进行相关分析,接下来咱们一块儿来研究研究Zk集群模式下的源码。数组
集群模式下的调试不像独立模式那么简单,也许你可能会问,那是否须要多台物理机来搭建一个zk集群呢?其实也不须要,单台物理机也是能够模拟集群运行的。所以,下文咱们将按照如下目录开展讨论:bash
zk配置集群其实很是简单,在上篇博客中讲到,zk在解析配置文件时会判断你配置文件中是否有相似server.
的配置项,若是没有相似server.
的配置项,则默认以独立模式运行zk。相反,集群模式下就要求你进行相应的配置了。下面将一步一步对搭建环境进行讲解:服务器
zoo1.cfg
、zoo2.cfg
和zoo3.cfg
其内容分别以下:数据结构
zoo1.cfg
eclipse
tickTime=200000
initLimit=10
syncLimit=5
dataDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\1
dataLogDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\log\\1
maxClientCnxns=2
# 服务器监听客户端链接的端口
clientPort=2181
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
复制代码
zoo2.cfg
ide
tickTime=200000
initLimit=10
syncLimit=5
dataDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\2
dataLogDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\log\\2
maxClientCnxns=2
# 服务器监听客户端链接的端口
clientPort=2182
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
复制代码
zoo3.cfg
源码分析
tickTime=200000
initLimit=10
syncLimit=5
dataDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\3
dataLogDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\log\\3
maxClientCnxns=2
# 服务器监听客户端链接的端口
clientPort=2183
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
复制代码
上面相关通用的配置项在此处我就不作一一解释,相关含义在上篇文章中都有提到。下面咱们重点关注下clientPort
属性和server.x
属性。post
clientPort
表明服务器监听客户端链接的端口,换句话说就是客户端链接到服务器的那个端口。该属性的默认配置通常都是2181
,那为何咱们这里要写成2181
,2182
,2183
呢?其实缘由很简单,由于咱们的集群式搭建在单台物理机上面,为了防止端口冲突,咱们设置3台zk服务器分别监听不一样的端口。
至于server.x
属性,用于配置参与集群的每台服务器的地址和端口号。其格式为:
server.x addressIP:port1:port2
其中x
表示zk节点的惟一编号,也就是咱们常说的sid的值,下面讲到zk选举的时候将会进一步讲解。你可能会很好奇port1
和port2
之间有什么区别,在zk中,port1
表示fllowers
链接到leader
的端口,port2
表示当前结点参与选举的端口。之因此要这么设计,其实我以为在ZAB
协议中,当客户端发出的写操做在服务器端执行完毕时,leader
节点必须将状态同步给全部的fllowers
,leader
和fllowers
之间须要进行通讯嘛!另一种是全部节点进行快速选举时,各个节点之间须要进行投票,投票选出完一个leader
节点以后须要通知其余节点。因此说,明白端口含义便可,它们就是区别做用罢了。
myid
文件zk在集群模式下运行时会读取位于dataDir
目录下的myid
文件,若是没有找到,则会报错。所以,下面咱们将分别在对应的dataDir
下新建myid
文件,该文件的内容填写当前服务器的编号,也就是咱们上面说到的server.x
中的x
值。
E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\1
下建立该文件,文件内容为序号1
E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\2
下建立该文件,文件内容为序号2
E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\3
下建立该文件,文件内容为序号3
QuorumPeerMain
的main()
方法便可。配置文件路径能够这样传给eclipse
,以下图:上面讲的内容彷佛和源码打不上边,嗯,别着急,下面就讲源码。
首先咱们看看zk是如何解析server.x
标签的,进入QuorumPeerConfig
类的parseProperties()
方法,你将看到以下代码片断:
// 判断属性是否以server.开始
if (key.startsWith("server.")) {
int dot = key.indexOf('.');
// 获取sid的值,也就是咱们server.x中的x值
long sid = Long.parseLong(key.substring(dot + 1));
// 将配置值拆分为数组,格式为[addressIP,port1,port2]
String parts[] = splitWithLeadingHostname(value);
if ((parts.length != 2) && (parts.length != 3) && (parts.length != 4)) {
LOG.error(value + " does not have the form host:port or host:port:port "
+ " or host:port:port:type");
}
// 表明当前结点的类型,能够是观察者类型(不须要参与投票),也能够是PARTICIPANT(表示该节点后期可能成为follower和leader)
LearnerType type = null;
String hostname = parts[0];
Integer port = Integer.parseInt(parts[1]);
Integer electionPort = null;
if (parts.length > 2) {
electionPort = Integer.parseInt(parts[2]);
}
}
复制代码
上面源码将会根据你的配置解析每个server
配置,源码也不是很复杂,接下来咱们将看看zk如何读取dataDir
目录下的myid
文件,继续在QuorumPeerConfig
的parseProperties()
方法中,找到以下代码片断:
File myIdFile = new File(dataDir, "myid");
// 必须在快照目录下建立myid文件,不然报错
if (!myIdFile.exists()) {
throw new IllegalArgumentException(myIdFile.toString() + " file is missing");
}
// 读取myid的值
BufferedReader br = new BufferedReader(new FileReader(myIdFile));
String myIdString;
try {
myIdString = br.readLine();
} finally {
br.close();// 注意,优秀的人都不会丢三落四,对于打开的各类io流,不用的时候记得关闭,不要浪费资源
}
复制代码
在zk中,不管是独立模式运行仍是复制模式运行,其初始化的步骤均可以归为:
对于配置文件的解析,咱们在上一篇文章和本文上节已作出相关分析。咱们重点看下zk集群模式运行的相关源码,让咱们进入QuormPeerMain
类的runFromConfig()
方法,源码以下:
/**
* 加载配置运行服务器
* @param config
* @throws IOException
*/
public void runFromConfig(QuorumPeerConfig config) throws IOException {
LOG.info("Starting quorum peer");
try {
// 建立一个ServerCnxnFactory,默认为NIOServerCnxnFactory
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
// 对ServerCnxnFactory进行相关配置
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
// 初始化QuorumPeer,表明服务器节点server运行时的各类信息,如节点状态state,哪些服务器server参与竞选了,咱们 能够将它理解为集群模式下运行的容器
quorumPeer = getQuorumPeer();
// 设置参与竞选的全部服务器
quorumPeer.setQuorumPeers(config.getServers());
// 设置事务日志和数据快照工厂
quorumPeer.setTxnFactory(
new FileTxnSnapLog(new File(config.getDataDir()), new File(config.getDataLogDir())));
// 设置选举的算法
quorumPeer.setElectionType(config.getElectionAlg());
// 设置当前服务器的id,也就是在data目录下的myid文件
quorumPeer.setMyid(config.getServerId());
// 设置心跳时间
quorumPeer.setTickTime(config.getTickTime());
// 设置容许follower同步和链接到leader的时间总量,以ticket为单位
quorumPeer.setInitLimit(config.getInitLimit());
// 设置follower与leader之间同步的时间量
quorumPeer.setSyncLimit(config.getSyncLimit());
// 当设置为true时,ZooKeeper服务器将侦听来自全部可用IP地址的对等端的链接,而不只仅是在配置文件的服务器列表中配置的地址(即集群中配置的server.1,server.2。。。。)。 它会影响处理ZAB协议和Fast Leader Election协议的链接。 默认值为false
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
// 设置工厂,默认是NIO工厂
quorumPeer.setCnxnFactory(cnxnFactory);
// 设置集群数量验证器,默认为半数原则
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
// 设置客户端链接的服务器ip地址
quorumPeer.setClientPortAddress(config.getClientPortAddress());
// 设置最小Session过时时间,默认是2*ticket
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
// 设置最大Session过时时间,默认是20*ticket
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
// 设置zkDataBase
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
// 设置NIO处理连接的线程数
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
复制代码
在处理客户端请求方面,集群模式和独立模式都是使用ServerCnxnFactory
的相关子类实现,默认采用基于NIO的NIOServerCnxnFactory
,对于QuormPeer
类,你能够把它想象成一个容器或者上下文,它包含着集群模式下当前结点的全部配置信息,如哪些服务器参与选举,每一个节点的状态等等。当该方法运行至quorumPeer.join();
时,当前线程将阻塞,直到其余全部线程退出为止。
让咱们进入quorumPeer.start()
方法,看看它作了什么动做:
public synchronized void start() {
// 初始化是内存数据库
loadDataBase();
// 用于处理程序为捕获的异常和处理客户端请求
cnxnFactory.start();
// 选举前相关配置
startLeaderElection();
// 线程调用本类的run()方法,实施选举
super.start();
}
复制代码
zk自己运行时会在内存中维护一个目录树,也就是一个内存数据库,初始化服务器时,zk会从本地配置文件中装载数据近内存数据库,若是没有本地记录,则建立一个空的内存数据库,同时,快照数据的保存也是基于内存数据库完成的。
小编目测了代码以后发现zk应该是采用JMX来管理选举功能,但因为小编对JMX暂时不熟悉,所以,此部分将不结合源码进行解释,直接说明zk中选举流程。
首先每一个服务器启动以后将进入LOOKING
状态,开始选举一个新的群首或者查找已经存在的群首,若是群首存在,其余服务器就会通知这个新启动的服务器,告知那个服务器是群首,于此同时,新的服务器会与群首创建连接,以确保本身的状态和群首一致。
对于群首选举时发送的消息,咱们称之为通知消息。当服务器进入LOOKING
状态时,会想集群中全部其余节点发送一个通知,该同志包括了本身的投票信息vote,vote的数据结构很简单,通常由sid和zxid组成,sid表示当前服务器的编号,zxid表示当前服务器最大的事务编号,投票信息的交换规则以下:
总之就是先比较事务ID,若是相等,再比较服务器编号Sid。若是一个服务器接收到的全部通知都同样时,则表示群首选举成功(zxid最大或者sid最大)
Zk集群建议服务器的数量为奇数个,其内部采用多数原则,由于这样能使得整个集群更加高可用。固然这也是由zk选举算法决定的,一个节点虽然能够为外界提供服务,但只有一个节点的zk还能算做是集群吗?很明显不是,只能说是独立模式运行zk。
假设咱们配置的机器有5台,那么咱们认为只要超过一半(即3)的服务器是可用的,那么整个集群就是可用的,至于为何必定要数量的半数,这是因为zk中采用多数原则决定的,具体能够查看QuorumMaj
类,该类有个校验多数原则的方法,代码以下:
/**
* 这个类实现了对法定服务器的校验
* This class implements a validator for majority quorums. The
* implementation is straightforward.
*/
public class QuorumMaj implements QuorumVerifier {
private static final Logger LOG = LoggerFactory.getLogger(QuorumMaj.class);
// 一半的服务器数量,若是是5,则half=2,只要可用的服务器数量大于2,则整个zk就是可用的
int half;
/**
* Defines a majority to avoid computing it every time.
*/
public QuorumMaj(int n) {
this.half = n / 2;
}
/**
* Returns weight of 1 by default.权重
*/
public long getWeight(long id) {
return (long) 1;
}
/**
* Verifies if a set is a majority.
*/
public boolean containsQuorum(HashSet<Long> set) {
return (set.size() > half);//传入一组服务器id,校验必须大于半数才能正常提供服务
}
}
复制代码
咱们再来看看QuormPeerConfig
类中的parseProperties()
方法中的代码片断:
// 只有2台服务器server
if (servers.size() == 2) {
// 打印日志,警告至少须要3台服务器,但不会报错
LOG.warn("No server failure will be tolerated. " + "You need at least 3 servers.");
} else if (servers.size() % 2 == 0) {
LOG.warn("Non-optimial configuration, consider an odd number of servers.");
}
复制代码
该代码片断对你配置文件中配置的服务器数量进行校验,若是是偶数或者等于2,则会发出诸如“该配置不是推荐配置”的警告,若是服务器数量等于2,则不能容忍哪怕1台服务器崩溃。
为了加深印象,咱们来看看为何zk推荐使用奇数台服务器。