构建操做包装类(Builder): GetDataBuilder getData() ---- CuratorFrameworknode
GetDataBuilderapache
storingStatIn(org.apache.zookeeper.data.Stat stat) // 把服务器端获取的状态数据存储到stat对象api
Byte[] forPath (String path) // 节点路劲缓存
public void readNode(String path) throws Exception { Stat stat = new Stat(); byte[] data = client.getData().storingStatIn(stat).forPath(path); System.out.println("读取节点" + path + "的数据:" + new String(data)); System.out.println(stat.toString()); }
构建操做包装类 (Builder) SetDataBuilder setData() ---- CuratorFramework服务器
SetDataBuilder异步
withVersion (int version) // 特定版本号函数
forPath (String path, byte[] data) // 节点路劲ui
forPath (String path) // 节点路劲this
public void updateNode(String path, byte[] data, int version) throws Exception { client.setData().withVersion(version).forPath(path, data); }
构建操做包装类(Builder):GetChildrenBuilder getChildren() --- CuratorFramework线程
GetChildrenBuilder
storingStatIn(org.apache.zookeeper.data.Stat stat) // 把服务器端获取的状态数据存储到stat对象
Byte[] forPath(String path) // 节点路径
usingWatcher(org.apache.zookeeper.Watcher watcher) // 设置watcher ,相似zk自己api ,也只能使用一次
usingWatcher(CuratorWatcher watcher) // 设置watcher ,相似于zk自己的api,也只能使用一次
public void getChildren(String path) throws Exception { List<String> children = client.getChildren().usingWatcher(new WatcherTest()).forPath("/curator"); for (String pth : children) { System.out.println("child=" + pth); } }
监听数据节点的内容变动
监听节点的建立,即若是指定的节点不存在,则节点建立后,会触发这个监听
监听指定节点的子节点变化状况
包括:新增子节点 子节点数据变动和子节点删除
构造函数
NodeCache(CuratorFramework client, String path)
NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
client : 客户端实例
path : 数据节点路径
dataIsCompressed : 是否进行数据压缩
public interface NodeCacheListener { // 没有参数,怎么获取事件信息以及节点数据 void nodeChanged() throws Exception; }
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) { this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); } public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory) { this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); } public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory) { this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); } public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ExecutorService executorService) { this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService)); }
public interface PathChildrenCacheListener { void childEvent(CuratorFramework var1, PathChildrenCacheEvent var2) throws Exception; }
client : 客户端实例
path : 数据节点路径
dataIsCompressed : 是否进行数据压缩
cacheData : 用于配置是否把节点内容缓存起来,若是配置true,那么客户端在接收到节点列表变动时,也
可以获取到节点的数据内容,若是为false则没法取到数据内容
threadFactory : 经过这两个参数构造专门的线程池来处理事件通知
ExecutorService
public static enum Type { CHILD_ADDED, // 新增子节点(CHILD_ADDED) CHILD_UPDATED, // 子节点数据变动(CHILD_UPDATED) CHILD_REMOVED, // 子节点删除(CHILD_REMOVED) CONNECTION_SUSPENDED, CONNECTION_RECONNECTED, CONNECTION_LOST, INITIALIZED; private Type() { } }
public static enum StartMode { // 异步初始化cache NORMAL, // 同步初始化客户端的cache,及建立cache后,就从服务器端拉入对应的数据 BUILD_INITIAL_CACHE, // 异步初始化,初始化完成触发事件, PathChildrenCacheEvent.Type.INITIALIZED POST_INITIALIZED_EVENT; private StartMode() { } }
public void addChildWatcher(String path) throws Exception { final PathChildrenCache cache = new PathChildrenCache(this.client, path, true); cache.start(StartMode.POST_INITIALIZED_EVENT); System.out.println(cache.getCurrentData().size()); //byte childone[] = cache.getCurrentData().get(0).getData(); // System.out.println("childone:" // + cache.getCurrentData().get(0).getPath() + ";data=" // + new String(childone)); cache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){ System.out.println("客户端子节点cache初始化数据完成"); System.out.println("size="+cache.getCurrentData().size()); }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){ System.out.println("添加子节点:"+event.getData().getPath()); System.out.println("修改子节点数据:"+new String(event.getData().getData())); }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){ System.out.println("删除子节点:"+event.getData().getPath()); }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ System.out.println("修改子节点数据:"+event.getData().getPath()); System.out.println("修改子节点数据:"+new String(event.getData().getData())); } } }); }