Curator是Netflix公司开源的一套zookeeper客户端框架。java
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, 5000,3000,new RetryNTimes(10, 5000));
一、ZK_ADDRESS 链接zk的地址 格式host1:port1,host2:port2,。。。
二、sessionTimeoutMs 会话超时时间,单位是毫秒,可不填测此参数,默认值为60000ms
三、connectionTimeoutMs
四、retryPolicy 重试策略,内建有四种重试策略,也能够自行实现RetryPolicy接口。
client.start()
四种节点
PERSISTENT:持久化
PERSISTENT_SEQUENTIAL:持久化而且带序列号
EPHEMERAL:临时
EPHEMERAL_SEQUENTIAL:临时而且带序列号node
一、默认持久化节点
client.create().forPath("/data1");
二、建立持久化节点并赋值
client.create().forPath("/data2", "this is data2".getBytes());
三、建立临时空节点
client.create().withMode(CreateMode.EPHEMERAL).forPath("/data3");
四、建立临时节点并赋值
client.create().withMode(CreateMode.EPHEMERAL).forPath("/data4", "this is data4".getBytes());
五、建立临时有序节点并赋值并递归建立父节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPH_SEQ).forPath("/patent/data", "children".getBytes());git
流式风格,拼接顺序能够调整。
一、只能删除叶子节点,不然抛出异常
client.delete().forPath("/data1");
二、删除一个节点并递归删除其全部的子节点
client.delete().deletingChildrenIfNeeded().forPath("/patent/data");
三、保证强制删除一个节点,只要客户端会话有效就会持续进行删除直到删除成功。
client.delete().guaranteed().forPath("/data2");github
一、读节点的数据内容,返回byte数组
byte[] data=client.getData().forPath("/data2");
二、读取一个节点的数据内容,同时获取到该节点的stat
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("data4");
三、获取一个路径下全部的子节点
List<String> childrens = client.getChildren().forPath("/");
四、检查节点是否存在,返回一个stat
stat= client.checkExists().forPath("/data3");apache
一、更新一个节点的数据内容,返回一个stat
Stat stat = client.setData().forPath("/data1","update data1 data".getBytes());api
CuratorFramework的实例包含inTransaction()接口方法,调用此方法开启一个ZooKeeper事务. 能够复合create, setData, check, and/or delete 等操做而后调用commit()做为一个原子操做提交。 数组
client.inTransaction()
.and().create().withMode(CreateMode.PERSISTENT).forPath("/data5", "this is data5".getBytes())
.and().setData().forPath("/data5", "update data5".getBytes())
.and().commit();缓存
Zookeeper原生支持经过注册Watcher来进行事件监听,可是开发者须要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,能够看做是对事件监听的本地缓存视图,可以自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。session
一、Path Cache框架
Path Cache用来监控一个ZNode的子节点. 当一个子节点增长, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将经过PathChildrenCacheListener通知。
package com.vi.test; import com.vi.util.CuratorClientUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.zookeeper.CreateMode; import java.util.List; public class CuratorWatchTest { private static final String PATH = "/path/cache"; public static void main(String[] args) { //建立客户端 CuratorFramework client = CuratorClientUtil.getClient(); client.start(); try { PathChildrenCache cache = pathCacheTest(client); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data1", "first data".getBytes()); Thread.sleep(500); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data2", "second data".getBytes()); Thread.sleep(500); //获取全部子节点 List<ChildData> datas =cache.getCurrentData(); for(ChildData childData : datas){ System.out.println("节点路径:"+childData.getPath() + ",节点值:" + new String(childData.getData())); } client.setData().forPath(PATH + "/data1", "update data".getBytes()); Thread.sleep(500); client.delete().deletingChildrenIfNeeded().forPath("/path"); Thread.sleep(500); cache.close(); client.close(); } catch (Exception e) { e.printStackTrace(); } } public static PathChildrenCache pathCacheTest(CuratorFramework client) throws Exception { //第三个参数不为true时,不会缓存data数据 PathChildrenCache cache = new PathChildrenCache(client, PATH, true); /* 三种启动方式 NORMAL:正常初始化。 BUILD_INITIAL_CACHE:在调用start()以前会调用rebuild()。 POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个 cache.start(PathChildrenCache.StartMode.NORMAL); */ cache.start(); //增长监听事件 cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("事件为:" + event.getType() + ",数据为:" + new String(event.getData().getData())); } }); return cache; } }
2)Node Cache
Node Cache与Path Cache相似,Node Cache只能监听某一个特定的节点。
package com.vi.test; import com.vi.util.CuratorClientUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.zookeeper.CreateMode; public class CuratorNodeCacheTest { private static final String PATH = "/path/cache"; public static void main(String[] args) { //建立客户端 CuratorFramework client = CuratorClientUtil.getClient(); client.start(); try { NodeCache cache = nodeCacheTest(client); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH, "first data".getBytes()); Thread.sleep(500); client.setData().forPath(PATH, "update data".getBytes()); Thread.sleep(500); client.delete().deletingChildrenIfNeeded().forPath("/path"); Thread.sleep(500); cache.close(); client.close(); } catch (Exception e) { e.printStackTrace(); } } public static NodeCache nodeCacheTest(CuratorFramework client) throws Exception { //只能监控一个节点 NodeCache cache = new NodeCache(client, PATH); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData childData = cache.getCurrentData(); if (childData == null) { System.out.println("节点删除!"); } else { System.out.println("节点数据:" + new String(cache.getCurrentData().getData())); } } }); cache.start(); return cache; } }
3)Tree Cache
Tree Cache能够监控整个树上的全部节点,相似于PathCache和NodeCache的组合。
package com.vi.test; import com.vi.util.CuratorClientUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.zookeeper.CreateMode; public class CuratorTreeCacheTest { private static final String PATH = "/path/cache"; public static void main(String[] args) { //建立客户端 CuratorFramework client = CuratorClientUtil.getClient(); client.start(); try { TreeCache cache = treeCacheTest(client); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data1", "first data".getBytes()); Thread.sleep(500); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data2", "second data".getBytes()); Thread.sleep(500); client.setData().forPath(PATH , "update path data".getBytes()); Thread.sleep(500); client.setData().forPath(PATH + "/data1", "update data".getBytes()); Thread.sleep(500); client.delete().deletingChildrenIfNeeded().forPath("/path"); Thread.sleep(1000); cache.close(); client.close(); } catch (Exception e) { e.printStackTrace(); } } public static TreeCache treeCacheTest(CuratorFramework client) throws Exception { //只能监控一个节点 TreeCache cache = new TreeCache(client, PATH); cache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("事件为:" + event.getType() + ",节点路径为:" + event.getData().getPath() + ",数据为:" + new String(event.getData().getData())); } }); cache.start(); return cache; } }
curator-recipes中有一些高级特性可使用,在后面的章节会具体介绍。
源码地址: https://github.com/binary-vi/binary.github.io/tree/master/zk-curator