zookeeper
源码分析系列文章:html
原创博客,纯手敲,转载请注明出处,谢谢!java
Zookeeper
单机启动原理Zookeeper
属于C/S
架构,也就是传统的客户端-服务器模式,客户端发送请求,服务器响应请求。这和高性能网络框架Netty
是同样的,所以咱们也能够猜测到它的启动方式无非就是从main()
方法开始,客户端和服务器各有一个main()
方法。算法
那咱们先来看看Zookeeper
服务器端的启动过程,当你打开Zookeeper
目录下/bin
目录中zkServer.cmd
文件你就会发现,其实Zookeeper
的启动入口为org.apache.zookeeper.server.quorum.QuorumPeerMain
类的main
方法,不管你是单机模式启动Zookeeper
仍是复制模式启动Zookeeper
,执行入口都是这个类,至于如何区别是哪一种模式启动,该类会根据你配置文件的配置进行判断,具体的判断接下来将会详细讲解。apache
zkServer.cmd
详细源代码:缓存
setlocal
call "%~dp0zkEnv.cmd"
set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain // 设置主类入口
echo on
call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %* // 执行该类的main()方法
endlocal
复制代码
下面先看看单机启动时序图:bash
一、首先执行main
方法 二、解析传进来的配置文件路径,默认会去${baseDir}\conf\zoo.cfg
找配置文件 三、建立NIOServerCnxnFactory
进行监听客户端的链接请求,在Zookeeper
中有两种ServerCnxnFactory
,一种是NIOServerCnxnFactory
,另外一种是NettyServerCnxnFactory
,前者为默认工厂,后者除非你在启动main
方法时指定System
的zookeeper.serverCnxnFactory
属性值为NettyServerCnxnFactory
。服务器
下面将详细深刻源码分析各个阶段是如何实现以及工做的。网络
Zookeeper
单机模式(standalone
)启动Zookeeper
是如何解析配置文件的?zk的属性配置分为两种:session
一、
Java System property
:Java系统环境变量,也就是System.setProperty()
设置的参数架构二、
No Java system property
配置文件属性,也就是你在配置文件中配置的属性
配置文件的解析原理很简单,无非就是解析一些.properties
文件中的键值对,其实Java
已经提供了Properties
类来表明.properties
文件中全部键值对集合,咱们可使用Properties
对象的load()
方法将一个配置文件装载进内存,而后对该对象进行遍历就获得咱们锁配置的属性值集合了。
说到Zookeeper
中的配置文件解析,原理也和上面差很少,只不过是在变量键值对的时候多了一些Zookeeper
自身的逻辑判断。ZooKeeper
中的配置文件解析从QuorumPeerConfig
类的parse()
方法提及,源代码以下:
/**
* Parse a ZooKeeper configuration file 解析一个配置文件
* @param path the patch of the configuration file
* @throws ConfigException error processing configuration
*/
public void parse(String path) throws ConfigException {
File configFile = new File(path);
LOG.info("Reading configuration from: " + configFile);
try {
if (!configFile.exists()) {
throw new IllegalArgumentException(configFile.toString() + " file is missing");
}
// 声明一个Properties对象
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
// 传入一个配置文件输入流便可装载全部配置
cfg.load(in);
} finally {
// 涉及到流的操做记得最后将流关闭
in.close();
}
// 此处是zk自身的逻辑处理
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
}
复制代码
接下来咱们来看看上面的parseProperties(cfg)
方法,该方法太长了,硬着头皮啃完:
/**
* Parse config from a Properties.
* @param zkProp Properties to parse from.
* @throws IOException
* @throws ConfigException
*/
public void parseProperties(Properties zkProp) throws IOException, ConfigException {
int clientPort = 0;
String clientPortAddress = null;
// 遍历全部的key-value键值对
for (Entry<Object, Object> entry : zkProp.entrySet()) {
// 注意这里要把首尾空格去掉
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
// 存储快照文件snapshot的目录配置
if (key.equals("dataDir")) {
dataDir = value;
// 事务日志存储目录
} else if (key.equals("dataLogDir")) {
dataLogDir = value;
// 客户端链接server的端口,zk启动总得有个端口吧!若是你没有配置,则会报错!通常咱们会将端口配置为2181
} else if (key.equals("clientPort")) {
clientPort = Integer.parseInt(value);
// 服务器IP地址
} else if (key.equals("clientPortAddress")) {
clientPortAddress = value.trim();
// zk中的基本事件单位,用于心跳和session最小过时时间为2*tickTime
} else if (key.equals("tickTime")) {
tickTime = Integer.parseInt(value);
// 客户端并发链接数量,注意是一个客户端跟一台服务器的并发链接数量,也就是说,假设值为3,那么某个客户端不能同时并发链接3次到同一台服务器(并发嘛!),不然会出现下面错误too many connections from /127.0.0.1 - max is 3
} else if (key.equals("maxClientCnxns")) {
maxClientCnxns = Integer.parseInt(value);
} else if (key.equals("minSessionTimeout")) {
minSessionTimeout = Integer.parseInt(value);
} else if (key.equals("maxSessionTimeout")) {
maxSessionTimeout = Integer.parseInt(value);
} else if (key.equals("initLimit")) {
initLimit = Integer.parseInt(value);
} else if (key.equals("syncLimit")) {
syncLimit = Integer.parseInt(value);
} else if (key.equals("electionAlg")) {
electionAlg = Integer.parseInt(value);
} else if (key.equals("quorumListenOnAllIPs")) {
quorumListenOnAllIPs = Boolean.parseBoolean(value);
} else if (key.equals("peerType")) {
if (value.toLowerCase().equals("observer")) {
peerType = LearnerType.OBSERVER;
} else if (value.toLowerCase().equals("participant")) {
peerType = LearnerType.PARTICIPANT;
} else {
throw new ConfigException("Unrecognised peertype: " + value);
}
......
复制代码
下面对解析的全部配置项用表格总结下: 全部的配置项均可以在官网查询到。
下面咱们一块儿看看Zookkeeper
的配置文件属性:
配置项 | 说明 | 异常状况 | 是否报错? | 错误 or 备注 |
---|---|---|---|---|
clientPort |
服务端server监听客户端链接的端口 | 不配置 | 是 | clientPort is not set |
clientPortAddress |
客户端链接的服务器ip地址 | 不配置 | 否 | 默认使用网卡的地址 |
dataDir |
数据快照目录 | 不配置 | 是 | dataDir is not set |
dataLogDir |
事务日志存放目录 | 不配置 | 否 | 默认跟dataDir 目录相同 |
tickTime |
ZK基本时间单元(毫秒),用于心跳和超时.minSessionTimeout 默认是两倍ticket |
不配置 | 是 | tickTime is not set |
maxClientCnxns |
同一ip地址最大并发链接数(也就是说同一个ip最多能够同时维持与服务器连接的个数) | 不配置 | 否 | 默认最大链接数为60,设置为0则无限制 |
minSessionTimeout |
最小会话超时时间,默认2*ticket | 不配置 | 否,若minSessionTimeout > maxSessionTimeout ,则报错 |
minSessionTimeout must not be larger than maxSessionTimeout |
maxSessionTimeout |
最大会话超时时间,默认20*ticket | 不配置 | 否 | 不能小于minSessionTimeout |
initLimit |
容许follower 同步和链接到leader 的时间总量,以ticket 为单位 |
不配置 | 是 | initLimit is not set ,若是zk管理的数据量特别大,则辞职应该调大 |
syncLimit |
follower 与leader 之间同步的世间量 |
不配置 | 是 | syncLimit is not set |
electionAlg |
zk选举算法选择,默认值为3,表示采用快速选举算法 |
不配置 | 否 | 若是没有配置选举地址server ,则抛Missing election port for server: serverid |
quorumListenOnAllIPs |
当设置为true时,ZooKeeper服务器将侦听来自全部可用IP地址的对等端的链接,而不只仅是在配置文件的服务器列表中配置的地址(即集群中配置的server.1,server.2。。。。)。 它会影响处理ZAB协议和Fast Leader Election协议的链接。 默认值为false | 不配置 | 否 | |
peerType |
服务器的角色,是观察者observer 仍是参与选举或成为leader ,默认为PARTICIPANT |
不配置 | 否 | 若配置了不知支持的角色,则报Unrecognised peertype: |
autopurge.snapRetainCount |
数据快照保留个数,默认是3,最小也是3 | 不配置 | 否 | |
autopurge.purgeInterval |
执行日志、快照清除任务的时间间隔(小时) | 不配置 | 否 | 默认是 0 |
server.x=[hostname]:nnnnn[:nnnnn] |
集群服务器配置 | 不配置 | 单机:否;集群:是 | zk集群启动将加载该该配置,每台zk服务器必须有一个myid文件,里边存放服务器的id,该id值必须匹配server.x中的x ; 第一个端口表示与leader 链接的端口,第二个端口表示用于选举的端口,第二个端口是可选的 |
Zookeeper
是如何判断何种模式启动服务器的?由于Zookeeper
的ZkServer.cmd
启动文件指定的统一入口为org.apache.zookeeper.server.quorum.QuorumPeerMain
,那么咱们就要问了,那ZK
是怎么判断我要单机模式启动仍是集群方式启动呢?答案是明显的,也就是取决于你在配置文件zoo.cfg
中是否有配置server.x=hostname:port1:port2
,以上的配置项代表咱们想让ZK
以集群模式运行,那么在代码中是如何体现的呢?
上面讲到ZK
解析配置文件的原理,咱们依旧走进parseProperties()
方法,看看以下代码:
.....
// 此处解析配置文件以server.开头的配置
} else if (key.startsWith("server.")) {
// server.3
int dot = key.indexOf('.');
long sid = Long.parseLong(key.substring(dot + 1));
String parts[] = splitWithLeadingHostname(value);
if ((parts.length != 2) && (parts.length != 3) && (parts.length != 4)) {
LOG.error(value + " does not have the form host:port or host:port:port "
+ " or host:port:port:type");
}
LearnerType type = null;
String hostname = parts[0];
Integer port = Integer.parseInt(parts[1]);
Integer electionPort = null;
if (parts.length > 2) {
electionPort = Integer.parseInt(parts[2]);
}
if (parts.length > 3) {
if (parts[3].toLowerCase().equals("observer")) {
type = LearnerType.OBSERVER;
} else if (parts[3].toLowerCase().equals("participant")) {
type = LearnerType.PARTICIPANT;
} else {
throw new ConfigException("Unrecognised peertype: " + value);
}
}
if (type == LearnerType.OBSERVER) {
observers.put(Long.valueOf(sid),
new QuorumServer(sid, hostname, port, electionPort, type));
} else {
// 若是配置了,那么就加进servers中,其中servers是一个本地缓存Map,用于存储配置的ip地址
servers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type));
}
复制代码
若是配置了,那么servers
的size>0
,解析完成以后,回到QuorumPeerMain
的initializeAndRun()
方法:
// 若是servers长度大于0,则集群方式启动,不然,单机启动
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
复制代码
从上面能够看出,单机启动的入口为ZooKeeperServerMain
类,而统一的入口类为QuorumPeerMain
,因此,在ZK
中,服务器端的启动类就只有这两个了。
Zookeeper
是如何处理客户端请求的?不管是哪一种方式启动Zookeeper
,它都必须对客户端的请求进行处理,那么ZK
是如何处理客户端请求的呢?让咱们一块儿来看看源码是怎么写的!
上面说到,Zk
单机启动的入口类为ZooKeeperServerMain
,咱们一块儿看下其runFromConfig()
方法:
/**
* Run from a ServerConfig.
* @param config ServerConfig to use.
* @throws IOException
*/
public void runFromConfig(ServerConfig config) throws IOException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
// 建立一个ZooKeeperServer,ZooKeeperServer表明具体运行的zk服务器,包含监听客户端请求
final ZooKeeperServer zkServer = new ZooKeeperServer();
// 这个是代表上面建立的ZooKeeperServer线程执行完以后,当前主线程才结束,相似Thread的join()方法
final CountDownLatch shutdownLatch = new CountDownLatch(1);
// 关闭服务器时的回调处理器
zkServer.registerServerShutdownHandler(
new ZooKeeperServerShutdownHandler(shutdownLatch));
// 执行快照数据,日志的定时保存操做,指定保存路径
txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
config.dataDir));
zkServer.setTxnLogFactory(txnLog);
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
// 建立ServerCnxnFactory,默认实现为NIOServerCnxnFactory,也能够指定为NettyServerCnxnFactory
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
// 启动服务器,将一个服务器zkServer丢给工厂,而后启动
cnxnFactory.startup(zkServer);
// 这里将会等待,除非调用shutdown()方法
shutdownLatch.await();
shutdown();
// 这里会等待直到zkServer线程完成
cnxnFactory.join();
if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}
复制代码
了解完上面的代码,咱们明白单机启动ZooKeeperServer
时ZK
作了什么工做,主要点在zk
建立的是哪一种工厂,至于NIOServerCnxnFactory
的代码,我就不说了,你们有兴趣能够去看看。
回归正题,让咱们进入NIOServerCnxnFactory
的run()
方法中看看:
public void run() {
while (!ss.socket().isClosed()) {
try {
// 每一秒轮询一次
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
// 若是有读请求或者链接请求,则接收请求
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
// 这里对maxClientCnxns作出判断,防止DOS攻击
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns);
sc.close();
} else {
LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
// 若是有读请求且客户端以前有链接过的,则直接处理
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select " + k.readyOps());
}
}
}
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}
复制代码
看到这,我以为对于Zk
如何监听处理客户端的请求就清晰多了,上面的代码主要采用轮询机制,每一秒轮询一次,经过selector.select(1000)
方法指定,这里的监听方式和传统的BIO不一样,传统的网络监听采用阻塞的accept()
方法,zk采用java的nio实现。
谢谢阅读~~