上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列中,咱们一块儿写了下如何经过ZooKeeper的持久性顺序节点实现一个分布式队列。java
本文咱们来一块儿写一个ZooKeeper的实现的分布式锁。node
参考以前学习的【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock,实现java.util.concurrent.locks.Lock接口。编程
咱们经过重写接口中的方法实现一个可重入锁。并发
咱们使用ZooKeeper的EPHEMERAL临时节点机制,若是能建立成功的话,则获取锁成功,释放锁或客户端断开链接后,临时节点自动删除,这样能够避免误删除或漏删除的状况。分布式
获取锁失败后,这里咱们使用轮询的方式来不断尝试建立。其实应该使用Watcher机制来实现,这样能避免大量的无用请求。在下一节更优雅的分布式锁实现机制中咱们会用到。学习
public class DistributedLock implements Lock { private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class); //ZooKeeper客户端,进行ZooKeeper操做 private ZooKeeper zooKeeper; //根节点名称 private String dir; //加锁节点 private String node; //ZooKeeper鉴权信息 private List<ACL> acls; //要加锁节点 private String fullPath; //加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开链接,删除临时节点。 private volatile int state; /** * Constructor. * * @param zooKeeper the zoo keeper * @param dir the dir * @param node the node * @param acls the acls */ public DistributedLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) { this.zooKeeper = zooKeeper; this.dir = dir; this.node = node; this.acls = acls; this.fullPath = dir.concat("/").concat(node); init(); } private void init() { try { Stat stat = zooKeeper.exists(dir, false); if (stat == null) { zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT); } } catch (Exception e) { logger.error("[DistributedLock#init] error : " + e.toString(), e); } } }
public void lock() { //经过state实现重入机制,若是已经获取锁,则将state++便可。 if (addLockCount()) { return; } //一直尝试获取锁,知道获取成功 for (;;) { try { //建立临时节点 zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); //第一次获取锁,state++,这里不须要使用加锁机制保证原子性,由于同一时间,最多只有一个线程能create节点成功。 state++; break; } catch (InterruptedException ie) { //若是捕获中断异常,则设置当前线程为中断状态 logger.error("[DistributedLock#lock] error : " + ie.toString(), ie); Thread.currentThread().interrupt(); } catch (KeeperException ke) { //若是捕获到的异常是 节点已存在 外的其余异常,则设置当前线程为中断状态 logger.error("[DistributedLock#lock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { Thread.currentThread().interrupt(); } } } }
public void lockInterruptibly() throws InterruptedException { //经过state实现重入机制,若是已经获取锁,则将state++便可。 if (addLockCount()) { return; } for (;;) { //若是当前线程为中断状态,则抛出中断异常 if (Thread.interrupted()) { throw new InterruptedException(); } try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; break; } catch (InterruptedException ie) { //若是捕获中断异常,则设置当前线程为中断状态 logger.error("[DistributedLock#lockInterruptibly] error : " + ie.toString(), ie); Thread.currentThread().interrupt(); } catch (KeeperException ke) { //若是捕获到的异常是 节点已存在 外的其余异常,则设置当前线程为中断状态 logger.error("[DistributedLock#lockInterruptibly] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { Thread.currentThread().interrupt(); } } } }
public boolean tryLock() { //经过state实现重入机制,若是已经获取锁,则将state++便可。 if (addLockCount()) { return true; } //若是获取成功则返回true,失败则返回false try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; return true; } catch (Exception e) { logger.error("[DistributedLock#tryLock] error : " + e.toString(), e); } return false; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { //经过state实现重入机制,若是已经获取锁,则将state++便可。 if (addLockCount()) { return true; } //若是尝试获取超时,则返回false long nanosTimeout = unit.toNanos(time); if (nanosTimeout <= 0L) { return false; } final long deadline = System.nanoTime() + nanosTimeout; for (;;) { //若是当前线程为中断状态,则抛出中断异常 if (Thread.interrupted()) { throw new InterruptedException(); } //若是尝试获取超时,则返回false nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) { return false; } try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; return true; } catch (InterruptedException ie) { //若是捕获中断异常,则返回false logger.error("[DistributedLock#tryLock] error : " + ie.toString(), ie); return false; } catch (KeeperException ke) { //若是捕获到的异常是 节点已存在 外的其余异常,则返回false logger.error("[DistributedLock#tryLock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { return false; } } } }
public void unlock() { //经过state实现重入机制,若是已经获取锁,释放锁时,须要将state--。 delLockCount(); //若是state为0时,说明再也不持有锁,须要将链接关闭,自动删除临时节点 if (state == 0 && zooKeeper != null) { try { zooKeeper.close(); } catch (InterruptedException e) { logger.error("[DistributedLock#unlock] error : " + e.toString(), e); } } }
private boolean addLockCount() { //若是state大于0,即已持有锁,将state数量加一 if (state > 0) { synchronized (this) { if (state > 0) { state++; return true; } } } return false; }
private boolean delLockCount() { //若是state大于0,即还持有锁,将state数量减一 if (state > 0) { synchronized (this) { if (state > 0) { state--; return true; } } } return false; }
上面就是一个经过ZooKeeper实现的分布式可重入锁,利用了临时节点的特性。源代码可见:aloofJr优化
其中有几个能够优化的点。this
下节咱们使用新的方式实现分布式锁来解决上面的几个问题,若是你们好的优化建议,欢迎一块儿讨论。spa
更多文章线程
见个人博客:https://nc2era.com
written by AloofJr,转载请注明出处
本文为云栖社区原创内容,未经容许不得转载。