Zookeeper分布式锁

原创转载请注明出处:http://www.javashuo.com/article/p-tzhhwqih-hp.htmlhtml

 

Zookeeper是一种提供“分布式服务协调“的中心化服务,分布式应用程序才能够基于Zookeeper的如下两个特性实现分布式锁功能。java

  • 顺序临时节点:Zookeeper提供一个多层级的节点命名空间(节点称为Znode),每一个节点都用一个以斜杠(/)分隔的路径来表示,并且每一个节点都有父节点(根节点除外),很是相似于文件系统。节点类型能够分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),每一个节点还能被标记为有序性(SEQUENTIAL),一旦节点被标记为有序性,那么整个节点就具备顺序自增的特色。通常能够组合这几类节点来建立所须要的节点,例如,建立一个持久节点做为父节点,在父节点下面建立临时节点,并标记该临时节点为有序性。
  • Watch机制:Zookeeper还提供了另一个重要的特性,Watcher(事件监听器)。ZooKeeper容许用户在指定节点上注册一些Watcher,而且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知给用户。

熟悉了Zookeeper的这两个特性以后,就能够看看Zookeeper是如何实现分布式锁的了。node

首先,须要创建一个父节点,节点类型为持久节点(PERSISTENT) ,每当须要访问共享资源时,就会在父节点下创建相应的顺序子节点,节点类型为临时节点(EPHEMERAL),且标记为有序性(SEQUENTIAL),而且以临时节点名称+父节点名称+顺序号组成特定的名字。spring

在创建子节点后,对父节点下面的全部以临时节点名称name开头的子节点进行排序,判断刚刚创建的子节点顺序号是不是最小的节点,若是是最小节点,则得到锁。apache

若是不是最小节点,则阻塞等待锁,而且得到该节点的上一顺序节点,为其注册监听事件,等待节点对应的操做得到锁。session

当调用完共享资源后,删除该节点,关闭zk,进而能够触发监听事件,释放该锁。并发

以上实现的分布式锁是严格按照顺序访问的并发锁。通常还能够直接引用Curator框架来实现Zookeeper分布式锁,代码以下:框架

Maven Dependency分布式

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

DistributedLock.javaide

 1 package org.fool.spring.util;
 2 
 3 import java.util.concurrent.TimeUnit;
 4 
 5 public interface DistributedLock {
 6 
 7     void lock() throws Exception;
 8 
 9     boolean tryLock(long time, TimeUnit unit) throws Exception;
10 
11     void unlock() throws Exception;
12 
13     boolean isAcquiredInThisProcess();
14 }

ZkDistributedLock.java

 1 package org.fool.spring.util;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 5 
 6 import java.util.concurrent.TimeUnit;
 7 
 8 public class ZkDistributedLock implements DistributedLock {
 9 
10     private InterProcessMutex mutex;
11 
12     ZkDistributedLock(ZkClient zkClient, String lockPath) {
13         CuratorFramework client = zkClient.getClient();
14         this.mutex = new InterProcessMutex(client, lockPath);
15     }
16 
17     @Override
18     public void lock() throws Exception {
19         this.mutex.acquire();
20     }
21 
22     @Override
23     public boolean tryLock(long time, TimeUnit unit) throws Exception {
24         return this.mutex.acquire(time, unit);
25     }
26 
27     @Override
28     public void unlock() throws Exception {
29         this.mutex.release();
30     }
31 
32     @Override
33     public boolean isAcquiredInThisProcess() {
34         return this.mutex.isAcquiredInThisProcess();
35     }
36 
37 }

ZkClient.java

 1 package org.fool.spring.util;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 
 5 public class ZkClient {
 6 
 7     private final CuratorFramework client;
 8 
 9     ZkClient(CuratorFramework client) {
10         this.client = client;
11     }
12 
13     CuratorFramework getClient() {
14         return this.client;
15     }
16 
17     /**
18      * start the client
19      */
20     public void start() {
21         this.client.start();
22     }
23 
24     /**
25      * close the client
26      */
27     public void close() {
28         this.client.close();
29     }
30 
31 }

DistributedLocks.java

 1 package org.fool.spring.util;
 2 
 3 import org.apache.curator.RetryPolicy;
 4 import org.apache.curator.framework.CuratorFramework;
 5 import org.apache.curator.framework.CuratorFrameworkFactory;
 6 import org.apache.curator.retry.ExponentialBackoffRetry;
 7 
 8 public final class DistributedLocks {
 9 
10     private DistributedLocks() {
11     }
12 
13     private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000;
14     private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000;
15     private static final int BASE_SLEEP_TIME_MS = 1000;
16     private static final int MAX_RETRIES = 3;
17 
18     /**
19      * Define the default retry policy
20      */
21     private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES);
22 
23     /**
24      * Create a new ZkClient with custom connectString, default sessionTimeout and default connectionTimeout.
25      */
26     public static ZkClient newZkClient(String connectString) {
27         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, DEFAULT_ZK_SESSION_TIMEOUT_MS,
28                 DEFAULT_ZK_CONNECTION_TIMEOUT_MS, DEFAULT_RETRY_POLICY);
29         return new ZkClient(client);
30     }
31 
32     /**
33      * Create a new ZkClient with custom connectString, sessionTimeout and connectionTimeout
34      */
35     public static ZkClient newZkClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs) {
36         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs,
37                 connectionTimeoutMs, DEFAULT_RETRY_POLICY);
38         return new ZkClient(client);
39     }
40 
41     /**
42      * Create a new DistributedLock with ZkClient and lock path.
43      */
44     public static DistributedLock newZkDistributedLock(ZkClient zkClient, String lockPath) {
45         return new ZkDistributedLock(zkClient, lockPath);
46     }
47 }

TestZkDistributedLock.java

 1 package org.fool.spring.test;
 2 
 3 import org.fool.spring.util.DistributedLock;
 4 import org.fool.spring.util.DistributedLocks;
 5 import org.fool.spring.util.ZkClient;
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 
 9 import java.util.concurrent.TimeUnit;
10 
11 public class TestZkDistributedLock {
12 
13     private static final Logger logger = LoggerFactory.getLogger(TestZkDistributedLock.class);
14 
15     private static final String lockPath = "/curator/test";
16 
17     public static void main(String[] args) throws Exception {
18         ZkClient zkClient = DistributedLocks.newZkClient("127.0.0.1:2181");
19         zkClient.start();
20 
21         DistributedLock lock = DistributedLocks.newZkDistributedLock(zkClient, lockPath);
22 
23         boolean isAcquired = lock.isAcquiredInThisProcess();
24         logger.info("==========lock acquired: " + isAcquired + "==========");
25 
26         if (lock.tryLock(3, TimeUnit.SECONDS)) {
27             try {
28                 isAcquired = lock.isAcquiredInThisProcess();
29                 logger.info("==========lock acquired: " + isAcquired + "==========");
30 
31                 // mock to do business logic
32                 Thread.sleep(60000);
33             } finally {
34                 lock.unlock();
35                 logger.info("==========release the lock !!!==========");
36             }
37         } else {
38             logger.info("==========failed to get the lock !!!==========");
39         }
40 
41         zkClient.close();
42     }
43 }

 

Test

执行TestZkDistributedLock,模拟业务执行占用60s时间

在60s内,再次执行TestZkDistributedLock,能够看到尝试获取锁失败

打开zk client,查看执行期间内的顺序临时节点的变化状况

 

 

Summary

Zookeeper实现的分布式锁

优势

  • Zookeeper是集群实现,能够避免单点问题,且能保证每次操做均可以有效地释放锁,这是由于一旦应用服务挂掉了,临时节点会由于session链接断开而自动删除掉。

缺点

  • 因为频繁地建立和删除结点,加上大量的Watch事件,对Zookeeper集群来讲,压力很是大。且从性能上来讲,与Redis实现的分布式锁相比,仍是存在必定的差距。

 

Reference

https://time.geekbang.org/column/article/125983

http://curator.apache.org/

相关文章
相关标签/搜索