Redis修行 — 分布式锁

java

常见的实现方式

  • 基于数据库的分布式锁
  • 基于缓存的分布式锁(redis,memcached等)
  • 基于ZooKeeper的分布式锁(临时有序节点)

本文主要介绍经过Redis本身去实现分布式锁以及使用开源框架Redisson去实现分布式锁,基于数据库和Zookeeper方式简要带过。nginx

特性

  • 互斥性:只能有一个客户端持有锁
  • 防死锁:客户端在持有锁期间崩溃,未能解锁,也有其余方式去解锁,不影响其余客户端获取锁
  • 只有加锁的人才能释放锁

原理

分布式锁本质上能够理解为是一个全部客户端共享的全局变量,当这个全局变量存在时,说明已经有客户端获取到了锁,其余客户端只能等它释放锁(删除这个全局变量)后才能获取到锁(设置全局变量)。git

基于Redis实现分布式锁

按照上面的特性和理论,咱们整理一下基本思路:github

  • 指定一个key做为锁标记,存入Redis中,指定一个惟一的用户标识做为value
  • 当key不存在时才能设置值,确保同一时间只有一个客户端得到锁,知足互斥性特性
  • 设置一个过时时间,防止因系统异常致使没能删除这个key,知足防死锁特性
  • 当处理完业务以后须要清除这个key来释放锁。
  • 清除key时须要校验value值,须要知足只有加锁的人才能释放锁

获取锁

使用如下指令:web

SET mylock userId NX PX 10000
复制代码
  • mylock为锁对应的key
  • userId为惟一的用户标识,用于删除时校验
  • NX表示只有当key不存在时才能set成功,确保只有一个客户端可以请求成功
  • PX 10000表示这个锁有一个10秒的自动过时时间

释放锁

当业务完成后删除key来释放锁,能够执行如下lua脚本:redis

if redis.call("get",KEYS[1]) == ARGV[1then
    return redis.call("del",KEYS[1])
else
    return 0
end
复制代码

执行以上脚本时,须要将mylock做为KEYS[1]传进去,将userId做为ARGV[1]传进去spring

注意点

  • 必需要给锁加一个过时时间:这样即便中间系统异常了,等过时时间到了,也能够自动释放锁,防止出现死锁现象
  • 获取锁时不能分红先设置key,再设置过时时间两步去执行,错误示例以下:
    # 当key不存在时设置值
    setnx mylock userId
    # 设置过时时间
    expire mylock 10
复制代码

这样会存在一个问题,若是系统在执行完setnx以后异常了,expire指令就没法执行,一样会出现死锁现象sql

  • 有必要将value设置为一个惟一的用户标识,用于保证所要释放的锁是本身创建的,由于在极端的状况下会出现下列状况:

A成功获取了锁数据库

A在某个操做上被阻塞了好久apache

A的锁到达过时时间

B获取了锁

A从阻塞中恢复了,执行释放锁操做,把B的锁释放了,致使B操做不受保护

  • 释放锁操做须要保证操做时原子性的,须要经过Lua脚原本实现。它将GET、判断是否相同、DEL三个步骤以一个原子性的方式去完成。若是按逻辑分开执行一样会出现相似上面的问题:

A先判断当前锁的值,肯定了是本身建的锁,准备释放锁了

由于网路问题或者系统卡顿致使A被阻塞了

A的锁过时了

B获取锁

A从阻塞中恢复了

A调用DEL释放了B的锁

缺陷

从上面的描述能够看出来,当出现系统阻塞或者网络延迟等状况下,可能业务尚未执行完成,锁就过时自动释放了,这时它的业务操做时不受保护的。

代码实现

本文样例基于SpringBoot实现

pom.xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- redis Lettuce 模式 链接池 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>
复制代码
yml配置文件
spring:
  redis:
    # Redis数据库索引(默认为0)
    database: 0
    # Redis服务器地址
    host: localhost
    # Redis服务器链接端口
    port: 6379
    # Redis服务器链接密码(默认为空)
    # password: admin
    # 链接超时时间(毫秒)
    timeout: 3000ms
    lettuce:
      pool:
        # 链接池最大链接数(使用负值表示没有限制)
        max-active: 20
        # 链接池最大阻塞等待时间(使用负值表示没有限制)
        max-wait: 3000ms
        # 链接池中的最大空闲链接(负数没有限制)
        max-idle: 8
        # 链接池中的最小空闲链接
        min-idle: 0
复制代码
锁操做
@Component
public class RedisLock {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 加锁
     */

    public boolean tryLock(String key, String value) {
        Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, value, 5, TimeUnit.SECONDS);
        if (isLocked == null) {
            return false;
        }
        return isLocked;
    }

    /**
     * 解锁
     */

    public Boolean unLock(String key, String value) {
        // 执行 lua 脚本
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        // 指定 lua 脚本
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("redis/unLock.lua")));
        // 指定返回类型
        redisScript.setResultType(Long.class);
        // 参数一:redisScript,参数二:key列表,参数三:arg(可多个)
        Long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
        return result != null && result > 0;
    }
}
复制代码

释放锁须要执行Lua脚本,路径为:resources/redis/unLock.lua

if redis.call("get",KEYS[1]) == ARGV[1then
  return redis.call("del",KEYS[1])
else
  return 0
end
复制代码
测试

模拟一个减库存的操做,先在redis中设置库存量50,key为productKey,建立访问接口:

@RestController
@RequestMapping("/redis")
public class RedisController {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static final String PRODUCT_KEY = "productKey";

    private static final String LOCK_KEY = "redisLock";

    @Autowired
    private RedisLock redisLock;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @GetMapping("/lock")
    public void lockTest() throws InterruptedException {
        // 用户惟一标识
        String lockValue = UUID.randomUUID().toString().replace("-""");
        Random random = new Random();
        int sleepTime;
        while (true) {
            if (redisLock.tryLock(LOCK_KEY, lockValue)) {
                logger.info("[{}]成功获取锁", lockValue);
                break;
            }
            sleepTime = random.nextInt(1000);
            Thread.sleep(sleepTime);
            logger.info("[{}]获取锁失败,{}毫秒后从新尝试获取锁", lockValue, sleepTime);
        }
        // 剩余库存
        String products = stringRedisTemplate.opsForValue().get(PRODUCT_KEY);
        if (products == null) {
            logger.info("[{}]获取剩余库存失败,释放锁:{} @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@", lockValue, redisLock.unLock(LOCK_KEY, lockValue));
            return;
        }
        int surplus = Integer.parseInt(products);
        if (surplus <= 0) {
            logger.info("[{}]库存不足,释放锁:{} ##########################################", lockValue, redisLock.unLock(LOCK_KEY, lockValue));
            return;
        }

        logger.info("[{}]当前库存[{}],操做:库存-1", lockValue, surplus);
        stringRedisTemplate.opsForValue().decrement(PRODUCT_KEY);
        logger.info("[{}]操做完成,开始释放锁,释放结果:{}", lockValue, redisLock.unLock(LOCK_KEY, lockValue));
    }
}
复制代码

启动项目,使用JMeter进行并发测试,设置1秒60次请求,观察控制台输出和最终redis中库存数量

Redisson 实现

Redisson是【Redis官方推荐】官网推荐分布式锁实现的方案。使用起来也很简单。这里只作简单演示,具体能够看官方文档

Redis son 莫非是redis亲儿子的意思

pom.xml

直接引入redisson-spring-boot-starter,它包含了对spring-boot-starter-webspring-boot-starter-data-redis的依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.12.0</version>
</dependency>
复制代码

建立配置文件

@Configuration
public class RedissonConfig {
    /**
     * 这里只配置单节点的,支持集群、哨兵等方式配置
     * 能够用Config.fromYAML加载yml文件中的配置
     */

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://localhost:6379")
                .setDatabase(0);
        return Redisson.create(config);
    }
}
复制代码

注意这里的address须要以 redis://host:port 的格式

建立测试接口

@RestController
@RequestMapping("/redisson")
public class RedissonController {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static final String PRODUCT_KEY = "productKey";

    private static final String LOCK_KEY = "redissonLock";

    @Autowired
    private RedissonClient redissonClient;

    @RequestMapping("/lock")
    public void lock() {
        RLock lock = redissonClient.getLock(LOCK_KEY);
        // 设置5秒过时时间
        lock.lock(5, TimeUnit.SECONDS);
        String lockValue = lock.toString();
        logger.info("[{}]成功获取锁,开始执行业务。。。", lockValue);

        RAtomicLong atomicLong = redissonClient.getAtomicLong(PRODUCT_KEY);
        long surplus = atomicLong.get();
        if (surplus <= 0) {
            lock.unlock();
            logger.info("[{}]库存不足,释放锁 ##########################################", lockValue);
            return;
        }
        logger.info("[{}]当前库存[{}],库存 -1,剩余库存[{}]", lockValue, surplus, atomicLong.decrementAndGet());

        logger.info("[{}]操做完成,释放锁", lockValue);
        lock.unlock();
    }
}
复制代码

启动项目,使用JMeter进行并发测试,一样设置1秒60次请求,观察控制台输出和最终redis中库存数量

基于数据库实现分布式锁

经过惟一索引的方式

# 创建一张记录锁信息的表
lockName -- 锁名称。 加上惟一索引,确保只能有一个客户端得到锁
creater -- 建立人,只有建立者才能解锁
expire -- 过时时间
复制代码
  • 执行前先插入锁数据,lockName作了惟一性约束,若是多个请求同时提交只会有一个请求提交成功。
  • 执行完后删除锁
  • 能够经过定时任务方式去删除已过时的数据,防止死锁

经过乐观锁的形式

  • 在须要操做的表中加一个字段version
  • 操做任务前先查询到当前version的值
    select version from product where product_name = '电脑'
复制代码
  • 更新数据时,将前面查出来的version的值做为条件
    update product set product_count = product_count - 1version = version + 1 where product_name = '电脑' and version = ${version}
复制代码

这样若是在这期间数据被修改了,那么version的值就不一致了,更新操做会失败。这样就确保了在你业务期间没有其余人修改过数据。

基于 ZooKeeper 的分布式锁

ZooKeeper的分布式锁主要是经过建立临时有序节点的方式实现的:

  • 发起加锁请求,在ZooKeeper中建立一个临时有序节点
  • 判断本身建立的节点是不是最小序号
  • 若是是最小的,则成功获取锁
  • 若是不是最小的,则在它的上一节点加上一个监听器
  • 处理完业务后,释放锁,即删除对应的节点
  • ZooKeeper通知监听这个节点的监听器,你的前面已经没有其余节点了,你能够获取锁了
  • 对应节点获取锁

能够发现,ZooKeeper的方式获取锁是有序的,先请求的先获取锁,而经过redis的方式是无序的,谁先抢到谁得到锁

访问源码

全部代码均上传至Github上,方便你们访问

>>>>>> Redis实现分布式锁 <<<<<<

平常求赞

创做不易,若是各位以为有帮助,求点赞 支持

新建公众号,求关注

相关文章
相关标签/搜索