
1 前言
在程序中,咱们想要保证一个变量的可见性及原子性,咱们能够用volatile(对任意单个volatile变量的读/写具备原子性,但相似于volatile++这种复合操做不具备原子性)、synchronized、乐观锁、悲观锁等等来控制。单体应用内能够这样作,而如今随着时代的发展,大多项目都已经告别的单机时代,拥抱微服务时代,这样的状况下不少服务须要作集群,一个应用须要部署到几台机器上而后作负载均衡,在并发状况下使用上面说的机制来保证变量的可见性及原子性就不可行了(以下图),从而产生了不少分布式机制(如分布式事务、分布式锁等),主要的做用仍是用来保证数据的一致性:node

如上图,假设变量a是剩余库存,值为1,这时候三个用户进来下单,正好三个请求被分到了三个不一样的服务节点上面,三个节点 检查剩余库存,发现还有1个,而后都去进行扣减,这样就致使库存为负数,有两个用户没有货发,就是俗称的超卖。这种状况是不被接受的,用户会和业务撕逼、业务会和你领导吵架,而后你就收拾书包回家了!程序员
在这种场景中,咱们就须要一种方法解决这个问题,这就是分布式锁要解决的问题。web
2 分布式锁的实现与特性
2.1 分布式锁的实现
本地锁能够经过语言自己支持,要实现分布式锁,就必须依赖中间件,数据库、redis、zookeeper等,主要有如下几种实现方式:
1)Memcached:利用 Memcached 的 add 命令。此命令是原子性操做,只有在 key 不存在的状况下,才能 add 成功,也就意味着线程获得了锁。
2)Redis:和 Memcached 的方式相似,利用 Redis 的 setnx 命令。此命令一样是原子性操做,只有在 key 不存在的状况下,才能 set 成功。
3)Zookeeper:利用 Zookeeper 的顺序临时节点,来实现分布式锁和等待队列。Zookeeper 设计的初衷,就是为了实现分布式锁服务的。
4)Chubby:Google 公司实现的粗粒度分布式锁服务,底层利用了 Paxos 一致性算法。
redis
2.2 分布式锁的特性
1)在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行。
2)高可用的获取锁与释放锁。
3)高性能的获取锁与释放锁。
4)具有可重入特性。
5)具有锁失效机制,防止死锁。
6)具有非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
算法
3 Redisson实现Redis分布式锁以及实现原理
3.1 添加依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.12.4</version>
</dependency>
3.2 测试查看
库存数量100,调用一次减1,小于等于0的时候返回false,表示下单失败。spring
@Component
public class RedissonLock {
private static Integer inventory = 100;
/**
* 测试
*
* @return true:下单成功 false:下单失败
*/
public Boolean redisLockTest(){
// 获取锁实例
RLock inventoryLock = RedissonService.getRLock("inventory-number");
try {
// 加锁
inventoryLock.lock();
if (inventory <= 0){
return false;
}
inventory--;
System.out.println("线程名称:" + Thread.currentThread().getName() + "剩余数量:" + RedissonLock.inventory);
}catch (Exception e){
e.printStackTrace();
}finally {
// 释放锁
inventoryLock.unlock();
}
return true;
}
}
用jmeter进行压测:数据库
线程组100执行20秒:微信
响应断言true为正确,false为失败:数据结构
结果:并发
3.3 获取锁的实例
RLock inventoryLock = RedissonService.getRLock("inventory-number");这段就是获取锁的实例,inventory-number为指定锁名称,进去getLock(String name)方法以后就能看到获取锁的实例就是在RedissonLock构造方法中,初始化一些属性。
public RLock getLock(String name) {
return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}
看下RedissonLock的构造函数:
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
//命令执行器
this.commandExecutor = commandExecutor;
//UUID字符串(MasterSlaveConnectionManager类的构造函数 传入UUID)
this.id = commandExecutor.getConnectionManager().getId();
//内部锁过时时间(防止死锁,默认时间为30s)
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
//uuid+传进来的锁名称
this.entryName = this.id + ":" + name;
//redis消息体
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
内部锁过时时间(默认30s,若是超过这个时间业务代码尚未执行完毕,那么过时时间会自动续约):
3.4 加锁
inventoryLock.lock();这段代码表示加锁,一步一步进去源码里面看看,进来首先看到以下lock()方法:
public void lock() {
try {
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
能够看到这里设置了一些默认值,而后继续调用了带参lock()方法,也是在这里,完成了加锁的逻辑,源码以下:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 线程ID
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
// 若是过时时间等于null,则表示获取到锁,直接返回,不等于null继续往下执行
if (ttl != null) {
// 若是获取锁失败,则订阅到对应这个锁的channel
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
// 可中断订阅
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
// 不可中断订阅
this.commandExecutor.syncSubscription(future);
}
try {
// 不断循环
while(true) {
// 再次尝试获取锁
ttl = this.tryAcquire(leaseTime, unit, threadId);
// ttl(过时时间)为空,说明成功获取锁,返回
if (ttl == null) {
return;
}
// ttl(过时时间)大于0 则等待ttl时间后继续尝试获取
if (ttl >= 0L) {
try {
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var13) {
if (interruptibly) {
throw var13;
}
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
((RedissonLockEntry)future.getNow()).getLatch().acquire();
} else {
((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
}
}
} finally {
// 取消对channel的订阅
this.unsubscribe(future, threadId);
}
}
}
再来看下获取锁的tryAcquire方法:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}
进去看下tryAcquireAsync方法:
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 有设置过时时间
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 没有设置过时时间
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
-
tryLockInnerAsync方法是真正执行获取锁的逻辑,它是一段LUA脚本代码。在这里,它使用的是hash数据结构。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
// 若是锁不存在,则经过hset设置它的值,并设置过时时间
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
// 若是锁已存在,而且锁的是当前线程,则经过hincrby给数值递增1(这里显示了redis分布式锁的可重入性)
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
// 若是锁已存在,但并不是本线程,则返回过时时间ttl
return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
KEYS[1]表明的是你加锁的那个key,好比说:RLock inventoryLock = RedissonService.getRLock("inventory-number");这里你本身设置了加锁的那个锁key就是"inventory-number"。
ARGV[1]表明的就是锁key的默认生存时间,上面也截图看了,默认时间为30秒。
ARGV[2]表明的是加锁的客户端的ID,相似于后面这样: 8743c9c0-0795-4907-87fd-6c719a6b4586:1
上面这段LUA代码看起来也不是很复杂,其中有三个判断:
经过exists判断锁存不存在,若是锁不存在,则设置值和过时时间,加锁成功。
经过hexists判断,若是锁已存在,而且锁的是当前线程,则证实是重入锁,加锁成功,ARGV[2]的value+1,原来是1,如今变为2,固然,释放的时候也要释放两次。
若是锁已存在,但锁的不是当前线程,则证实有其余线程持有锁。返回当前锁的过时时间,加锁失败

3.5 解锁
inventoryLock.unlock();这段代码表示解锁,跟刚才同样,一步一步进去源码里面看看,进来首先看到以下unlock()方法:
public void unlock() {
try {
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException var2) {
if (var2.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)var2.getCause();
} else {
throw var2;
}
}
}
进去unlockAsync()查看,这是解锁的方法:
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
// 释放锁的方法
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
// 添加监听器 解锁opStatus:返回值
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
//若是返回null,则证实解锁的线程和当前锁不是同一个线程,抛出异常
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
// 解锁成功
result.trySuccess((Object)null);
}
});
return result;
}
再进去看下释放锁的方法:unlockInnerAsync():
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 若是释放锁的线程和已存在锁的线程不是同一个线程,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end;
// 若是是同一个线程,就经过hincrby减1的方式,释放一次锁
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
// 若剩余次数大于0 ,则刷新过时时间
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
// 其余就证实锁已经释放,删除key并发布锁释放的消息
else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end;
return nil;",
Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
}
上述代码是释放锁的逻辑。一样的,它也是有三个判断:
若是解锁的线程和当前锁的线程不是同一个,解锁失败,抛出异常。
若是解锁的线程和当前锁的线程是同一个,就经过hincrby减1的方式,释放一次锁。若剩余次数还大于0,则证实是重入锁,再次刷新过时时间。
锁已不存在,经过publish发布锁释放的消息,解锁成功

到这里就结束了,眼过千百不如手过一遍,本身试试就明白了,各位老板看到这里能不能点个赞,鄙人想看看恐怖如斯的二级世界,谢谢各位!
本文分享自微信公众号 - 一个快乐又痛苦的程序员(AsuraTechnology)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。