1.基于ZooKeeper分布式锁的流程java
在zookeeper指定节点(locks)下建立临时顺序节点node_n 获取locks下全部子节点children 对子节点按节点自增序号从小到大排序 判断本节点是否是第一个子节点,如果,则获取锁;若不是,则监听比该节点小的那个节点的删除事件 若监听事件生效,则回到第二步从新进行判断,直到获取到锁
2.实现node
zookeeper系列(五)实战分布式锁apache
3.简单实现segmentfault
package zookeeper; import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class TestLock implements Watcher { private ZooKeeper zk=null; private String config; private String root="/locks"; private String lock="/lock_"; private String currentPath; // 计数器 private CountDownLatch countDownLatch; @Override public void process(WatchedEvent event) { System.out.println(event.getPath()+"-"+event.getType()+"-"+event.getState()); } /** * @param config 路径 * @param root 根目录 * @throws IOException * @throws InterruptedException * @throws KeeperException */ public TestLock(String config) throws IOException, KeeperException, InterruptedException { if(null==config) { return; } this.config=config; zk=new ZooKeeper(config,5000,this); Stat st=zk.exists(root, false); if(null==st) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } /** * 获取锁 * @return * @throws InterruptedException * @throws KeeperException */ public synchronized boolean getLock() throws KeeperException, InterruptedException { currentPath=zk.create(root+lock,new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println("当前路径:"+currentPath.substring(currentPath.lastIndexOf('/')+1)); while(true) { List<String>children=zk.getChildren(root, false); //排序 Collections.sort(children); //获取位置 System.out.println(children.toString()); int index=children.indexOf(currentPath.substring(currentPath.lastIndexOf("/")+1)); //不是在开头位置 if(index!=0) { System.out.println(root+"/"+children.get(0)); new ZkClient(this.config).subscribeDataChanges(root+"/"+children.get(0),new IZkDataListener(){ // 当修改当前节点的数据 public void handleDataChange(String arg0, Object arg1) throws Exception { System.out.println("---"); } @Override public void handleDataDeleted(String arg0) throws Exception { System.out.println("线程:"+Thread.currentThread().getName()+"释放锁:"+currentPath); countDownLatch.countDown(); } }); this.countDownLatch=new CountDownLatch(1); this.countDownLatch.await(); this.countDownLatch=null; }else { System.out.println("线程:"+Thread.currentThread().getName()+"获取锁:"+currentPath); break; } } return false; } public void unlock() throws InterruptedException, KeeperException { this.zk.delete(currentPath, -1); System.out.println("线程:"+Thread.currentThread().getName()+"释放锁:"+currentPath); } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { Runnable runnable=new Runnable() { @Override public void run() { TestLock tl; try { tl = new TestLock("127.0.0.1:2181"); tl.getLock(); tl.unlock(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (KeeperException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; // for (int i = 0; i < 1; i++) { // Thread t = new Thread(runnable); // t.start(); // } Thread t = new Thread(runnable); t.start(); } }