原创转载请注明出处:http://www.javashuo.com/article/p-tzhhwqih-hp.htmlhtml
Zookeeper是一种提供“分布式服务协调“的中心化服务,分布式应用程序才能够基于Zookeeper的如下两个特性实现分布式锁功能。java
熟悉了Zookeeper的这两个特性以后,就能够看看Zookeeper是如何实现分布式锁的了。node
首先,须要创建一个父节点,节点类型为持久节点(PERSISTENT) ,每当须要访问共享资源时,就会在父节点下创建相应的顺序子节点,节点类型为临时节点(EPHEMERAL),且标记为有序性(SEQUENTIAL),而且以临时节点名称+父节点名称+顺序号组成特定的名字。spring
在创建子节点后,对父节点下面的全部以临时节点名称name开头的子节点进行排序,判断刚刚创建的子节点顺序号是不是最小的节点,若是是最小节点,则得到锁。apache
若是不是最小节点,则阻塞等待锁,而且得到该节点的上一顺序节点,为其注册监听事件,等待节点对应的操做得到锁。session
当调用完共享资源后,删除该节点,关闭zk,进而能够触发监听事件,释放该锁。并发
以上实现的分布式锁是严格按照顺序访问的并发锁。通常还能够直接引用Curator框架来实现Zookeeper分布式锁,代码以下:框架
Maven Dependency分布式
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency>
DistributedLock.javaide
1 package org.fool.spring.util; 2 3 import java.util.concurrent.TimeUnit; 4 5 public interface DistributedLock { 6 7 void lock() throws Exception; 8 9 boolean tryLock(long time, TimeUnit unit) throws Exception; 10 11 void unlock() throws Exception; 12 13 boolean isAcquiredInThisProcess(); 14 }
ZkDistributedLock.java
1 package org.fool.spring.util; 2 3 import org.apache.curator.framework.CuratorFramework; 4 import org.apache.curator.framework.recipes.locks.InterProcessMutex; 5 6 import java.util.concurrent.TimeUnit; 7 8 public class ZkDistributedLock implements DistributedLock { 9 10 private InterProcessMutex mutex; 11 12 ZkDistributedLock(ZkClient zkClient, String lockPath) { 13 CuratorFramework client = zkClient.getClient(); 14 this.mutex = new InterProcessMutex(client, lockPath); 15 } 16 17 @Override 18 public void lock() throws Exception { 19 this.mutex.acquire(); 20 } 21 22 @Override 23 public boolean tryLock(long time, TimeUnit unit) throws Exception { 24 return this.mutex.acquire(time, unit); 25 } 26 27 @Override 28 public void unlock() throws Exception { 29 this.mutex.release(); 30 } 31 32 @Override 33 public boolean isAcquiredInThisProcess() { 34 return this.mutex.isAcquiredInThisProcess(); 35 } 36 37 }
ZkClient.java
1 package org.fool.spring.util; 2 3 import org.apache.curator.framework.CuratorFramework; 4 5 public class ZkClient { 6 7 private final CuratorFramework client; 8 9 ZkClient(CuratorFramework client) { 10 this.client = client; 11 } 12 13 CuratorFramework getClient() { 14 return this.client; 15 } 16 17 /** 18 * start the client 19 */ 20 public void start() { 21 this.client.start(); 22 } 23 24 /** 25 * close the client 26 */ 27 public void close() { 28 this.client.close(); 29 } 30 31 }
DistributedLocks.java
1 package org.fool.spring.util; 2 3 import org.apache.curator.RetryPolicy; 4 import org.apache.curator.framework.CuratorFramework; 5 import org.apache.curator.framework.CuratorFrameworkFactory; 6 import org.apache.curator.retry.ExponentialBackoffRetry; 7 8 public final class DistributedLocks { 9 10 private DistributedLocks() { 11 } 12 13 private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000; 14 private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000; 15 private static final int BASE_SLEEP_TIME_MS = 1000; 16 private static final int MAX_RETRIES = 3; 17 18 /** 19 * Define the default retry policy 20 */ 21 private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES); 22 23 /** 24 * Create a new ZkClient with custom connectString, default sessionTimeout and default connectionTimeout. 25 */ 26 public static ZkClient newZkClient(String connectString) { 27 CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, DEFAULT_ZK_SESSION_TIMEOUT_MS, 28 DEFAULT_ZK_CONNECTION_TIMEOUT_MS, DEFAULT_RETRY_POLICY); 29 return new ZkClient(client); 30 } 31 32 /** 33 * Create a new ZkClient with custom connectString, sessionTimeout and connectionTimeout 34 */ 35 public static ZkClient newZkClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs) { 36 CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, 37 connectionTimeoutMs, DEFAULT_RETRY_POLICY); 38 return new ZkClient(client); 39 } 40 41 /** 42 * Create a new DistributedLock with ZkClient and lock path. 43 */ 44 public static DistributedLock newZkDistributedLock(ZkClient zkClient, String lockPath) { 45 return new ZkDistributedLock(zkClient, lockPath); 46 } 47 }
TestZkDistributedLock.java
1 package org.fool.spring.test; 2 3 import org.fool.spring.util.DistributedLock; 4 import org.fool.spring.util.DistributedLocks; 5 import org.fool.spring.util.ZkClient; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import java.util.concurrent.TimeUnit; 10 11 public class TestZkDistributedLock { 12 13 private static final Logger logger = LoggerFactory.getLogger(TestZkDistributedLock.class); 14 15 private static final String lockPath = "/curator/test"; 16 17 public static void main(String[] args) throws Exception { 18 ZkClient zkClient = DistributedLocks.newZkClient("127.0.0.1:2181"); 19 zkClient.start(); 20 21 DistributedLock lock = DistributedLocks.newZkDistributedLock(zkClient, lockPath); 22 23 boolean isAcquired = lock.isAcquiredInThisProcess(); 24 logger.info("==========lock acquired: " + isAcquired + "=========="); 25 26 if (lock.tryLock(3, TimeUnit.SECONDS)) { 27 try { 28 isAcquired = lock.isAcquiredInThisProcess(); 29 logger.info("==========lock acquired: " + isAcquired + "=========="); 30 31 // mock to do business logic 32 Thread.sleep(60000); 33 } finally { 34 lock.unlock(); 35 logger.info("==========release the lock !!!=========="); 36 } 37 } else { 38 logger.info("==========failed to get the lock !!!=========="); 39 } 40 41 zkClient.close(); 42 } 43 }
执行TestZkDistributedLock,模拟业务执行占用60s时间
在60s内,再次执行TestZkDistributedLock,能够看到尝试获取锁失败
打开zk client,查看执行期间内的顺序临时节点的变化状况
Zookeeper实现的分布式锁
优势
缺点