#ZooKeeper.分布式锁java
public class ZKLock implements Lock{ private static Logger log = Logger.getLogger(ZKLock.class.getSimpleName()); private ZooKeeper zookpeer; private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; private static final String ROOT = "/lock"; private byte[] b = new byte[]{1}; private String lock; private ReentrantLock reenLock = new ReentrantLock(); private Condition condition = reenLock.newCondition(); //private CountDownLatch c; private int i; public ZKLock(String zookpeer,int i) { try { this.i = i; this.zookpeer = new ZooKeeper(zookpeer,3000,new Watcher(){ @Override public void process(WatchedEvent event) { System.out.println("==启动回调=="); }}); init(); } catch (IOException e) { } } @Override public boolean tryLock() { reenLock.lock(); try { System.out.println("==================="); System.out.println("开始抢锁 机器" + i); //c = new CountDownLatch(1); String myLock = zookpeer.create(getLockName(), b, acl, CreateMode.EPHEMERAL); this.lock = getFirstNode(); if(myLock.equals(this.lock)) { //获取到锁 System.out.println("我得到锁,我是 机器" + i + ":" + this.lock + " !!!!!!!!!!!!"); } else { System.out.println("我没抢到锁 机器" + i); } reg(zookpeer); condition.await(); //c.await(); System.out.println("释放锁 机器:" + i); if(zookpeer.exists(myLock, false) != null) { zookpeer.delete(myLock, -1); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ reenLock.unlock(); } return false; } //初始化锁目录 public void init(){ try { if(zookpeer.exists(ROOT, false) == null) { zookpeer.create(ROOT,b, acl, CreateMode.PERSISTENT); } } catch (KeeperException e) { log.log(Level.WARNING,"root already exist",e); } catch (InterruptedException e) { log.log(Level.SEVERE,"connection fail"); } } public void reg(ZooKeeper zk) { try { zk.exists(lock, new M()); } catch (KeeperException e) { } catch (InterruptedException e) { } } class M implements Watcher{ @Override public void process(WatchedEvent event) { reenLock.lock(); try { System.out.println("节点变动 机器i" + i); //c.countDown(); condition.signal(); reg(zookpeer); } finally{ reenLock.unlock(); } } } @Override public void lock() { while(true){ tryLock(); } } private String getFirstNode() throws KeeperException, InterruptedException { List<String> children = zookpeer.getChildren(ROOT, false); Collections.sort(children); System.out.println("当前节点:" + Arrays.toString(children.toArray())); return ROOT + "/" + children.get(0); } private String getLockName() { String name = UUID.randomUUID().toString(); return ROOT + "/" + name.substring(0,name.indexOf("-")); } ..... }
//3线程模拟争夺 手动删除节点,可看到从新争锁 public class LockWatch { public static void main(String[] args) throws Exception { ExecutorService p = Executors.newCachedThreadPool(); for(int i = 0; i < 3; i ++) { p.submit(new T(i)); } } } class T implements Runnable{ private int i; public T(int i){ this.i = i; } @Override public void run() { final ZKLock z = new ZKLock("127.0.0.1",i); z.lock(); } }
==启动回调== ==启动回调== ==启动回调== =================== =================== =================== 开始抢锁 机器2 开始抢锁 机器0 开始抢锁 机器1 当前节点:[a63a788c, c61a117b, decc9d68] 当前节点:[a63a788c, c61a117b, decc9d68] 我没抢到锁 机器1 当前节点:[a63a788c, c61a117b, decc9d68] 我没抢到锁 机器0 我得到锁,我是 机器2:/lock/a63a788c !!!!!!!!!!!! 节点变动 机器i2 节点变动 机器i1 节点变动 机器i0 释放锁 机器:1 释放锁 机器:0 释放锁 机器:2