ZkClient简单的操做demo,经过修改节点或节点数据,监控其数据的变化。java
一、对数据节点的修改操做session
public class ZKClientChanger { private static ZkClient zkClient; /*** * 测试修改path数据 */ private static class ChangerThread extends Thread { @Override public void run() { if(!zkClient.exists("/root")) { //一、建立root节点 二、初始化节点数据 三、节点为永久性建立 zkClient.create("/root", "root".getBytes(), CreateMode.PERSISTENT); } for(int i = 0; i < 50; i++) { zkClient.writeData("/root", "root" + String.valueOf(i)); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { //对zookeeper封装的client,zookeeper发布订阅是一次的性的,监控一次后会断开链接; //client超时过时时货自动建立zookeeper链接 zkClient = new ZkClient("192.168.52.128:2181", 1000); new ChangerThread().start(); } }
二、监控数据节点的变化状况ide
public class ZKclientWatcher { private static ZkClient zkClient; private static int count = 0; /** * 测试监控path下的数据变化状况 */ private static class WatcherThread extends Thread { @Override public void run() { //监控path下的数据的变化 zkClient.subscribeDataChanges("/root", new IZkDataListener() { @Override public void handleDataChange(String path, Object data) throws Exception { System.out.println("handleDataChange===========>path:" + path + " data: " + data); count++; } @Override public void handleDataDeleted(String path) throws Exception { System.out.println("handleDataDeleted=========>path:" + path); } }); //监控path下的节点的变化 zkClient.subscribeChildChanges("/root", new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { // TODO 节点变化处理逻辑 } }); //监控链接状态的变化 zkClient.subscribeStateChanges(new IZkStateListener() { @Override public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { // TODO 监控zookeeper链接状态变化逻辑 } @Override public void handleNewSession() throws Exception { // TODO 监控zookeeper Session过时并建立新Session时的逻辑 } @Override public void handleSessionEstablishmentError(Throwable error) throws Exception { // TODO 监控链接失败,session没法从新建立时的逻辑 } }); //取消全部的订阅[unsubscribeChildChanges/unsubscribeDataChanges/unsubscribeStateChanges] // zkClient.unsubscribeAll(); } } public static void main(String[] args) throws InterruptedException { zkClient = new ZkClient("192.168.52.128:2181", 2000); new WatcherThread().start(); while (count < 10) { System.out.println("count : " + count); Thread.sleep(1000); } } }
主要依赖zkclient的包:
测试
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.7</version> </dependency>