最近懒成一坨屎,学不动系列一波接一波,大多还都是底层原理相关的。上周末抽时间重读了周志明大湿的 JVM 高效并发部分,每读一遍都有不一样的感悟。路漫漫,借此,把前段时间搞着玩的秒杀案例中的分布式锁深刻了解一下。html
在尝试了解分布式锁以前,你们能够想象一下,什么场景下会使用分布式锁?算法
单机应用架构中,秒杀案例使用ReentrantLcok或者synchronized来达到秒杀商品互斥的目的。然而在分布式系统中,会存在多台机器并行去实现同一个功能。也就是说,在多进程中,若是还使用以上JDK提供的进程锁,来并发访问数据库资源就可能会出现商品超卖的状况。所以,须要咱们来实现本身的分布式锁。数据库
实现一个分布式锁应该具有的特性:apache
高可用、高性能的获取锁与释放锁缓存
在分布式系统环境下,一个方法或者变量同一时间只能被一个线程操做网络
具有锁失效机制,网络中断或宕机没法释放锁时,锁必须被删除,防止死锁session
具有阻塞锁特性,即没有获取到锁,则继续等待获取锁架构
具有非阻塞锁特性,即没有获取到锁,则直接返回获取锁失败并发
具有可重入特性,一个线程中能够屡次获取同一把锁,好比一个线程在执行一个带锁的方法,该方法中又调用了另外一个须要相同锁的方法,则该线程能够直接执行调用的方法,而无需从新得到锁分布式
在以前的秒杀案例中,咱们曾介绍过关于分布式锁几种实现方式:
基于数据库实现分布式锁
基于 Redis 实现分布式锁
基于 Zookeeper 实现分布式锁
前两种对于分布式生产环境来讲并非特别推荐,高并发下数据库锁性能太差,Redis在锁时间限制和缓存一致性存在必定问题。这里咱们重点介绍一下 Zookeeper 如何实现分布式锁。
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能存在惟一文件名。
ZooKeeper数据模型与文件系统目录树(源自网络)
PERSISTENT 持久化节点,节点建立后,不会由于会话失效而消失
EPHEMERAL 临时节点, 客户端session超时此类节点就会被自动删除
EPHEMERAL_SEQUENTIAL 临时自动编号节点
PERSISTENT_SEQUENTIAL 顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1
当建立一个节点时,能够注册一个该节点的监视器,当节点状态发生改变时,watch被触发时,ZooKeeper将会向客户端发送且仅发送一条通知,由于watch只能被触发一次。
根据zookeeper的这些特性,咱们来看看如何利用这些特性来实现分布式锁:
建立一个锁目录lock
线程A获取锁会在lock目录下,建立临时顺序节点
获取锁目录下全部的子节点,而后获取比本身小的兄弟节点,若是不存在,则说明当前线程顺序号最小,得到锁
线程B建立临时节点并获取全部兄弟节点,判断本身不是最小节点,设置监听(watcher)比本身次小的节点
线程A处理完,删除本身的节点,线程B监听到变动事件,判断本身是最小的节点,得到锁
尽管ZooKeeper已经封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。可是若是让一个普通开发者去手撸一个分布式锁仍是比较困难的,在秒杀案例中咱们直接使用 Apache 开源的curator 开实现 Zookeeper 分布式锁。
这里咱们使用如下版本,截止目前最新版4.0.1:
<!-- zookeeper 分布式锁、注意zookeeper版本 这里对应的是3.4.6-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
</dependency>
首先,咱们看下InterProcessLock接口中的几个方法:
/**
* 获取锁、阻塞等待、可重入
*/
public void acquire() throws Exception;
/**
* 获取锁、阻塞等待、可重入、超时则获取失败
*/
public boolean acquire(long time, TimeUnit unit) throws Exception;
/**
* 释放锁
*/
public void release() throws Exception;
/**
* Returns true if the mutex is acquired by a thread in this JVM
*/
boolean isAcquiredInThisProcess();
//获取锁
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
具体实现:
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
实现同一个线程可重入性,若是当前线程已经得到锁,
则增长锁数据中lockCount的数量(重入次数),直接返回成功
*/
//获取当前线程
Thread currentThread = Thread.currentThread();
//获取当前线程重入锁相关数据
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
//原子递增一个当前值,记录重入次数,后面锁释放会用到
lockData.lockCount.incrementAndGet();
return true;
}
//尝试链接zookeeper获取锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
//建立可重入锁数据,用于记录当前线程重入次数
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
//获取锁超时或者zk通讯异常返回失败
return false;
}
Zookeeper获取锁实现:
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
//获取当前时间戳
final long startMillis = System.currentTimeMillis();
//若是unit不为空(非阻塞锁),把当前传入time转为毫秒
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//子节点标识
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
//尝试次数
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
//自旋锁,循环获取锁
while ( !isDone )
{
isDone = true;
try
{
//在锁节点下建立临时且有序的子节点,例如:_c_008c1b07-d577-4e5f-8699-8f0f98a013b4-lock-000000001
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//若是当前子节点序号最小,得到锁则直接返回,不然阻塞等待前一个子节点删除通知(release释放锁)
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
//异常处理,若是找不到节点,这可能发生在session过时等时,所以,若是重试容许,只需重试一次便可
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
//若是获取锁则返回当前锁子节点路径
if ( hasTheLock )
{
return ourPath;
}
return null;
}
判断是否为最小节点:
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
//自旋获取锁
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
//获取全部子节点集合
List<String> children = getSortedChildren();
//判断当前子节点是否为最小子节点
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
//若是是最小节点则获取锁
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
//获取前一个节点,用于监听
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
//这里使用getData()接口而不是checkExists()是由于,若是前一个子节点已经被删除了那么会抛出异常并且不会设置事件监听器,而checkExists虽然也能够获取到节点是否存在的信息可是同时设置了监听器,这个监听器其实永远不会触发,对于Zookeeper来讲属于资源泄露
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
//若是设置了获取锁等待时间
if ( millisToWait <= 0 )
{
doDelete = true; // 超时则删除子节点
break;
}
//等待超时时间
wait(millisToWait);
}
else
{
wait();//一直等待
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
//若是前一个子节点已经被删除则deException,只须要自旋获取一次便可
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);//获取锁超时则删除节点
}
}
return haveTheLock;
}
public void release() throws Exception
{
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
//没有获取锁,你释放个球球,若是为空抛出异常
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
//获取重入数量
int newLockCount = lockData.lockCount.decrementAndGet();
//若是重入锁次数大于0,直接返回
if ( newLockCount > 0 )
{
return;
}
//若是重入锁次数小于0,抛出异常
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{
//释放锁
internals.releaseLock(lockData.lockPath);
}
finally
{
//移除当前线程锁数据
threadData.remove(currentThread);
}
}
为了更好的理解其原理和代码分析中获取锁的过程,这里咱们实现一个简单的Demo:
/**
* 基于curator的zookeeper分布式锁
*/
public class CuratorUtil {
private static String address = "192.168.1.180:2181";
public static void main(String[] args) {
//一、重试策略:初试时间为1s 重试3次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//二、经过工厂建立链接
CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
//三、开启链接
client.start();
//4 分布式锁
final InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
//读写锁
//InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter");
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
fixedThreadPool.submit(new Runnable() {
public void run() {
boolean flag = false;
try {
//尝试获取锁,最多等待5秒
flag = mutex.acquire(5, TimeUnit.SECONDS);
Thread currentThread = Thread.currentThread();
if(flag){
System.out.println("线程"+currentThread.getId()+"获取锁成功");
}else{
System.out.println("线程"+currentThread.getId()+"获取锁失败");
}
//模拟业务逻辑,延时4秒
Thread.sleep(4000);
} catch (Exception e) {
e.printStackTrace();
} finally{
if(flag){
try {
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
});
}
}
}
这里咱们开启5个线程,每一个线程获取锁的最大等待时间为5秒,为了模拟具体业务场景,方法中设置4秒等待时间。开始执行main方法,经过ZooInspector监控/curator/lock下的节点以下图:
对,没错,设置4秒的业务处理时长就是为了观察生成了几个顺序节点。果真如案例中所述,每一个线程都会生成一个节点而且仍是有序的。
观察控制台,咱们会发现只有两个线程获取锁成功,另外三个线程超时获取锁失败会自动删除节点。线程执行完毕咱们刷新一下/curator/lock节点,发现刚才建立的五个子节点已经不存在了。
经过分析第三方开源工具实现的分布式锁方式,收获仍是满满的。学习自己就是一个由浅入深的过程,从如何调用API,到理解其代码逻辑实现。想要更深刻能够去挖掘Zookeeper的核心算法ZAB协议。
最后为了方便你们学习,小结了学习过程当中遇到的几个关键词:重入锁、自旋锁、有序节点、阻塞、非阻塞、监听,但愿对你们有所帮助。
https://yq.aliyun.com/articles/60663
http://www.hollischuang.com/archives/1716
http://www.cnblogs.com/sunddenly/p/4033574.html