Curator框架实现ZooKeeper分布式锁

 排他锁(X)

这里主要讲讲分布式锁中的排他锁。排他锁(Exclusive Locks,简称X锁),又称为写锁或独占锁,是一种基本的锁类型。若是事务T1对数据对象O1加上了排他锁,那么在整个加锁期间,只容许T1对O1进行数据的读取和更新操做,其它任何事务都不能对O1进行任何类型的操做,直道T1释放了排他锁。java

定义锁

在ZooKeeper中,能够经过在ZooKeeper中建立一个数据节点来表示一个锁。好比,/exclusive_lock/lock节点(znode)就能够表示为一个锁。node

获取锁

在须要获取排他锁时,全部的客户端都会试图经过create()接口,在/exclusive_lock节点下建立临时的子节点/exclusive_lock/lock,但ZooKeeper的强一致性最终只会保证仅有一个客户单能建立成功,那么就认为该客户端获取了锁。同时,全部没有获取锁的客户端事务只能处于等待状态,这些处于等待状态的客户端事先能够在/exclusive_lock节点上注册一个子节点变动的Watcher监听,以便实时监听到子节点的变动状况。apache

释放锁

 在“定义锁”部分,咱们已经提到/exclusive_lock/lock是一个临时节点,所以在如下两种状况下可能释放锁。服务器

  • 当前获取锁的客户端发生宕机,那么ZooKeeper服务器上保存的临时性节点就会被删除;
  • 正常执行完业务逻辑后,由客户端主动来将本身建立的临时节点删除。

不管什么状况下,临时节点/exclusive_lock/lock被移除,ZooKeeper都会通知在/exclusive_lock注册了子节点变动Watcher监听的客户端。这些客户端在接收到通知之后就会再次发起获取锁的操做,即重复“获取锁”过程。排他锁流程以下:session

下面的代码(java)演示了使用Curator框架来实现ZooKeeper分布式锁框架

import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;


public class ZkLock {

  @SneakyThrows
  public static void main(String[] args) {

    final String connectString = "localhost:2181,localhost:2182,localhost:2183";

    // 重试策略,初始化每次重试之间须要等待的时间,基准等待时间为1秒。
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

    // 使用默认的会话时间(60秒)和链接超时时间(15秒)来建立 Zookeeper 客户端
    @Cleanup CuratorFramework client = CuratorFrameworkFactory.builder().
        connectString(connectString).
        connectionTimeoutMs(15 * 1000).
        sessionTimeoutMs(60 * 100).
        retryPolicy(retryPolicy).
        build();

    // 启动客户端
    client.start();

    final String lockNode = "/lock_node";
    InterProcessMutex lock = new InterProcessMutex(client, lockNode);
    try {
      // 1. Acquire the mutex - blocking until it's available.
      lock.acquire();

      // OR

      // 2. Acquire the mutex - blocks until it's available or the given time expires.
      if (lock.acquire(60, TimeUnit.MINUTES)) {
        Stat stat = client.checkExists().forPath(lockNode);
        if (null != stat){
          // Dot the transaction
        }
      }
    } finally {
      if (lock.isAcquiredInThisProcess()) {
        lock.release();
      }
    }
  }

}

 maven引用maven

<!--curator这个开源项目提供zookeeper分布式锁实现-->
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>2.8.0</version>
</dependency>
相关文章
相关标签/搜索