ZooKeeper简介
ZooKeeper是分布式服务框架,主要是用来解决分布式应用中常常遇到的一些数据管理问题,如:
统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等等。
ZooKeeper基本概念
角色
ZooKeeper中的角色主要有如下三类,以下表所示:
设计目的
1. 最终一致性:client不论链接到哪一个Server,展现给它都是同一个视图,这是ZooKeeper最重要的性能。
2. 可靠性:具备简单、健壮、良好的性能,若是消息m被到一台服务器接受,那么它将被全部的服务器接受。
3. 实时性:ZooKeeper保证客户端将在一个时间间隔范围内得到服务器的更新信息,或者服务器失效的信息。但因为网络延时等缘由,ZooKeeper不能保证两个客户端能同时获得刚更新的数据,若是须要最新数据,应该在读数据以前调用sync()接口。
4. 等待无关(wait-free):慢的或者失效的client不得干预快速的client的请求,使得每一个client都能有效的等待。
5. 原子性:更新只能成功或者失败,没有中间状态。
6. 顺序性:包括全局有序和偏序两种:全局有序是指若是在一台服务器上消息a在消息b前发布,则在全部Server上消息a都将在消息b前被发布;偏序是指若是一个消息b在消息a后被同一个发送者发布,a必将排在b前面。
ZooKeeper工做原理
ZooKeeper的核心是广播,这个机制保证了各个Server之间的同步。实现这个机制的协议叫作Zab协议。Zab协议有两种模式,它们分别是恢复模式(选主)和广播模式(同步)。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数Server完成了和leader的状态同步之后,恢复模式就结束了。状态同步保证了leader和Server具备相同的系统状态。为了保证事务的顺序一致性,ZooKeeper采用了递增的事务id号 (zxid)来标识事务。全部的提议(proposal)都在被提出的时候加上了zxid。实现中zxid是一个64位的数字,它高32位是epoch用 来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch,标识当前属于那个leader的统治时期。低32位用于递增计数。
每一个Server在工做过程当中有三种状态:
LOOKING:当前Server不知道leader是谁,正在搜寻。
LEADING:当前Server即为选举出来的leader。
FOLLOWING:leader已经选举出来,当前Server与之同步。
选主流程
当 leader崩溃或者leader失去大多数的follower,这时候zk进入恢复模式,恢复模式须要从新选举出一个新的leader,让全部的 Server都恢复到一个正确的状态。Zk的选举算法有两种:一种是基于basic paxos实现的,另一种是基于fast paxos算法实现的。系统默认的选举算法为fast paxos。
basic paxos流程:
1 .选举线程由当前Server发起选举的线程担任,其主要功能是对投票结果进行统计,并选出推荐的Server;
2 .选举线程首先向全部Server发起一次询问(包括本身);
3 .选举线程收到回复后,验证是不是本身发起的询问(验证zxid是否一致),而后获取对方的id(myid),并存储到当前询问对象列表中,最后获取对方提议的leader相关信息(id,zxid),并将这些信息存储到当次选举的投票记录表中;
4. 收到全部Server回复之后,就计算出zxid最大的那个Server,并将这个Server相关信息设置成下一次要投票的Server;
5. 线程将当前zxid最大的Server设置为当前Server要推荐的Leader,若是此时获胜的Server得到n/2 + 1的Server票数, 设置当前推荐的leader为获胜的Server,将根据获胜的Server相关信息设置本身的状态,不然,继续这个过程,直到leader被选举出来。
通 过流程分析咱们能够得出:要使Leader得到多数Server的支持,则Server总数必须是奇数2n+1,且存活的Server的数目不得少于 n+1.每一个Server启动后都会重复以上流程。在恢复模式下,若是是刚从崩溃状态恢复的或者刚启动的server还会从磁盘快照中恢复数据和会话信 息,zk会记录事务日志并按期进行快照,方便在恢复时进行状态恢复。
选主的具体流程图以下所示:
fast paxos流程是在选举过程当中,某Server首先向全部Server提议本身要成为leader,当其它Server收到提议之后,解决epoch和 zxid的冲突,并接受对方的提议,而后向对方发送接受提议完成的消息,重复这个流程,最后必定能选举出Leader。其流程图以下所示:
同步流程
选完leader之后,zk就进入状态同步过程。
1. leader等待server链接;
2 .Follower链接leader,将最大的zxid发送给leader;
3 .Leader根据follower的zxid肯定同步点;
4 .完成同步后通知follower 已经成为uptodate状态;
5 .Follower收到uptodate消息后,又能够从新接受client的请求进行服务了。
流程图以下所示:
工做流程
Leader工做流程
Leader主要有三个功能:
1 .恢复数据;
2 .维持与Learner的心跳,接收Learner请求并判断Learner的请求消息类型;
3 .Learner的消息类型主要有PING消息、REQUEST消息、ACK消息、REVALIDATE消息,根据不一样的消息类型,进行不一样的处理。
PING 消息是指Learner的心跳信息;REQUEST消息是Follower发送的提议信息,包括写请求及同步请求;ACK消息是Follower的对提议 的回复,超过半数的Follower经过,则commit该提议;REVALIDATE消息是用来延长SESSION有效时间。
Leader的工做流程简图以下所示:
Follower工做流程
Follower主要有四个功能:
1. 向Leader发送请求(PING消息、REQUEST消息、ACK消息、REVALIDATE消息);
2 .接收Leader消息并进行处理;
3 .接收Client的请求,若是为写请求,发送给Leader进行投票;
4 .返回Client结果。
Follower的消息循环处理以下几种来自Leader的消息:
1 .PING消息: 心跳消息;
2 .PROPOSAL消息:Leader发起的提案,要求Follower投票;
3 .COMMIT消息:服务器端最新一次提案的信息;
4 .UPTODATE消息:代表同步完成;
5 .REVALIDATE消息:根据Leader的REVALIDATE结果,关闭待revalidate的session仍是容许其接受消息;
6 .SYNC消息:返回SYNC结果到客户端,这个消息最初由客户端发起,用来强制获得最新的更新。
Follower的工做流程简图以下所示:
Zookeeper 会维护一个具备层次关系的数据结构,它很是相似于一个标准的文件系统。
Zookeeper 这种数据结构有以下这些特色:
1. 每一个子目录项如 NameService 都被称做为 znode,这个 znode 是被它所在的路径惟一标识,如 Server1 这个 znode 的标识为 /NameService/Server1(图上有一个错误,你们自行发现吧:-))。
2. znode 能够有子节点目录,而且每一个 znode 能够存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录。
3. znode 是有版本的,每一个 znode 中存储的数据能够有多个版本,也就是一个访问路径中能够存储多份数据。
4. znode 能够是临时节点,一旦建立这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通讯采用长链接方式,每一个客户端和服务器经过心跳来保持链接,这个链接状态称为 session,若是 znode 是临时节点,这个 session 失效,znode 也就删除了。
5. znode 的目录名能够自动编号,如 App1 已经存在,再建立的话,将会自动命名为 App2。
6. znode 能够被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化能够通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的不少功能都是基于这个特性实现的,后面在典型的应用场景中会有实例介绍。
ZooKeeper安装配置
ZooKeeper的安装模式分为三种,分别为:单机模式(stand-alone)、集群模式和集群伪分布模式(略)。
单机模式
下载ZooKeeper的安装包以后, 解压到合适目录. 进入ZooKeeper目录下的conf子目录, 建立zoo.cfg(或者直接更名zoo_sample.cfg):
tickTime=2000
dataDir=D:/devtools/zookeeper-3.2.2/build
clientPort=2181
|
- clientPort:这个端口就是客户端链接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
当这些配置项配置好后,你如今就能够启动 Zookeeper 了,启动后要检查 Zookeeper 是否已经在服务,能够经过 netstat -ano 命令查看是否有你配置的 clientPort 端口号在监听服务。
集群模式
Zookeeper 不只能够单机提供服务,同时也支持多机组成集群来提供服务。实际上 Zookeeper 还支持另一种伪集群的方式,也就是能够在一台物理机上运行多个 Zookeeper 实例,下面将介绍集群模式的安装和配置。
Zookeeper 的集群模式的安装和配置也不是很复杂,所要作的就是增长几个配置项。集群模式除了上面的三个配置项还要增长下面几个配置项:
initLimit=5
syncLimit=2
server.1=192.168.211.1:2888:3888
server.2=192.168.211.2:2888:3888
|
-
initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户链接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中链接到 Leader 的 Follower 服务器)初始化链接时最长能忍受多少个心跳时间间隔数。当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器尚未收到客户端的返回信息,那么代表这个客户端链接失败。总的时间长度就是 5*2000=10 秒
-
syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 2*2000=4 秒
-
server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,须要一个端口来从新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通讯的端口。若是是伪集群的配置方式,因为 B 都是同样,因此不一样的 Zookeeper 实例通讯端口号不能同样,因此要给它们分配不一样的端口号。
除了修改 zoo.cfg 配置文件,集群模式下还要配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面就有一个数据就是 A 的值,Zookeeper 启动时会读取这个文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断究竟是那个 server。
ZooKeeper经常使用命令
ZooKeeper服务命令:
1. 启动ZK服务: ./zkServer.sh start
2. 查看ZK服务状态: ./zkServer.sh status
3. 中止ZK服务: ./zkServer.sh stop
4. 重启ZK服务: ./zkServer.sh restart
ZooKeeper客户端命令:
ZooKeeper 命令行工具相似于Linux的shell环境,使用它能够对ZooKeeper进行访问,数据建立,数据修改等操做. 使用
zkCli.sh -server 127.0.0.1:2181 链接到 ZooKeeper 服务,链接成功后,系统会输出 ZooKeeper 的相关环境以及配置信息。
命令行工具的一些简单操做以下:
1. 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容
2. 显示根目录下、文件: ls2 / 查看当前节点数据并能看到更新次数等数据
3. 建立文件,并设置初始内容: create /zk "test" 建立一个新的 znode节点“ zk ”以及与它关联的字符串
4. 获取文件内容: get /zk 确认 znode 是否包含咱们所建立的字符串
5. 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置
6. 删除文件: delete /zk 将刚才建立的 znode 删除
7. 退出客户端: quit
8. 帮助命令: help
ZooKeeper 经常使用四字命令:
ZooKeeper 支持某些特定的四字命令字母与其的交互。它们大可能是查询命令,用来获取 ZooKeeper 服务的当前状态及相关信息。用户在客户端能够经过 telnet 或 nc 向 ZooKeeper 提交相应的命令
1. 能够经过命令:echo stat|nc 127.0.0.1 2181 来查看哪一个节点被选择做为follower或者leader
2. 使用echo ruok|nc 127.0.0.1 2181 测试是否启动了该Server,若回复imok表示已经启动。
3. echo dump| nc 127.0.0.1 2181 ,列出未经处理的会话和临时节点。
4. echo kill | nc 127.0.0.1 2181 ,关掉server
5. echo conf | nc 127.0.0.1 2181 ,输出相关服务配置的详细信息。
6. echo cons | nc 127.0.0.1 2181 ,列出全部链接到服务器的客户端的彻底的链接 / 会话的详细信息。
7. echo envi |nc 127.0.0.1 2181 ,输出关于服务环境的详细信息(区别于 conf 命令)。
8. echo reqs | nc 127.0.0.1 2181 ,列出未经处理的请求。
9. echo wchs | nc 127.0.0.1 2181 ,列出服务器 watch 的详细信息。
10. echo wchc | nc 127.0.0.1 2181 ,经过 session 列出服务器 watch 的详细信息,它的输出是一个与 watch 相关的会话的列表。
11. echo wchp | nc 127.0.0.1 2181 ,经过路径列出服务器 watch 的详细信息。它输出一个与 session 相关的路径。
ZooKeeper的Java客户端API
客户端要链接 Zookeeper 服务器能够经过建立 org.apache.zookeeper. ZooKeeper 的一个实例对象,而后调用这个类提供的接口来和服务器交互。
前面说了 ZooKeeper 主要是用来维护和监控一个目录节点树中存储的数据的状态,因此咱们可以操做 ZooKeeper 的节点也和操做目录节点树大致同样,如建立一个目录节点,给某个目录节点设置数据,获取某个目录节点的全部子目录节点,给某个目录节点设置权限和监控这个目录节点的状态变化。
类org.apache.zookeeper. ZooKeeper 的方法以下表所示:
|
String create(String path, byte[] data, List<ACL> acl,CreateMode createMode) |
建立一个给定的目录节点 path, 并给它设置数据,CreateMode 标识有四种形式的目录节点,分别是 PERSISTENT:持久化目录节点,这个目录节点存储的数据不会丢失;PERSISTENT_SEQUENTIAL:顺序自动编号的目录节点,这种目录节点会根据当前已近存在的节点数自动加 1,而后返回给客户端已经成功建立的目录节点名;EPHEMERAL:临时目录节点,一旦建立这个节点的客户端与服务器端口也就是 session 超时,这种节点会被自动删除;EPHEMERAL_SEQUENTIAL:临时自动编号节点 |
Stat exists(String path, boolean watch) |
判断某个 path 是否存在,并设置是否监控这个目录节点,这里的 watcher 是在建立 ZooKeeper 实例时指定的 watcher,exists方法还有一个重载方法,能够指定特定的watcher |
Stat exists(String path,Watcher watcher) |
重载方法,这里给某个目录节点设置特定的 watcher,Watcher 在 ZooKeeper 是一个核心功能,Watcher 能够监控目录节点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知全部设置在这个目录节点上的 Watcher,从而每一个客户端都很快知道它所关注的目录节点的状态发生变化,而作出相应的反应 |
void delete(String path, int version) |
删除 path 对应的目录节点,version 为 -1 能够匹配任何版本,也就删除了这个目录节点全部数据 |
List<String>getChildren(String path, boolean watch) |
获取指定 path 下的全部子目录节点,一样 getChildren方法也有一个重载方法能够设置特定的 watcher 监控子节点的状态 |
Stat setData(String path, byte[] data, int version) |
给 path 设置数据,能够指定这个数据的版本号,若是 version 为 -1 怎能够匹配任何版本 |
byte[] getData(String path, boolean watch, Stat stat) |
获取这个 path 对应的目录节点存储的数据,数据的版本等信息能够经过 stat 来指定,同时还能够设置是否监控这个目录节点数据的状态 |
voidaddAuthInfo(String scheme, byte[] auth) |
客户端将本身的受权信息提交给服务器,服务器将根据这个受权信息验证客户端的访问权限。 |
Stat setACL(String path,List<ACL> acl, int version) |
给某个目录节点从新设置访问权限,须要注意的是 Zookeeper 中的目录节点权限不具备传递性,父目录节点的权限不能传递给子目录节点。目录节点 ACL 由两部分组成:perms 和 id。 Perms 有 ALL、READ、WRITE、CREATE、DELETE、ADMIN 几种 而 id 标识了访问目录节点的身份列表,默认状况下有如下两种: ANYONE_ID_UNSAFE = new Id("world", "anyone") 和 AUTH_IDS = new Id("auth", "") 分别表示任何人均可以访问和建立者拥有访问权限。 |
List<ACL>getACL(String path,Stat stat) |
获取某个目录节点的访问权限列表 |
除了以上这些列出的方法以外还有一些重载方法,如都提供了一个回调类的重载方法以及能够设置特定 Watcher 的重载方法,具体的方法能够参考 org.apache.zookeeper. ZooKeeper 类的 API 说明。
下面给出基本的操做 ZooKeeper 的示例代码,这样你就能对 ZooKeeper 有直观的认识了。下面的清单包括了建立与 ZooKeeper 服务器的链接以及最基本的数据操做:
// 建立一个与服务器的链接
ZooKeeper zk = new ZooKeeper("localhost:" + CLIENT_PORT,
ClientBase.CONNECTION_TIMEOUT, new Watcher() {
// 监控全部被触发的事件
public void process(WatchedEvent event) {
System.out.println("已经触发了" + event.getType() + "事件!");
}
});
// 建立一个目录节点
zk.create("/testRootPath", "testRootData".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// 建立一个子目录节点
zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(),
Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/testRootPath",false,null)));
// 取出子目录节点列表
System.out.println(zk.getChildren("/testRootPath",true));
// 修改子目录节点数据
zk.setData("/testRootPath/testChildPathOne","modifyChildDataOne".getBytes(),-1);
System.out.println("目录节点状态:["+zk.exists("/testRootPath",true)+"]");
// 建立另一个子目录节点
zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(),
Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo",true,null)));
// 删除子目录节点
zk.delete("/testRootPath/testChildPathTwo",-1);
zk.delete("/testRootPath/testChildPathOne",-1);
// 删除父目录节点
zk.delete("/testRootPath",-1);
// 关闭链接
zk.close();
|
输出的结果以下:
已经触发了 None 事件!
testRootData
[testChildPathOne]
目录节点状态:[5,5,1281804532336,1281804532336,0,1,0,0,12,1,6]
已经触发了 NodeChildrenChanged 事件!
testChildDataTwo
已经触发了 NodeDeleted 事件!
已经触发了 NodeDeleted 事件!
|
当对目录节点监控状态打开时,一旦目录节点的状态发生变化,Watcher 对象的 process 方法就会被调用。
直接使用zk的api实现业务功能比较繁琐。由于要处理session loss,session expire等异常,在发生这些异常后进行重连。又由于ZK的watcher是一次性的,若是要基于wather实现发布/订阅模式,还要本身包装一下,将一次性订阅包装成持久订阅。另外若是要使用抽象级别更高的功能,好比分布式锁,leader选举等,还要本身额外作不少事情。这里介绍下ZK的两个第三方客户端包装小工具,能够分别解决上述小问题。
zkClient主要作了两件事情。一件是在session loss和session expire时自动建立新的ZooKeeper实例进行重连。另外一件是将一次性watcher包装为持久watcher。后者的具体作法是简单的在watcher回调中,从新读取数据的同时再注册相同的watcher实例。
ZkConnection 类: 对zookeeper API的简单分装,提供了连接zookeeper server和数据CRUD的操做;此类实现了IZkConnection接口,一般状况下,若是I0Itec-zkclient不能知足须要的时候,我 们能够重写ZkConnection便可.ZkClient类: 核心类,也是开发者须要直接使用的类,它内部维护了zookeeper的连接管理和Event处理逻辑等,同时也暴露了zookeeper znode的CRUD方法列表.IZkChildListener接口: znode 子节点事件侦听器,当ZkClient接收到某个path节点变动或者子节点变动事件时,会触发lisntener.IZkDataListener接 口:IZkStateListener接口: 当zookeeper客户端状态变动时,触发.
public static void testzkClient(final String serverList) {
ZkClient zkClient4subChild = new ZkClient(serverList);
zkClient4subChild.subscribeChildChanges(PATH, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List currentChilds) throws Exception {
System.out.println(prefix() + "clildren of path " + parentPath + ":" + currentChilds);
}
});
}
上面是订阅children变化,下面是订阅数据变化
ZkClient zkClient4subData = new ZkClient(serverList);
zkClient4subData.subscribeDataChanges(PATH, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println(prefix() + "Data of " + dataPath + " has changed");
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(prefix() + dataPath + " has deleted");
}
});
订阅链接状态的变化:
ZkClient zkClient4subStat = new ZkClient(serverList);
zkClient4subStat.subscribeStateChanges(new IZkStateListener() {
@Override public void handleNewSession() throws Exception {
System.out.println(prefix() + "handleNewSession()");
}
@Override
public void handleStateChanged(KeeperState stat) throws Exception {
System.out.println(prefix() + "handleStateChanged,stat:" + stat);
}
});
下面表格列出了写操做与ZK内部产生的事件的对应关系:node
event For “/path”
event For “/path/child”
|
create(“/path”) |
EventType.NodeCreated |
NA |
delete(“/path”) |
EventType.NodeDeleted |
NA |
setData(“/path”) |
EventType.NodeDataChanged |
NA |
create(“/path/child”) |
EventType.NodeChildrenChanged |
EventType.NodeCreated |
delete(“/path/child”) |
EventType.NodeChildrenChanged |
EventType.NodeDeleted |
setData(“/path/child”) |
NA |
EventType.NodeDataChanged |
而ZK内部的写事件与所触发的watcher的对应关系以下:git
event For “/path”
defaultWatcher
exists
(“/path”)
getData
(“/path”)
getChildren
(“/path”)
|
EventType.None |
√ |
√ |
√ |
√ |
EventType.NodeCreated |
|
√ |
√ |
|
EventType.NodeDeleted |
|
√(不正常) |
√ |
|
EventType.NodeDataChanged |
|
√ |
√ |
|
EventType.NodeChildrenChanged |
|
|
|
√ |
综合上面两个表,咱们能够总结出各类写操做能够触发哪些watcher,以下表所示:github
“/path”
“/path/child”
exists
getData
getChildren
exists
getData
getChildren
|
|
create(“/path”) |
√ |
√ |
|
|
|
|
delete(“/path”) |
√ |
√ |
√ |
|
|
|
setData(“/path”) |
√ |
√ |
|
|
|
|
create(“/path/child”) |
|
|
√ |
√ |
√ |
|
delete(“/path/child”) |
|
|
√ |
√ |
√ |
√ |
setData(“/path/child”) |
|
|
|
√ |
√ |
|
若是发生session close、authFail和invalid,那么全部类型的wather都会被触发。算法
zkClient除了作了一些便捷包装以外,对watcher使用作了一点加强。好比subscribeChildChanges其实是经过exists和getChildren关注了两个事件。这样当create(“/path”)时,对应path上经过getChildren注册的listener也会被调用。另外subscribeDataChanges实际上只是经过exists注册了事件。由于从上表能够看到,对于一个更新,经过exists和getData注册的watcher要么都会触发,要么都不会触发。shell
zkClient地址:https://github.com/sgroschupf/zkclient
Maven工程中使用zkClient须要加的依赖:数据库
<dependency>
<groupId>zkclient</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
menagerie基于Zookeeper实现了java.util.concurrent包的一个分布式版本。这个封装是更大粒度上对各类分布式一致性使用场景的抽象。其中最基础和经常使用的是一个分布式锁的实现:
org.menagerie.locks.ReentrantZkLock,经过ZooKeeper的全局有序的特性和EPHEMERAL_SEQUENTIAL类型znode的支持,实现了分布式锁。具体作法是:不一样的client上每一个试图得到锁的线程,都在相同的basepath下面建立一个EPHEMERAL_SEQUENTIAL的node。EPHEMERAL表示要建立的是临时znode,建立链接断开时会自动删除; SEQUENTIAL表示要自动在传入的path后面缀上一个自增的全局惟一后缀,做为最终的path。所以对不一样的请求ZK会生成不一样的后缀,并分别返回带了各自后缀的path给各个请求。由于ZK全局有序的特性,无论client请求怎样前后到达,在ZKServer端都会最终排好一个顺序,所以自增后缀最小的那个子节点,就对应第一个到达ZK的有效请求。而后client读取basepath下的全部子节点和ZK返回给本身的path进行比较,当发现本身建立的sequential node的后缀序号排在第一个时,就认为本身得到了锁;不然的话,就认为本身没有得到锁。这时确定是有其余并发的而且是没有断开的client/线程先建立了node。
基于分布式锁,还实现了其余业务场景,好比leader选举:
public static void leaderElectionTest() {
ZkSessionManager zksm = new DefaultZkSessionManager(“ZK-host-ip:2181″, 5000);
LeaderElector elector = new ZkLeaderElector(“/leaderElectionTest”, zksm, Ids.OPEN_ACL_UNSAFE);
if (elector.nominateSelfForLeader()) {
System.out.println(“Try to become the leader success!”);
}
}apache
java.util.concurrent包下面的其余接口实现,也主要是基于ReentrantZkLock的,好比ZkHashMap实现了ConcurrentMap。具体请参见menagerie的API文档设计模式
menagerie地址:https://github.com/openUtility/menagerie
Maven工程中使用menagerie须要加的依赖:
<dependency>
<groupId>org.menagerie</groupId>
<artifactId>menagerie</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
Zookeeper 从设计模式角度来看,是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理你们都关心的数据,而后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者作出相应的反应,从而实现集群中相似 Master/Slave 管理模式,关于 Zookeeper 的详细架构等内部细节能够阅读 Zookeeper 的源码。
下面详细介绍这些典型的应用场景,也就是 Zookeeper 到底能帮咱们解决那些问题?下面将给出答案。
分布式应用中,一般须要有一套完整的命名规则,既可以产生惟一的名称又便于人识别和记住,一般状况下用树形的名称结构是一个理想的选择,树形的名称结构是一个有层次的目录结构,既对人友好又不会重复。说到这里你可能想到了 JNDI,没错 Zookeeper 的 Name Service 与 JNDI 可以完成的功能是差很少的,它们都是将有层次的目录结构关联到必定资源上,可是 Zookeeper 的 Name Service 更加是普遍意义上的关联,也许你并不须要将名称关联到特定资源上,你可能只须要一个不会重复名称,就像数据库中产生一个惟一的数字主键同样。
Name Service 已是 Zookeeper 内置的功能,你只要调用 Zookeeper 的 API 就能实现。如调用 create 接口就能够很容易建立一个目录节点。
配置管理(Configuration Management)
配置的管理在分布式应用环境中很常见,例如同一个应用系统须要多台 PC Server 运行,可是它们运行的应用系统的某些配置项是相同的,若是要修改这些相同的配置项,那么就必须同时修改每台运行这个应用系统的 PC Server,这样很是麻烦并且容易出错。
像这样的配置信息彻底能够交给 Zookeeper 来管理,将配置信息保存在 Zookeeper 的某个目录节点中,而后将全部须要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收到 Zookeeper 的通知,而后从 Zookeeper 获取新的配置信息应用到系统中。
Zookeeper 可以很容易的实现集群管理的功能,若有多台 Server 组成一个服务集群,那么必需要一个“总管”知道当前集群中每台机器的服务状态,一旦有机器不能提供服务,集群中其它集群必须知道,从而作出调整从新分配服务策略。一样当增长集群的服务能力时,就会增长一台或多台 Server,一样也必须让“总管”知道。
Zookeeper 不只可以帮你维护当前的集群中机器的服务状态,并且可以帮你选出一个“总管”,让这个总管来管理集群,这就是 Zookeeper 的另外一个功能 Leader Election。
它们的实现方式都是在 Zookeeper 上建立一个 EPHEMERAL 类型的目录节点,而后每一个 Server 在它们建立目录节点的父目录节点上调用
getChildren(
String path, boolean watch) 方法并设置 watch 为 true,因为是 EPHEMERAL 目录节点,当建立它的 Server 死去,这个目录节点也随之被删除,因此 Children 将会变化,这时
getChildren上的 Watch 将会被调用,因此其它 Server 就知道已经有某台 Server 死去了。新增 Server 也是一样的原理。
Zookeeper 如何实现 Leader Election,也就是选出一个 Master Server。和前面的同样每台 Server 建立一个 EPHEMERAL 目录节点,不一样的是它仍是一个 SEQUENTIAL 目录节点,因此它是个 EPHEMERAL_SEQUENTIAL 目录节点。之因此它是 EPHEMERAL_SEQUENTIAL 目录节点,是由于咱们能够给每台 Server 编号,咱们能够选择当前是最小编号的 Server 为 Master,假如这个最小编号的 Server 死去,因为是 EPHEMERAL 节点,死去的 Server 对应的节点也被删除,因此当前的节点列表中又出现一个最小编号的节点,咱们就选择这个节点为当前 Master。这样就实现了动态选择 Master,避免了传统意义上单 Master 容易出现单点故障的问题。
void findLeader() throws InterruptedException {
byte[] leader = null;
try {
leader = zk.getData(root + "/leader", true, null);
} catch (Exception e) {
logger.error(e);
}
if (leader != null) {
following();
} else {
String newLeader = null;
try {
byte[] localhost = InetAddress.getLocalHost().getAddress();
newLeader = zk.create(root + "/leader", localhost,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e) {
logger.error(e);
}
if (newLeader != null) {
leading();
} else {
mutex.wait();
}
}
}
|
共享锁在同一个进程中很容易实现,可是在跨进程或者在不一样 Server 之间就很差实现了。Zookeeper 却很容易实现这个功能,实现方式也是须要得到锁的 Server 建立一个 EPHEMERAL_SEQUENTIAL 目录节点,而后调用
getChildren方法获取当前的目录节点列表中最小的目录节点是否是就是本身建立的目录节点,若是正是本身建立的,那么它就得到了这个锁,若是不是那么它就调用
exists(
String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到本身建立的节点是列表中最小编号的目录节点,从而得到锁,释放锁很简单,只要删除前面它本身所建立的目录节点就好了。
void getLock() throws KeeperException, InterruptedException{
List<String> list = zk.getChildren(root, false);
String[] nodes = list.toArray(new String[list.size()]);
Arrays.sort(nodes);
if(myZnode.equals(root+"/"+nodes[0])){
doAction();
}
else{
waitForLock(nodes[0]);
}
}
void waitForLock(String lower) throws InterruptedException, KeeperException {
Stat stat = zk.exists(root + "/" + lower,true);
if(stat != null){
mutex.wait();
}
else{
getLock();
}
}
|
Zookeeper 能够处理两种类型的队列:
(1)当一个队列的成员都聚齐时,这个队列才可用,不然一直等待全部成员到达,这种是同步队列。
(2)队列按照 FIFO 方式进行入队和出队操做,例如实现生产者和消费者模型。
同步队列用 Zookeeper 实现的实现思路以下:
建立一个父目录 /synchronizing,每一个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,而后每一个成员都加入这个队列,加入队列的方式就是建立 /synchronizing/member_i 的临时目录节点,而后每一个成员获取 / synchronizing 目录的全部目录节点,也就是 member_i。判断 i 的值是否已是成员的个数,若是小于成员个数等待 /synchronizing/start 的出现,若是已经相等就建立 /synchronizing/start。
用下面的流程图更容易理解:
同步队列的关键代码以下:
void addQueue() throws KeeperException, InterruptedException{
zk.exists(root + "/start",true);
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
synchronized (mutex) {
List<String> list = zk.getChildren(root, false);
if (list.size() < size) {
mutex.wait();
} else {
zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
}
}
|
当队列没尽是进入 wait(),而后会一直等待 Watch 的通知,Watch 的代码以下:
public void process(WatchedEvent event) {
if(event.getPath().equals(root + "/start") &&
event.getType() == Event.EventType.NodeCreated){
System.out.println("获得通知");
super.process(event);
doAction();
}
}
|
FIFO 队列用 Zookeeper 实现思路以下:
实现的思路也很是简单,就是在特定的目录下建立 SEQUENTIAL 类型的子目录 /queue_i,这样就能保证全部成员加入队列时都是有编号的,出队列时经过 getChildren( ) 方法能够返回当前全部的队列中的元素,而后消费其中最小的一个,这样就能保证 FIFO。
下面是生产者和消费者这种队列形式的示例代码:
boolean produce(int i) throws KeeperException, InterruptedException{
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
|
int consume() throws KeeperException, InterruptedException{
int retvalue = -1;
Stat stat = null;
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
mutex.wait();
} else {
Integer min = new Integer(list.get(0).substring(7));
for(String s : list){
Integer tempValue = new Integer(s.substring(7));
if(tempValue < min) min = tempValue;
}
byte[] b = zk.getData(root + "/element" + min,false, stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
|
参考资料