导入zookeeper和junitjava
<dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.6</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
before,建立会话用,后面不在贴这部分代码node
private static ZooKeeper zooKeeper; private static Stat stat = new Stat(); @Before public void before() { zooKeeper = ZookeeperConnect.getZookeeper(); }
CountDownLatch的知识点,参考java并发编程学习之CountDownLatch。ZooKeeperZookeeperConnect代码以下:apache
public class ZookeeperConnect { private static final String CONNECT_STRING = "172.17.0.2:2181,172.17.0.3:2181,172.17.0.4:2181"; public static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper = null; public static ZooKeeper getZookeeper() { try { // 第一个参数,是zookeeper集群各个节点的地址,多个地址用逗号隔开 // 第二个参数,会话超时时间,毫秒为单位。 // 第三个参数,是Watcher,这边用CountDownLatch控制链接成功后,才返回zooKeeper。 zooKeeper = new ZooKeeper(CONNECT_STRING, 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("watchedEvent:" + watchedEvent); if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { countDownLatch.countDown(); } } } }); System.out.println("等待中"); countDownLatch.await(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("获取到链接"); return zooKeeper; } }
测试代码编程
@Test public void testConnect(){ }
运行结果:segmentfault
包括同步和异步。测试代码以下:服务器
@Test public void testCreate() throws KeeperException, InterruptedException { // 第一个参数,是节点路径,一样不能直接多级设置,会报KeeperErrorCode = NoNode for的错误 // 第二个参数,是节点的内容 // 第三个参数,是节点的ACL策略,这边先用OPEN_ACL_UNSAFE放宽权限操做 // 第四个参数,是节点的类型,好比持久化节点、持久化有序节点、临时节点、临时有序节点 String result = zooKeeper.create("/node", "node".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("新增节点成功:" + result); } @Test public void testCreateASync() { // 第五个参数,是用于异步回调 // 第六个参数,是用于回调的时候,传递一个上下文信息 zooKeeper.create("/nodeASync", "nodeASync".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new MyStringCallBack(), "testCreateASync"); System.out.println("新增节点成功"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }
MyStringCallBack并发
public class MyStringCallBack implements AsyncCallback.StringCallback { public void processResult(int rc, String path, Object ctx, String name) { System.out.println("rc:" + rc + ",path:" + path + ",ctx:" + ctx + "name," + name); } }
同步运行结果以下:
异步运行结果以下:
在上面的例子中,能够看到,testCreate()
是有抛异常的,而异步是没有抛异常的,异常的信息经过Result Code来响应。
回调函数的接口,都是继承了AsyncCallback
。在AsyncCallback
类中,里面还有DataCallback
、VoidCallback
、ACLCallback
、StatCallback
等回调接口,根据ZooKeeper类中,不一样的方法,调用不一样的接口。后续几个方法的演示,这个异步的就略过了,大同小异。
接口中的processResult()
方法,参数说明以下:异步
测试代码函数
@Test public void testDelete() throws KeeperException, InterruptedException { // 第一个参数,是节点路径 // 第二个参数,是节点的版本,,-1表示不参与原子性操做 // 其余回调参数、上下文跟上面雷同 zooKeeper.delete("/node", -1); System.out.println("删除节点成功"); }
运行结果以下:学习
测试代码
@Test public void testGetChildren() throws KeeperException, InterruptedException { // 第一个参数,是获取这个节点路径下面的子节点 // 第二个参数,true表示使用默认的Watcher,false表示不用Watcher来监听。这个默认的watcher就是建立会话的时候的Watcher // 其余参数,好比注册Watcher、回调、上下文、stat信息就略过 List<String> children = zooKeeper.getChildren("/", true); System.out.println(children); }
运行结果以下,在这以前,又运行了node添加,因此就有三个节点:
测试代码:
@Test public void testGetData() throws KeeperException, InterruptedException { // 第一个参数,是获取这个路径的数据 // 第二个参数,true表示使用默认的Watcher,false表示不用Watcher来监听。这个默认的watcher就是建立会话的时候的Watcher。 // 第三个参数,接收新的stat信息 byte[] data = zooKeeper.getData("/node", true, stat); System.out.println(stat); System.out.println("node数据为:" + new String(data)); }
运行结果以下:
测试代码:
@Test public void testSetData() throws KeeperException, InterruptedException { byte[] data = zooKeeper.getData("/node", true, stat); System.out.println("node数据为:" + new String(data)); // 第一个参数,是设置节点的路径 // 第二个参数,是设置节点的数据 // 第三个参数,是设置节点的版本,-1表示不参与原子性操做 zooKeeper.setData("/node", "newData".getBytes(), -1); data = zooKeeper.getData("/node", true, stat); System.out.println("node数据为:" + new String(data)); }
运行结果以下:
测试代码:
@Test public void testExists() throws KeeperException, InterruptedException { // 第一个参数,须要获取信息的节点的路径 // 第二个参数,true表示使用默认的Watcher,false表示不用Watcher来监听。这个默认的watcher就是建立会话的时候的Watcher System.out.println("node节点信息:" + zooKeeper.exists("/node", true)); }
运行结果以下:
当客户端获取到节点的信息或者节点的子列表信息时,能够经过注册Watcher来监听节点或子列表的变化信息。指的一提的是,Watcher的通知,是一次性的,一旦触发后,这个Watcher就失效了,因此想要一直监听,就要一直反复的注册Watcher。
WatchedEvent有如下几种:
ZookeeperConnect修改为:
public class ZookeeperConnect { private static final String CONNECT_STRING = "172.17.0.2:2181,172.17.0.3:2181,172.17.0.4:2181"; public static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper = null; public static ZooKeeper getZookeeper() { try { zooKeeper = new ZooKeeper(CONNECT_STRING, 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("watchedEvent:" + watchedEvent); if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { countDownLatch.countDown(); } else { switch (watchedEvent.getType()) { case NodeChildrenChanged: System.out.println("NodeChildrenChanged"); try { zooKeeper.getChildren(watchedEvent.getPath(), true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } break; case NodeDataChanged: try { if (watchedEvent.getPath().equals("/node/node_1")) { zooKeeper.getData(watchedEvent.getPath(), true, new Stat()); } else { zooKeeper.exists(watchedEvent.getPath(), true); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("NodeDataChanged"); break; case NodeCreated: try { zooKeeper.exists(watchedEvent.getPath(), true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("NodeCreated"); break; case NodeDeleted: System.out.println("NodeDeleted"); break; default: break; } } } } }); System.out.println("等待中"); countDownLatch.await(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("获取到链接"); return zooKeeper; } }
测试代码:
@Test public void testGetChildren4Watcher() throws KeeperException, InterruptedException { zooKeeper.getChildren("/node", true); System.out.println("create prepare"); zooKeeper.create("/node/node_1", "node_1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("setData prepare"); zooKeeper.setData("/node/node_1", "node_1_new".getBytes(), -1); System.out.println("delete prepare"); zooKeeper.delete("/node/node_1", -1); }
运行结果以下:
监听了/node的子节点,能够看出,子节点的建立、删除,都获得了监听。
测试代码:
@Test public void testGetData4Watcher() throws KeeperException, InterruptedException { zooKeeper.create("/node/node_1", "node_1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.getData("/node/node_1", true, stat); System.out.println("setData prepare"); zooKeeper.setData("/node/node_1", "node_1_new".getBytes(), -1); TimeUnit.MILLISECONDS.sleep(200); System.out.println(); System.out.println("delete prepare"); zooKeeper.delete("/node/node_1", -1); }
运行结果以下:
监听了/node/node_1节点,能够看出,/node/node_1节点的修改、删除,都获得了监听。
测试代码:
@Test public void testExists4Watcher() throws KeeperException, InterruptedException { if(null==zooKeeper.exists("/node/node_2", true)){ System.out.println("create prepare"); zooKeeper.create("/node/node_2", "node_2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.getData("/node/node_2", true, stat); TimeUnit.MILLISECONDS.sleep(200); System.out.println(); System.out.println("setData prepare"); zooKeeper.setData("/node/node_2", "node_2_new".getBytes(), -1); TimeUnit.MILLISECONDS.sleep(200); System.out.println("delete prepare"); System.out.println(); zooKeeper.delete("/node/node_2", -1); } }
运行结果以下:
监听了/node/node_2,能够看出,/node/node_2节点的建立、修改、删除,都获得了监听。
zookeeper提供了ACL的权限控制机制,经过服务器数据节点的ACL,来控制客户端对该节点的控制权限。
ACL的权限控制模式:
测试代码:
@Test public void testAuth() throws IOException, KeeperException, InterruptedException { ZooKeeper zooKeeper1 = new ZooKeeper(ZookeeperConnect.CONNECT_STRING, 5000, null); zooKeeper1.addAuthInfo("digest", "name:dajun".getBytes()); zooKeeper1.create("/node2", "node2".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL); System.out.println("zooKeeper1:"+new String(zooKeeper1.getData("/node2", false, stat))); ZooKeeper zooKeeper2 = new ZooKeeper(ZookeeperConnect.CONNECT_STRING, 5000, null); System.out.println("zooKeeper2:"+new String(zooKeeper2.getData("/node2", false, stat))); }
运行结果以下:
zooKeeper2获取节点信息的时候,抛异常了。
下面咱们来看看特殊的delete操做。
测试代码:
@Test public void testAuth4Delete() throws IOException, KeeperException, InterruptedException { ZooKeeper zooKeeper1 = new ZooKeeper(ZookeeperConnect.CONNECT_STRING, 5000, null); zooKeeper1.addAuthInfo("digest", "name:dajun".getBytes()); zooKeeper1.create("/node2", "node2".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); zooKeeper1.create("/node2/node2_1", "node2_1".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL); ZooKeeper zooKeeper2 = new ZooKeeper(ZookeeperConnect.CONNECT_STRING, 5000, null); try { zooKeeper2.delete("/node2/node2_1", -1); } catch (Exception e) { System.out.println("delete error:" + e.getMessage()); } ZooKeeper zooKeeper3 = new ZooKeeper(ZookeeperConnect.CONNECT_STRING, 5000, null); zooKeeper3.addAuthInfo("digest", "name:dajun".getBytes()); zooKeeper3.delete("/node2/node2_1", -1); System.out.println("delete /node2/node2_1 sucess"); ZooKeeper zooKeeper4 = new ZooKeeper(ZookeeperConnect.CONNECT_STRING, 5000, null); zooKeeper4.delete("/node2", -1); System.out.println("delete /node2 sucess"); }
运行结果以下:zookeeper1,对/node2和/node2/node2_1设置了权限。zookeeper2,删除的时候,由于没权限,删除失败。zookeeper3,有权限,删除成功。zookeeper4,没有权限,也删除成功。能够看出,删除的时候,权限的做用访问是其子节点,须要权限才能够删除,可是这个对于该节点,仍是能够任意的删除。固然,读取等权限,也是不容许的,只对删除节点有用。