zookeeper Watcher API 说明

  Watcher 在 ZooKeeper 是一个核心功能,Watcher 能够监控目录节点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知全部设置在这个目录节点上的 Watcher,从而每一个客户端都很快知道它所关注的目录节点的状态发生变化,而作出相应的反应.html

能够设置观察的操做:exists,getChildren,getDatanode

能够触发观察的操做:create,delete,setData服务器

znode以某种方式发生变化时,“观察”(watch)机制可让客户端获得通知.能够针对ZooKeeper服务的“操做”来设置观察,该服务的其余 操做能够触发观察.网络

好比,客户端能够对某个客户端调用exists操做,同时在它上面设置一个观察,若是此时这个znode不存在,exists返回 false,若是一段时间以后,这个znode被其余客户端建立,则这个观察会被触发,以前的那个客户端就会获得通知.

说明: zookeeper客户端对server的操做都是不可回退的。
意思是说,zk的客户端每次和server进行通讯的时候,会记住server上最新的zxid。若是某个时刻,客户端和server断开了链接,那么等到下次从新链接到集群中的机器上时,会检查当前链接上的那个server是否和client有相同的zxid,或者已是更新的zxid了。一旦客户端发现server的zxid比本身小,那么客户端会断开和这个server的链接,而且从新链接集群中的其它server.

一、连接Zookeeper服务器
session

/** * <p>链接Zookeeper</p> * <pre> * [关于connectString服务器地址配置] * 格式: 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 * 这个地址配置有多个ip:port之间逗号分隔,底层操做 * ConnectStringParser connectStringParser = new ConnectStringParser(“192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181”); * 这个类主要就是解析传入地址列表字符串,将其它保存在一个ArrayList中 * ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(); * 接下去,这个地址列表会被进一步封装成StaticHostProvider对象,而且在运行过程当中,一直是这个对象来维护整个地址列表。 * ZK客户端将全部Server保存在一个List中,而后随机打乱(这个随机过程是一次性的),而且造成一个环,具体使用的时候,从0号位开始一个一个使用。 * 所以,Server地址可以重复配置,这样可以弥补客户端没法设置Server权重的缺陷,可是也会加大风险。 * * [客户端和服务端会话说明] * ZooKeeper中,客户端和服务端创建链接后,会话随之创建,生成一个全局惟一的会话ID(Session ID)。 * 服务器和客户端之间维持的是一个长链接,在SESSION_TIMEOUT时间内,服务器会肯定客户端是否正常链接(客户端会定时向服务器发送heart_beat,服务器重置下次SESSION_TIMEOUT时间)。 * 所以,在正常状况下,Session一直有效,而且ZK集群全部机器上都保存这个Session信息。 * 在出现网络或其它问题状况下(例如客户端所链接的那台ZK机器挂了,或是其它缘由的网络闪断),客户端与当前链接的那台服务器之间链接断了, * 这个时候客户端会主动在地址列表(实例化ZK对象的时候传入构造方法的那个参数connectString)中选择新的地址进行链接。 * * [会话时间] * 客户端并非能够随意设置这个会话超时时间,在ZK服务器端对会话超时时间是有限制的,主要是minSessionTimeout和maxSessionTimeout这两个参数设置的。 * 若是客户端设置的超时时间不在这个范围,那么会被强制设置为最大或最小时间。 默认的Session超时时间是在2 * tickTime ~ 20 * tickTime * </pre> * @param connectString Zookeeper服务地址 * @param sessionTimeout Zookeeper链接超时时间 */
    public void connectionZookeeper(String connectString, int sessionTimeout){ this.releaseConnection(); try { // ZK客户端容许咱们将ZK服务器的全部地址都配置在这里
            zk = new ZooKeeper(connectString, sessionTimeout, this ); // 使用CountDownLatch.await()的线程(当前线程)阻塞直到全部其它拥有CountDownLatch的线程执行完毕(countDown()结果为0)
 connectedSemaphore.await(); } catch ( InterruptedException e ) { LOG.error("链接建立失败,发生 InterruptedException , e " + e.getMessage(), e); } catch ( IOException e ) { LOG.error( "链接建立失败,发生 IOException , e " + e.getMessage(), e ); } }

 

二、建立节点ide

    /** * <p>建立zNode节点, String create(path<节点路径>, data[]<节点内容>, List(ACL访问控制列表), CreateMode<zNode建立类型>) </p><br/> * <pre> * 节点建立类型(CreateMode) * 一、PERSISTENT:持久化节点 * 二、PERSISTENT_SEQUENTIAL:顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1 * 三、EPHEMERAL:临时节点客户端,session超时这类节点就会被自动删除 * 四、EPHEMERAL_SEQUENTIAL:临时自动编号节点 * </pre> * @param path zNode节点路径 * @param data zNode数据内容 * @return 建立成功返回true, 反之返回false. */
    public boolean createPath( String path, String data ) { try { String zkPath =  this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); LOG.info( "节点建立成功, Path: " + zkPath + ", content: " + data ); return true; } catch ( KeeperException e ) { LOG.error( "节点建立失败, 发生KeeperException! path: " + path + ", data:" + data + ", errMsg:" + e.getMessage(), e ); } catch ( InterruptedException e ) { LOG.error( "节点建立失败, 发生 InterruptedException! path: " + path + ", data:" + data + ", errMsg:" + e.getMessage(), e ); } return false; }

三、删除节点this

    /** * <p>删除一个zMode节点, void delete(path<节点路径>, stat<数据版本号>)</p><br/> * <pre> * 说明 * 一、版本号不一致,没法进行数据删除操做. * 二、若是版本号与znode的版本号不一致,将没法删除,是一种乐观加锁机制;若是将版本号设置为-1,不会去检测版本,直接删除. * </pre> * @param path zNode节点路径 * @return 删除成功返回true,反之返回false. */
    public boolean deletePath( String path ){ try { this.zk.delete(path,-1); LOG.info( "节点删除成功, Path: " + path); return true; } catch ( KeeperException e ) { LOG.error( "节点删除失败, 发生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e ); } catch ( InterruptedException e ) { LOG.error( "节点删除失败, 发生 InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e ); } return false; }

四、节点赋值/更新节点spa

    /** * <p>更新指定节点数据内容, Stat setData(path<节点路径>, data[]<节点内容>, stat<数据版本号>)</p> * <pre> * 设置某个znode上的数据时若是为-1,跳过版本检查 * </pre> * @param path zNode节点路径 * @param data zNode数据内容 * @return 更新成功返回true,返回返回false */
    public boolean writeData( String path, String data){ try { Stat stat = this.zk.setData(path, data.getBytes(), -1); LOG.info( "更新数据成功, path:" + path + ", stat: " + stat ); return true; } catch (KeeperException e) { LOG.error( "更新数据失败, 发生KeeperException! path: " + path + ", data:" + data + ", errMsg:" + e.getMessage(), e ); } catch (InterruptedException e) { LOG.error( "更新数据失败, 发生InterruptedException! path: " + path + ", data:" + data + ", errMsg:" + e.getMessage(), e ); } return false; }

五、读取节点值线程

    /** * <p>读取指定节点数据内容,byte[] getData(path<节点路径>, watcher<监视器>, stat<数据版本号>)</p> * @param path zNode节点路径 * @return 节点存储的值,有值返回,无值返回null */
    public String readData( String path ){ String data = null; try { data = new String( this.zk.getData( path, false, null ) ); LOG.info( "读取数据成功, path:" + path + ", content:" + data); } catch (KeeperException e) { LOG.error( "读取数据失败,发生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e ); } catch (InterruptedException e) { LOG.error( "读取数据失败,发生InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e ); } return data; }

六、判断节点是否存在code

    /** * <p>判断某个zNode节点是否存在, Stat exists(path<节点路径>, watch<并设置是否监控这个目录节点,这里的 watcher 是在建立 ZooKeeper 实例时指定的 watcher>)</p> * @param path zNode节点路径 * @return 存在返回true,反之返回false */
    public boolean isExists( String path ){ try { Stat stat = this.zk.exists( path, false ); return null != stat; } catch (KeeperException e) { LOG.error( "读取数据失败,发生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e ); } catch (InterruptedException e) { LOG.error( "读取数据失败,发生InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e ); } return false; }

七、获取某个节点下的子节点

    /** * <p>获取某个节点下的全部子节点,List getChildren(path<节点路径>, watcher<监视器>)该方法有多个重载</p> * @param path zNode节点路径 * @return 子节点路径集合 说明,这里返回的值为节点名 * <pre> * eg. * /node * /node/child1 * /node/child2 * getChild( "node" )户的集合中的值为["child1","child2"] * </pre> * * * * @throws KeeperException * @throws InterruptedException */
    public List<String> getChild( String path ){ try{ List<String> list=this.zk.getChildren( path, false ); if(list.isEmpty()){ LOG.info( "中没有节点" + path ); } return list; }catch (KeeperException e) { LOG.error( "读取子节点数据失败,发生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e ); } catch (InterruptedException e) { LOG.error( "读取子节点数据失败,发生InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e ); } return null; }

八、释放连接

  /** * 关闭ZK链接 */
    public void releaseConnection() { if ( null != zk ) { try { this.zk.close(); } catch ( InterruptedException e ) { LOG.error("release connection error ," + e.getMessage() ,e); } } }


转载请注明出处:[http://www.cnblogs.com/dennisit/p/4340688.html]

相关文章
相关标签/搜索