<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.4-beta</version> </dependency>
/** * 建立 zookeeper 会话 * <p> * <p> * zookeeper 客户端 和 服务端建立会话的过程是异步的。也就是客户度经过构造方法建立会话后当即返回,此时的链接并无彻底创建。 * 当真正的会话创建完成后,zk服务端会给客户端通知一个事件,客户端获取通知以后在代表链接正在创建。 */ public class ZooKeeperClientSession implements Watcher { //用于等待zk服务端通知 private static CountDownLatch latch = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2183", 5000, new ZooKeeperClientSession()); System.out.println(zooKeeper.getState()); latch.await(); long sessionId = zooKeeper.getSessionId(); byte[] sessionPasswd = zooKeeper.getSessionPasswd(); System.out.println(zooKeeper.getSessionId()); /** * 利用 sessionId 和 sessionPasswd 复用会话链接 */ ZooKeeper zooKeeper1 = new ZooKeeper("127.0.0.1:2183", 5000, new ZooKeeperClientSession(), sessionId, sessionPasswd); System.out.println(zooKeeper1.getSessionId()); } /** * 处理 zookeeper 服务端的 Watcher 通知 * @param watchedEvent */ public void process(WatchedEvent watchedEvent) { System.out.println("receive watch event : " + watchedEvent); if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { latch.countDown(); } } }
1.构造函数说明
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
参数 | 做用 |
---|---|
connectString | zk服务器列表,由英文逗号分开的字符串,例如:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183;也能够是带有目录的字符:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/zk-book |
sessionTimeout | 会话超时时间,以毫秒为单位。在一个会话周期内,zk客户端和服务端经过心跳来检查链接的有效性,一旦在sessionTimeout时间内没有进行心跳检测,则会话失效 |
watcher | zk容许客户端在构造方法中传入一个Watcher接口实现类做为事件通知处理器 |
sessionId、sessionPasswd | 利用sessionId 和 sessionPasswd 确保复用会话链接 |
canBeReadOnly | 用于标识当前会话是否支付只读模式。在zk集群模式中,若是一台集群和集群中过半以上的机器都都失去了网络链接,那么这个机器将再也不处理客户端请求,包括读写请求。但在某些状况下出现相似问题,咱们但愿该台机器可以处理读请求,此时为 read-only 模式 |
1.同步建立临时节点
/** * 同步 api 建立临时节点 */ public class CreatedSyncNode implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new CreatedSyncNode()); latch.await(); //临时节点:建立接口返回该节点路径L,例如返回值 /zk-test String path = zooKeeper.create("/zk-test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("created path success : " + path); // 临时顺序节点:会自动的在节点路径后加一个数字,例如返回值:/zk-test0000000001 String path1 = zooKeeper.create("/zk-test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("created path success : " + path1); } public void process(WatchedEvent watchedEvent) { if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { latch.countDown(); } } }
2.异步建立节点
/** * 异步 api 建立持久化节点 */ public class CreatedAsyncNode implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zooKeeper = new ZooKeeper( "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new CreatedAsyncNode()); latch.await(); //持久化节点:建立接口返回该节点路径,无返回值;异步建立的节点:/zk-test zooKeeper.create("/zk-test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new CallBack(), "PERSISTENT"); //持久化顺序节点:会自动的在节点路径后加一个数字,该方法无返回值,建立后节点:/zk-test-seq0000000002 zooKeeper.create("/zk-test-seq", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, new CallBack(), "PERSISTENT_SEQUENTIAL"); Thread.sleep(1000); } public void process(WatchedEvent watchedEvent) { if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) { latch.countDown(); } } static class CallBack implements AsyncCallback.StringCallback { /** * 服务端回调方法 * * @param code 服务端响应码:0 接口调用成功;-4 客户端和服务端链接断开;-110 节点已存在 ;-112 会话过时 * @param path 建立节点传入的路径参数 * @param ctx 异步建立api传入的ctx参数 * @param name 服务端真正建立节点的名称,业务逻辑应该以该值为准 */ public void processResult(int code, String path, Object ctx, String name) { System.out.println("created success : " + code + "," + path + "," + ctx + "," + name); } } }
读取数据包括获取子节点列表和节点的数据内容。zookeeper 中分别提供了 getChildren 和 getData 方法。html
参数名 | 参数类型 | 做用 |
---|---|---|
path | String | 指定节点的路径 |
watcher | Watcher | 注册一个Watcher,一旦在本次子节点获取到以后,子节点列表发送变动时,那么会向客户端发送通知 |
watch | boolean | 代表是否须要注册一个 |
stat | org.apache.zookeeper.data.Stat | 节点的状态信息,有时候咱们不只须要最新的子节点列表,还要获取这个节点的最新状态信息,咱们能够将一个旧的 stat 传入到api方法中,在方法执行过程当中 stat 会被来自服务的新的 stat 替换掉 |
cb | AsyncCallback.ChildrenCallback | 异步回调函数 |
ctx | Object | 用于传递上下文信息 |
/** * 同步 api 建立节点,而后同步获取节点列表 */ public class CreatedSyncNode implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); private static ZooKeeper zooKeeper; public static void main(String[] args) throws Exception { zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new CreatedSyncNode()); latch.await(); String nodePath = "/zk-data"; zooKeeper.create(nodePath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); List<String> list = zooKeeper.getChildren(nodePath, true); System.out.println("get children : " + list); //给节点 /zk-data 再添加子节点 /conf zooKeeper.create(nodePath + "/conf", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Thread.sleep(1000); } /** * 监听子节点的变化 * * @param watchedEvent */ public void process(WatchedEvent watchedEvent) { if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { latch.countDown(); } else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { //监听到子节点变化后,主动的去获取节点列表 try { List<String> list = zooKeeper.getChildren(watchedEvent.getPath(), true); System.out.println("get children node changed : " + list); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
重点是在获取节点列表 api 中传入回调对象,该对象实现 Children2Callback
接口java
/** * 异步查询节点列表 */ public class AsyncQueryNode implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); private static ZooKeeper zooKeeper; public static void main(String[] args) throws Exception { //建立异步会话 zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncQueryNode()); latch.await(); String nodePath = "/zk-book"; zooKeeper.create(nodePath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //表示异步获取子节点列表,在 ChildrenCallback 中对节点列表进行处理 zooKeeper.getChildren(nodePath, true, new ChildrenCallback(), "query children"); //给节点 /zk-data 再添加子节点 /conf zooKeeper.create(nodePath + "/src", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Thread.sleep(1000); } /** * 监听子节点的变化 * * @param watchedEvent */ public void process(WatchedEvent watchedEvent) { if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { latch.countDown(); } else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { //监听到子节点变化后,主动的去获取节点列表 try { List<String> list = zooKeeper.getChildren(watchedEvent.getPath(), true); System.out.println("get children node changed : " + list); } catch (Exception e) { e.printStackTrace(); } } } } static class ChildrenCallback implements AsyncCallback.Children2Callback { /** * @param code 服务端响应码 * @param path 建立节点传入的路径参数 * @param ctx 上下文信息 * @param list 子节点列表 * @param stat 节点的状态信息 */ public void processResult(int code, String path, Object ctx, List<String> list, Stat stat) { System.out.println("code : " + code + ";path : " + path + ";ctx : " + ctx + ";children list : " + list + ";stat : " + stat); } } }
获取节点数据内容方法,getData的用法和getChildren差很少,getData 方法的参数中也有一个 Watcher 参数,该 Watcher 的做用是客户端拿到节点的数据以后,能够进行 Watcher 注册,一旦该节点的状态发送变化,服务端会发送 NoteDataChanged
事件告诉客户端。node
/** * 同步获取节点数据 */ public class SyncGetData implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); private static ZooKeeper zooKeeper; private static Stat stat = new Stat(); public static void main(String[] args) throws Exception { String path = "/zk-demo"; zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new SyncGetData()); latch.await(); zooKeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //直接获取节点数据 byte[] datas = zooKeeper.getData(path, true, stat); System.out.println("结果:" + new String(datas)); System.out.println(stat); //修改节点数据 zooKeeper.setData(path, "123".getBytes(), -1); Thread.sleep(1000); } public void process(WatchedEvent watchedEvent) { if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { latch.countDown(); //节点变化时进行通知,EventType.NodeDataChanged 事件 } else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) { try { byte[] datas = zooKeeper.getData(watchedEvent.getPath(), true, stat); System.out.println("回调通知:" + new String(datas)); System.out.println(stat); } catch (Exception e) { e.printStackTrace(); } } } } }
运行结果:apache
实现异步回调接口 DataCallback
,接收回调通知api
public class AsyncGetData implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); private static ZooKeeper zooKeeper; public static void main(String[] args) throws Exception { String path = "/zk-async"; zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncGetData()); latch.await(); zooKeeper.create(path, "456".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //设置异步获取节点数据 zooKeeper.getData(path, true, new DataCallback(), null); //再次更新节点数据 zooKeeper.setData(path, "789".getBytes(), -1); Thread.sleep(1000); } public void process(WatchedEvent watchedEvent) { if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { latch.countDown(); //节点变化时进行通知,EventType.NodeDataChanged 事件 } else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) { try { //再次异步获取变化后的节点数据 zooKeeper.getData(watchedEvent.getPath(), true, new DataCallback(), null); } catch (Exception e) { e.printStackTrace(); } } } } //接收服务端回调 static class DataCallback implements AsyncCallback.DataCallback { public void processResult(int code, String path, Object ctx, byte[] data, Stat stat) { System.out.println("回调通知的节点数据:" + new String(data)); System.out.println("code : " + code + ";path : " + path + ";stat : " + stat); } } }
测试结果:服务器