前面讲了zookeeper的API使用和zookeeper之ZkClient的使用,如今看看看看curator的使用。node
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.2.0</version> </dependency>
before,建立会话用,后面不在贴这部分代码apache
CuratorFramework client; @Before public void before() { client = CuratorConnect.getCuratorClient2(); }
CuratorConnect,这边给出2个方式,一个是直接new一个,一个是Fluent风格的,须要调用start方法来开启会话。segmentfault
public class CuratorConnect { static final String CONNECT_STRING = "172.17.0.2:2181,172.17.0.3:2181,172.17.0.4:2181"; static ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 5); public static CuratorFramework getCuratorClient() { CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STRING, 5000, 5000, retryPolicy); client.start(); return client; } public static CuratorFramework getCuratorClient2() { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(CONNECT_STRING) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); return client; } }
跟以前不同的是,这边须要设置重试策略,其余参数雷同。
重试策略RetryPolicy以下:缓存
RetryPolicy的allowRetry有三个参数,分别是已重试的次数、从第一次重试开始到当前话费的时间、用于sleep的时间。网络
测试代码:session
@Test public void testCreate() throws Exception { // 默认持久性节点 client.create().forPath("/node1"); client.create().forPath("/node2","data2".getBytes()); // 须要指定类型,用withMode建立临时节点 client.create().withMode(CreateMode.EPHEMERAL).forPath("/node3"); // 建立多层级用creatingParentsIfNeeded client.create().creatingParentsIfNeeded().forPath("/node4/node4_1"); //方便查看临时节点 TimeUnit.SECONDS.sleep(100); }
客户端查询结果以下:异步
[zk: localhost:2181(CONNECTED) 75] ls / [node1, node2, node3, node4, zookeeper] [zk: localhost:2181(CONNECTED) 76] get /node2 data2 [zk: localhost:2181(CONNECTED) 77] ls /node4 [node4_1]
测试代码:maven
@Test public void testDelete() throws Exception { // 删除节点 client.delete().forPath("/node1"); // guaranteed保证能够删除节点,即使在网络可能波动的状况下 client.delete().guaranteed().forPath("/node5"); // 指定版本号删除 client.delete().withVersion(-1).forPath("/node2"); // 删除递归子节点 client.delete().deletingChildrenIfNeeded().forPath("/node4"); }
客户端查询结果以下:测试
[zk: localhost:2181(CONNECTED) 87] ls / [node1, node2, node4, node5, zookeeper] [zk: localhost:2181(CONNECTED) 88] ls / [zookeeper]
测试代码:ui
@Test public void testGetChildren() throws Exception { List<String> children = client.getChildren().forPath("/node4"); System.out.println(children); }
运行结果以下:
测试代码:
@Test public void testGetData() throws Exception { byte[] bytes = client.getData().forPath("/node2"); System.out.println(new String(bytes)); }
运行结果以下:
测试代码:
@Test public void testWriteData() throws Exception { client.setData().forPath("/node2", "new_data".getBytes()); byte[] bytes = client.getData().forPath("/node2"); System.out.println(new String(bytes)); }
运行结果以下:
测试代码:
@Test public void testExists() throws Exception { Stat stat = client.checkExists().forPath("/node"); System.out.println(stat); // 不存在父节点会建立,但不会建立当前节点 client.checkExists().creatingParentsIfNeeded().forPath("/node/node_1"); stat = client.checkExists().forPath("/node"); System.out.println(stat); }
运行结果以下:
在curator中,BackgroundCallback接口,用来异步接口调用后,处理服务端返回的接口。
接口中的processResult方法有两个参数,一个是客户端实例CuratorFramework,一个是服务端事件CuratorEvent。
在MyBackgroundCallback中,打印了事件类型和响应码,响应码和AsyncCallback的processResult方法的rc是同样的,在原生API的建立方法有提过。
public class MyBackgroundCallback implements BackgroundCallback { public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println(curatorEvent.getType() + "-" + curatorEvent.getResultCode()); } }
测试代码:
@Test public void testASyn() throws Exception { BackgroundCallback callback = new MyBackgroundCallback(); client.create().inBackground(callback).forPath("/node"); TimeUnit.SECONDS.sleep(3); client.getData().inBackground(callback).forPath("/node"); TimeUnit.SECONDS.sleep(3); client.setData().inBackground(callback).forPath("/node", "new_data".getBytes()); TimeUnit.SECONDS.sleep(3); client.getData().inBackground(callback).forPath("/node"); TimeUnit.SECONDS.sleep(3); client.delete().inBackground(callback).forPath("/node"); TimeUnit.SECONDS.sleep(3); }
运行结果以下:
在inBackground方法中,除了传递BackgroundCallback,还能够传线程池对象,这样业务逻辑就会该线程池处理,若是没有传,就使用默认的EventThread处理,这边就不作演示了。
POM文件须要导入:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency>
在curator中是经过NodeCache来监听节点的变化的。
测试代码:
@Test public void testNodeCache4Listener() throws Exception { // 第一个参数,是监听的客户端 // 第二个参数,是监听的节点 final NodeCache nodeCache = new NodeCache(client, "/node1"); // 启动的时候从zookeeper读取对应的数据 nodeCache.start(true); nodeCache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { System.out.println(nodeCache.getPath() + ":" + new String(nodeCache.getCurrentData().getData())); } }); client.create().forPath("/node1","data".getBytes()); TimeUnit.MILLISECONDS.sleep(200); client.setData().forPath("/node1", "node1_data".getBytes()); TimeUnit.MILLISECONDS.sleep(200); client.delete().forPath("/node1"); TimeUnit.MILLISECONDS.sleep(200); }
测试结果以下:
节点的建立、删除,会触发监听。
测试代码
@Test public void testPathChildrenCache4Listener() throws Exception { // 第一个参数,是监听的客户端 // 第二个参数,是监听的节点 // 第三个参数,是否缓存节点内容 PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node1", true); pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println(pathChildrenCacheEvent); } }); System.out.println("--------------1--------------"); client.create().forPath("/node1"); TimeUnit.MILLISECONDS.sleep(200); System.out.println("--------------2--------------"); client.create().forPath("/node1/node1_1"); TimeUnit.MILLISECONDS.sleep(200); System.out.println("--------------3--------------"); client.setData().forPath("/node1/node1_1", "new_data".getBytes()); TimeUnit.MILLISECONDS.sleep(200); System.out.println("--------------4--------------"); client.delete().forPath("/node1/node1_1"); TimeUnit.MILLISECONDS.sleep(200); System.out.println("--------------5--------------"); client.delete().forPath("/node1"); TimeUnit.MILLISECONDS.sleep(2000); }
运行结果以下:子节点的新增、修改、删除,都会获得监听,当前节点的建立也会获得监听。