Zookeeper Client简介

直接使用zk的api实现业务功能比较繁琐。由于要处理session loss,session expire等异常,在发生这些异常后进行重连。又由于ZK的watcher是一次性的,若是要基于wather实现发布/订阅模式,还要本身包装一下,将一次性订阅包装成持久订阅。另外若是要使用抽象级别更高的功能,好比分布式锁,leader选举等,还要本身额外作不少事情。这里介绍下ZK的两个第三方客户端包装小工具,能够分别解决上述小问题。java

1、 zkClient
zkClient主要作了两件事情。一件是在session loss和session expire时自动建立新的ZooKeeper实例进行重连。另外一件是将一次性watcher包装为持久watcher。后者的具体作法是简单的在watcher回调中,从新读取数据的同时再注册相同的watcher实例。node

zkClient简单的使用样例以下:git

  public static void testzkClient(final String serverList) {
        ZkClient zkClient4subChild = new ZkClient(serverList);
        zkClient4subChild.subscribeChildChanges(PATH, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List currentChilds) throws Exception {
                System.out.println(prefix() + "clildren of path " + parentPath + ":" + currentChilds);
            }
        });

上面是订阅children变化,下面是订阅数据变化github

  ZkClient zkClient4subData = new ZkClient(serverList);
        zkClient4subData.subscribeDataChanges(PATH, new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println(prefix() + "Data of " + dataPath + " has changed");
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println(prefix() + dataPath + " has deleted");
            }
        });

订阅链接状态的变化:api

 ZkClient zkClient4subStat = new ZkClient(serverList);
        zkClient4subStat.subscribeStateChanges(new IZkStateListener() {
            @Override
            public void handleNewSession() throws Exception {
                System.out.println(prefix() + "handleNewSession()");
            }

            @Override
            public void handleStateChanged(KeeperState stat) throws Exception {
                System.out.println(prefix() + "handleStateChanged,stat:" + stat);
            }
        });

下面表格列出了写操做与ZK内部产生的事件的对应关系:session

  **event For "/path"** **event For "/path/child"**
**create("/path")** EventType.NodeCreated NA
**delete("/path")** EventType.NodeDeleted NA
**setData("/path")** EventType.NodeDataChanged NA
**create("/path/child")** EventType.NodeChildrenChanged EventType.NodeCreated
**delete("/path/child")** EventType.NodeChildrenChanged EventType.NodeDeleted
**setData("/path/child")** NA EventType.NodeDataChanged

而ZK内部的写事件与所触发的watcher的对应关系以下:并发

**event For "/path"** **defaultWatcher** **exists ("/path")** **getData ("/path")** **getChildren ("/path")**
**EventType.None**
**EventType.NodeCreated**    
**EventType.NodeDeleted**   √(不正常)  
**EventType.NodeDataChanged**    
**EventType.NodeChildrenChanged**      

综合上面两个表,咱们能够总结出各类写操做能够触发哪些watcher,以下表所示:分布式

  **"/path"** **"/path/child"**
  **exists** **getData** **getChildren** **exists** **getData** **getChildren**
**create("/path")** **√** **√** ** ** ** ** ** ** ** **
**delete("/path")** **√** **√** **√** ** ** ** ** ** **
**setData("/path")** **√** **√** ** ** ** ** ** ** ** **
**create("/path/child")** ** ** ** ** **√** **√** **√** ** **
**delete("/path/child")** ** ** ** ** **√** **√** **√** **√**
**setData("/path/child")** ** ** ** ** ** ** **√** **√** ** **

若是发生session close、authFail和invalid,那么全部类型的wather都会被触发 zkClient除了作了一些便捷包装以外,对watcher使用作了一点加强。好比subscribeChildChanges其实是经过exists和getChildren关注了两个事件。这样当create("/path")时,对应path上经过getChildren注册的listener也会被调用。另外subscribeDataChanges实际上只是经过exists注册了事件。由于从上表能够看到,对于一个更新,经过exists和getData注册的watcher要么都会触发,要么都不会触发。 zkClient地址:[https://github.com/sgroschupf/zkclient](https://github.com/sgroschupf/zkclient) Maven工程中使用zkClient须要加的依赖:ide

    <dependency>
        <groupId>zkclient</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.1</version>
    </dependency>

2、 menagerie工具

menagerie基于Zookeeper实现了java.util.concurrent包的一个分布式版本。这个封装是更大粒度上对各类分布式一致性使用场景的抽象。其中最基础和经常使用的是一个分布式锁的实现:
org.menagerie.locks.ReentrantZkLock,经过ZooKeeper的全局有序的特性和EPHEMERAL_SEQUENTIAL类型znode的支持,实现了分布式锁。具体作法是:不一样的client上每一个试图得到锁的线程,都在相同的basepath下面建立一个EPHEMERAL_SEQUENTIAL的node。EPHEMERAL表示要建立的是临时znode,建立链接断开时会自动删除; SEQUENTIAL表示要自动在传入的path后面缀上一个自增的全局惟一后缀,做为最终的path。所以对不一样的请求ZK会生成不一样的后缀,并分别返回带了各自后缀的path给各个请求。由于ZK全局有序的特性,无论client请求怎样前后到达,在ZKServer端都会最终排好一个顺序,所以自增后缀最小的那个子节点,就对应第一个到达ZK的有效请求。而后client读取basepath下的全部子节点和ZK返回给本身的path进行比较,当发现本身建立的sequential node的后缀序号排在第一个时,就认为本身得到了锁;不然的话,就认为本身没有得到锁。这时确定是有其余并发的而且是没有断开的client/线程先建立了node。

基于分布式锁,还实现了其余业务场景,好比leader选举:

public static void leaderElectionTest() {
ZkSessionManager zksm = new DefaultZkSessionManager(“ZK-host-ip:2181”, 5000);
LeaderElector elector = new ZkLeaderElector(“/leaderElectionTest”, zksm, Ids.OPEN_ACL_UNSAFE);
if (elector.nominateSelfForLeader()) {
System.out.println(“Try to become the leader success!”);
}
}

java.util.concurrent包下面的其余接口实现,也主要是基于ReentrantZkLock的,好比ZkHashMap实现了ConcurrentMap。具体请参见menagerie的API文档

menagerie地址:https://github.com/wxuedong/menagerie
Maven工程中使用menagerie须要加的依赖:

    <dependency>
        <groupId>org.menagerie</groupId>
        <artifactId>menagerie</artifactId>
        <version>1.1-SNAPSHOT</version>
    </dependency>
相关文章
相关标签/搜索