这里是zookeeper响应式编程的第二篇——自定义分布式锁,第一篇zookeeper分布式注册配置中心见以下连接:
https://segmentfault.com/a/11...
因为在分布式系统中,任意2个节点的线程须要获取同一个资源的时候就须要锁来维持程序的正确运行,可是呢,若是使用JVM提供的锁只能锁住本身,由于这是2台主机,这就引入了分布式锁的概念。也便是说须要一把锁在主机的外面,而不少框架均可以实现分布式锁,好比redis,mysql和zookeeper,目前最为方便的是使用zookeeper,由于就其高可用性和统一视图而言就比其余的技术方便不少。java
对于zookeeper作分布式锁的分析过程以下:首先对于2台主机抢占同一把锁的时候,只能有一台主机成功抢占,那么有可能出现得到锁的主机“挂了”,那么咱们可使用临时节点解决该问题,那么在一个主机成功抢占该锁以后,若是它释放了锁,其余主机是如何知道它已经成功释放了呢?第一种方式就是能够采用主动轮询的方式不断检测该锁是否已经被释放,可是这种方式有延迟,而且在主机特别多的时候多台主机轮询一把锁会形成zookeeper很大的压力。第二种方式就是使用watch机制,这种方式能够解决延迟的问题,可是在得到锁的主机释放锁的时候,zookeeper会回调哪些全部没有抢到锁的线程,而后那些主机又会发起强锁的操做,会形成较大的通讯压力。第三种方式就可使用watche机制+序列节点,而后让每个临时序列节点都watch前一个节点,这样只有一个编号最小的才能得到锁,而且在释放锁后会只通知后面的一个主机。mysql
首选咱们须要在编写配置中心的Utils工具类,而且建立TestLock类实现分布式锁。而后咱们开辟10个线程模拟多台主机抢占锁的过程,基本流程就是抢占锁,而后执行业务代码(这里使用睡眠来代替),最后再释放锁。redis
package org.qzx.config; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:02 下午 * @Description: org.qzx.config * @version: 1.0 */ public class Utils { // zookeeper对象 private static ZooKeeper zooKeeper; // 链接地址 private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test"; private static DefaultWatcher defaultWatcher = new DefaultWatcher(); // 锁 private static CountDownLatch latch = new CountDownLatch(1); public static ZooKeeper getZooKeeper() throws Exception{ zooKeeper = new ZooKeeper(address,3000,defaultWatcher); defaultWatcher.setLatch(latch); latch.await(); return zooKeeper; } }
package org.qzx.lock; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.TimeUnit; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 3:54 下午 * @Description: org.qzx.lock * @version: 1.0 */ public class TestLock { private ZooKeeper zooKeeper; @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void TestLock(){ for (int i = 0; i < 10; i++) { new Thread(()->{ try { // 抢占锁 // 业务代码 TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"is woorking!!!"); // 释放锁 } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } }
咱们在这里提供另一个工具类能够为每个线程实现抢锁和释放锁的过程,同时因为抢占的锁其实是zookeeper的临时序列节点,因此一定会使用wather和回调机制,这里就把这个工具类叫作MyWatcherAndCallBack,该类提供抢占锁、释放锁,节点变化回调方法。其大致框架以下:sql
package org.qzx.lock; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 4:03 下午 * @Description: org.qzx.lock * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher { private ZooKeeper zooKeeper; public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } @Override public void process(WatchedEvent event) { } // 抢占锁 public void tryLock(){ } // 释放锁 public void unlock(){ } }
package org.qzx.lock; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.TimeUnit; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 3:54 下午 * @Description: org.qzx.lock * @version: 1.0 */ public class TestLock { private ZooKeeper zooKeeper; @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void TestLock(){ for (int i = 0; i < 10; i++) { new Thread(()->{ MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack(); myWatcherAndCallBack.setZooKeeper(zooKeeper); try { // 抢占锁 myWatcherAndCallBack.tryLock(); // 业务代码 TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"is woorking!!!"); // 释放锁 myWatcherAndCallBack.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } }
这样框架就已经搭建完毕,接下来就是编写具体的抢占锁和释放的逻辑代码了。
首先对于抢占锁的过程必定是阻塞的,直到抢占成功的时候才会接着往下走,这里使用CountDownLatch实现。而每个线程都会建立属于本身的临时序列节点做为本身的锁,不过只有编号最小的那个才会得到被对应的线程所占有,其余的线程在建立节点后都会阻塞。这里为了方便看到那些线程建立了哪些锁,将线程的名字做为数据写入到节点中。而后咱们在建立节点的回调函数中输出当前线程的名字和节点的名字,目的是为了检验代码写到如今是否正确。apache
package org.qzx.lock; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.TimeUnit; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 3:54 下午 * @Description: org.qzx.lock * @version: 1.0 */ public class TestLock { private ZooKeeper zooKeeper; @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void TestLock(){ for (int i = 0; i < 10; i++) { new Thread(()->{ MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack(); myWatcherAndCallBack.setThreadName(Thread.currentThread().getName()); myWatcherAndCallBack.setZooKeeper(zooKeeper); try { // 抢占锁 myWatcherAndCallBack.tryLock(); // 业务代码 TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"is woorking!!!"); // 释放锁 myWatcherAndCallBack.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } while (true){ } } }
为了防止主线程执行太快致使回调函数尚未执行完毕就结束,在TestLock方法最后加上死循环进行阻塞。编程
@Test public void TestLock(){ for (int i = 0; i < 10; i++) { new Thread(()->{ MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack(); myWatcherAndCallBack.setThreadName(Thread.currentThread().getName()); myWatcherAndCallBack.setZooKeeper(zooKeeper); try { // 抢占锁 myWatcherAndCallBack.tryLock(); // 业务代码 TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"is woorking!!!"); // 释放锁 myWatcherAndCallBack.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } while (true){ } }
启动测试,在测试前记得建立工做目录/test,结果以下:
能够看到节点建立的是有序的,可是线程是无序的。
接下来在建立节点成功的回调函数中,咱们就须要获取锁了,使用getChildren方法得到工做目录下的孩子节点,也就是建立的临时序列节点,该方法不须要使用watch机制,由于不须要监测父节点,同时对于其回调对象咱们也是一样封装在MyWatcherAndCallBack中。最后因为建立节点的名字在后面会用到,使用pathName属性保存当前线程建立的节点名字。segmentfault
package org.qzx.lock; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.List; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 4:03 下午 * @Description: org.qzx.lock * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback { private ZooKeeper zooKeeper; private CountDownLatch latch = new CountDownLatch(1); private String threadName; private String pathName; public void setThreadName(String threadName) { this.threadName = threadName; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // Watcher @Override public void process(WatchedEvent event) { } // 抢占锁 public void tryLock(){ try { // 建立一个临时序列节点做为锁 zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this,"abc"); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 释放锁 public void unlock(){ } // StringCallback @Override public void processResult(int rc, String path, Object ctx, String name) { if(name!=null){ System.out.println(threadName+"------>"+name); pathName = name;//相似于/lcok0000000000 zooKeeper.getChildren("/",false,this,"aaa"); } } //Children2Callback @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { } }
在getChildren的回调方法中,当前线程必定成功建立了节点,而且能够看到全部在它以前建立的节点。那么咱们如今遍历输出全部的children中的全部节点,目的是为了看到当前线程所看到的全部节点是无序的,这样就为后面须要排序提供了必要性。修改的部分代码以下:框架
//Children2Callback @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { System.out.println(threadName+"能看到的节点以下:"); for (String child : children) { System.out.println(child); } }
输出结果为:
能够看到第一个建立节点的线程为Thread-6,而且看到的节点都是无序的。而且节点的名字少了个/。
接下来就是对于当前线程,得判断它建立的锁是否是第一个,因此咱们先对children进行排序,而后再获取当前锁在children的位置,若是是第一个说明该线程能够得到锁,执行latch.countdown(),这样该线程就能够去执行相应的任务了。若是不是第一个,那么就得判断前一个锁是否已经释放,判断的方法为exists,若是前面一个节点不存在了,说明已经释放,对于exists方法有可能会出现尚未成功监控到前一个节点就出现释放锁的状况,也就是exists执行失败了,没能监控前一个节点,那么说明锁已经释放,当前线程所须要进行的操做不在watcher中执行而是在回调函数中中执行,因此在这里exists的回调函数是必须的。分布式
package org.qzx.lock; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 4:03 下午 * @Description: org.qzx.lock * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback { private ZooKeeper zooKeeper; private CountDownLatch latch = new CountDownLatch(1); private String threadName; private String pathName; public void setThreadName(String threadName) { this.threadName = threadName; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // Watcher @Override public void process(WatchedEvent event) { } // 抢占锁 public void tryLock(){ try { // 建立一个临时序列节点做为锁 zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this,"abc"); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 释放锁 public void unlock(){ } // StringCallback @Override public void processResult(int rc, String path, Object ctx, String name) { if(name!=null){ System.out.println(threadName+"------>"+name); pathName = name;//相似于/lcok0000000000 zooKeeper.getChildren("/",false,this,"aaa"); } } //Children2Callback @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { Collections.sort(children); int index = children.indexOf(pathName.substring(1)); if(index==0){ // 当前线程建立的锁是第一个,能够得到 latch.countDown(); }else { // 不是第一个,得判断前一个锁是否已经释放 zooKeeper.exists("/"+children.get(index-1),this,this,"azz"); } } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { } }
接下来须要对前一把锁的释放事件作处理,首先是在节点删除后,会触发节点删除时间,在该线程中会作出响应,具体作法就是要么直接latch.coutdown()得到锁或者经过getChildren判断当前锁是不是第一个了,是第一个就得到锁。同时对于当前线程得到锁后释放锁进行处理,直接对其建立的节点进行删除便可。ide
package org.qzx.lock; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 4:03 下午 * @Description: org.qzx.lock * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback { private ZooKeeper zooKeeper; private CountDownLatch latch = new CountDownLatch(1); private String threadName; private String pathName; public void setThreadName(String threadName) { this.threadName = threadName; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // Watcher @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: break; case NodeDeleted: // 前一把锁被删除,当前线程得到锁 // zooKeeper.getChildren("/",false,this,"aaa"); latch.countDown(); break; case NodeDataChanged: break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; } } // 抢占锁 public void tryLock(){ try { // 建立一个临时序列节点做为锁 zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this,"abc"); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 释放锁 public void unlock() throws KeeperException, InterruptedException { zooKeeper.delete(pathName,-1);// -1表明忽略版本号 } // StringCallback @Override public void processResult(int rc, String path, Object ctx, String name) { if(name!=null){ System.out.println(threadName+"------>"+name); pathName = name;//相似于/lcok0000000000 zooKeeper.getChildren("/",false,this,"aaa"); } } //Children2Callback @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { Collections.sort(children); int index = children.indexOf(pathName.substring(1)); if(index==0){ // 当前线程建立的锁是第一个,能够得到 latch.countDown(); }else { // 不是第一个,得判断前一个锁是否已经释放 zooKeeper.exists("/"+children.get(index-1),this,this,"azz"); } } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { } }
咱们如今运行程序能够看到每个线程均可以得到锁而且顺序执行。
对于上述代码,存在一个问题,当前去除主线程业务代码中睡眠的操做,就会出现只有一个线程能够成功得到锁而且执行响应的操做,其余线程会出现相似于死锁的现象,可是这里不是死锁。这里的缘由是执行速度太快了,很快就把当前线程得到的锁删除了,那么后面的线程在执行完排序,监控前面的锁就会出现失败的状况,这里的一种解决方法就是在exists的回调函数中针对节点不存在,也就是stat==null的时候,从新调用getChildren方法判断当前是不是第一把锁,若是是就会执行。
package org.qzx.lock; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 4:03 下午 * @Description: org.qzx.lock * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback { private ZooKeeper zooKeeper; private CountDownLatch latch = new CountDownLatch(1); private String threadName; private String pathName; public void setThreadName(String threadName) { this.threadName = threadName; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // Watcher @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: break; case NodeDeleted: // 前一把锁被删除,当前线程得到锁 // zooKeeper.getChildren("/",false,this,"aaa"); latch.countDown(); break; case NodeDataChanged: break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; } } // 抢占锁 public void tryLock(){ try { // 建立一个临时序列节点做为锁 zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this,"abc"); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 释放锁 public void unlock() throws KeeperException, InterruptedException { zooKeeper.delete(pathName,-1); } // StringCallback @Override public void processResult(int rc, String path, Object ctx, String name) { if(name!=null){ System.out.println(threadName+"------>"+name); pathName = name;//相似于/lcok0000000000 zooKeeper.getChildren("/",false,this,"aaa"); } } //Children2Callback @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { Collections.sort(children); int index = children.indexOf(pathName.substring(1)); if(index==0){ // 当前线程建立的锁是第一个,能够得到 latch.countDown(); }else { // 不是第一个,得判断前一个锁是否已经释放 zooKeeper.exists("/"+children.get(index-1),this,this,"azz"); } } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat==null){ // 监控失败,自动获取锁 zooKeeper.getChildren("/",false,this,"aaa"); } } }
在主线程的睡眠操做去除掉后,程序运行的结果以下:
能够看到全部线程又重新正常运行了。
到此,zookeeper自定义分布式锁的小demo就编写完毕。对于zookeeper分布式锁的全部代码整理以下。
package org.qzx.lock; import org.apache.zookeeper.ZooKeeper; import org.qzx.config.DefaultWatcher; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 1:02 下午 * @Description: org.qzx.config * @version: 1.0 */ public class Utils { // zookeeper对象 private static ZooKeeper zooKeeper; // 链接地址 private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test"; private static DefaultWatcher defaultWatcher = new DefaultWatcher(); // 锁 private static CountDownLatch latch = new CountDownLatch(1); public static ZooKeeper getZooKeeper() throws Exception{ zooKeeper = new ZooKeeper(address,3000,defaultWatcher); defaultWatcher.setLatch(latch); latch.await(); return zooKeeper; } }
package org.qzx.lock; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.TimeUnit; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 3:54 下午 * @Description: org.qzx.lock * @version: 1.0 */ public class TestLock { private ZooKeeper zooKeeper; @Before public void conn() throws Exception { zooKeeper = Utils.getZooKeeper(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test public void TestLock(){ for (int i = 0; i < 10; i++) { new Thread(()->{ MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack(); myWatcherAndCallBack.setThreadName(Thread.currentThread().getName()); myWatcherAndCallBack.setZooKeeper(zooKeeper); try { // 抢占锁 myWatcherAndCallBack.tryLock(); // 业务代码 // TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"is woorking!!!"); // 释放锁 myWatcherAndCallBack.unlock(); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } }).start(); } while (true){ } } }
package org.qzx.lock; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * @Auther: qzx * @Date: 2020/10/29 - 10 - 29 - 4:03 下午 * @Description: org.qzx.lock * @version: 1.0 */ public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback { private ZooKeeper zooKeeper; private CountDownLatch latch = new CountDownLatch(1); private String threadName; private String pathName; public void setThreadName(String threadName) { this.threadName = threadName; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // Watcher @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: break; case NodeDeleted: // 前一把锁被删除,当前线程得到锁 // zooKeeper.getChildren("/",false,this,"aaa"); latch.countDown(); break; case NodeDataChanged: break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; } } // 抢占锁 public void tryLock(){ try { // 建立一个临时序列节点做为锁 zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this,"abc"); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 释放锁 public void unlock() throws KeeperException, InterruptedException { zooKeeper.delete(pathName,-1); } // StringCallback @Override public void processResult(int rc, String path, Object ctx, String name) { if(name!=null){ System.out.println(threadName+"------>"+name); pathName = name;//相似于/lcok0000000000 zooKeeper.getChildren("/",false,this,"aaa"); } } //Children2Callback @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { Collections.sort(children); int index = children.indexOf(pathName.substring(1)); if(index==0){ // 当前线程建立的锁是第一个,能够得到 latch.countDown(); }else { // 不是第一个,得判断前一个锁是否已经释放 zooKeeper.exists("/"+children.get(index-1),this,this,"azz"); } } // StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat==null){ // 监控失败,自动获取锁 zooKeeper.getChildren("/",false,this,"aaa"); } } }