学 无 止 境 , 与 君 共 勉 。java
本文主要介绍经过Redis本身去实现分布式锁以及使用开源框架Redisson去实现分布式锁,基于数据库和Zookeeper方式简要带过。nginx
分布式锁本质上能够理解为是一个全部客户端共享的全局变量,当这个全局变量存在时,说明已经有客户端获取到了锁,其余客户端只能等它释放锁(删除这个全局变量)后才能获取到锁(设置全局变量)。git
按照上面的特性和理论,咱们整理一下基本思路:github
使用如下指令:web
SET mylock userId NX PX 10000
复制代码
当业务完成后删除key来释放锁,能够执行如下lua脚本:redis
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
复制代码
执行以上脚本时,须要将mylock
做为KEYS[1]
传进去,将userId
做为ARGV[1]
传进去spring
# 当key不存在时设置值
setnx mylock userId
# 设置过时时间
expire mylock 10
复制代码
这样会存在一个问题,若是系统在执行完
setnx
以后异常了,expire
指令就没法执行,一样会出现死锁现象sql
value
设置为一个惟一的用户标识,用于保证所要释放的锁是本身创建的,由于在极端的状况下会出现下列状况:A成功获取了锁数据库
A在某个操做上被阻塞了好久apache
A的锁到达过时时间
B获取了锁
A从阻塞中恢复了,执行释放锁操做,把B的锁释放了,致使B操做不受保护
Lua
脚原本实现。它将GET、判断是否相同、DEL三个步骤以一个原子性的方式去完成。若是按逻辑分开执行一样会出现相似上面的问题:A先判断当前锁的值,肯定了是本身建的锁,准备释放锁了
由于网路问题或者系统卡顿致使A被阻塞了
A的锁过时了
B获取锁
A从阻塞中恢复了
A调用DEL释放了B的锁
从上面的描述能够看出来,当出现系统阻塞或者网络延迟等状况下,可能业务尚未执行完成,锁就过时自动释放了,这时它的业务操做时不受保护的。
本文样例基于SpringBoot实现
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis Lettuce 模式 链接池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
复制代码
spring:
redis:
# Redis数据库索引(默认为0)
database: 0
# Redis服务器地址
host: localhost
# Redis服务器链接端口
port: 6379
# Redis服务器链接密码(默认为空)
# password: admin
# 链接超时时间(毫秒)
timeout: 3000ms
lettuce:
pool:
# 链接池最大链接数(使用负值表示没有限制)
max-active: 20
# 链接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: 3000ms
# 链接池中的最大空闲链接(负数没有限制)
max-idle: 8
# 链接池中的最小空闲链接
min-idle: 0
复制代码
@Component
public class RedisLock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 加锁
*/
public boolean tryLock(String key, String value) {
Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, value, 5, TimeUnit.SECONDS);
if (isLocked == null) {
return false;
}
return isLocked;
}
/**
* 解锁
*/
public Boolean unLock(String key, String value) {
// 执行 lua 脚本
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 指定 lua 脚本
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("redis/unLock.lua")));
// 指定返回类型
redisScript.setResultType(Long.class);
// 参数一:redisScript,参数二:key列表,参数三:arg(可多个)
Long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
return result != null && result > 0;
}
}
复制代码
释放锁须要执行Lua脚本,路径为:resources/redis/unLock.lua
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
复制代码
模拟一个减库存的操做,先在redis中设置库存量50,key为productKey,建立访问接口:
@RestController
@RequestMapping("/redis")
public class RedisController {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final String PRODUCT_KEY = "productKey";
private static final String LOCK_KEY = "redisLock";
@Autowired
private RedisLock redisLock;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@GetMapping("/lock")
public void lockTest() throws InterruptedException {
// 用户惟一标识
String lockValue = UUID.randomUUID().toString().replace("-", "");
Random random = new Random();
int sleepTime;
while (true) {
if (redisLock.tryLock(LOCK_KEY, lockValue)) {
logger.info("[{}]成功获取锁", lockValue);
break;
}
sleepTime = random.nextInt(1000);
Thread.sleep(sleepTime);
logger.info("[{}]获取锁失败,{}毫秒后从新尝试获取锁", lockValue, sleepTime);
}
// 剩余库存
String products = stringRedisTemplate.opsForValue().get(PRODUCT_KEY);
if (products == null) {
logger.info("[{}]获取剩余库存失败,释放锁:{} @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@", lockValue, redisLock.unLock(LOCK_KEY, lockValue));
return;
}
int surplus = Integer.parseInt(products);
if (surplus <= 0) {
logger.info("[{}]库存不足,释放锁:{} ##########################################", lockValue, redisLock.unLock(LOCK_KEY, lockValue));
return;
}
logger.info("[{}]当前库存[{}],操做:库存-1", lockValue, surplus);
stringRedisTemplate.opsForValue().decrement(PRODUCT_KEY);
logger.info("[{}]操做完成,开始释放锁,释放结果:{}", lockValue, redisLock.unLock(LOCK_KEY, lockValue));
}
}
复制代码
启动项目,使用JMeter进行并发测试,设置1秒60次请求,观察控制台输出和最终redis中库存数量
Redisson是【Redis官方推荐】官网推荐分布式锁实现的方案。使用起来也很简单。这里只作简单演示,具体能够看官方文档。
Redis son 莫非是redis亲儿子的意思
直接引入redisson-spring-boot-starter
,它包含了对spring-boot-starter-web
和spring-boot-starter-data-redis
的依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.12.0</version>
</dependency>
复制代码
@Configuration
public class RedissonConfig {
/**
* 这里只配置单节点的,支持集群、哨兵等方式配置
* 能够用Config.fromYAML加载yml文件中的配置
*/
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://localhost:6379")
.setDatabase(0);
return Redisson.create(config);
}
}
复制代码
注意这里的address须要以 redis://host:port 的格式
@RestController
@RequestMapping("/redisson")
public class RedissonController {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final String PRODUCT_KEY = "productKey";
private static final String LOCK_KEY = "redissonLock";
@Autowired
private RedissonClient redissonClient;
@RequestMapping("/lock")
public void lock() {
RLock lock = redissonClient.getLock(LOCK_KEY);
// 设置5秒过时时间
lock.lock(5, TimeUnit.SECONDS);
String lockValue = lock.toString();
logger.info("[{}]成功获取锁,开始执行业务。。。", lockValue);
RAtomicLong atomicLong = redissonClient.getAtomicLong(PRODUCT_KEY);
long surplus = atomicLong.get();
if (surplus <= 0) {
lock.unlock();
logger.info("[{}]库存不足,释放锁 ##########################################", lockValue);
return;
}
logger.info("[{}]当前库存[{}],库存 -1,剩余库存[{}]", lockValue, surplus, atomicLong.decrementAndGet());
logger.info("[{}]操做完成,释放锁", lockValue);
lock.unlock();
}
}
复制代码
启动项目,使用JMeter进行并发测试,一样设置1秒60次请求,观察控制台输出和最终redis中库存数量
# 创建一张记录锁信息的表
lockName -- 锁名称。 加上惟一索引,确保只能有一个客户端得到锁
creater -- 建立人,只有建立者才能解锁
expire -- 过时时间
复制代码
lockName
作了惟一性约束,若是多个请求同时提交只会有一个请求提交成功。version
version
的值 select version from product where product_name = '电脑'
复制代码
version
的值做为条件 update product set product_count = product_count - 1, version = version + 1 where product_name = '电脑' and version = ${version}
复制代码
这样若是在这期间数据被修改了,那么version的值就不一致了,更新操做会失败。这样就确保了在你业务期间没有其余人修改过数据。
ZooKeeper的分布式锁主要是经过建立临时有序节点的方式实现的:
能够发现,ZooKeeper的方式获取锁是有序的,先请求的先获取锁,而经过redis的方式是无序的,谁先抢到谁得到锁
全部代码均上传至Github上,方便你们访问
创做不易,若是各位以为有帮助,求点赞 支持