Zookeeper实现分布式锁

实现分布式锁目前有三种流行方案,分别为基于数据库、Redis、Zookeeper的方案,其中前两种方案网络上有不少资料能够参考,本文不作展开。咱们来看下使用Zookeeper如何实现分布式锁。node

什么是Zookeeper?算法

Zookeeper(业界简称zk)是一种提供配置管理、分布式协同以及命名的中心化服务,这些提供的功能都是分布式系统中很是底层且必不可少的基本功能,可是若是本身实现这些功能并且要达到高吞吐、低延迟同时还要保持一致性和可用性,实际上很是困难。所以zookeeper提供了这些功能,开发者在zookeeper之上构建本身的各类分布式系统。数据库

虽然zookeeper的实现比较复杂,可是它提供的模型抽象倒是很是简单的。Zookeeper提供一个多层级的节点命名空间(节点称为znode),每一个节点都用一个以斜杠(/)分隔的路径表示,并且每一个节点都有父节点(根节点除外),很是相似于文件系统。例如,/foo/doo这个表示一个znode,它的父节点为/foo,父父节点为/,而/为根节点没有父节点。与文件系统不一样的是,这些节点均可以设置关联的数据,而文件系统中只有文件节点能够存放数据而目录节点不行。Zookeeper为了保证高吞吐和低延迟,在内存中维护了这个树状的目录结构,这种特性使得Zookeeper不能用于存放大量的数据,每一个节点的存放数据上限为1M。apache

而为了保证高可用,zookeeper须要以集群形态来部署,这样只要集群中大部分机器是可用的(可以容忍必定的机器故障),那么zookeeper自己仍然是可用的。客户端在使用zookeeper时,须要知道集群机器列表,经过与集群中的某一台机器创建TCP链接来使用服务,客户端使用这个TCP连接来发送请求、获取结果、获取监听事件以及发送心跳包。若是这个链接异常断开了,客户端能够链接到另外的机器上。网络

架构简图以下所示:session

zookeeper数据结构

客户端的读请求能够被集群中的任意一台机器处理,若是读请求在节点上注册了监听器,这个监听器也是由所链接的zookeeper机器来处理。对于写请求,这些请求会同时发给其余zookeeper机器而且达成一致后,请求才会返回成功。所以,随着zookeeper的集群机器增多,读请求的吞吐会提升可是写请求的吞吐会降低。架构

有序性是zookeeper中很是重要的一个特性,全部的更新都是全局有序的,每一个更新都有一个惟一的时间戳,这个时间戳称为zxid(Zookeeper Transaction Id)。而读请求只会相对于更新有序,也就是读请求的返回结果中会带有这个zookeeper最新的zxid。分布式

如何使用zookeeper实现分布式锁?ide

在描述算法流程以前,先看下zookeeper中几个关于节点的有趣的性质:

  • 有序节点:假如当前有一个父节点为/lock,咱们能够在这个父节点下面建立子节点;zookeeper提供了一个可选的有序特性,例如咱们能够建立子节点“/lock/node-”而且指明有序,那么zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号,也就是说若是是第一个建立的子节点,那么生成的子节点为/lock/node-0000000000,下一个节点则为/lock/node-0000000001,依次类推。

  • 临时节点:客户端能够创建一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点。

  • 事件监听:在读取数据时,咱们能够同时对节点设置事件监听,当节点数据或结构变化时,zookeeper会通知客户端。当前zookeeper有以下四种事件:1)节点建立;2)节点删除;3)节点数据修改;4)子节点变动。

下面描述使用zookeeper实现分布式锁的算法流程,假设锁空间的根节点为/lock:

  1. 客户端链接zookeeper,并在/lock下建立临时的且有序的子节点,第一个客户端对应的子节点为/lock/lock-0000000000,第二个为/lock/lock-0000000001,以此类推。

  2. 客户端获取/lock下的子节点列表,判断本身建立的子节点是否为当前子节点列表中序号最小的子节点,若是是则认为得到锁,不然监听/lock的子节点变动消息,得到子节点变动通知后重复此步骤直至得到锁;

  3. 执行业务代码;

  4. 完成业务流程后,删除对应的子节点释放锁。

步骤1中建立的临时节点可以保证在故障的状况下锁也能被释放,考虑这么个场景:假如客户端a当前建立的子节点为序号最小的节点,得到锁以后客户端所在机器宕机了,客户端没有主动删除子节点;若是建立的是永久的节点,那么这个锁永远不会释放,致使死锁;因为建立的是临时节点,客户端宕机后,过了必定时间zookeeper没有收到客户端的心跳包判断会话失效,将临时节点删除从而释放锁。

另外细心的朋友可能会想到,在步骤2中获取子节点列表与设置监听这两步操做的原子性问题,考虑这么个场景:客户端a对应子节点为/lock/lock-0000000000,客户端b对应子节点为/lock/lock-0000000001,客户端b获取子节点列表时发现本身不是序号最小的,可是在设置监听器前客户端a完成业务流程删除了子节点/lock/lock-0000000000,客户端b设置的监听器岂不是丢失了这个事件从而致使永远等待了?这个问题不存在的。由于zookeeper提供的API中设置监听器的操做与读操做是原子执行的,也就是说在读子节点列表时同时设置监听器,保证不会丢失事件。

最后,对于这个算法有个极大的优化点:假如当前有1000个节点在等待锁,若是得到锁的客户端释放锁时,这1000个客户端都会被唤醒,这种状况称为“羊群效应”;在这种羊群效应中,zookeeper须要通知1000个客户端,这会阻塞其余的操做,最好的状况应该只唤醒新的最小节点对应的客户端。应该怎么作呢?在设置事件监听时,每一个客户端应该对恰好在它以前的子节点设置事件监听,例如子节点列表为/lock/lock-0000000000、/lock/lock-000000000一、/lock/lock-0000000002,序号为1的客户端监听序号为0的子节点删除消息,序号为2的监听序号为1的子节点删除消息。

zookeeper学习中

因此调整后的分布式锁算法流程以下:

  • 客户端链接zookeeper,并在/lock下建立临时的且有序的子节点,第一个客户端对应的子节点为/lock/lock-0000000000,第二个为/lock/lock-0000000001,以此类推;

  • 客户端获取/lock下的子节点列表,判断本身建立的子节点是否为当前子节点列表中序号最小的子节点,若是是则认为得到锁,不然监听恰好在本身以前一位的子节点删除消息,得到子节点变动通知后重复此步骤直至得到锁;

  • 执行业务代码;

  • 完成业务流程后,删除对应的子节点释放锁。

Curator的源码分析

虽然zookeeper原生客户端暴露的API已经很是简洁了,可是实现一个分布式锁仍是比较麻烦的…咱们能够直接使用curator这个开源项目提供的zookeeper分布式锁实现。

代码以下:

public static void main(String[] args) throws Exception {

//建立zookeeper的客户端

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);

client.start();

//建立分布式锁, 锁空间的根节点路径为/curator/lock

InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");

mutex.acquire();

//得到了锁, 进行业务流程

System.out.println("Enter mutex");

//完成业务流程, 释放锁

mutex.release();

//关闭客户端

client.close();

}

能够看到关键的核心操做就只有mutex.acquire()和mutex.release(),简直太方便了!

下面来分析下获取锁的源码实现。acquire的方法以下:

/*

* 获取锁,当锁被占用时会阻塞等待,这个操做支持同线程的可重入(也就是重复获取锁),acquire的次数须要与release的次数相同。

@throws Exception ZK errors, connection interruptions

*/

@Override

public void acquire() throws Exception

{

if ( !internalLock(-1, null) )

{

throw new IOException("Lost connection while trying to acquire lock: " + basePath);

}

}

这里有个地方须要注意,当与zookeeper通讯存在异常时,acquire会直接抛出异常,须要使用者自身作重试策略。代码中调用了internalLock(-1, null),参数代表在锁被占用时永久阻塞等待。internalLock的代码以下:

private boolean internalLock(long time, TimeUnit unit) throws Exception

{

//这里处理同线程的可重入性,若是已经得到锁,那么只是在对应的数据结构中增长acquire的次数统计,直接返回成功

Thread currentThread = Thread.currentThread();

LockData lockData = threadData.get(currentThread);

if ( lockData != null )

{

// re-entering

lockData.lockCount.incrementAndGet();

return true;

}

//这里才真正去zookeeper中获取锁

String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());

if ( lockPath != null )

{

//得到锁以后,记录当前的线程得到锁的信息,在重入时只需在LockData中增长次数统计便可

LockData newLockData = new LockData(currentThread, lockPath);

threadData.put(currentThread, newLockData);

return true;

}

//在阻塞返回时仍然获取不到锁,这里上下文的处理隐含的意思为zookeeper通讯异常

return false;

}

代码中增长了具体注释,不作展开。看下zookeeper获取锁的具体实现:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception

{

//参数初始化,此处省略

//...

//自旋获取锁

while ( !isDone )

{

isDone = true;

try

{

//在锁空间下建立临时且有序的子节点

ourPath = driver.createsTheLock(client, path, localLockNodeBytes);

//判断是否得到锁(子节点序号最小),得到锁则直接返回,不然阻塞等待前一个子节点删除通知

hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);

}

catch ( KeeperException.NoNodeException e )

{

//对于NoNodeException,代码中确保了只有发生session过时才会在这里抛出NoNodeException,所以这里根据重试策略进行重试

if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )

{

isDone = false;

}

else

{

throw e;

}

}

}

//若是得到锁则返回该子节点的路径

if ( hasTheLock )

{

return ourPath;

}

return null;

}

上面代码中主要有两步操做:

  • driver.createsTheLock:建立临时且有序的子节点,里面实现比较简单不作展开,主要关注几种节点的模式:1)PERSISTENT(永久);2)PERSISTENT_SEQUENTIAL(永久且有序);3)EPHEMERAL(临时);4)EPHEMERAL_SEQUENTIAL(临时且有序)。

  • internalLockLoop:阻塞等待直到得到锁。

看下internalLockLoop是怎么判断锁以及阻塞等待的,这里删除了一些无关代码,只保留主流程:

//自旋直至得到锁

while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )

{

//获取全部的子节点列表,而且按序号从小到大排序

List<String> children = getSortedChildren();

//根据序号判断当前子节点是否为最小子节点

String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);

if ( predicateResults.getsTheLock() )

{

//若是为最小子节点则认为得到锁

haveTheLock = true;

}

else

{

//不然获取前一个子节点

String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

//这里使用对象监视器作线程同步,当获取不到锁时监听前一个子节点删除消息而且进行wait(),当前一个子节点删除(也就是锁释放)时,回调会经过notifyAll唤醒此线程,此线程继续自旋判断是否得到锁

synchronized(this)

{

try

{

//这里使用getData()接口而不是checkExists()是由于,若是前一个子节点已经被删除了那么会抛出异常并且不会设置事件监听器,而checkExists虽然也能够获取到节点是否存在的信息可是同时设置了监听器,这个监听器其实永远不会触发,对于zookeeper来讲属于资源泄露

client.getData().usingWatcher(watcher).forPath(previousSequencePath);

//若是设置了阻塞等待的时间

if ( millisToWait != null )

{

millisToWait -= (System.currentTimeMillis() - startMillis);

startMillis = System.currentTimeMillis();

if ( millisToWait <= 0 )

{

doDelete = true; // 等待时间到达,删除对应的子节点

break;

}

//等待相应的时间

wait(millisToWait);

}

else

{

//永远等待

wait();

}

}

catch ( KeeperException.NoNodeException e )

{

//上面使用getData来设置监听器时,若是前一个子节点已经被删除那么会抛出NoNodeException,只须要自旋一次便可,无需额外处理

}

}

}

}

具体逻辑见注释,再也不赘述。代码中设置的事件监听器,在事件发生回调时只是简单的notifyAll唤醒当前线程以从新自旋判断,比较简单再也不展开。

curator库:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.7</version>
</dependency>

其余关于zk锁的说明:http://www.javashuo.com/article/p-vvxfodlk-ms.html