在以前的文章中介绍过度布式锁的特色和利用Redis实现简单的分布式锁。可是分布式锁的实现还有不少其余方式,可是万变不离其宗,始终遵循一个特色:同一时刻只能有一个操做获取。这篇文章主要介绍如何基于zookeeper实现分布式锁。html
关于分布式锁的相关特性,这里再也不赘述,请参考分布式锁。node
这里回顾下分布式锁的特色:算法
zookeeper中有一种临时顺序节点,它具备如下特征:分布式
利用以上的特色能够知足分布式锁实现的基本要求:ide
由于顺序性,可让最小顺序号的应用获取到锁,从而知足分布式锁的每次只能一个占用锁,由于只有它一个获取到,因此能够实现重复进入,只要设置标识便可。锁的释放,即删除应用在zookeeper上注册的节点,由于每一个节点只被本身注册拥有,因此只有本身才能删除,这样就知足只有占用者才能够解锁性能
zookeeper的序号分配是原子的,分配后即不会再改变,让最小序号者获取锁,因此获取锁是原子的fetch
由于注册的是临时节点,在会话期间内有效,因此不会产生死锁ui
zookeeper注册节点的性能能知足几千,并且支持集群,可以知足大部分状况下的性能this
须要获取分布式锁的应用都向zookeeper的/lock/{resouce}目录下注册sequence-前缀的节点,序号最小者获取到操做资源的权限:线程
Note:
这里的resource须要依据竞争的具体资源肯定,如竞争帐户则能够使用帐户号做为resource。
从图中能够看出,clientA的顺序号最小,由它获取到锁,操做资源。
算法步骤:
流程图:
由于最小的节点只被获取到锁的client持有,因此该锁不可能被其余client释放。同时释放锁只须要将临时顺序节点删除,也是原子性操做。
/** * 基于Zookeeper实现分布式锁 * * @author huaijin */ public class DistributedLockBaseZookeeper implements DistributedLock { private static final Logger log = LoggerFactory.getLogger(DistributedLockBaseZookeeper.class); /** * 利用空串做为各个节点存储的数据 */ private static final String EMPTY_DATA = ""; /** * 分布式锁的根目录 */ private static final String LOCK_ROOT = "/lock"; /** * zookeeper目录分隔符 */ private static final String PATH_SEPARATOR = "/"; /** * 临时顺序节点前缀 */ private static final String LOCK_NODE_PREFIX = "sequence-"; /** * 利用Lock和Condition实现等待通知 */ private Lock waitNotifierLock = new ReentrantLock(); private Condition waitNotifier = waitNotifierLock.newCondition(); /** * 操做zookeeper的client */ private ZkClient zkClient; /** * 分布式资源的路径 */ private String resourcePath; /** * 锁节点完整前缀 */ private String lockNodePrefix; /** * 当前注册的临时顺序节点路径 */ private String currentLockNodePath; public DistributedLockBaseZookeeper(String resource, ZkClient zkClient) { Objects.requireNonNull(zkClient, "zkClient must not be null!"); if (resource == null || resource.isEmpty()) { throw new IllegalArgumentException("resource must not be null!"); } this.zkClient = zkClient; this.resourcePath = LOCK_ROOT + PATH_SEPARATOR + resource; this.lockNodePrefix = resourcePath + PATH_SEPARATOR + LOCK_NODE_PREFIX; // 建立分布式锁根目录 if (!this.zkClient.exists(LOCK_ROOT)) { try { this.zkClient.create(LOCK_ROOT, EMPTY_DATA, CreateMode.PERSISTENT); } catch (ZkNodeExistsException e) { // ignore, logging log.warn("The root path for lock already exists."); } } // 建立资源目录 if (!this.zkClient.exists(resourcePath)) { try { this.zkClient.create(resourcePath, EMPTY_DATA, CreateMode.PERSISTENT); } catch (ZkNodeExistsException e) { // ignore, logging log.warn("The resource path for [" + resourcePath + "] already exists."); } } } @Override public void lock() throws DistributedLockException { if (!acquireLock()) { // 若是获取锁不成功,则等待 waitNotifierLock.lock(); try { waitNotifier.await(); } catch (Exception e) { throw new DistributedLockException("Interrupt when waiting notification."); } finally { waitNotifierLock.unlock(); } } } @Override public void unlock() { // 删除自身节点,释放锁 zkClient.delete(currentLockNodePath); } private boolean acquireLock() throws DistributedLockException { // 若是当前未注册临时顺序节点,则注册 if (this.currentLockNodePath == null) { this.currentLockNodePath = zkClient.create(lockNodePrefix, EMPTY_DATA, CreateMode.EPHEMERAL_SEQUENTIAL); } // 获取顺序号 long lockNodeSeq = fetchSeqFromNodePath(currentLockNodePath); // 获取全部子节点 List<String> childNodePaths = zkClient.getChildren(resourcePath); if (childNodePaths == null || childNodePaths.isEmpty()) { throw new DistributedLockException("Not exists child nodes."); } // 从全部子节点中获取最小子节点的顺序号 long minSeq = 1000000L; int minIndex = -1; for (int i = 0; i < childNodePaths.size(); i++) { long nodeSeq = fetchSeqFromNodePath(resourcePath + childNodePaths.get(i)); if (nodeSeq < minSeq) { minSeq = nodeSeq; minIndex = i; } } // 比较自身顺序号与最小序号 if (lockNodeSeq > minSeq) { // 若是存在更小序号,则监控最小序号的子节点 String minLockNodePath = childNodePaths.get(minIndex); zkClient.subscribeDataChanges(resourcePath + PATH_SEPARATOR + minLockNodePath, new ListenerForLockRelease()); return false; } // 成功获取锁,返回 return true; } private long fetchSeqFromNodePath(String nodePath) { String seq = nodePath.substring(lockNodePrefix.length()); return Long.valueOf(seq); } private class ListenerForLockRelease implements IZkDataListener { @Override public void handleDataChange(String dataPath, Object data) throws Exception { } @Override public void handleDataDeleted(String dataPath) throws Exception { // 若是成功获取锁,则通知,让主线程返回 if (acquireLock()) { waitNotifierLock.lock(); try { waitNotifier.signal(); } finally { waitNotifierLock.unlock(); } } } } }