前言:在zk中,当有节点新增,删除,或者节点内容发生改变的时候,只要对节点注册了监听事件,那么当发生上述节点变化的时候,zk会自动触发监听事件并通知客户端,客户端拿到对应事件通知后,就能够作相应的业务处理java
本文涉及到的节点:node
1.父节点:/disLocks1(zk根目录下的disLocks1目录,CreateMode.PERSISTENT类型)apache
2.全部须要获取锁的线程,都会在/disLocks1目录下创建一个临时顺序的子节点(CreateMode.EPHEMERAL_SEQUENTIAL类型)缓存
3.每次都是序号最小的节点获取锁,当最小的节点业务逻辑处理完毕后,断开本次链接(或者删除当前子节点),则临时顺序的节点自动删除,接着让其余没有获取锁的节点去获取锁多线程
贴代码:并发
一个JVM,10个线程并发获取锁()多jvm,只须要事先创建父节点便可jvm
package zoo.com.max.zoo.lock; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * * zk分布式锁实现 * 基于监听父节点下面的所有子节点实现,效率较低 * */ public class DistributedLock implements Watcher{ public static String host="127.0.0.1:2181"; //缓存时间 private static final int TIME_OUT = 2000; private static String FATHER_PATH = "/disLocks1"; private ZooKeeper zk; private int threadId; protected CountDownLatch countDownLatch=new CountDownLatch(1); public DistributedLock(int threadId){ this.threadId = threadId; } //获取zk链接 public void getZkClient(String host,int timeout) { try { if(null == zk){ zk = new ZooKeeper(host, timeout, this); } } catch (IOException e) { e.printStackTrace(); } } /** * 建立子节点 * * */ public String createNode(){ try { //检测节点是否存在 Stat stat = zk.exists(FATHER_PATH, false); //父节点不存在,则建立父节点,防止多线程并发建立父节点,因此加上同步代码块,防止在同一个jvm中的并发建立,多jvm环境下, 父节点能够事先建立好 if(Objects.isNull(stat)){ synchronized (FATHER_PATH) { Stat stat2 = zk.exists(FATHER_PATH, false); if(Objects.isNull(stat2)){ //父节点是持久节点 String path = zk.create(FATHER_PATH, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("父节点建立成功,返回值【"+path+"】"); } } } //建立持久性父节点下面的临时顺序子节点,/父节点路径/0000000002 String lockPath = zk.create(FATHER_PATH+"/",null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("线程【"+threadId+"】开始执行,子节点建立成功,返回值【"+lockPath+"】"); return lockPath; } catch (KeeperException e1) { e1.printStackTrace(); } catch (InterruptedException e1) { e1.printStackTrace(); } return null; } //校验当前节点是否为序号最小的节点 public boolean checkLockPath(String lockPath){ try { //注册父节点监听事件,当父节点下面的子节点有变化,就会触发Watcher事件 List<String> nodeList = zk.getChildren(FATHER_PATH, this); Collections.sort(nodeList); int index = nodeList.indexOf( lockPath.substring(FATHER_PATH.length()+1)); switch (index){ case -1:{ System.out.println("本节点已不在了"+lockPath); return false; } case 0:{ System.out.println("线程【"+threadId+"】获取锁成功,子节点序号【"+lockPath+"】"); return true; } default:{ String waitPath = nodeList.get(index - 1); System.out.println(waitPath+"在"+nodeList.get(index)+"点前面,须要等待【"+nodeList.get(index)+"】"); return false; } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public boolean getLock(){ //建立获取锁的节点(顺序临时节点) String childPath = createNode(); boolean flag = true; if(null != childPath){ try { //轮询等待zk获取锁的通知 while(flag){ if(checkLockPath(childPath)){ //获取锁成功 return true; }else{ //节点建立成功, 则等待zk通知 countDownLatch.await(); } } } catch (InterruptedException e) { e.printStackTrace(); } }else{ System.out.println("节点没有建立成功,获取锁失败"); } return false; } public void process(WatchedEvent event) { //成功链接zk,状态判断 if(event.getState() == KeeperState.SyncConnected){ //子节点有变化 if(event.getType() == EventType.NodeChildrenChanged){ countDownLatch.countDown(); } } } public void unlock(){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public ZooKeeper getZooKeeper(){ return zk; } public static void main(String[] args) throws KeeperException, InterruptedException { for(int i=0; i < 10; i++){ final int threadId = i+1; new Thread(){ @Override public void run() { try{ DistributedLock dis = new DistributedLock(threadId); dis.getZkClient(host,TIME_OUT); if(dis.getLock()){ Thread.sleep(200); dis.unlock(); } } catch (Exception e){ System.out.println("【第"+threadId+"个线程】 抛出的异常:"); e.printStackTrace(); } } }.start(); } } }
第二遍会改进为向子节点注册监听事件, 这样就不用全部子节点都去向父节点注册事件,子节点只会在本身前面一个节点注册节点删除事件分布式
新手码农,若有错误,但愿你们多多指教,共同进步ide