1.导入jarjava
package com.toov5.zookeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class JavaZKTest { private static final String CONNECTSTRING ="192.168.91.5"; private static int SESSIONTIMEOUT=5000; //超时时间 //使用Java并发包的 信号量 控制zk链接成功以后 开始建立 private static final CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ZooKeeper zooKeeper = null; try { //1zk建立了一个链接 zooKeeper = new ZooKeeper(CONNECTSTRING, SESSIONTIMEOUT, new Watcher() { public void process(WatchedEvent event) { //监听节点是否发生变化 链接成功 (代码从上往下执行,建立节点 直接copy) // 获取事件状态 KeeperState keeperState = event.getState(); // 获取事件类型 EventType eventType = event.getType(); if (KeeperState.SyncConnected == keeperState) { //状态判断 if (EventType.None == eventType) { countDownLatch.countDown(); //--操做 到0时候 await启动了哦 System.out.println("zk 启动链接..."); //才能够去建立节点的逻辑执行 须要用到信号量 } } } }); countDownLatch.await(); //不为0 一直等待~~ //建立持久节点 //Ids.OPEN_ACL_UNSAFE链接权限 //// CreateMode对应好多模式 关于 SEQENTIAL 重名状况下 加了个id 保证惟一性 String createNode = zooKeeper.create("/test666", "toov5".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("节点名称"+createNode); } catch (Exception e) { }finally { if (zooKeeper != null) { //关闭链接 zooKeeper.close(); } } } }
2.能够经过图形化界面进行操做使用的工具是 zookeeper-dev-ZooInspector.jarnode
建立节点(znode) 方法:web
create:
提供了两套建立节点的方法,同步和异步建立节点方式。
同步方式:
参数1,节点路径《名称) : InodeName (不容许递归建立节点,也就是说在父节点不存在
的状况下,不容许建立子节点)
参数2,节点内容: 要求类型是字节数组(也就是说,不支持序列化方式,若是须要实现序
列化,可以使用java相关序列化框架,如Hessian、Kryo框架)
参數3,节点权限: 使用Ids.OPEN_ACL_UNSAFE开放权限便可。(这个参数通常在权展
没有过高要求的场景下,不必关注)
参数4,节点类型: 建立节点的类型: CreateMode,提供四种首点象型apache
在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventType两个枚举类,分别表明了通知状态和事件类型,同时定义了事件的回调方法:process(WatchedEvent event)。数组
同一个事件类型在不一样的通知状态中表明的含义有所不一样,表列举了常见的通知状态和事件类型。服务器
表7-3中列举了ZooKeeper中最多见的几个通知状态和事件类型。websocket
回调方法process()网络
process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理。process方法的定义以下:session
abstract public void process(WatchedEvent event);数据结构
这个回调方法的定义很是简单,咱们重点看下方法的参数定义:WatchedEvent。
WatchedEvent包含了每个事件的三个基本属性:通知状态(keeperState),事件类型(EventType)和节点路径(path),其数据结构如图7-5所示。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。
提到WatchedEvent,不得不讲下WatcherEvent实体。笼统地讲,二者表示的是同一个事物,都是对一个服务端事件的封装。不一样的是,WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程当中所需的逻辑对象,而WatcherEvent由于实现了序列化接口,所以能够用于网络传输。
服务端在生成WatchedEvent事件以后,会调用getWrapper方法将本身包装成一个可序列化的WatcherEvent事件,以便经过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent还原成一个WatchedEvent事件,并传递给process方法处理,回调方法process根据入参就可以解析出完整的服务端事件了。
须要注意的一点是,不管是WatchedEvent仍是WatcherEvent,其对ZooKeeper服务端事件的封装都是机及其简单的。举个例子来讲,当/zk-book这个节点的数据发生变动时,服务端会发送给客户端一个“ZNode数据内容变动”事件,客户端只可以接收到以下信
public class ZkClientWatcher implements Watcher { // 集群链接地址 private static final String CONNECT_ADDRES = "192.168.110.159:2181,192.168.110.160:2181,192.168.110.162:2181"; // 会话超时时间 private static final int SESSIONTIME = 2000; // 信号量,让zk在链接以前等待,链接成功后才能往下走. private static final CountDownLatch countDownLatch = new CountDownLatch(1); private static String LOG_MAIN = "【main】 "; private ZooKeeper zk; public void createConnection(String connectAddres, int sessionTimeOut) { try { zk = new ZooKeeper(connectAddres, sessionTimeOut, this); System.out.println(LOG_MAIN + "zk 开始启动链接服务器...."); countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } } public boolean createPath(String path, String data) { try { this.exists(path, true); this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(LOG_MAIN + "节点建立成功, Path:" + path + ",data:" + data); } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 判断指定节点是否存在 * * @param path * 节点路径 */ public Stat exists(String path, boolean needWatch) { try { return this.zk.exists(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } public boolean updateNode(String path,String data) throws KeeperException, InterruptedException { exists(path, true); this.zk.setData(path, data.getBytes(), -1); return false; } public void process(WatchedEvent watchedEvent) { // 获取事件状态 KeeperState keeperState = watchedEvent.getState(); // 获取事件类型 EventType eventType = watchedEvent.getType(); // zk 路径 String path = watchedEvent.getPath(); System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path); // 判断是否创建链接 if (KeeperState.SyncConnected == keeperState) { if (EventType.None == eventType) { // 若是创建创建成功,让后程序往下走 System.out.println(LOG_MAIN + "zk 创建链接成功!"); countDownLatch.countDown(); } else if (EventType.NodeCreated == eventType) { System.out.println(LOG_MAIN + "事件通知,新增node节点" + path); } else if (EventType.NodeDataChanged == eventType) { System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被修改...."); } else if (EventType.NodeDeleted == eventType) { System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被删除...."); } } System.out.println("--------------------------------------------------------"); } public static void main(String[] args) throws KeeperException, InterruptedException { ZkClientWatcher zkClientWatcher = new ZkClientWatcher(); zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME); // boolean createResult = zkClientWatcher.createPath("/p15", "pa-644064"); zkClientWatcher.updateNode("/pa2","7894561"); } }
注意在建立节点时候,必定要写 “/” 好比“/test” !!!!!!
下面的例子颇有意义的,监听的:
package com.toov5.zookeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.jboss.netty.handler.codec.http.websocketx.WebSocketHandshakeException; import org.apache.zookeeper.ZooKeeper; public class JavaZKTest { private static final String CONNECTSTRING ="192.168.91.5"; private static int SESSIONTIMEOUT=5000; //超时时间 //使用Java并发包的 信号量 控制zk链接成功以后 开始建立 private static final CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ZooKeeper zooKeeper = null; try { //1zk建立了一个链接 zooKeeper = new ZooKeeper(CONNECTSTRING, SESSIONTIMEOUT, new Watcher() { public void process(WatchedEvent event) { //监听节点是否发生变化 链接成功 (代码从上往下执行,建立节点 直接copy) // 获取事件状态 KeeperState keeperState = event.getState(); // 获取事件类型 EventType eventType = event.getType(); if (KeeperState.SyncConnected == keeperState) { //状态判断 if (EventType.None == eventType) { countDownLatch.countDown(); //--操做 到0时候 await启动了哦 System.out.println("zk 启动链接..."); //才能够去建立节点的逻辑执行 须要用到信号量 } if (EventType.NodeCreated==eventType) { System.out.println("zk时间通知,获取当前在建立节点.."); } } } }); countDownLatch.await(); //不为0 一直等待~~ String path="/tooov5"; zooKeeper.exists(path, true); //true时候有事件通知 //建立持久节点 //Ids.OPEN_ACL_UNSAFE链接权限 //// CreateMode对应好多模式 关于 SEQENTIAL 重名状况下 加了个id 保证惟一性 String createNode = zooKeeper.create(path, "toov5".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("节点名称"+createNode); } catch (Exception e) { }finally { if (zooKeeper != null) { //关闭链接 zooKeeper.close(); } } } }
节点的 增长 删除 修改 均可以监听到!!!!!!!!!
package com.toov5.zookeeper;
import java.io.IOException;import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.ZooDefs.Ids;import org.jboss.netty.handler.codec.http.websocketx.WebSocketHandshakeException;import org.apache.zookeeper.ZooKeeper;
public class JavaZKTest { private static final String CONNECTSTRING ="192.168.91.5"; private static int SESSIONTIMEOUT=5000; //超时时间 //使用Java并发包的 信号量 控制zk链接成功以后 开始建立 private static final CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ZooKeeper zooKeeper = null; try { //1zk建立了一个链接 zooKeeper = new ZooKeeper(CONNECTSTRING, SESSIONTIMEOUT, new Watcher() { public void process(WatchedEvent event) { //监听节点是否发生变化 链接成功 (代码从上往下执行,建立节点 直接copy) // 获取事件状态 KeeperState keeperState = event.getState(); // 获取事件类型 EventType eventType = event.getType(); if (KeeperState.SyncConnected == keeperState) { //状态判断 if (EventType.None == eventType) { countDownLatch.countDown(); //--操做 到0时候 await启动了哦 System.out.println("zk 启动链接..."); //才能够去建立节点的逻辑执行 须要用到信号量 } if (EventType.NodeCreated==eventType) { System.out.println("zk时间通知,获取当前在建立节点.."); } } } }); countDownLatch.await(); //不为0 一直等待~~ String path="/tooov5"; zooKeeper.exists(path, true); //true时候有事件通知 //建立持久节点 //Ids.OPEN_ACL_UNSAFE链接权限 //// CreateMode对应好多模式 关于 SEQENTIAL 重名状况下 加了个id 保证惟一性 String createNode = zooKeeper.create(path, "toov5".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("节点名称"+createNode); } catch (Exception e) { }finally { if (zooKeeper != null) { //关闭链接 zooKeeper.close(); } } }}