Zookeeper Curator 事件监听 - 秒懂

疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 -25【 博客园 总入口java


写在前面

​ 你们好,我是做者尼恩。目前和几个小伙伴一块儿,组织了一个高并发的实战社群【疯狂创客圈】。正在开始高并发、亿级流程的 IM 聊天程序 学习和实战node

​ 前面,已经完成一个高性能的 Java 聊天程序的四件大事:面试

接下来,须要进入到分布式开发的环节了。 分布式的中间件,疯狂创客圈的小伙伴们,一致的选择了zookeeper,不只仅是因为其在大数据领域,太有名了。更重要的是,不少的著名框架,都使用了zk。apache

本篇介绍 ZK Curator 的事件监听缓存

1.1. Curator 事件监听

Curator 事件有两种模式,一种是标准的观察模式,一种是缓存监听模式。标准的监听模式是使用Watcher 监听器。第二种缓存监听模式引入了一种本地缓存视图的Cache机制,来实现对Zookeeper服务端事件监听。服务器

Cache事件监听能够理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache是一种缓存机制,能够借助Cache实现监听。简单来讲,Cache在客户端缓存了znode的各类状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。网络

Watcher 监听器比较简单,只有一种。Cache事件监听的种类有3种Path Cache,Node Cache,Tree Cache。并发

1.1.1. Watcher 标准的事件处理器

在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventType两个枚举类,分别表明了通知状态和事件类型。框架

Watcher接口定义了事件的回调方法:process(WatchedEvent event)。定义一个Watcher的实例很简单,代码以下:

Watcher w = new Watcher() {
    @Override
    public void process(WatchedEvent watchedEvent) {
        log.info("监听器watchedEvent:" + watchedEvent);
    }
};

使用Watcher监听器实例的方式也很简单,在Curator的调用链上,加上usingWatcher方法便可,代码以下:

byte[] content = client.getData()
        .usingWatcher(w).forPath(workerPath);

一个Watcher监听器在向服务端完成注册后,当服务端的一些事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知,来实现分布式的通知功能。客户收到服务器的通知后,Curator 会封装一个WatchedEvent 事件实例,传递给监听器的回调方法process(WatchedEvent event)。

WatchedEvent包含了三个基本属性:

(1)通知状态(keeperState)

(2)事件类型(EventType)

(3)节点路径(path)

注意,WatchedEvent并非直接从ZooKeeper集群直接传递过来的事件实例,而是Curator 封装过的事件实例。WatchedEvent类型没有实现序列化接口java.io.Serializable,所以不能用于网络传输。ZooKeeper集群直接网络传输传递过来的事件实例是啥呢? 是一个WatcherEvent类型的实例,这个传输实例和Curator 封装过的WatchedEvent实例,在名称上有一个字母之差,并且功能也是同样的,都表示的是同一个事物,都是对一个服务端事件的封装。

所以,这里只讲Curator 封装过的WatchedEvent实例。下边列举了ZooKeeper中最多见的几个通知状态和事件类型。

KeeperState EventType 触发条件 说明
None (-1) 客户端与服务端成功创建链接
SyncConnected (0) NodeCreated (1) Watcher监听的对应数据节点被建立
NodeDeleted (2) Watcher监听的对应数据节点被删除 此时客户端和服务器处于链接状态
NodeDataChanged (3) Watcher监听的对应数据节点的数据内容发生变动
NodeChildChanged (4) Wather监听的对应数据节点的子节点列表发生变动
Disconnected (0) None (-1) 客户端与ZooKeeper服务器断开链接 此时客户端和服务器处于断开链接状态
Expired (-112) Node (-1) 会话超时 此时客户端会话失效,一般同时也会受到SessionExpiredException异常
AuthFailed (4) None (-1) 一般有两种状况,1:使用错误的schema进行权限检查 2:SASL权限检查失败 一般同时也会收到AuthFailedException异常

利用Watcher来对节点进行监听操做,但此监听操做只能监听一次。来看一个简单的实例程序:

@Slf4j

@Data

public class ZkWatcherDemo {

 

    private String workerPath = "/test/listener/node";
    private String subWorkerPath = "/test/listener/node/id-";

 
    @Test
    public void testWatcher() {
        CuratorFramework client = ZKclient.instance.getClient();

        //检查节点是否存在,没有则建立
        boolean isExist = ZKclient.instance.isNodeExist(workerPath);
        if (!isExist) {
            ZKclient.instance.createNode(workerPath, null);
        }

        try {

            Watcher w = new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("监听到的变化 watchedEvent = " + watchedEvent);
                }
            };

            byte[] content = client.getData()
                    .usingWatcher(w).forPath(workerPath);

            log.info("监听节点内容:" + new String(content));

            // 第一次变动节点数据
            client.setData().forPath(workerPath, "第1次更改内容".getBytes());

            // 第二次变动节点数据
            client.setData().forPath(workerPath, "第2次更改内容".getBytes());

            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

        //....

}

运行代码,输出的结果以下:

监听到的变化 watchedEvent = WatchedEvent state:SyncConnected type:NodeDataChanged path:/test/listener/node

程序中,对节点路径 “/test/listener/node”注册一个Watcher监听器实例,随后调用setData方法两次改变节点内容,可是,监听器仅仅监听到了一个事件。也就是说,当第二次改变节点内容时,监听已经失效,没法再次得到节点变更事件。

也就是说,Watcher监听器是一次性的,若是要反复使用,就须要反复的使用usingWatcher提早注册。

因此,Watcher监听器不能应用于节点的数据变更或者节点变更这样的通常业务场景。而是适用于一些特殊的,好比会话超时、受权失败等这样的特殊场景。

既然Watcher监听器是一次性的,在开发过程当中须要反复注册Watcher,比较繁琐。Curator引入了Cache来监听ZooKeeper服务端的事件。Cache对ZooKeeper事件监听进行了封装,可以自动处理反复注册监听。

1.1.2. NodeCache 节点缓存的监听

Curator引入的Cache缓存实现,是一个系列,包括了Node Cache 、Path Cache、Tree Cache三组类。其中Node Cache节点缓存能够用于ZNode节点的监听,Path Cache子节点缓存用于ZNode的子节点的监听,而Tree Cache树缓存是Path Cache的加强,不光能监听子节点,也能监听ZNode节点自身。

Node Cache,能够用于监控本节点的新增,删除,更新。

Node Cache使用的第一步,就是构造一个NodeCache缓存实例。

有两个构造方法,具体以下:

NodeCache(CuratorFramework client, String path) 

NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

第一个参数就是传入建立的Curator的框架客户端,第二个参数就是监听节点的路径,第三个重载参数dataIsCompressed 表示是否对数据进行压缩。

NodeCache使用的第二步,就是构造一个NodeCacheListener监听器实例。该接口的定义以下:

package org.apache.curator.framework.recipes.cache;

public interface NodeCacheListener {

    void nodeChanged() throws Exception;

}

NodeCacheListener监听器接口,只定义了一个简单的方法 nodeChanged,当节点变化时,这个方法就会被回调到。

在建立完NodeCacheListener的实例以后,须要将这个实例注册到NodeCache缓存实例,使用缓存实例的addListener方法。 而后使用缓存实例nodeCache的start方法,启动节点的事件监听。

nodeCache.getListenable().addListener(l);

nodeCache.start();

强调下,须要调用nodeCache的start方法能进行缓存和事件监听,这个方法有两个版本:

void    start()//Start the cache.

void    start(boolean buildInitial)  //true表明缓存当前节点

惟一的一个参数buildInitial表明着是否将该节点的数据当即进行缓存。若是设置为true的话,在start启动时当即调用NodeCache的getCurrentData方法就可以获得对应节点的信息ChildData类,若是设置为false的就得不到对应的信息。

使用NodeCache来监听节点的事件,完整的实例代码以下:

@Test
    public void testNodeCache() {

        //检查节点是否存在,没有则建立
        boolean isExist = ZKclient.instance.isNodeExist(workerPath);
        if (!isExist) {
            ZKclient.instance.createNode(workerPath, null);
        }

        CuratorFramework client = ZKclient.instance.getClient();
        try {
            NodeCache nodeCache =
                    new NodeCache(client, workerPath, false);
            NodeCacheListener l = new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    ChildData childData = nodeCache.getCurrentData();
                    log.info("ZNode节点状态改变, path={}", childData.getPath());
                    log.info("ZNode节点状态改变, data={}", new String(childData.getData(), "Utf-8"));
                    log.info("ZNode节点状态改变, stat={}", childData.getStat());
                }
            };
            nodeCache.getListenable().addListener(l);
            nodeCache.start();

            // 第1次变动节点数据
            client.setData().forPath(workerPath, "第1次更改内容".getBytes());
            Thread.sleep(1000);

            // 第2次变动节点数据
            client.setData().forPath(workerPath, "第2次更改内容".getBytes());

            Thread.sleep(1000);

            // 第3次变动节点数据
            client.setData().forPath(workerPath, "第3次更改内容".getBytes());
            Thread.sleep(1000);

            // 第4次变动节点数据
//            client.delete().forPath(workerPath);
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            log.error("建立NodeCache监听失败, path={}", workerPath);
        }
    }

运行的结果是,NodeCashe节点缓存可以重复的进行事件节点。代码中的第三次监听的输出节选以下:

\- ZNode节点状态改变, path=/test/listener/node

\- ZNode节点状态改变, data=第3次更改内容

\- ZNode节点状态改变, stat=17179869191,...

最后说明一下,若是NodeCache监听的节点为空(也就是说传入的路径不存在)。那么若是咱们后面建立了对应的节点,也是会触发事件从而回调nodeChanged方法。

1.1.3. PathChildrenCache 子节点监听

PathChildrenCache子节点缓存用于子节点的监听,监控本节点的子节点被建立、更新或者删除。须要强调两点:

(1)只能监听子节点,监听不到当前节点

(2)不能递归监听,子节点下的子节点不能递归监控

PathChildrenCache子节点缓存使用的第一步,就是构造一个缓存实例。

有多个重载版本的构造方法,选择4个进行说明,具体以下:

public PathChildrenCache(CuratorFramework client, String path,boolean cacheData)

public PathChildrenCache(CuratorFramework client, String path,boolean cacheData, 
         boolean dataIsCompressed,final ExecutorService executorService)

public PathChildrenCache(CuratorFramework client, String path,boolean cacheData,
         boolean dataIsCompressed,ThreadFactory threadFactory)

public PathChildrenCache(CuratorFramework client, String path,boolean cacheData,
         ThreadFactory threadFactory)

全部的构造方法,前三个参数,都是同样的。

第一个参数就是传入建立的Curator的框架客户端,第二个参数就是监听节点的路径,第三个重载参数cacheData表示是否把节点内容缓存起来。若是cacheData为true,那么接收到节点列表变动事件的同时,会将得到节点内容。

dataIsCompressed参数(若是有),表示是否对节点数据进行压缩。

executorService 和threadFactory参数差很少,表示经过传入的线程池或者线程工厂,来异步处理监听事件。

threadFactory参数(若是有)表示线程池工厂,当PathChildrenCache内部须要开启新的线程执行时,使用该线程池工厂来建立线程。

PathChildrenCache子节点缓存使用的第二步,就是构造一个子节点缓存监听器PathChildrenCacheListener实例。该接口的定义以下:

package org.apache.curator.framework.recipes.cache;

import org.apache.curator.framework.CuratorFramework;
 
public interface PathChildrenCacheListener {

   void childEvent(CuratorFramework client, PathChildrenCacheEvent e) throws Exception;

}

PathChildrenCacheListener监听器接口中,也只定义了一个简单的方法 childEvent,当子节点有变化时,这个方法就会被回调到。

在建立完PathChildrenCacheListener的实例以后,须要将这个实例注册到PathChildrenCache缓存实例,使用缓存实例的addListener方法。 而后使用缓存实例nodeCache的start方法,启动节点的事件监听。

这里的start方法,须要传入启动的模式。能够传入三种模式,也就是API列表中看到的StartMode,其中定义了下面三种枚举:

(1)NORMAL——异步初始化cache

(2)BUILD_INITIAL_CACHE——同步初始化cache

(3)POST_INITIALIZED_EVENT——异步初始化cache,并触发完成事件

对于start模式的三种启动方式,详细的说明以下:

BUILD_INITIAL_CACHE:启动时,同步初始化cache,以及建立cache后,就从服务器拉取对应的数据。

POST_INITIALIZED_EVENT:启动时,异步初始化cache,初始化完成触发PathChildrenCacheEvent.Type#INITIALIZED事件,cache中Listener会收到该事件的通知。

最后是第一个枚举常量,NORMAL:启动时,异步初始化cache,完成后不会发出通知。

使用PathChildrenCache来监听节点的事件,完整的实例代码以下:

@Test
    public void testPathChildrenCache() {

        //检查节点是否存在,没有则建立
        boolean isExist = ZKclient.instance.isNodeExist(workerPath);
        if (!isExist) {
            ZKclient.instance.createNode(workerPath, null);
        }

        CuratorFramework client = ZKclient.instance.getClient();

        try {
            PathChildrenCache cache =
                    new PathChildrenCache(client, workerPath, true);
            PathChildrenCacheListener l =
                    new PathChildrenCacheListener() {
                        @Override
                        public void childEvent(CuratorFramework client,
                                               PathChildrenCacheEvent event) {
                            try {
                                ChildData data = event.getData();
                                switch (event.getType()) {
                                    case CHILD_ADDED:

                                        log.info("子节点增长, path={}, data={}",
                                                data.getPath(), new String(data.getData(), "UTF-8"));

                                        break;
                                    case CHILD_UPDATED:
                                        log.info("子节点更新, path={}, data={}",
                                                data.getPath(), new String(data.getData(), "UTF-8"));
                                        break;
                                    case CHILD_REMOVED:
                                        log.info("子节点删除, path={}, data={}",
                                                data.getPath(), new String(data.getData(), "UTF-8"));
                                        break;
                                    default:
                                        break;
                                }

                            } catch (
                                    UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                    };
            cache.getListenable().addListener(l);
            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            Thread.sleep(1000);
            for (int i = 0; i < 3; i++) {
                ZKclient.instance.createNode(subWorkerPath + i, null);
            }

            Thread.sleep(1000);
            for (int i = 0; i < 3; i++) {
                ZKclient.instance.deleteNode(subWorkerPath + i);
            }

             } catch (Exception e) {
            log.error("PathCache监听失败, path=", workerPath);
        }

    }

运行的结果以下:

\- 子节点增长, path=/test/listener/node/id-0, data=to set content

\- 子节点增长, path=/test/listener/node/id-2, data=to set content

\- 子节点增长, path=/test/listener/node/id-1, data=to set content

......

\- 子节点删除, path=/test/listener/node/id-2, data=to set content

\- 子节点删除, path=/test/listener/node/id-0, data=to set content

\- 子节点删除, path=/test/listener/node/id-1, data=to set content

能够看到,PathChildrenCache 可以反复的监听到节点的新增和删除。

简单说下Curator的监听原理,不管是PathChildrenCache,仍是TreeCache,所谓的监听,都是进行Curator本地缓存视图和ZooKeeper服务器远程的数据节点的对比。

在什么场景下触发事件呢?

以节点增长事件NODE_ADDED为例,所在本地缓存视图开始的时候,本地视图为空,在数据同步的时候,本地的监听器就能监听到NODE_ADDED事件。这是由于,刚开始本地缓存并无内容,而后本地缓存和服务器缓存进行对比,发现ZooKeeper服务器有节点而本地缓存没有,这才将服务器的节点缓存到本地,就会触发本地缓存的NODE_ADDED事件。

1.1.4. Tree Cache 节点树缓存

前面已经讲完了两个系列的缓存监听。简单回顾一下:

Node Cache用来观察ZNode自身,若是ZNode节点自己被建立,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。Node Cache是经过NodeCache类来实现的,监听器对应的接口为NodeCacheListener。

Path Cache子节点缓存用来观察ZNode的子节点、并缓存子节点的状态,若是ZNode的子节点被建立,更新或者删除,那么Path Cache会更新缓存,而且触发事件给注册的监听器。Path Cache是经过PathChildrenCache类来实现的,监听器注册是经过PathChildrenCacheListener。

最后的一个系列,是Tree Cache。Tree Cache能够看作是上两种的合体,Tree Cache观察的是当前ZNode节点的全部数据。而TreeCache节点树缓存是PathChildrenCache的加强,不光能监听子节点,也能监听节点自身。

Tree Cache使用的第一步,就是构造一个TreeCache缓存实例。

有两个构造方法,具体以下:

TreeCache(CuratorFramework client, String path) 
 

TreeCache(CuratorFramework client, String path,
          boolean cacheData, boolean dataIsCompressed, int maxDepth, 
         ExecutorService executorService, boolean createParentNodes,
         TreeCacheSelector selector)

第一个参数就是传入建立的Curator的框架客户端,第二个参数就是监听节点的路径,第三个重载参数dataIsCompressed 表示是否对数据进行压缩。maxDepth表示缓存的层次深度,默认为整数最大值。executorService 表示监听的的执行线程池,默认会建立一个单一线程的线程池。createParentNodes 表示是否建立父亲节点,默认为false。

通常状况下,使用第一个构造函数便可。

TreeCache使用的第二步,就是构造一个TreeCacheListener监听器实例。该接口的定义以下:

package org.apache.curator.framework.recipes.cache;

 import org.apache.curator.framework.CuratorFramework;

public interface TreeCacheListener {
    void childEvent(CuratorFramework var1, TreeCacheEvent var2) throws Exception;

}

TreeCacheListener 监听器接口中,也只定义了一个简单的方法 childEvent,当子节点有变化时,这个方法就会被回调到。

在建立完TreeCacheListener 的实例以后,使用缓存实例的addListener方法,将TreeCacheListener 监听器实例注册到TreeCache 缓存实例。 而后使用缓存实例nodeCache的start方法,启动节点的事件监听。

整个实例的代码以下:

@Test
    public void testTreeCache() {

        //检查节点是否存在,没有则建立
        boolean isExist = ZKclient.instance.isNodeExist(workerPath);
        if (!isExist) {
            ZKclient.instance.createNode(workerPath, null);
        }

        CuratorFramework client = ZKclient.instance.getClient();

        try {
            TreeCache treeCache  =
                    new TreeCache(client, workerPath);
            TreeCacheListener l =
                    new TreeCacheListener() {
                        @Override
                        public void childEvent(CuratorFramework client,
                                               TreeCacheEvent event) {
                            try {
                                ChildData data = event.getData();
                                if(data==null)
                                {
                                    log.info("数据为空");
                                    return;
                                }
                                switch (event.getType()) {
                                    case NODE_ADDED:

                                        log.info("[TreeCache]节点增长, path={}, data={}",
                                                data.getPath(), new String(data.getData(), "UTF-8"));

                                        break;
                                    case NODE_UPDATED:
                                        log.info("[TreeCache]节点更新, path={}, data={}",
                                                data.getPath(), new String(data.getData(), "UTF-8"));
                                        break;
                                    case NODE_REMOVED:
                                        log.info("[TreeCache]节点删除, path={}, data={}",
                                                data.getPath(), new String(data.getData(), "UTF-8"));
                                        break;
                                    default:
                                        break;
                                }

                            } catch (
                                    UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                    };
            treeCache.getListenable().addListener(l);
            treeCache.start();
            Thread.sleep(1000);
            for (int i = 0; i < 3; i++) {
                ZKclient.instance.createNode(subWorkerPath + i, null);
            }

            Thread.sleep(1000);
            for (int i = 0; i < 3; i++) {
                ZKclient.instance.deleteNode(subWorkerPath + i);
            }
            Thread.sleep(1000);

            ZKclient.instance.deleteNode(workerPath);

            Thread.sleep(Integer.MAX_VALUE);

        } catch (Exception e) {
            log.error("PathCache监听失败, path=", workerPath);
        }

    }

运行的结果以下:

\- [TreeCache]节点增长, path=/test/listener/node, data=to set content

 

\- [TreeCache]节点增长, path=/test/listener/node/id-0, data=to set content

\- [TreeCache]节点增长, path=/test/listener/node/id-1, data=to set content

\- [TreeCache]节点增长, path=/test/listener/node/id-2, data=to set content

 

\- [TreeCache]节点删除, path=/test/listener/node/id-2, data=to set content

\- [TreeCache]节点删除, path=/test/listener/node/id-1, data=to set content

\- [TreeCache]节点删除, path=/test/listener/node/id-0, data=to set content

 

\- [TreeCache]节点删除, path=/test/listener/node, data=to set content

最后,说明下事件的类型,对应于节点的增长、修改、删除,TreeCache 的事件类型为:

(1)NODE_ADDED

(2)NODE_UPDATED

(3)NODE_REMOVED

这一点,与Path Cache 的事件类型不一样,与Path Cache 的事件类型为:

(1)CHILD_ADDED

(2)CHILD_UPDATED

(3)CHILD_REMOVED

写在最后

​ 下一篇:基于zk,实现分布式锁。


疯狂创客圈 亿级流量 高并发IM 实战 系列

  • Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战

  • Netty 源码、原理、JAVA NIO 原理
  • Java 面试题 一网打尽
  • 疯狂创客圈 【 博客园 总入口 】

相关文章
相关标签/搜索