你们好,我是walking,原文首发于公众号编程大道。感谢你打开这篇文章,请认真阅读下去吧。 今天咱们聊聊redis的一个实际开发的使用场景,那就是大名鼎鼎的分布式锁。html
咱们学习 Java 都知道锁的概念,例如基于 JVM 实现的同步锁 synchronized,以及 jdk 提供的一套代码级别的锁机制 lock,咱们在并发编程中会常常用这两种锁去保证代码在多线程环境下运行的正确性。可是这些锁机制在分布式场景下是不适用的,缘由是在分布式业务场景下,咱们的代码都是跑在不一样的JVM甚至是不一样的机器上,synchronized 和 lock 只能在同一个 JVM 环境下起做用。因此这时候就须要用到分布式锁了。java
例如,如今有个场景就是整点抢消费券(疫情的缘由,支付宝最近在8点、12点整点开放抢消费券),消费券有一个固定的量,先到先得,抢完就没了,线上的服务都是部署多个的,大体架构以下:nginx
因此这个时候咱们就得用分布式锁来保证共享资源的访问的正确性。git
假设不使用分布式锁,咱们看看 synchronized 能不能保证?实际上是不能的,咱们来演示一下。github
下面我写了一个简单的 springboot 项目来模拟这个抢消费券的场景,代码很简单,大体意思是先从 Redis 获取剩余消费券数,而后判断大于0,则减一模拟被某个用户抢到一个,而后减一后再修改 Redis 的剩余消费券数量,打印扣减成功,剩余还有多少,不然扣减失败,就没抢到。整块代码被 synchronized 包裹,Redis 设置的库存数量为50。web
//假设库存编号是00001
private String key = "stock:00001";
@Autowired
private StringRedisTemplate stringRedisTemplate;
/** * 扣减库存 synchronized同步锁 */
@RequestMapping("/deductStock")
public String deductStock(){
synchronized (this){
//获取当前库存
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
if(stock>0){
int afterStock = stock-1;
stringRedisTemplate.opsForValue().set(key,afterStock+"");//修改库存
System.out.println("扣减库存成功,剩余库存"+afterStock);
}else {
System.out.println("扣减库存失败");
}
}
return "ok";
}
复制代码
而后启动两个springboot项目,端口分别为8080,8081,而后在nginx里配置负载均衡redis
upstream redislock{
server 127.0.0.1:8080;
server 127.0.0.1:8081;
}
server {
listen 80;
server_name 127.0.0.1;
location / {
root html;
index index.html index.htm;
proxy_pass http://redislock;
}
}
复制代码
而后用jmeter压测工具进行测试spring
而后咱们看一下控制台输出,能够看到咱们运行的两个web实例,不少一样的消费券被不一样的线程抢到,证实synchronized在这样的状况下是不起做用的,因此就须要使用分布式锁来保证资源的正确性。编程
在实现分布式锁以前,咱们先考虑如何实现,以及都要实现锁的哪些功能。springboot
一、分布式特性(部署在多个机器上的实例都可以访问这把锁)
二、排他性(同一时间只能有一个线程持有锁)
三、超时自动释放的特性(持有锁的线程须要给定必定的持有锁的最大时间,防止线程死掉没法释放锁而形成死锁)
四、...
基于以上列出的分布式锁须要拥有的基本特性,咱们思考一下使用Redis该如何实现?
一、第一个分布式的特性Redis已经支持,多个实例连同一个Redis便可
二、第二个排他性,也就是要实现一个独占锁,可使用Redis的setnx命令实现
三、第三个超时自动释放特性,Redis能够针对某个key设置过时时间
四、执行完毕释放分布式锁
科普时间 Redis Setnx 命令 Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值 语法 redis Setnx 命令基本语法以下: redis 127.0.0.1:6379> SETNX KEY_NAME VALUE 可用版本:>= 1.0.0 返回值:设置成功,返回1, 设置失败,返回0
@RequestMapping("/stock_redis_lock")
public String stock_redis_lock(){
//底层使用setnx命令
Boolean aTrue = stringRedisTemplate.opsForValue().setIfAbsent(lock_key, "true");
stringRedisTemplate.expire(lock_key,10, TimeUnit.SECONDS);//设置过时时间10秒
if (!aTrue) {//设置失败则表示没有拿到分布式锁
return "error";//这里能够给用户一个友好的提示
}
//获取当前库存
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
if(stock>0){
int afterStock = stock-1;
stringRedisTemplate.opsForValue().set(key,afterStock+"");
System.out.println("扣减库存成功,剩余库存"+afterStock);
}else {
System.out.println("扣减库存失败");
}
stringRedisTemplate.delete(lock_key);//执行完毕释放分布式锁
return "ok";
}
复制代码
仍然设置库存数量为50,咱们再用jmeter测试一下,把jmeter的测试地址改成127.0.0.1/stock_redis_lock,一样的设置再来测一次。
测试了5次没有出现脏数据,把发送时间改成0,测了5次也没问题,而后又把线程数改成600,时间为0 ,循环4次,测了几回也是正常的。
上面实现分布式锁的代码已是一个较为成熟的分布式锁的实现了,对大多数软件公司来讲都已经知足需求了。可是上面代码仍是有优化的空间,例如:
1)上面的代码咱们是没有考虑异常状况的,实际状况下代码没有这么简单,可能还会有别的不少复杂的操做,都有可能会出现异常,因此咱们释放锁的代码须要放在finally块里来保证即便是代码抛异常了释放锁的代码他依然会被执行。
2)还有,你有没有注意到,上面咱们的分布式锁的代码的获取和设置过时时间的代码是两步操做第4行和第5行,即非原子操做,就有可能刚执行了第4行还没来得及执行第5行这台机器挂了,那么这个锁就没有设置超时时间,其余线程就一直没法获取,除非人工干预,因此这是一步优化的地方,Redis也提供了原子操做,那就是SET key value EX seconds NX
科普时间 SET key value [EX seconds] [PX milliseconds] [NX|XX] 将字符串值 value 关联到 key 可选参数 从 Redis 2.6.12 版本开始, SET 命令的行为能够经过一系列参数来修改:
- EX second :设置键的过时时间为 second 秒。SET key value EX second 效果等同于 SETEX key second value
- PX millisecond :设置键的过时时间为 millisecond 毫秒。SET key value PX millisecond 效果等同于 PSETEX key millisecond value
- NX :只在键不存在时,才对键进行设置操做。SET key value NX 效果等同于 SETNX key value
- XX :只在键已经存在时,才对键进行设置操做
SpringBoot的StringRedisTemplate也有对应的方法实现,以下代码:
//假设库存编号是00001
private String key = "stock:00001";
private String lock_key = "lock_key:00001";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/stock_redis_lock")
public String stock_redis_lock() {
try {
//原子的设置key及超时时间
Boolean aTrue = stringRedisTemplate.opsForValue().setIfAbsent(lock_key, "true", 30, TimeUnit.SECONDS);
if (!aTrue) {
return "error";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
if (stock > 0) {
int afterStock = stock - 1;
stringRedisTemplate.opsForValue().set(key, afterStock + "");
System.out.println("扣减库存成功,剩余库存" + afterStock);
} else {
System.out.println("扣减库存失败");
}
} catch (NumberFormatException e) {
e.printStackTrace();
} finally {
//避免死锁
stringRedisTemplate.delete(lock_key);
}
return "ok";
}
复制代码
这样实现是否就完美了呢?嗯,对于并发量要求不高或者非大并发的场景的话这样实现已经能够了。可是对于抢购 ,秒杀这样的场景,当流量很大,这时候服务器网卡、磁盘IO、CPU负载均可能会达到极限,那么服务器对于一个请求的的响应时间势必变得比正常状况下慢不少,那么假设就刚才设置的锁的超时时间为10秒,若是某一个线程拿到锁以后由于某些缘由没能在10秒内执行完毕锁就失效了,这时候其余线程就会抢占到分布式锁去执行业务逻辑,而后以前的线程执行完了,会去执行 finally 里的释放锁的代码就会把正在占有分布式锁的线程的锁给释放掉,实际上刚刚正在占有锁的线程还没执行完,那么其余线程就又有机会得到锁了...这样整个分布式锁就失效了,将会产生意想不到的后果。以下图模拟了这个场景。
因此这个问题总结一下,就是由于锁的过时时间设置的不合适或由于某些缘由致使代码执行时间大于锁过时时间而致使并发问题以及锁被别的线程释放,以致于分布式锁混乱。在简单的说就是两个问题,1)本身的锁被别人释放 2)锁超时没法续时间。
第一个问题很好解决,在设置分布式锁时,咱们在当前线程中生产一个惟一串将value设置为这个惟一值,而后在finally块里判断当前锁的value和本身设置的同样时再去执行delete,以下:
String uuid = UUID.randomUUID().toString();
try {
//原子的设置key及超时时间,锁惟一值
Boolean aTrue = stringRedisTemplate.opsForValue().setIfAbsent(lock_key,uuid,30,TimeUnit.SECONDS);
//...
} finally {
//是本身设置的锁再执行delete
if(uuid.equals(stringRedisTemplate.opsForValue().get(lock_key))){
stringRedisTemplate.delete(lock_key);//避免死锁
}
}
复制代码
问题一解决了(设想一下上述代码还有什么问题,一下子讲),那锁的超时时间就很关键了,不能太大也不能过小,这就须要评估业务代码的执行时间,好比设置个10秒,20秒。即便是你的锁设置了合适的超时时间,也避免不了可能会发生上述分析的由于某些缘由代码没在正常评估的时间内执行完毕,因此这时候的解决方案就是给锁续超时时间。大体思路就是,业务线程单独起一个分线程,定时去监听业务线程设置的分布式锁是否还存在,存在就说明业务线程还没执行完,那么就延长锁的超时时间,若锁已不存在则业务线程执行完毕,而后就结束本身。
**“锁续命”**的这套逻辑属实有点复杂啊,要考虑的问题太多了,稍不注意就会有bug。不要看上面实现分布式锁的代码没有几行,就认为实现起来很简单,若是说本身去实现的时候没有实际高并发的经验,确定也会踩不少坑,例如,
1)锁的设置和过时时间的设置是非原子操做的,就可能会致使死锁。
2)还有上面遗留的一个,在finally块里判断锁是不是本身设置的,是的话再删除锁,这两步操做也不是原子的,假设刚判断完为true服务就挂了,那么删除锁的代码不会执行,就会形成死锁,即便是设置了过时时间,在没过时这段时间也会死锁。因此这里也是一个注意的点,要保证原子操做的话,Redis提供了执行Lua脚本的功能来保证操做的原子性,具体怎么使用再也不展开。
因此,**“锁续命”**的这套逻辑实现起来仍是有点复杂的,好在市面上已经有现成的开源框架帮咱们实现了,那就是Redisson。
实现原理:
一、首先Redisson会尝试进行加锁,加锁的原理也是使用相似Redis的setnx命令原子的加锁,加锁成功的话其内部会开启一个子线程 二、子线程主要负责监听,其实就是一个定时器,定时监听主线程是否还持有锁,持有则将锁的时间延时,不然结束线程 三、若是加锁失败则自旋不断尝试加锁 四、执行完代码主线程主动释放锁
那咱们看一下使用后Redisson后的代码是什么样的。
一、首先在pom.xml文件添加Redisson的maven坐标
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.5</version>
</dependency>
复制代码
二、咱们要拿到Redisson的这个对象,以下配置Bean
@SpringBootApplication
public class RedisLockApplication {
public static void main(String[] args) {
SpringApplication.run(RedisLockApplication.class, args);
}
@Bean
public Redisson redisson(){
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379")
.setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
复制代码
三、而后咱们获取Redisson的实例,使用其API进行加锁释放锁操做
//假设库存编号是00001
private String key = "stock:00001";
private String lock_key = "lock_key:00001";
@Autowired
private StringRedisTemplate stringRedisTemplate;
/** * 使用Redisson实现分布式锁 * @return */
@RequestMapping("/stock_redisson_lock")
public String stock_redisson_lock() {
RLock redissonLock = redisson.getLock(lock_key);
try {
redissonLock.lock();
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
if (stock > 0) {
int afterStock = stock - 1;
stringRedisTemplate.opsForValue().set(key, afterStock + "");
System.out.println("扣减库存成功,剩余库存" + afterStock);
} else {
System.out.println("扣减库存失败");
}
} catch (NumberFormatException e) {
e.printStackTrace();
} finally {
redissonLock.unlock();
}
return "ok";
}
复制代码
看这个Redisson的分布式锁提供的API是否是很是的简单?就像Java并发变成里AQS那套Lock机制同样,以下获取一把RedissonLock
RLock redissonLock = redisson.getLock(lock_key);
复制代码
默认返回的是RedissonLock的对象,该对象实现了RLock接口,而RLock接口继承了JDK并发编程报包里的Lock接口
在使用Redisson加锁时,它也提供了不少API,以下
如今咱们选择使用的是最简单的无参lock方法,简单的点进去跟一下看看他的源码,咱们找到最终的执行加锁的代码以下:
咱们能够看到其底层使用了Lua脚原本保证原子性,使用Redis的hash结构实现的加锁,以及可重入锁。
比咱们本身实现分布式锁看起来还要简单,可是咱们本身写的锁功能他都有,咱们没有的他也有。好比,他实现的分布式锁是支持可重入的,也支持可等待,即尝试等待必定时间,没拿到锁就返回false。上述代码中的redissonLock.lock();是一直等待,内部自旋尝试加锁。
Distributed Java locks and synchronizers
Lock
FairLock
MultiLock
RedLock
ReadWriteLock
Semaphore
PermitExpirableSemaphore
CountDownLatch
redisson.org
Redisson提供了丰富的API,内部运用了大量的Lua脚本保证原子操做,篇幅缘由redisson实现锁的代码暂不分析了。
注意:在上述示例代码中,为了方便演示,查询redis库存、修改库存并不是原子操做,实际这两部操做也得保证原子行,能够用redis自带的Lua脚本功能去实现
到这里,Redis分布式锁实战基本就讲完了,总结一下Redis分布式锁吧。
一、若是说是本身实现的话,须要特别注意四点:
二、若是使用现成的分布式锁框架Redisson,就须要熟悉一下其经常使用的API以及实现原理,或者选择其余开源的分布式锁框架,充分考察,选择适合本身业务需求的便可。
参考
doc.redisfans.com/string/set.…
若是以为本文对你有帮助的话,请不要吝啬你的赞哦。
更多干货文章欢迎关注公众号:编程大道
公众号也有好多关于Redis的技术文,欢迎关注哦
另外walking本人呢也在整理Redis相关的知识点,作成思惟导图的形式,不过还没最终整理完,已经整理了好几天啦,关注公众号,整理好了会经过公众号推送给你们~