一个zk的节点能够被监控,包括这个目录中存储的数据的修改,子节点目录的变化,一旦变化能够通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,经过这个特性能够实现的功能包括配置的集中管理,集群管理,分布式锁等等。java
watch机制官方说明:一个Watch事件是一个一次性的触发器,当被设置了Watch的数据发生了改变的时候,则服务器将这个改变发送给设置了Watch的客户端,以便通知它们
。
node
1) 一次性的触发器(one-time trigger) 服务器
当数据改变的时候,那么一个Watch事件会产生而且被发送到客户端中。可是客户端只会收到一次这样的通知,若是之后这个数据再次发生改变的时候,以前设置Watch的客户端将不会再次收到改变的通知,由于Watch机制规定了它是一个一次性的触发器。 网络
当设置监视的数据发生改变时,该监视事件会被发送到客户端,例如,若是客户端调用了 getData("/znode1", true) 而且稍后 /znode1 节点上的数据发生了改变或者被删除了,客户端将会获取到 /znode1 发生变化的监视事件,而若是 /znode1 再一次发生了变化,除非客户端再次对 /znode1 设置监视,不然客户端不会收到事件通知。异步
2)发送给客户端(Sent to the client) 分布式
这个代表了Watch的通知事件是从服务器发送给客户端的,是异步的,这就代表不一样的客户端收到的Watch的时间可能不一样,可是ZooKeeper有保证:当一个客户端在看到Watch事件以前是不会看到结点数据的变化的。例如:A=3,此时在上面设置了一次Watch,若是A忽然变成4了,那么客户端会先收到Watch事件的通知,而后才会看到A=4。函数
Zookeeper 客户端和服务端是经过 Socket 进行通讯的,因为网络存在故障,因此监视事件颇有可能不会成功地到达客户端,监视事件是异步发送至监视者的,Zookeeper 自己提供了保序性(ordering guarantee):即客户端只有首先看到了监视事件后,才会感知到它所设置监视的 znode 发生了变化(a client will never see a change for which it has set a watch until it first sees the watch event). 网络延迟或者其余因素可能致使不一样的客户端在不一样的时刻感知某一监视事件,可是不一样的客户端所看到的一切具备一致的顺序。this
3)被设置Watch的数据(The data for which the watch was set)spa
这意味着 znode 节点自己具备不一样的改变方式。你也能够想象 Zookeeper 维护了两条监视链表:code
数据监视和子节点监视(data watches and child watches)
getData() and exists() 设置数据监视,getChildren() 设置子节点监视。 或者,你也能够想象 Zookeeper 设置的不一样监视返回不一样的数据,getData() 和 exists() 返回 znode 节点的相关信息,而 getChildren() 返回子节点列表。
所以, setData() 会触发设置在某一节点上所设置的数据监视(假定数据设置成功),而一次成功的 create() 操做则会出发当前节点上所设置的数据监视以及父节点的子节点监视。一次成功的 delete() 操做将会触发当前节点的数据监视和子节点监视事件,同时也会触发该节点父节点的child watch。
能够注册watcher的方法:getData、exists、getChildren。
能够触发watcher的方法:create、delete、setData。链接断开的状况下触发的watcher会丢失。
一个Watcher实例是一个回调函数,被回调一次后就被移除了。若是还须要关注数据的变化,须要再次注册watcher。
New ZooKeeper时注册的watcher叫default watcher,它不是一次性的,只对client的链接状态变化做出反应。
什么样的操做会产生什么类型的事件:
event For “/path” | event For “/path/child” | |
create(“/path”) | EventType.NodeCreated | 无 |
delete(“/path”) | EventType.NodeDeleted | 无 |
setData(“/path”) | EventType.NodeDataChanged | 无 |
create(“/path/child”) | EventType.NodeChildrenChanged(getChild) | EventType.NodeCreated |
delete(“/path/child”) | EventType.NodeChildrenChanged(getChild) | EventType.NodeDeleted |
setData(“/path/child”) | 无 | EventType.NodeDataChanged |
事件类型与watcher的对应关系:
event For “/path” | Default Watcher |
exists(“/path”) |
getData(“/path”) |
getChildren(“/path”) |
EventType.None | √ | √ | √ | √ |
EventType.NodeCreated | √ | √ | ||
EventType.NodeDeleted | √ | √ | ||
EventType.NodeDataChanged | √ | √ | ||
EventType.NodeChildrenChanged | √ |
本表总结:exits和getData设置数据监视,而getChildren设置子节点监视
操做与watcher的对应关系:
exits("/path") | getData(“/path”) | getChildren(“/path”) | exits("/path/child") | getData(“/path/child”) | getChildren(“/path/child”) | |
create(“/path”) | √ | √ | 会报错 | |||
delete(“/path”) | √ | √ | √(这个要注意) | |||
setData(“/path”) | √ | √ | ||||
create(“/path/child”) | √ | √ | √ | |||
delete(“/path/child”) | √ | √ | √ | √ | ||
setData(“/path/child”) | √ | √ | ||||
值得注意的是:getChildren("/path")监视/path的子节点,若是(/path)本身删了,也会触发NodeDeleted事件。
因为zookeeper是一次性监听,因此咱们必须在wather的process方法里面再设置监听。一个方法以下:
如下逻辑是实现的是生产者和消费者模型,消费者监听某一路径下面子节点的变化,当生产者有消息发送过来的时候,在该节点下面建立一个子节点,而后把消息放到该子节点里面,这会触发消费者的process方法被调用,而后消费者取到该节点下面的子节点(顺便设置一个再监听该节点的子节点),而后取出子节点的内容,作处理,而后删除该子节点。
public void process(WatchedEvent event) { // TODO Auto-generated method stub if (event.getState() == KeeperState.SyncConnected) { System.out.println("watcher received event"); countDownLatch.countDown(); } System.out.println("回调watcher1实例: 路径" + event.getPath() + " 类型:"+ event.getType()); // 事件类型,状态,和检测的路径 EventType eventType = event.getType(); KeeperState state = event.getState(); String watchPath = event.getPath(); switch (eventType) { case NodeCreated: break; case NodeDataChanged: break; case NodeChildrenChanged: try { //处理收到的消息 handleMessage(watchPath); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (KeeperException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } break; default: break; } } public void handleMessage(String watchPath) throws KeeperException,InterruptedException, UnsupportedEncodingException { System.out.println("收到消息"); //再监听该子节点 List<String> Children = this.getChildren(watchPath); for (String a : Children) { String childrenpath = watchPath + "/" + a; byte[] recivedata = this.getData(childrenpath); String recString = new String(recivedata, "UTF-8"); System.out.println("receive the path:" + childrenpath + ":data:"+ recString); //作完了以后,删除该节点 this.deletNode(childrenpath, -1); } } public List<String> getChildren(String path) throws KeeperException,InterruptedException { //监听该节点子节点的变化状况 return this.zooKeeper.getChildren(path, this); } public Stat setData(String path, byte[] data, int version)throws KeeperException, InterruptedException { return this.zooKeeper.setData(path, data, version); }