因为本身最近在学习zookeeper分布式相关的知识,发如今其代码实现上存在较多难以想清楚的点,尤为是响应式编程的操做,为此在这里记录完整的代码书写流程(每一步的思想),这里是第一篇zookeeper分布式注册配置中心的实现代码过程,后面还会有第二篇关于zookeeper分布式锁的简单实现过程。
第二篇zookeeper分布式锁实现:
https://segmentfault.com/a/11...
zookeeper因为拥有watcher机制,使得其拥有发布订阅的功能,而发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到 ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。 应用在启动的时候会主动来获取一次配置,同时,在节点上注册一个 Watcher,这样一来,之后每次配置有更新的时候,都会实时通知到订阅的客户端,历来达到获取最新配置信息的目的。java
首选交代实验环境,本身的zookeeper的版本是3.5.8的版本,代码工具为IDEA,建立了一个MAVEN项目,仅仅添加了以下依赖。apache
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.8</version> </dependency>
因为客户端须要与zookeeper创建链接,获取数据,添加监控等等一系列的事情,因此这里封装一个Utils工具类供咱们使用。编程
而后对于zookeeper链接客户端的地址的后面能够紧跟一个path,做为在根目录下的工做目录。该目录就是做为全部操做的根目录,这里使用/test、segmentfault
同时因为zookeeper基于watch机制实现发布订阅,我们全部的watcher都采用自定义的方式实现,首先是对链接成功的时候的DefaultWatcher。服务器
package org.qzx.config; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:05 下午 * @Description: org.qzx.config * @version: 1.0 */ public class DefaultWatcher implements Watcher { @Override public void process(WatchedEvent event) { System.out.println(event.toString()); } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:02 下午 * @Description: org.qzx.config * @version: 1.0 */ public class Utils { // zookeeper对象 private static ZooKeeper zooKeeper; // 链接地址 private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test"; private static DefaultWatcher defaultWatcher = new DefaultWatcher(); public static ZooKeeper getZooKeeper() throws Exception{ zooKeeper = new ZooKeeper(address,3000,defaultWatcher); return zooKeeper; } }
因为zookeeper采用的是异步调用,因此这里须要使用一把锁锁住主线程,在链接成功后自动解锁,主线程再往下进行。这里使用CountDownLatch实现锁,在主线程建立,传递到DafaultWatcher的回掉函数中。框架
package org.qzx.config; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:05 下午 * @Description: org.qzx.config * @version: 1.0 */ public class DefaultWatcher implements Watcher { private CountDownLatch latch; public void setLatch(CountDownLatch latch) { this.latch = latch; } @Override public void process(WatchedEvent event) { switch (event.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: latch.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; case Closed: break; } System.out.println(event.toString()); } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:02 下午 * @Description: org.qzx.config * @version: 1.0 */ public class Utils { // zookeeper对象 private static ZooKeeper zooKeeper; // 链接地址 private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test"; private static DefaultWatcher defaultWatcher = new DefaultWatcher(); // 锁 private static CountDownLatch latch = new CountDownLatch(1); public static ZooKeeper getZooKeeper() throws Exception{ zooKeeper = new ZooKeeper(address,3000,defaultWatcher); defaultWatcher.setLatch(latch); latch.await(); return zooKeeper; } }
接下来就是编写配置类TestConfig,首先是在操做以前进行链接,操做后得关闭,分别对应conn和close方法,而后就是配置方法getConfig,因为并不清楚zookeeper客户端是否必定含有自定义的工做目录,因此通常倾向于使用exists方法来进行测试。又因为exists方法中有1个watcher和一个回调函数,在回调函数中返回存在的话又得调用getData方法获取数据,在getData方法中又存在一个watcher和回调函数,这样会形成代码深度太大不易阅读,因此这里也自定义一个工具类,封装好全部的watcher和回调函数。该类的名称就叫MyWatcherAndCallBack.异步
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { } // Watcher @Override public void process(WatchedEvent event) { } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf(){ // 这里的/AppConf在zookeeper中其实是/test/AppConf, zooKeeper.exists("/AppConf",watcherAndCallBack,watcherAndCallBack,"123"); } }
这个时候就得考虑在成功判断在工做目录下存在AppConf的时候须要作的事情,其实也很简单,就是获取当前节点的数据就好了。分布式
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat!=null){ // 节点存在获取数据 zooKeeper.getData("/AppConf",this,this,"aaa"); } } // Watcher @Override public void process(WatchedEvent event) { } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf(){ watcherAndCallBack.setZooKeeper(zooKeeper); // 这里的/AppConf在zookeeper中其实是/test/AppConf, zooKeeper.exists("/AppConf",watcherAndCallBack,watcherAndCallBack,"123"); } }
如今,咱们再来考虑另一个问题,当咱们取数据的时候,zookeeper其实是使用的异步调用模型,这里不会等待数据取回而是直接继续执行主线程的任务,那么在数据取回的时候要如何让主线程知道呢?因此在这里我们得准备一个接受数据的对象,该类叫MyConf,对应的代码以下:ide
package org.qzx.config; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 2:00 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyConf { private String confData; public String getConfData() { return confData; } public void setConfData(String confData) { this.confData = confData; } }
因为须要让主线程接受数据,得在TestConfig类中聚合该对象,而且在getData的回调函数中须要为MyConf设置数据,因此在MyWatcherAndCallBack中也得聚合该对象。函数
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); // 接受数据 MyConf myConf = new MyConf(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf(){ watcherAndCallBack.setZooKeeper(zooKeeper); watcherAndCallBack.setMyConf(myConf); // 这里的/AppConf在zookeeper中其实是/test/AppConf, zooKeeper.exists("/AppConf",watcherAndCallBack,watcherAndCallBack,"123"); } }
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private MyConf myConf; public void setMyConf(MyConf myConf) { this.myConf = myConf; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat!=null){ // 节点存在获取数据 zooKeeper.getData("/AppConf",this,this,"aaa"); } } // Watcher @Override public void process(WatchedEvent event) { } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if(stat!=null){ myConf.setConfData(new String(data)); } } }
这样在数据取回来后,在TestConfig中就能够看见该数据了。这里存在着一个问题,在exists执行的时候不会等待数据的获取而会一直执行下去,可是对于判断时候,若是有该节点而且获取数据应该是一个原子性的操做,在这里咱们将这两步封装成一部操做完成。咱们能够在MyWatcherAndCallBack类中添加一个方法用来等待该操做的执行,从而获取数据结果,该方法就叫aWait().咱们这里将exists方法移动到aWait方法中,同时使用CountDownLatch阻塞该操做,直到获取数据成功为止解锁。这里使用一个CountDownLatch完成了对于判断节点存在和获取数据的封装,若是在TestConfig中对exists方法进行加锁,那就还得将这把锁传递到MyWatcherAndCallBack中在getData回调结束才能解锁,这种实现方式显然在语义上没有将其移动到aWait方法中的更好。
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private MyConf myConf; private CountDownLatch latch = new CountDownLatch(1); public void setMyConf(MyConf myConf) { this.myConf = myConf; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat!=null){ // 节点存在获取数据 zooKeeper.getData("/AppConf",this,this,"aaa"); } } // Watcher @Override public void process(WatchedEvent event) { } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if(stat!=null){ myConf.setConfData(new String(data)); latch.countDown(); } } public void aWait() throws InterruptedException { // 这里的/AppConf在zookeeper中其实是/test/AppConf, zooKeeper.exists("/AppConf",this,this,"123"); latch.await(); } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); // 接受数据 MyConf myConf = new MyConf(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf() throws InterruptedException { watcherAndCallBack.setZooKeeper(zooKeeper); watcherAndCallBack.setMyConf(myConf); watcherAndCallBack.aWait(); } }
如今咱们对于判断节点存在和成功获取节点数据的这条路径就编写完毕了,接下来考虑节点被修改的状况,首先是当节点不存在的时候,exists的回调不会执行,在节点被建立的时候,注册在exists的watcher会被执行,那么咱们只须要调用数据便可,其次是节点中的数据被修改,咱们须要从新得到新的节点数据而且设置到confData中,再就是节点被删除,咱们须要将confData的数据置为空。为了观察数据的变化,这里在TestConfig中循环打印设置的数据。
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.TimeUnit; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); // 接受数据 MyConf myConf = new MyConf(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf() throws InterruptedException { watcherAndCallBack.setZooKeeper(zooKeeper); watcherAndCallBack.setMyConf(myConf); watcherAndCallBack.aWait(); while (true){ System.out.println(myConf.getConfData()); TimeUnit.SECONDS.sleep(2); } } }
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private MyConf myConf; private final CountDownLatch latch = new CountDownLatch(1); public void setMyConf(MyConf myConf) { this.myConf = myConf; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat!=null){ // 节点存在获取数据 zooKeeper.getData("/AppConf",this,this,"aaa"); } } // Watcher @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: // 节点建立须要获取数据 zooKeeper.getData("/AppConf",this,this,"bbb"); break; case NodeDeleted: // 节点删除须要清空数据 myConf.setConfData(""); break; case NodeDataChanged: // 数据修改 zooKeeper.getData("/AppConf",this,this,"bbb"); break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; } } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if(stat!=null){ myConf.setConfData(new String(data)); latch.countDown(); } } public void aWait() throws InterruptedException { // 这里的/AppConf在zookeeper中其实是/test/AppConf, zooKeeper.exists("/AppConf",this,this,"123"); latch.await(); } }
接下来就是测试程序是否正确了,首先启动4台zookeeper,而后在根目录下建立test工做目录.
而后开始启动程序,而后在zookeeper客户端手动建立AppConf节点,而且设置数据olddata。
能够看到程序输出olddata.
如今再修改该节点数据为newdata.
而后能够看到程序输出newdata.
在测试的时候发现若是删除了节点会不断的输出空字符串,这个比较占用IO和资源,修改成阻塞等待数据不空。同时在输出的时候若是数据为空打印一句数据为空的提示,这里对于MyWatcherAndCallBack中节点删除的代码须要注意的是,咱们是经过调用aWait方法来实现的阻塞,由于这样会在节点数据存在时候自动解锁,进而输出节点数据,可是因为CountDownLatch已经被减过了,因此这里须要将latch从新赋值。
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private MyConf myConf; private CountDownLatch latch = new CountDownLatch(1); public void setMyConf(MyConf myConf) { this.myConf = myConf; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat!=null){ // 节点存在获取数据 zooKeeper.getData("/AppConf",this,this,"aaa"); } } // Watcher @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: // 节点建立须要获取数据 zooKeeper.getData("/AppConf",this,this,"bbb"); break; case NodeDeleted: // 节点删除须要清空数据而且等待数据到达 myConf.setConfData(""); latch = new CountDownLatch(1); break; case NodeDataChanged: // 数据修改 zooKeeper.getData("/AppConf",this,this,"bbb"); break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; } } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if(stat!=null){ myConf.setConfData(new String(data)); latch.countDown(); } } public void aWait() throws InterruptedException { // 这里的/AppConf在zookeeper中其实是/test/AppConf, zooKeeper.exists("/AppConf",this,this,"123"); latch.await(); } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.TimeUnit; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); // 接受数据 MyConf myConf = new MyConf(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf() throws InterruptedException { watcherAndCallBack.setZooKeeper(zooKeeper); watcherAndCallBack.setMyConf(myConf); watcherAndCallBack.aWait(); while (true){ if(myConf.getConfData().equals("")){ System.out.println("数据为空"); watcherAndCallBack.aWait();// 等待数据到达 } System.out.println(myConf.getConfData()); TimeUnit.SECONDS.sleep(2); } } }
接下来从新测试节点被删除的状况.
删除节点后会发发现程序输出数据为空的提示后就阻塞住了。
如今从新建立节点:
会发现又从新得到了节点的数据。
到此对于zookeeper的配置注册的代码就编写完毕。
这里对于zookeeper的配置注册作一个小小的总结,配置注册本质上是在统一管理服务器共享的节点,其配置信息所有写在了那1M的数据中,在一个服务器修改了该节点后,其余的服务器会经过zookeeper的watcher机制接受到该消息,也就成功看到节点的实时变化完成更新配置的操做,这样就完成了分布式服务的协调功能。
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:02 下午 * @Description: org.qzx.config * @version: 1.0 */ public class Utils { // zookeeper对象 private static ZooKeeper zooKeeper; // 链接地址 private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test"; private static DefaultWatcher defaultWatcher = new DefaultWatcher(); // 锁 private static CountDownLatch latch = new CountDownLatch(1); public static ZooKeeper getZooKeeper() throws Exception{ zooKeeper = new ZooKeeper(address,3000,defaultWatcher); defaultWatcher.setLatch(latch); latch.await(); return zooKeeper; } }
package org.qzx.config; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:05 下午 * @Description: org.qzx.config * @version: 1.0 */ public class DefaultWatcher implements Watcher { private CountDownLatch latch; public void setLatch(CountDownLatch latch) { this.latch = latch; } @Override public void process(WatchedEvent event) { switch (event.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: latch.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; case Closed: break; } System.out.println(event.toString()); } }
package org.qzx.config; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:40 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private MyConf myConf; private CountDownLatch latch = new CountDownLatch(1); public void setMyConf(MyConf myConf) { this.myConf = myConf; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat!=null){ // 节点存在获取数据 zooKeeper.getData("/AppConf",this,this,"aaa"); } } // Watcher @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: // 节点建立须要获取数据 zooKeeper.getData("/AppConf",this,this,"bbb"); break; case NodeDeleted: // 节点删除须要清空数据而且等待数据到达 myConf.setConfData(""); latch = new CountDownLatch(1); break; case NodeDataChanged: // 数据修改 zooKeeper.getData("/AppConf",this,this,"bbb"); break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; } } // DataCallback @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if(stat!=null){ myConf.setConfData(new String(data)); latch.countDown(); } } public void aWait() throws InterruptedException { // 这里的/AppConf在zookeeper中其实是/test/AppConf, zooKeeper.exists("/AppConf",this,this,"123"); latch.await(); } }
package org.qzx.config; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 2:00 下午 * @Description: org.qzx.config * @version: 1.0 */ public class MyConf { private String confData; public String getConfData() { return confData; } public void setConfData(String confData) { this.confData = confData; } }
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.TimeUnit; import java.util.zip.ZipOutputStream; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:29 下午 * @Description: org.qzx.config * @version: 1.0 */ public class TestConfig { private ZooKeeper zooKeeper; private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack(); // 接受数据 MyConf myConf = new MyConf(); @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void getConf() throws InterruptedException { watcherAndCallBack.setZooKeeper(zooKeeper); watcherAndCallBack.setMyConf(myConf); watcherAndCallBack.aWait(); while (true){ if(myConf.getConfData().equals("")){ System.out.println("数据为空"); watcherAndCallBack.aWait();// 等待数据到达 } System.out.println(myConf.getConfData()); TimeUnit.SECONDS.sleep(2); } } }
可能有人本文篇幅较冗余(尤为是代码部分)并且过于简单,可是本人只是想完整的记录响应式编程的思考过程和完整的代码书写流程,能够供本身复习和为小白提供一个入门zookeeper响应式编程的小demo。