「从入门到放弃-ZooKeeper」ZooKeeper实战-分布式锁

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列中,咱们一块儿写了下如何经过ZooKeeper的持久性顺序节点实现一个分布式队列。java

本文咱们来一块儿写一个ZooKeeper的实现的分布式锁。node

设计

参考以前学习的【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock,实现java.util.concurrent.locks.Lock接口。编程

咱们经过重写接口中的方法实现一个可重入锁。并发

  • lock:请求锁,若是成功则直接返回,不成功则阻塞 直到获取锁。
  • lockInterruptibly:请求锁,若是失败则一直阻塞等待 直到获取锁或线程中断
  • tryLock:一、尝试获取锁,获取失败的话 直接返回false,不会再等待。二、尝试获取锁,获取成功返回true,不然一直请求,直到超时返回false
  • unlock:释放锁

咱们使用ZooKeeper的EPHEMERAL临时节点机制,若是能建立成功的话,则获取锁成功,释放锁或客户端断开链接后,临时节点自动删除,这样能够避免误删除或漏删除的状况。分布式

获取锁失败后,这里咱们使用轮询的方式来不断尝试建立。其实应该使用Watcher机制来实现,这样能避免大量的无用请求。在下一节更优雅的分布式锁实现机制中咱们会用到。学习

DistributedLock

public class DistributedLock implements Lock { private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class); //ZooKeeper客户端,进行ZooKeeper操做 private ZooKeeper zooKeeper; //根节点名称 private String dir; //加锁节点 private String node; //ZooKeeper鉴权信息 private List<ACL> acls; //要加锁节点 private String fullPath; //加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开链接,删除临时节点。 private volatile int state; /** * Constructor. * * @param zooKeeper the zoo keeper * @param dir the dir * @param node the node * @param acls the acls */ public DistributedLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) { this.zooKeeper = zooKeeper; this.dir = dir; this.node = node; this.acls = acls; this.fullPath = dir.concat("/").concat(node); init(); } private void init() { try { Stat stat = zooKeeper.exists(dir, false); if (stat == null) { zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT); } } catch (Exception e) { logger.error("[DistributedLock#init] error : " + e.toString(), e); } } }
 

lock

public void lock() { //经过state实现重入机制,若是已经获取锁,则将state++便可。 if (addLockCount()) { return; } //一直尝试获取锁,知道获取成功 for (;;) { try { //建立临时节点 zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); //第一次获取锁,state++,这里不须要使用加锁机制保证原子性,由于同一时间,最多只有一个线程能create节点成功。 state++; break; } catch (InterruptedException ie) { //若是捕获中断异常,则设置当前线程为中断状态 logger.error("[DistributedLock#lock] error : " + ie.toString(), ie); Thread.currentThread().interrupt(); } catch (KeeperException ke) { //若是捕获到的异常是 节点已存在 外的其余异常,则设置当前线程为中断状态 logger.error("[DistributedLock#lock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { Thread.currentThread().interrupt(); } } } }
 

lockInterruptibly

public void lockInterruptibly() throws InterruptedException { //经过state实现重入机制,若是已经获取锁,则将state++便可。 if (addLockCount()) { return; } for (;;) { //若是当前线程为中断状态,则抛出中断异常 if (Thread.interrupted()) { throw new InterruptedException(); } try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; break; } catch (InterruptedException ie) { //若是捕获中断异常,则设置当前线程为中断状态 logger.error("[DistributedLock#lockInterruptibly] error : " + ie.toString(), ie); Thread.currentThread().interrupt(); } catch (KeeperException ke) { //若是捕获到的异常是 节点已存在 外的其余异常,则设置当前线程为中断状态 logger.error("[DistributedLock#lockInterruptibly] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { Thread.currentThread().interrupt(); } } } }
 

tryLock

public boolean tryLock() { //经过state实现重入机制,若是已经获取锁,则将state++便可。 if (addLockCount()) { return true; } //若是获取成功则返回true,失败则返回false try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; return true; } catch (Exception e) { logger.error("[DistributedLock#tryLock] error : " + e.toString(), e); } return false; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { //经过state实现重入机制,若是已经获取锁,则将state++便可。 if (addLockCount()) { return true; } //若是尝试获取超时,则返回false long nanosTimeout = unit.toNanos(time); if (nanosTimeout <= 0L) { return false; } final long deadline = System.nanoTime() + nanosTimeout; for (;;) { //若是当前线程为中断状态,则抛出中断异常 if (Thread.interrupted()) { throw new InterruptedException(); } //若是尝试获取超时,则返回false nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) { return false; } try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; return true; } catch (InterruptedException ie) { //若是捕获中断异常,则返回false logger.error("[DistributedLock#tryLock] error : " + ie.toString(), ie); return false; } catch (KeeperException ke) { //若是捕获到的异常是 节点已存在 外的其余异常,则返回false logger.error("[DistributedLock#tryLock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { return false; } } } }
 

unlock

public void unlock() { //经过state实现重入机制,若是已经获取锁,释放锁时,须要将state--。 delLockCount(); //若是state为0时,说明再也不持有锁,须要将链接关闭,自动删除临时节点 if (state == 0 && zooKeeper != null) { try { zooKeeper.close(); } catch (InterruptedException e) { logger.error("[DistributedLock#unlock] error : " + e.toString(), e); } } }
 

addLockCount

private boolean addLockCount() { //若是state大于0,即已持有锁,将state数量加一 if (state > 0) { synchronized (this) { if (state > 0) { state++; return true; } } } return false; }
 

delLockCount

private boolean delLockCount() { //若是state大于0,即还持有锁,将state数量减一 if (state > 0) { synchronized (this) { if (state > 0) { state--; return true; } } } return false; }
 

总结

上面就是一个经过ZooKeeper实现的分布式可重入锁,利用了临时节点的特性。源代码可见:aloofJr优化

其中有几个能够优化的点。this

  • 轮询的方式换成Watcher机制
  • 可重入锁实现方式的优化
  • 全部线程竞争一个节点的建立,容易出现羊群效应,且是一种不公平的锁竞争模式

下节咱们使用新的方式实现分布式锁来解决上面的几个问题,若是你们好的优化建议,欢迎一块儿讨论。spa

更多文章线程

见个人博客:https://nc2era.com

written by AloofJr,转载请注明出处

 

原文连接

本文为云栖社区原创内容,未经容许不得转载。

相关文章
相关标签/搜索