做者:leesf 掌控之中,才会成功;掌控以外,注定失败; 出处:http://www.cnblogs.com/leesf456/ (尊重原创,感谢做者整理的这么好,做者的部份内容添加了个人理解和阐述,供你们一块儿学习讨论)html
1、前言java
上一篇博客咱们经过命令行来操做Zookeper的客户端和服务端并进行相应的操做,这篇主要介绍如何经过API(JAVA)来操做Zookeeper。node
2、开发环境配置apache
首先打开Zookeeper服务端(上一篇博客有具体的方法),方便客户端链接。api
配置开发环境环境能够有两种方式:① 直接下载相关的依赖Jar包,而后在IDE中添加依赖 ② 创建maven项目,使用maven进行依赖管理。数组
① 手动添加依赖至IDE服务器
步骤一:点击这里下载对应版本的Jar包,包括(jar、javadoc.jar、sources.jar),笔者对应下载的Zookeeper3.4.6版本。网络
步骤二:打开IDE(我用的是MyEclipse),新建名为zookeeper_examples_none_maven的java项目。因为须要单独添加依赖,为了方便管理,笔者在项目下新建了jar文件夹,用于存放本项目的jar包(将步骤一下载的3个jar包存放至此文件夹下)。session
步骤三:在MyEclipse中添加依赖,选中项目右键-->build path -->Libraries-->Add JARs引入jar文件夹下的jar包;(须要加入log4j和slf4j-api俩个jar包)并发
步骤四:新建包、Java类进行测试
Zookeeper_Constructor_Usage_Simple.java
package com.hust.grid.leesf.examples; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.KeeperState; /** * 使用zookeeper的javaAPI实现客户端对zookeeper服务端的链接; */ public class Zookeeper_Constructor_Usage_Simple implements Watcher { private static final int SESSION_TIMEOUT = 5000; //对执行中的线程进行管理,等待线程完成某些操做后,再对此线程作处理(起到过河拆桥、卸磨杀驴的做用) private static CountDownLatch connectedSemaphore = new CountDownLatch(1); ZooKeeper zookeeper; /** * 在链接函数中建立了zookeeper的实例;而后创建与服务器的链接; * 创建链接函数会当即返回,因此咱们须要等待链接创建成功后再进行其余的操做; * 咱们使用connectedSemaphore.await()来阻塞当前线程,直到zookeeper准备就绪; */ public void connect(String host) throws IOException, InterruptedException{ /** * host 127.0.0.1:2181,服务器端主机名称以及端口号; * SESSION_TIMEOUT 客户端链接服务器session的超时时间; * this 表示Watcher接口的一个实例,Watcher实例负责接收Zookeeper数据变化时产生的事件回调; */ zookeeper = new ZooKeeper(host, SESSION_TIMEOUT, this); System.out.println("zk的链接状态:"+zookeeper.getState()); connectedSemaphore.await(); System.out.println("Zookeeper session established"); } /** * 当客户端链接上了zookeeper服务器,Watcher接口会使用process()方法接收一个链接成功的事件, * 接下来调用CountDownLatch释放以前的阻塞; */ public void process(WatchedEvent event) { System.out.println("Receive watched event : " + event); if (KeeperState.SyncConnected == event.getState()) { System.out.println("链接成功:再也不阻塞当前线程"); connectedSemaphore.countDown(); } } public static void main(String[] args) throws IOException, InterruptedException { Zookeeper_Constructor_Usage_Simple zookeeperConstructor = new Zookeeper_Constructor_Usage_Simple(); zookeeperConstructor.connect("127.0.0.1:2181"); } }
打印结果为下图所示:
表示客户端已经成功链接至服务器了。
能够看到方法一相对而言比较麻烦,须要手动管理不一样的依赖jar包,能够采用更成熟的依赖管理方法,即便用maven来管理Jar包。
② 使用maven管理依赖
步骤一:新建maven项目,使用IDE工具直接建立便可;
直接next便可:
依然是Next:
依然是Next:
步骤二:配置pom.xml文件以下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hust.grid.leesf</groupId> <artifactId>zookeeper_examples</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>zookeeper_examples</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <type>bundle</type> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> </dependencies> </project>
步骤三:新建java类进行测试
Zookeeper_Constructor_Usage_Simple.java,代码同上(执行结果不在重复展现)。
先说一下CountDownLatch这个类,这是个很牛逼的门闩,能够对当前执行线程进行管理;我使用该类的源码作一个说明,模拟军队撤退的过程;
package com.hust.grid.leesf.curator; import java.util.concurrent.CountDownLatch; /** * 模拟军队撤退过程:军官先跑,而后小兵再跑 * CountDownLatch相似于一个门闩,当计数器不为0,当前线程不能执行,使用await方法将线程关在门外; * 执行一次countDown,计数器就减1,一旦计数器减完后是0,那么await方法当即打开门,让线程执行; * @author songzl * */ public class Driver { public static void main(String[] arg) throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(5); for (int i = 0; i < 5; ++i){ // create and start threads new Thread(new Worker(startSignal, doneSignal,i)).start(); } //军座先跑 doSomethingElse("军座先撤退..."); //而后计算器减1 startSignal.countDown();//军座撤退完毕,打开门闩,让小兵开始跑 doSomethingElse("军座撤退成功,让小兵撤退..."); //清点人数,确认小兵是否所有撤退,若没有须要列队等待 doneSignal.await(); // wait for all to finish System.out.println("全军撤退完毕!"); } private static void doSomethingElse(String commond) { System.out.println(commond); } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; private int count = 0; public Worker(CountDownLatch startSignal, CountDownLatch doneSignal,int count) { this.startSignal = startSignal; this.doneSignal = doneSignal; this.count = count; } public void run() { try { System.out.println("第"+count+"个小兵试图逃跑,被要求滚回战场..."); startSignal.await(); doWork(count); doneSignal.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } private void doWork(int count) { System.out.println("军座都跑完了,咱们小兵也跑吧,第"+count+"个小兵逃跑:"+Thread.currentThread().getName()); } }
3、操做示例
3.1 建立节点
建立节点有异步和同步两种方式。不管是异步或者同步,Zookeeper都不支持递归调用,即没法在父节点不存在的状况下建立一个子节点,如在/zk-ephemeral节点不存在的状况下建立/zk-ephemeral/ch1节点;而且若是一个节点已经存在,那么建立同名节点时,会抛出NodeExistsException异常。
① 同步方式
package com.hust.grid.leesf.examples; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.ACL; /** * 使用zookeeper的javaAPI实现客户端对zookeeper服务端的链接;而且给服务端建立znode节点 */ public class Zookeeper_Constructor_Usage_Simple implements Watcher { private static final int SESSION_TIMEOUT = 5000; //对执行中的线程进行管理,等待线程完成某些操做后,再对此线程作处理(起到过河拆桥、卸磨杀驴的做用) private static CountDownLatch connectedSemaphore = new CountDownLatch(1); ZooKeeper zookeeper; /** * 在链接函数中建立了zookeeper的实例;而后创建与服务器的链接; * 创建链接函数会当即返回,因此咱们须要等待链接创建成功后再进行其余的操做; * 咱们使用CountDownLatch来阻塞当前线程,直到zookeeper准备就绪; */ public void connect(String host) throws IOException, InterruptedException{ /** * host 127.0.0.1:2181,服务器端主机名称以及端口号; * SESSION_TIMEOUT 客户端链接服务器session的超时时间; * this 表示Watcher接口的一个实例,Watcher实例负责接收Zookeeper数据变化时产生的事件回调; */ zookeeper = new ZooKeeper(host, SESSION_TIMEOUT, this); System.out.println("zk的链接状态:"+zookeeper.getState()); connectedSemaphore.await(); System.out.println("Zookeeper session established"); } /** * 当客户端链接上了zookeeper服务器,Watcher接口会使用process()方法接收一个链接成功的事件, * 接下来调用CountDownLatch释放以前的阻塞; */ public void process(WatchedEvent event) { System.out.println("Receive watched event : " + event); if (KeeperState.SyncConnected == event.getState()) { System.out.println("链接成功:再也不阻塞当前线程"); connectedSemaphore.countDown(); } } /** * 同步建立节点:方法中调用zookeeper实例的create()方法来建立一个znode; * @param path znode节点的绝对路径 * @param bytes znode节点的内容(一个二进制数组) * @param ACL access control list(ACL,访问控制列表,这里使用彻底开放模式) * @param createMode znode的性质,分为EPHEMERAL(临时)、PERSISTENT(持久)、EPHEMERAL_SEQUENTIAL临时顺序和PERSISTENT_SEQUENTIAL持久顺序 * @throws KeeperException * @throws InterruptedException */ public void create(String path,byte[] bytes,ArrayList<ACL> ACL,CreateMode createMode) throws KeeperException, InterruptedException{ String znodePath = zookeeper.create(path, bytes, ACL, createMode); System.out.println("Success create znode: " + znodePath); } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { Zookeeper_Constructor_Usage_Simple zookeeperConstructor = new Zookeeper_Constructor_Usage_Simple(); zookeeperConstructor.connect("127.0.0.1:2181"); System.out.println("开始给zookeeper服务端建立节点========="); zookeeperConstructor.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zookeeperConstructor.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } }
运行结果以下:
结果代表已经成功建立了临时节点和临时顺序节点,在建立顺序节点时,系统会在后面自动增长一串数字。
② 异步方式
使用异步方式于同步方式的区别在于节点的建立过程(包括网络通讯和服务端的节点建立过程)是异步的,在同步接口调用过程当中,开发者须要关注接口抛出异常的可能,可是在异步接口中,接口自己不会抛出异常,全部异常都会在回调函数中经过Result Code来体现。
package com.hust.grid.leesf.examples; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.ZooKeeper; /** * 使用zookeeper的javaAPI实现客户端对zookeeper服务端的链接,而且使用异步方法建立znode节点 * @author songzl */ public class Zookeeper_Create_API_ASync_Usage implements Watcher { private static final int SESSION_TIMEOUT = 5000; //对执行中的线程进行管理,等待线程完成某些操做后,再对此线程作处理(起到过河拆桥、卸磨杀驴的做用) private static CountDownLatch connectedSemaphore = new CountDownLatch(1); ZooKeeper zookeeper; /** * 在链接函数中建立了zookeeper的实例;而后创建与服务器的链接; * 创建链接函数会当即返回,因此咱们须要等待链接创建成功后再进行其余的操做; * 咱们使用CountDownLatch来阻塞当前线程,直到zookeeper准备就绪; */ public void connect(String host) throws IOException, InterruptedException{ /** * host 127.0.0.1:2181,服务器端主机名称以及端口号; * SESSION_TIMEOUT 客户端链接服务器session的超时时间; * this 表示Watcher接口的一个实例,Watcher实例负责接收Zookeeper数据变化时产生的事件回调; */ zookeeper = new ZooKeeper(host, SESSION_TIMEOUT, this); System.out.println("zk的链接状态:"+zookeeper.getState()); connectedSemaphore.await(); System.out.println("Zookeeper session established"); } /** * 当客户端链接上了zookeeper服务器,Watcher接口会使用process()方法接收一个链接成功的事件, * 接下来调用CountDownLatch释放以前的阻塞; */ public void process(WatchedEvent event) { System.out.println("Receive watched event : " + event); if (KeeperState.SyncConnected == event.getState()) { System.out.println("链接成功:再也不阻塞当前线程"); connectedSemaphore.countDown(); } } /** * 异步建立节点:方法中调用zookeeper实例的create()方法来建立一个znode; * @param path znode节点的绝对路径 * @param bytes znode节点的内容(一个二进制数组) * @param ACL access control list(ACL,访问控制列表,这里使用彻底开放模式) * @param createMode znode的性质,分为EPHEMERAL(临时)、PERSISTENT(持久)、EPHEMERAL_SEQUENTIAL临时顺序和PERSISTENT_SEQUENTIAL持久顺序 * @param string 一个java的字符串 * @param iStringCallback 回调函数 * @throws KeeperException * @throws InterruptedException */ public void create(String path,byte[] bytes,ArrayList<ACL> ACL,CreateMode createMode, IStringCallback iStringCallback, String string) throws KeeperException, InterruptedException{ String znodePath = zookeeper.create(path, bytes, ACL, createMode); System.out.println("Success create znode: " + znodePath); } public static void main(String[] args) throws Exception { Zookeeper_Create_API_ASync_Usage zookeeperCreateAPIASyncUsage = new Zookeeper_Create_API_ASync_Usage(); zookeeperCreateAPIASyncUsage.connect("127.0.0.1:2181"); //异步建立临时节点 zookeeperCreateAPIASyncUsage.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(), "I am context. "); //异步建立临时顺序节点 zookeeperCreateAPIASyncUsage.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new IStringCallback(), "I am context. "); //若是不阻塞线程,session超时后临时节点会自动删除 Thread.sleep(Integer.MAX_VALUE); } } class IStringCallback implements AsyncCallback.StringCallback { public void processResult(int rc, String path, Object ctx, String name) { System.out.println("Create path result: [" + rc + ", " + path + ", " + ctx + ", real path name: " + name); } }
运行结果以下:
3.2 删除节点
只容许删除叶子节点,即一个节点若是有子节点,那么该节点将没法直接删除,必须先删掉其全部子节点。一样也有同步和异步两种方式。
同步和异步方式
package com.hust.grid.leesf.examples; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.AsyncCallback.VoidCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.ZooKeeper; public class Delete_API_Sync_Usage implements Watcher{ private static final int SESSION_TIMEOUT = 5000; //对执行中的线程进行管理,等待线程完成某些操做后,再对此线程作处理(起到过河拆桥、卸磨杀驴的做用) private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zookeeper; /** * 在链接函数中建立了zookeeper的实例;而后创建与服务器的链接; * 创建链接函数会当即返回,因此咱们须要等待链接创建成功后再进行其余的操做; * 咱们使用CountDownLatch来阻塞当前线程,直到zookeeper准备就绪; */ public void connect(String host) throws IOException, InterruptedException{ /** * host 127.0.0.1:2181,服务器端主机名称以及端口号; * SESSION_TIMEOUT 客户端链接服务器session的超时时间; * this 表示Watcher接口的一个实例,Watcher实例负责接收Zookeeper数据变化时产生的事件回调; */ zookeeper = new ZooKeeper(host, SESSION_TIMEOUT, this); System.out.println("zk的链接状态:"+zookeeper.getState()); connectedSemaphore.await(); System.out.println("Zookeeper session established"); } /** * 当客户端链接上了zookeeper服务器,Watcher接口会使用process()方法接收一个链接成功的事件, * 接下来调用CountDownLatch释放以前的阻塞; */ public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } } } /** * 方法中调用zookeeper实例的create()方法来建立一个znode; * @param path znode节点的绝对路径 * @param bytes znode节点的内容(一个二进制数组) * @param ACL access control list(ACL,访问控制列表,这里使用彻底开放模式) * @param createMode znode的性质,分为EPHEMERAL(临时)、PERSISTENT(持久)、EPHEMERAL_SEQUENTIAL临时顺序和PERSISTENT_SEQUENTIAL持久顺序 * @throws KeeperException * @throws InterruptedException */ public void create(String path,byte[] bytes,ArrayList<ACL> ACL,CreateMode createMode) throws KeeperException, InterruptedException{ String znodePath = zookeeper.create(path, bytes, ACL, createMode); System.out.println("Success create znode: " + znodePath); } /** * 同步删除znode的节点方法 * @param path znode节点的绝对路径 * @param version znode节点的版本号(-1表示不匹配版本号) * @throws InterruptedException * @throws KeeperException */ public void syncDelete(String path,int version) throws InterruptedException, KeeperException{ zookeeper.delete(path, version); } /** * 异步删除znode的节点方法 * @param path znode节点的绝对路径 * @param version znode节点的版本号(-1表示不匹配版本号) * @param cb * @param ctx * @throws InterruptedException * @throws KeeperException */ public void AsyncDelete(String path, int version, VoidCallback cb, Object ctx) throws InterruptedException, KeeperException{ zookeeper.delete(path, version, cb, ctx); } public static void main(String[] args) throws Exception { String path = "/zk-book"; Delete_API_Sync_Usage delete_API_Sync_Usage = new Delete_API_Sync_Usage(); delete_API_Sync_Usage.connect("127.0.0.1:2181"); //建立path为"/zk-book"的节点 delete_API_Sync_Usage.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("success create znode: " + path); //建立path为"/zk-book/c1"的节点 delete_API_Sync_Usage.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("success create znode: " + path + "/c1"); //============调用同步删除节点开始======================================= try { //直接删除path为"/zk-book"的节点失败,由于包含子节点 delete_API_Sync_Usage.syncDelete(path, -1); } catch (Exception e) { System.out.println("fail to delete znode: " + path); } //先删除子节点 delete_API_Sync_Usage.syncDelete(path + "/c1", -1); System.out.println("success delete znode: " + path + "/c1"); //再删除父节点 delete_API_Sync_Usage.syncDelete(path, -1); System.out.println("success delete znode: " + path); //============调用同步删除节点结束======================================= //============调用异步步删除节点开始======================================= delete_API_Sync_Usage.AsyncDelete(path, -1, new IVoidCallback(), null); delete_API_Sync_Usage.AsyncDelete(path + "/c1", -1, new IVoidCallback(), null); delete_API_Sync_Usage.AsyncDelete(path, -1, new IVoidCallback(), null); Thread.sleep(Integer.MAX_VALUE); } } /** * 异步删除时,接口的一个回调函数 * 参数rc表示返回码; * 参数path和ctx与客户端调用的方法中的参数相等,这两个参数一般用来肯定回调中得到的响应是来自于哪一个请求的; * 参数ctx能够是任意对象,只有当path参数不能消灭请求的歧义时才会用到,若是不须要参数ctx,能够设置为null */ class IVoidCallback implements AsyncCallback.VoidCallback { public void processResult(int rc, String path, Object ctx) { System.out.println(rc + ", " + path + ", " + ctx); } }
备注:ZooKeeper的API提供一个delete()方法来删除一个znode。咱们经过输入znode的path和版本号(version number)来删除想要删除的znode。除了使用path来定位咱们要删除的znode,还须要一个参数是版本号。只有当咱们指定要删除的本版号,与znode当前的版本号一致时,ZooKeeper才容许咱们将znode删除掉。这是一种optimistic locking(乐观锁)机制,用来处理znode的读写冲突。咱们也能够忽略版本号一致检查,作法就是版本号赋值为-1。
3.3 子节点获取
读取节点的子节点列表,一样可使用同步和异步的方式进行操做。
package com.hust.grid.leesf.examples; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.ZooKeeper; /** * 获取zookeeper的子节点:建立一个父节点和三个子节点 */ public class Zookeeper_GetChildren_API_Sync_Usage implements Watcher { private static final int SESSION_TIMEOUT = 5000; //对执行中的线程进行管理,等待线程完成某些操做后,再对此线程作处理(起到过河拆桥、卸磨杀驴的做用) private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zookeeper; /** * 在链接函数中建立了zookeeper的实例;而后创建与服务器的链接; * 创建链接函数会当即返回,因此咱们须要等待链接创建成功后再进行其余的操做; * 咱们使用CountDownLatch来阻塞当前线程,直到zookeeper准备就绪; */ public void connect(String host) throws IOException, InterruptedException{ /** * host 127.0.0.1:2181,服务器端主机名称以及端口号; * SESSION_TIMEOUT 客户端链接服务器session的超时时间; * this 表示Watcher接口的一个实例,Watcher实例负责接收Zookeeper数据变化时产生的事件回调; */ zookeeper = new ZooKeeper(host, SESSION_TIMEOUT, this); System.out.println("zk的链接状态:"+zookeeper.getState()); connectedSemaphore.await(); System.out.println("Zookeeper session established"); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } else if (event.getType() == EventType.NodeChildrenChanged) { try { //获取子节点 System.out.println("ReGet Child:" + zookeeper.getChildren(event.getPath(), true)); } catch (Exception e) { } } } } /** * 方法中调用zookeeper实例的create()方法来建立一个znode; * @param path znode节点的绝对路径 * @param bytes znode节点的内容(一个二进制数组) * @param ACL access control list(ACL,访问控制列表,这里使用彻底开放模式) * @param createMode znode的性质,分为EPHEMERAL(临时)、PERSISTENT(持久)、EPHEMERAL_SEQUENTIAL临时顺序和PERSISTENT_SEQUENTIAL持久顺序 * @throws KeeperException * @throws InterruptedException */ public void create(String path,byte[] bytes,ArrayList<ACL> ACL,CreateMode createMode) throws KeeperException, InterruptedException{ String znodePath = zookeeper.create(path, bytes, ACL, createMode); System.out.println("Success create znode: " + znodePath); } public static void main(String[] args) throws KeeperException, InterruptedException, IOException { Zookeeper_GetChildren_API_Sync_Usage zookeeper_GetChildren_API_Sync_Usage = new Zookeeper_GetChildren_API_Sync_Usage(); zookeeper_GetChildren_API_Sync_Usage.connect("127.0.0.1:2181"); String path = "/zk-book-1"; //建立父节点 zookeeper_GetChildren_API_Sync_Usage.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //建立子节点 zookeeper_GetChildren_API_Sync_Usage.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); List<String> childrenList = zookeeper.getChildren(path, true); System.out.println(childrenList); zookeeper_GetChildren_API_Sync_Usage.create(path + "/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Thread.sleep(1000); zookeeper_GetChildren_API_Sync_Usage.create(path + "/c3", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Thread.sleep(Integer.MAX_VALUE); } }
值得注意的是,Watcher通知是一次性的,即一旦触发一次通知后,该Watcher就失效了,所以客户端须要反复注册Watcher,即程序中在process里面又注册了Watcher,不然,将没法获取c3节点的建立而致使子节点变化的事件。
3.4 数据节点获取(再也不展现异步的获取方式)
package com.hust.grid.leesf.examples; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; /** * 数据节点获取 */ public class GetData_API_Sync_Usage implements Watcher{ private static final int SESSION_TIMEOUT = 5000; private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zookeeper; private static Stat stat = new Stat(); /** * 在链接函数中建立了zookeeper的实例;而后创建与服务器的链接; * 创建链接函数会当即返回,因此咱们须要等待链接创建成功后再进行其余的操做; * 咱们使用CountDownLatch来阻塞当前线程,直到zookeeper准备就绪; */ public void connect(String host) throws IOException, InterruptedException{ /** * host 127.0.0.1:2181,服务器端主机名称以及端口号; * SESSION_TIMEOUT 客户端链接服务器session的超时时间; * this 表示Watcher接口的一个实例,Watcher实例负责接收Zookeeper数据变化时产生的事件回调; */ zookeeper = new ZooKeeper(host, SESSION_TIMEOUT, this); System.out.println("zk的链接状态:"+zookeeper.getState()); connectedSemaphore.await(); System.out.println("Zookeeper session established"); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } else if (event.getType() == EventType.NodeDataChanged) {//NodeDeleted表示本节点被变更 try { System.out.println("the data of znode " + event.getPath() + " is : " + new String(zookeeper.getData(event.getPath(), true, stat))); System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion()); } catch (Exception e) { } } } } /** * 方法中调用zookeeper实例的create()方法来建立一个znode; * @param path znode节点的绝对路径 * @param bytes znode节点的内容(一个二进制数组) * @param ACL access control list(ACL,访问控制列表,这里使用彻底开放模式) * @param createMode znode的性质,分为EPHEMERAL(临时)、PERSISTENT(持久)、EPHEMERAL_SEQUENTIAL临时顺序和PERSISTENT_SEQUENTIAL持久顺序 * @throws KeeperException * @throws InterruptedException */ public void create(String path,byte[] bytes,ArrayList<ACL> ACL,CreateMode createMode) throws KeeperException, InterruptedException{ String znodePath = zookeeper.create(path, bytes, ACL, createMode); System.out.println("Success create znode: " + znodePath); } public static void main(String[] args) throws Exception, InterruptedException { String path = "/zk-book"; GetData_API_Sync_Usage getData_API_Sync_Usage = new GetData_API_Sync_Usage(); getData_API_Sync_Usage.connect("127.0.0.1:2181"); //create启动的观察模式 zookeeper.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success create znode: " + path); System.out.println("the data of znode " + path + " is : " + new String(zookeeper.getData(path, true, stat))); System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion()); //setData启动的观察模式 zookeeper.setData(path, "123".getBytes(), -1); Thread.sleep(Integer.MAX_VALUE); } }
3.5 更新数据
在更新数据时,setData方法存在一个version参数,其用于指定节点的数据版本,代表本次更新操做是针对指定的数据版本进行的,可是,在getData方法中,并无提供根据指定数据版原本获取数据的接口,那么,这里为什么要指定数据更新版本呢,这里方便理解,能够等效于CAS(compare and swap),对于值V,每次更新以前都会比较其值是不是预期值A,只有符合预期,才会将V原子化地更新到新值B。Zookeeper的setData接口中的version参数能够对应预期值,代表是针对哪一个数据版本进行更新,假如一个客户端试图进行更新操做,它会携带上次获取到的version值进行更新,而若是这段时间内,Zookeeper服务器上该节点的数据已经被其余客户端更新,那么其数据版本也会相应更新,而客户端携带的version将没法匹配,没法更新成功,所以能够有效地避免分布式更新的并发问题。
package com.hust.grid.leesf.examples; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; public class SetData_API_Sync_Usage implements Watcher { private static final int SESSION_TIMEOUT = 5000; //对执行中的线程进行管理,等待线程完成某些操做后,再对此线程作处理(起到过河拆桥、卸磨杀驴的做用) private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zookeeper; /** * 在链接函数中建立了zookeeper的实例;而后创建与服务器的链接; * 创建链接函数会当即返回,因此咱们须要等待链接创建成功后再进行其余的操做; * 咱们使用CountDownLatch来阻塞当前线程,直到zookeeper准备就绪; */ public void connect(String host) throws IOException, InterruptedException{ /** * host 127.0.0.1:2181,服务器端主机名称以及端口号; * SESSION_TIMEOUT 客户端链接服务器session的超时时间; * this 表示Watcher接口的一个实例,Watcher实例负责接收Zookeeper数据变化时产生的事件回调; */ zookeeper = new ZooKeeper(host, SESSION_TIMEOUT, this); System.out.println("zk的链接状态:"+zookeeper.getState()); connectedSemaphore.await(); System.out.println("Zookeeper session established"); } /** * 方法中调用zookeeper实例的create()方法来建立一个znode; * @param path znode节点的绝对路径 * @param bytes znode节点的内容(一个二进制数组) * @param ACL access control list(ACL,访问控制列表,这里使用彻底开放模式) * @param createMode znode的性质,分为EPHEMERAL(临时)、PERSISTENT(持久)、EPHEMERAL_SEQUENTIAL临时顺序和PERSISTENT_SEQUENTIAL持久顺序 * @throws KeeperException * @throws InterruptedException */ public void create(String path,byte[] bytes,ArrayList<ACL> ACL,CreateMode createMode) throws KeeperException, InterruptedException{ String znodePath = zookeeper.create(path, bytes, ACL, createMode); System.out.println("Success create znode: " + znodePath); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } } } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { SetData_API_Sync_Usage setData_API_Sync_Usage = new SetData_API_Sync_Usage(); setData_API_Sync_Usage.connect("127.0.0.1:2181"); String path = "/zk-book"; setData_API_Sync_Usage.create(path, "songzl".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.getData(path, true, null); Stat stat = zookeeper.setData(path, "wangxn".getBytes(), -1); System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion()); Stat stat2 = zookeeper.setData(path, "songcy".getBytes(), stat.getVersion()); System.out.println("czxID: " + stat2.getCzxid() + ", mzxID: " + stat2.getMzxid() + ", version: " + stat2.getVersion()); try { zookeeper.setData(path, "456".getBytes(), stat.getVersion()); } catch (KeeperException e) { System.out.println("Error: " + e.code() + "," + e.getMessage()); } Thread.sleep(Integer.MAX_VALUE); } }
运行结果以下
success create znode: /zk-book czxID: 2936, mzxID: 2937, version: 1 czxID: 2936, mzxID: 2938, version: 2 Error: BADVERSION,KeeperErrorCode = BadVersion for /zk-book
结果代表因为携带的数据版本不正确,而没法成功更新节点。其中,setData中的version参数设置-1含义为客户端须要基于数据的最新版本进行更新操做。
3.6 检测节点是否存在
在调用接口时注册Watcher的话,还能够对节点是否存在进行监听,一旦节点被建立、被删除、数据更新,都会通知客户端。
package com.hust.grid.leesf.examples; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; /** * 检查节点是否存在 */ public class Exist_API_Sync_Usage implements Watcher{ private static final int SESSION_TIMEOUT = 5000; //对执行中的线程进行管理,等待线程完成某些操做后,再对此线程作处理(起到过河拆桥、卸磨杀驴的做用) private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zookeeper; /** * 在链接函数中建立了zookeeper的实例;而后创建与服务器的链接; * 创建链接函数会当即返回,因此咱们须要等待链接创建成功后再进行其余的操做; * 咱们使用CountDownLatch来阻塞当前线程,直到zookeeper准备就绪; */ public void connect(String host) throws IOException, InterruptedException{ /** * host 127.0.0.1:2181,服务器端主机名称以及端口号; * SESSION_TIMEOUT 客户端链接服务器session的超时时间; * this 表示Watcher接口的一个实例,Watcher实例负责接收Zookeeper数据变化时产生的事件回调; */ zookeeper = new ZooKeeper(host, SESSION_TIMEOUT, this); System.out.println("zk的链接状态:"+zookeeper.getState()); connectedSemaphore.await(); System.out.println("Zookeeper session established"); } /** * 方法中调用zookeeper实例的create()方法来建立一个znode; * @param path znode节点的绝对路径 * @param bytes znode节点的内容(一个二进制数组) * @param ACL access control list(ACL,访问控制列表,这里使用彻底开放模式) * @param createMode znode的性质,分为EPHEMERAL(临时)、PERSISTENT(持久)、EPHEMERAL_SEQUENTIAL临时顺序和PERSISTENT_SEQUENTIAL持久顺序 * @throws KeeperException * @throws InterruptedException */ public void create(String path,byte[] bytes,ArrayList<ACL> ACL,CreateMode createMode) throws KeeperException, InterruptedException{ String znodePath = zookeeper.create(path, bytes, ACL, createMode); System.out.println("Success create znode: " + znodePath); } /** * 当客户端链接上了zookeeper服务器,Watcher接口会使用process()方法接收一个链接成功的事件, * 接下来调用CountDownLatch释放以前的阻塞;若对节点有变更也会根据状态触发该事件; */ public void process(WatchedEvent event) { try { if (KeeperState.SyncConnected == event.getState()) {//判断状态是否已成功链接 if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); System.out.println("链接服务器成功,关闭阻塞线程"); } else if (EventType.NodeCreated == event.getType()) {//建立节点时触发 System.out.println("success create znode: " + event.getPath()); zookeeper.exists(event.getPath(), true); } else if (EventType.NodeDeleted == event.getType()) {//删除节点时触发 System.out.println("success delete znode: " + event.getPath()); zookeeper.exists(event.getPath(), true); } else if (EventType.NodeDataChanged == event.getType()) {//变动当前节点时触发 System.out.println("data changed of znode: " + event.getPath()); zookeeper.exists(event.getPath(), true); } } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { String path = "/zk-book"; Exist_API_Sync_Usage exist_API_Sync_Usage = new Exist_API_Sync_Usage(); exist_API_Sync_Usage.connect("127.0.0.1:2181"); //若节点不存在返回null,不然返回节点的状态,参数传true是给节点添加观察事件,当节点变更时触发观察事件 Stat stat = zookeeper.exists(path, true); zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.setData(path, "123".getBytes(), -1); zookeeper.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("success create znode: " + path + "/c1"); zookeeper.delete(path + "/c1", -1); zookeeper.delete(path, -1); Thread.sleep(Integer.MAX_VALUE); } }
运行结果以下:
结果代表:
· 不管节点是否存在,均可以经过exists接口注册Watcher。
· 注册的Watcher,对节点建立、删除、数据更新事件进行监听。
· 对于指定节点的子节点的各类变化,不会通知客户端(由于指有父节点调用exists了)。
3.7 权限控制
经过设置Zookeeper服务器上数据节点的ACL控制,就能够对其客户端对该数据节点的访问权限:若是符合ACL控制,则能够进行访问,不然没法访问。
① 使用无权限信息的Zookeeper会话访问含权限信息的数据节点
package com.hust.grid.leesf.examples; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class AuthSample_Get { final static String PATH = "/zk-book-auth_test"; public static void main(String[] args) throws Exception { ZooKeeper zookeeper1 = new ZooKeeper("127.0.0.1:2181", 5000, null); zookeeper1.addAuthInfo("digest", "foo:true".getBytes()); zookeeper1.create(PATH, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL); System.out.println("success create znode: " + PATH); ZooKeeper zookeeper2 = new ZooKeeper("127.0.0.1:2181", 5000, null); zookeeper2.getData(PATH, false, null); } }
运行结果以下:表示权限不够,不能进行操做
success create znode: /zk-book-auth_test Exception in thread "main" org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /zk-book-auth_test at org.apache.zookeeper.KeeperException.create(KeeperException.java:113) at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1155) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1184) at com.hust.grid.leesf.examples.AuthSample_Get.main(AuthSample_Get.java:17)
② 删除带权限控制的节点
package com.hust.grid.leesf.examples; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class AuthSample_Delete { final static String PATH = "/zk-book-auth_test"; final static String PATH2 = "/zk-book-auth_test/child"; public static void main(String[] args) throws Exception { ZooKeeper zookeeper1 = new ZooKeeper("127.0.0.1:2181", 5000, null); zookeeper1.addAuthInfo("digest", "foo:true".getBytes()); zookeeper1.create(PATH, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); zookeeper1.create(PATH2, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL); try { ZooKeeper zookeeper2 = new ZooKeeper("127.0.0.1:2181", 5000, null); zookeeper2.delete(PATH2, -1); } catch (Exception e) { System.out.println("fail to delete: " + e.getMessage()); } ZooKeeper zookeeper3 = new ZooKeeper("127.0.0.1:2181", 5000, null); zookeeper3.addAuthInfo("digest", "foo:true".getBytes()); zookeeper3.delete(PATH2, -1); System.out.println("success delete znode: " + PATH2); ZooKeeper zookeeper4 = new ZooKeeper("127.00.1:2181", 5000, null); zookeeper4.delete(PATH, -1); System.out.println("success delete znode: " + PATH); } }
总结:
一、应该使用同步API仍是异步API
两种API提供了相同的功能,须要使用哪一种API取决于你程序的模式。例如,你设计的程序模式是一个事件驱动模式的程序,那么你最好使用异步API。异步API也能够被用在追求一个比较好的数据吞吐量的场景。想象一下,若是你须要得去大量的znode数据,而且依靠独立的进程来处理他们。若是使用同步API,每次读取操做都会被阻塞住,直到返回结果。不如使用异步API,读取操做能够没必要等待返回结果,继续执行。而使用另外的线程来处理返回结果。
二、观察模式触发器 Watch triggers
读操做,例如:exists、getChildren、getData会在znode上开启观察模式,而且写操做会触发观察模式事件,例如:create、delete和setData。可是ACL(Access Control List)操做不会启动观察模式。观察模式被触发时,会生成一个事件,这个事件的类型取决于触发他的操做:
● exists启动的观察模式,由建立znode,删除znode和更新znode操做来触发。
● getData启动的观察模式,由删除znode和更新znode操做触发。建立znode不会触发,是由于getData操做成功的前提是znode必须已经存在。
● getChildren启动的观察模式,由子节点建立和删除,或者本节点被删除时才会被触发。咱们能够经过事件的类型来判断是本节点被删除仍是子节点被删除:NodeChildrenChanged表示子节点被删除,而NodeDeleted表示本节点删除。