原文连接:ZooKeeper 客户端之 Curatorhtml
ZooKeeper 是一个分布式的、开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现。它是集群的管理者,监视着集群中各个节点的状态,根据节点提交的反馈进行下一步合理操做。最终,将简单易用的接口和性能高效、功能稳定的系统提供给用户。java
Curator 是 Netflix 公司开源的一套 Zookeeper 客户端框架,解决了不少 Zookeeper 客户端很是底层的细节开发工做,包括链接重连、反复注册 Watcher 和 NodeExistsException 异常等等。Curator 包含了几个包:node
目前 Curator 有 2.x.x 和 3.x.x 两个系列的版本,支持不一样版本的 Zookeeper。其中Curator 2.x.x 兼容 Zookeeper的 3.4.x 和 3.5.x。而 Curator 3.x.x 只兼容 Zookeeper 3.5.x,而且提供了一些诸如动态从新配置、watch删除等新特性。linux
Curator 2.x.x - compatible with both ZooKeeper 3.4.x and ZooKeeper 3.5.x
Curator 3.x.x - compatible only with ZooKeeper 3.5.x and includes support for new
复制代码
若是跨版本会有兼容性问题,颇有可能致使节点操做失败,当时在使用的时候就踩了这个坑,抛了以下的异常:git
KeeperErrorCode = Unimplemented for /***
复制代码
这里就不对比与原生 API 的区别了,Curator 的 API 直接经过 org.apache.curator.framework.CuratorFramework 接口来看,并结合相应的案例进行使用,以备后用。github
为了能够直观的看到 Zookeeper 的节点信息,能够考虑弄一个 zk 的管控界面,常见的有 zkui 和 zkweb。web
zkui:github.com/DeemOpen/zk…apache
zkweb:github.com/zhitom/zkwe…api
我用的 zkweb ,虽然界面上看起来没有 zkui 精简,可是在层次展现和一些细节上感受比 zkui 好一点缓存
以前写的一个在 Linux 上安装部署 Zookeeper 的笔记,其余操做系统请自行谷歌教程吧。
本文案例工程已经同步到了 github,传送门。
PS : 目前尚未看过Curator的具体源码,因此不会涉及到任何源码解析、实现原理的东西;本篇主要是实际使用时的一些记录,以备后用。若是文中错误之处,但愿各位指出。
在实际的工程中,Zookeeper 客户端的初始化会在程序启动期间完成。
在 Spring 或者 SpringBoot 工程中最多见的就是绑定到容器启动的生命周期或者应用启动的生命周期中:
除了上面的方式以外,还有一种常见的是绑定到 bean 的生命周期中
关于 SpringBoot中的事件机制能够参考以前写过的一篇文章:SpringBoot-SpringBoot中的事件机制。
这里使用 InitializingBean 的这种方式,代码以下:
public class ZookeeperCuratorClient implements InitializingBean {
private CuratorFramework curatorClient;
@Value("${glmapper.zookeeper.address:localhost:2181}")
private String connectString;
@Value("${glmapper.zookeeper.baseSleepTimeMs:1000}")
private int baseSleepTimeMs;
@Value("${glmapper.zookeeper.maxRetries:3}")
private int maxRetries;
@Value("${glmapper.zookeeper.sessionTimeoutMs:6000}")
private int sessionTimeoutMs;
@Value("${glmapper.zookeeper.connectionTimeoutMs:6000}")
private int connectionTimeoutMs;
@Override
public void afterPropertiesSet() throws Exception {
// custom policy
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
// to build curatorClient
curatorClient = CuratorFrameworkFactory.builder().connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(retryPolicy).build();
curatorClient.start();
}
public CuratorFramework getCuratorClient() {
return curatorClient;
}
}
复制代码
glmapper.zookeeper.xxx 是本例中须要在配置文件中配置的 zookeeper 的一些参数,参数解释以下:
另外,Curator 客户端初始化时还须要指定重试策略,RetryPolicy 接口是 Curator 中重试链接(当zookeeper失去链接时使用)策略的顶级接口,其类继承体系以下图所示:
除上述以外,在一些场景中,须要对不一样的业务进行隔离,这种状况下,能够经过设置 namespace 来解决,namespace 实际上就是指定zookeeper的根路径,设置以后,后面的全部操做都会基于该根目录。
checkExists 方法返回的是一个 ExistsBuilder 构造器,这个构建器将返回一个 Stat 对象,就像调用了 org.apache.zookeeper.ZooKeeper.exists()同样。null 表示它不存在,而实际的 Stat 对象表示存在。
public void checkNodeExist(String path) throws Exception {
Stat stat = curatorClient.checkExists().forPath(path);
if (stat != null){
throw new RuntimeException("path = "+path +" has bean exist.");
}
}
复制代码
建议在实际的应用中,操做节点时对所需操做的节点进行 checkExists。
非递归方式建立节点
curatorClient.create().forPath("/glmapper");
curatorClient.create().forPath("/glmapper/test");
复制代码
先建立/glmapper,而后再在/glmapper 下面建立 /test ,若是直接使用 /glmapper/test 没有先建立 /glmapper 时,会抛出异常:
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /glmapper/test
复制代码
若是须要在建立节点时指定节点中数据,则能够这样:
curatorClient.create().forPath("/glmapper","data".getBytes());
复制代码
指定节点类型(EPHEMERAL 临时节点)
curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath("/glmapper","data".getBytes());
复制代码
递归方式建立节点
递归方式建立节点有两个方法,creatingParentsIfNeeded 和 creatingParentContainersIfNeeded。在新版本的 zookeeper 这两个递归建立方法会有区别; creatingParentContainersIfNeeded() 以容器模式递归建立节点,若是旧版本 zookeeper,此方法等于creatingParentsIfNeeded()。
在非递归方式状况下,若是直接建立 /glmapper/test 会报错,那么在递归的方式下则是能够的
curatorClient.create().creatingParentContainersIfNeeded().forPath("/glmapper/test");
复制代码
在递归调用中,若是不指定 CreateMode,则默认PERSISTENT
,若是指定为临时节点,则最终节点会是临时节点,父节点仍旧是PERSISTENT
非递归删除节点
curatorClient.delete().forPath("/glmapper/test");
复制代码
指定具体版本
curatorClient.delete().withVersion(-1).forPath("/glmapper/test");
复制代码
使用 guaranteed 方式删除,guaranteed 会保证在session有效的状况下,后台持续进行该节点的删除操做,直到删除掉
curatorClient.delete().guaranteed().withVersion(-1).forPath("/glmapper/test");
复制代码
递归删除当前节点及其子节点
curatorClient.delete().deletingChildrenIfNeeded().forPath("/glmapper/test");
复制代码
获取节点数据
byte[] data = curatorClient.getData().forPath("/glmapper/test");
复制代码
根据配置的压缩提供程序对数据进行解压缩处理
byte[] data = curatorClient.getData().decompressed().forPath("/glmapper/test");
复制代码
读取数据并得到Stat信息
Stat stat = new Stat();
byte[] data = curatorClient.getData().storingStatIn(stat).forPath("/glmapper/test");
复制代码
设置指定值
curatorClient.setData().forPath("/glmapper/test","newData".getBytes());
复制代码
设置数据并使用配置的压缩提供程序压缩数据
curatorClient.setData().compressed().forPath("/glmapper/test","newData".getBytes());
复制代码
设置数据,并指定版本
curatorClient.setData().withVersion(-1).forPath("/glmapper/test","newData".getBytes());
复制代码
List<String> childrenList = curatorClient.getChildren().forPath("/glmapper");
复制代码
Curator 也对 Zookeeper 典型场景之事件监听进行封装,这部分能力实在 curator-recipes 包下的。
在使用不一样的方法时会有不一样的事件发生
public enum CuratorEventType
{
//Corresponds to {@link CuratorFramework#create()}
CREATE,
//Corresponds to {@link CuratorFramework#delete()}
DELETE,
//Corresponds to {@link CuratorFramework#checkExists()}
EXISTS,
//Corresponds to {@link CuratorFramework#getData()}
GET_DATA,
//Corresponds to {@link CuratorFramework#setData()}
SET_DATA,
//Corresponds to {@link CuratorFramework#getChildren()}
CHILDREN,
//Corresponds to {@link CuratorFramework#sync(String, Object)}
SYNC,
//Corresponds to {@link CuratorFramework#getACL()}
GET_ACL,
//Corresponds to {@link CuratorFramework#setACL()}
SET_ACL,
//Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()}
WATCHED,
//Event sent when client is being closed
CLOSING
}
复制代码
利用 Watcher 来对节点进行监听操做,能够典型业务场景须要使用可考虑,但通常状况不推荐使用。
byte[] data = curatorClient.getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听器 watchedEvent:" + watchedEvent);
}
}).forPath("/glmapper/test");
System.out.println("监听节点内容:" + new String(data));
// 第一次变动节点数据
curatorClient.setData().forPath("/glmapper/test","newData".getBytes());
// 第二次变动节点数据
curatorClient.setData().forPath("/glmapper/test","newChangedData".getBytes());
复制代码
上面这段代码对 /glmapper/test 节点注册了一个 Watcher 监听事件,而且返回当前节点的内容。后面进行两次数据变动,实际上第二次变动时,监听已经失效,没法再次得到节点变更事件了。测试中控制台输出的信息以下:
监听节点内容:data
watchedEvent:WatchedEvent state:SyncConnected type:NodeDataChanged path:/glmapper/test
复制代码
CuratorListener 监听,此监听主要针对 background 通知和错误通知。使用此监听器以后,调用inBackground 方法会异步得到监听,对于节点的建立或修改则不会触发监听事件。
CuratorListener listener = new CuratorListener(){
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event : " + event);
}
};
// 绑定监听器
curatorClient.getCuratorListenable().addListener(listener);
// 异步获取节点数据
curatorClient.getData().inBackground().forPath("/glmapper/test");
// 更新节点数据
curatorClient.setData().forPath("/glmapper/test","newData".getBytes());
复制代码
测试中控制台输出的信息以下:
event : CuratorEventImpl{type=GET_DATA, resultCode=0, path='/glmapper/test', name='null', children=null, context=null, stat=5867,5867,1555140974671,1555140974671,0,0,0,0,4,0,5867
, data=[100, 97, 116, 97], watchedEvent=null, aclList=null}
复制代码
这里只触发了一次监听回调,就是 getData 。
Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听能够理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。Cache 分为两类注册类型:节点监听和子节点监听。
NodeCache
监听数据节点自己的变化。对节点的监听须要配合回调函数来进行处理接收到监听事件以后的业务处理。NodeCache 经过 NodeCacheListener 来完成后续处理。
String path = "/glmapper/test";
final NodeCache nodeCache = new NodeCache(curatorClient,path);
//若是设置为true则在首次启动时就会缓存节点内容到Cache中。 nodeCache.start(true);
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("触发监听回调,当前节点数据为:" + new String(nodeCache.getCurrentData().getData()));
}
});
curatorClient.setData().forPath(path,"1".getBytes());
curatorClient.setData().forPath(path,"2".getBytes());
curatorClient.setData().forPath(path,"3".getBytes());
curatorClient.setData().forPath(path,"4".getBytes());
curatorClient.setData().forPath(path,"5".getBytes());
curatorClient.setData().forPath(path,"6".getBytes());
复制代码
注意:在测试过程当中,nodeCache.start(),NodeCache 在前后屡次修改监听节点的内容时,出现了丢失事件现象,在用例执行的5次中,仅一次监听到了所有事件;若是 nodeCache.start(true),NodeCache 在前后屡次修改监听节点的内容时,不会出现丢失现象。
NodeCache不只能够监听节点内容变化,还能够监听指定节点是否存在。若是本来节点不存在,那么Cache就会在节点被建立时触发监听事件,若是该节点被删除,就没法再触发监听事件。
PathChildrenCache
PathChildrenCache 不会对二级子节点进行监听,只会对子节点进行监听。
String path = "/glmapper";
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorClient,path,true);
// 若是设置为true则在首次启动时就会缓存节点内容到Cache中。 nodeCache.start(true);
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
System.out.println("-----------------------------");
System.out.println("event:" + event.getType());
if (event.getData()!=null){
System.out.println("path:" + event.getData().getPath());
}
System.out.println("-----------------------------");
}
});
zookeeperCuratorClient.createNode("/glmapper/test","data".getBytes(),CreateMode.PERSISTENT);
Thread.sleep(1000);
curatorClient.setData().forPath("/glmapper/test","1".getBytes());
Thread.sleep(1000);
curatorClient.setData().forPath("/glmapper/test","2".getBytes());
Thread.sleep(1000);
zookeeperCuratorClient.createNode("/glmapper/test/second","data".getBytes(),CreateMode.PERSISTENT);
Thread.sleep(1000);
curatorClient.setData().forPath("/glmapper/test/second","1".getBytes());
Thread.sleep(1000);
curatorClient.setData().forPath("/glmapper/test/second","2".getBytes());
Thread.sleep(1000);
复制代码
注意:在测试过程当中发现,若是连续两个操做之间不进行必定时间的间隔,会致使没法监听到下一次事件。所以只会监听子节点,因此对二级子节点 /second 下面的操做是监听不到的。测试中控制台输出的信息以下:
-----------------------------
event:CHILD_ADDED
path:/glmapper/test
-----------------------------
-----------------------------
event:INITIALIZED
-----------------------------
-----------------------------
event:CHILD_UPDATED
path:/glmapper/test
-----------------------------
-----------------------------
event:CHILD_UPDATED
path:/glmapper/test
-----------------------------
复制代码
TreeCache
TreeCache 使用一个内部类TreeNode
来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。因此TreeCache 能够监听当前节点下全部节点的事件。
String path = "/glmapper";
TreeCache treeCache = new TreeCache(curatorClient,path);
treeCache.getListenable().addListener((client,event)-> {
System.out.println("-----------------------------");
System.out.println("event:" + event.getType());
if (event.getData()!=null){
System.out.println("path:" + event.getData().getPath());
}
System.out.println("-----------------------------");
});
treeCache.start();
zookeeperCuratorClient.createNode("/glmapper/test","data".getBytes(),CreateMode.PERSISTENT);
Thread.sleep(1000);
curatorClient.setData().forPath("/glmapper/test","1".getBytes());
Thread.sleep(1000);
curatorClient.setData().forPath("/glmapper/test","2".getBytes());
Thread.sleep(1000);
zookeeperCuratorClient.createNode("/glmapper/test/second","data".getBytes(),CreateMode.PERSISTENT);
Thread.sleep(1000);
curatorClient.setData().forPath("/glmapper/test/second","1".getBytes());
Thread.sleep(1000);
curatorClient.setData().forPath("/glmapper/test/second","2".getBytes());
Thread.sleep(1000);
复制代码
测试中控制台输出的信息以下:
-----------------------------
event:NODE_ADDED
path:/glmapper
-----------------------------
-----------------------------
event:NODE_ADDED
path:/glmapper/test
-----------------------------
-----------------------------
event:NODE_UPDATED
path:/glmapper/test
-----------------------------
-----------------------------
event:NODE_UPDATED
path:/glmapper/test
-----------------------------
-----------------------------
event:NODE_ADDED
path:/glmapper/test/second
-----------------------------
-----------------------------
event:NODE_UPDATED
path:/glmapper/test/second
-----------------------------
-----------------------------
event:NODE_UPDATED
path:/glmapper/test/second
-----------------------------
复制代码
CuratorFramework 的实例包含 inTransaction( ) 接口方法,调用此方法开启一个 ZooKeeper 事务。 能够复合create、 setData、 check、and/or delete 等操做而后调用 commit() 做为一个原子操做提交。
// 开启事务
CuratorTransaction curatorTransaction = curatorClient.inTransaction();
Collection<CuratorTransactionResult> commit =
// 操做1
curatorTransaction.create().withMode(CreateMode.EPHEMERAL).forPath("/glmapper/transaction")
.and()
// 操做2
.delete().forPath("/glmapper/test")
.and()
// 操做3
.setData().forPath("/glmapper/transaction", "data".getBytes())
.and()
// 提交事务
.commit();
Iterator<CuratorTransactionResult> iterator = commit.iterator();
while (iterator.hasNext()){
CuratorTransactionResult next = iterator.next();
System.out.println(next.getForPath());
System.out.println(next.getResultPath());
System.out.println(next.getType());
}
复制代码
这里debug看了下Collection信息,面板以下:
前面提到的增删改查都是同步的,可是 Curator 也提供了异步接口,引入了 BackgroundCallback 接口用于处理异步接口调用以后服务端返回的结果信息。BackgroundCallback 接口中一个重要的回调值为 CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
在使用上也是很是简单的,只须要带上 inBackground() 就行,以下:
curatorClient.getData().inBackground().forPath("/glmapper/test");
复制代码
经过查看 inBackground 方法定义能够看到,inBackground 支持自定义线程池来处理返回结果以后的业务逻辑。
public T inBackground(BackgroundCallback callback, Executor executor);
复制代码
这里就不贴代码了。
本文主要围绕 Curator 的基本 API 进行了学习记录,对于原理及源码部分没有涉及。这部分若是有时间在慢慢研究吧。另外像分布式锁、分布式自增序列等实现停留在理论阶段,没有实践,不敢妄论,用到再码吧。