面试官问redis分布式锁,如何设计才能让他满意?

前言java

对于分布式锁的问题我也查过不少资料,感受不少方式实现的并不完善,或者看着云里雾里的,不知因此然,因而就整理了这篇文章,但愿对您有用,有写的不对的地方,欢迎留言指正。redis

首先我们来聊聊什么是分布式锁,到底解决了什么问题?直接看代码数据库

$stock = $this->getStockFromDb();//查询剩余库存
 if ($stock>0){
       $this->ReduceStockInDb(); // 在数据库中进行减库存操做
       echo "successful";
 }else{
    echo  "库存不足";
 }

很简单的一个场景,用户下单,我们查询商品库存够不够,不够的话直接返回库存不足相似的错误信息,若是库存够的话直接在数据库中库存-1,而后返回成功,在业务逻辑上这段代码是没有什么问题的。session

可是,这段代码是存在严重的问题的。并发

若是库存只剩 1,而且在并发比较高的状况下,好比两个请求同时执行了这段代码,同时查到库存为 1,而后顺利成章的都去数据库执行 stock-1 的操做,这样库存就会变成-1,而后就会引起超卖的现象,刚才说的是两个请求同时执行,若是同时几千个请求打过来,可见形成的损失是很是大的。因而呢有些聪明人就想了个办法,办法以下。分布式

你们都知道 redis 有个 setnx 命令,不知道的话也不要紧,我已经帮你查过了
image高并发

咱们把上面的代码优化一下优化

version-1this

$lock_key="lock_key";
 $res = $redis->setNx($lock_key, 1);
 if (!$res){
       return "error_code";
 }

 $stock = $this->getStockFromDb();//查询剩余库存
 if ($stock>0){
       $this->ReduceStockInDb(); // 在数据库中进行减库存操做
       echo "successful";
 }else{
    echo  "库存不足";
 }

$redis->delete($lock_key);
  • 第一次请求进来会去 setNx,固然结果是返回 true,由于 lock_key 不存在,而后下面业务逻辑正常进行,任务执行完了以后把lock_key删除掉,这样下一次请求进来重复上述逻辑
  • 第二次请求进来一样会去执行 setNx,结果返回 false,由于lock_key已经存在,而后直接返回错误信息(你双11抢购秒杀产品的时候给你返回的系统繁忙就是这么来的),不执行库存减 1 的操做
  • 有的同窗可能有疑惑,我们不是说高并发的状况下么?要是两个请求同时 setNx 的话获取的结果不都是 true 了,一样会同时去执行业务逻辑,问题不是同样没解决么?可是你们要明白 redis 是单线程的,具有原子性,不一样的请求执行 setnx 是顺序执行的,因此这个是不用担忧的。

看似问题解决了,其实并否则。lua

咱们这里伪代码写的简单,查询一下库存,而后减1操做而已,可是真实的生产环境中的状况是很是复杂的,在一些极端状况下,程序极可能会报错,崩溃,若是第一次执行加锁了以后程序报错了,那这个锁永远存在,接下来的请求永远也请求不进来了,因此我们继续优化

version-2

try{ //新加入try catch处理,这样程序万一报错会把锁删除掉
   $lock_key="lock_key";
   $expire_time = 5;//新加入过时时间,这样锁不会一直占有
   $res = $redis->setNx($lock_key, 1, $expire_time);
   if (!$res){
         return "error_code";
   }

   $stock = $this->getStockFromDb();//查询剩余库存
   if ($stock>0){
         $this->ReduceStockInDb(); // 在数据库中进行减库存操做
         echo "successful";
   }else{
      echo  "库存不足";
   }
 }finally {
    $redis->delete($lock_key);
}
  • 在setnx的时候给加上过时时间,这样至少不会让锁一直存在成为死锁
  • 作try catch处理,万一程序抛出异常把锁删掉,也是为了解决死锁问题

此次是把死锁问题解决了,可是问题仍是存在,你们能够先想想还存在什么问题再接着往下看。

存在的问题以下

  • 咱们的过时时间是5秒钟,万一这个请求执行了6秒钟怎么办?超出的那一秒,跟没有加锁有什么区别?其实不只仅如此,还有一个更严重的问题存在。好比第二个请求也是执行6秒,那么在第二个请求在超出的那1秒才进来的时候,第一个请求执行完了,固然会删除第二个请求加的锁,若是一直并发都很大的话,锁跟没有加没什么区别。
  • 针对上述问题,最直接的办法是加长过时时间,可是这个不是解决问题的最终办法。把时间设置过长也会产生新的问题,好比各类缘由机器崩溃了,须要重启,而后你把锁设置的时间是1年,同时也没有delete掉,难道机器重启了再等一年?另外这样设置固定值的解决方案在计算机当中是不容许的,曾经的“千年虫”问题就是相似的缘由致使的
  • 在加超时时间的时候必定要注意必定是一次性加上,保证其原子性,不要先setnx以后,再设置expire_time,这样的话万一在setnx以后那一个瞬间系统挂了,这个锁依然会成为一个永久的死锁
  • 其实上述问题的主要缘由在于,请求1会删掉请求2的锁,因此说锁须要保证惟一性。

我们接着优化

version-3

try{ //新加入try catch处理,这样程序万一报错会把锁删除掉
   $lock_key="lock_key";
   $expire_time = 5;//新加入过时时间,这样锁不会一直占有
   
   $client_id = session_create_id();  //对每一个请求生成惟一性的id
   $res = $redis->setNx($lock_key, $client_id, $expire_time);
   if (!$res){
         return "error_code";
   }

   $stock = $this->getStockFromDb();//查询剩余库存
   if ($stock>0){
         $this->ReduceStockInDb(); // 在数据库中进行减库存操做
         echo "successful";
   }else{
      echo  "库存不足";
   }
 }finally {
   if ($redis->get($lock_key) == $client_id){  //在这里加一个判断,保证每次删除的锁是当次请求加的锁,这样避免误删了别的请求加的锁
      $redis->delete($lock_key);
   }
   
}
  • 咱们在每一个请求生成了惟一client_id,而且把该值写入了lock_key中
  • 在最后删除锁的时候会先判断这个lock_key是不是该请求生成的,若是不是的话则不会删除

可是上面方案还有问题,咱们看最后 redis是先进行了get操做判断,而后再删除,是两步操做,并无保证其原子性,redis的多步操做能够用lua脚原本保证原子性,其实看到lua也不须要感受太陌生,他就是一种语言而已,在这里的做用是把多个redis操做打包成一个命令去执行,保证了原子性而已

version-4

try{ //新加入try catch处理,这样程序万一报错会把锁删除掉
   $lock_key="lock_key";
   $expire_time = 5;//新加入过时时间,这样锁不会一直占有
   
   $client_id = session_create_id();  //对每一个请求生成惟一性的id
   $res = $redis->setNx($lock_key, $client_id, $expire_time);
   if (!$res){
         return "error_code";
   }

   $stock = $this->getStockFromDb();//查询剩余库存
   if ($stock>0){
         $this->ReduceStockInDb(); // 在数据库中进行减库存操做
         echo "successful";
   }else{
      echo  "库存不足";
   }
 }finally {
    $script = '  //此处用lua脚本执行是为了get对比以后再delete的两步操做的原子性
            if redis.call("GET", KEYS[1]) == ARGV[1] then
                return redis.call("DEL", KEYS[1])
            else
                return 0
            end
        ';
        return $instance->eval($script, [$lock_key, $client_id], 1);
   
}

这样封装以后,分布式锁应该就比较完善了。固然咱们还能够进一步的优化一下用户体验

  • 如今好比一个请求进来以后,若是请求被锁住,会当即返回给用户请求失败,请从新尝试,咱们能够适当的延长一点这个时间,不要当即返回给用户请求失败,这样体验会更好
  • 具体方式为用户请求进来若是遇到了锁,能够适当的等待一些时间以后重试,重试的时候若是锁释放了,则此次请求就能够成功

version-5

$retry_times = 3; //重试次数
$usleep_times = 5000;//重试间隔时间

  try{ //新加入try catch处理,这样程序万一报错会把锁删除掉
   
   
    $lock_key="lock_key";
    $expire_time = 5;//新加入过时时间,这样锁不会一直占有
    while($retry_times > 0){
      $client_id = session_create_id();  //对每一个请求生成惟一性的id
      $res = $redis->setNx($lock_key, $client_id, $expire_time);
      if ($res){
           break;
      }
      echo "尝试从新获取锁";
      $retry_times--;
      usleep($usleep_times);
   }
   if (!$res){  //重试三次以后都没有获取到锁则给用户返回错误信息
         return "error_code";
   }
   $stock = $this->getStockFromDb();//查询剩余库存
   if ($stock>0){
         $this->ReduceStockInDb(); // 在数据库中进行减库存操做
         echo "successful";
   }else{
      echo  "库存不足";
   }
 }finally {
    $script = '  //此处用lua脚本执行是为了get对比以后再delete的两步操做的原子性
            if redis.call("GET", KEYS[1]) == ARGV[1] then
                return redis.call("DEL", KEYS[1])
            else
                return 0
            end
        ';
        return $instance->eval($script, [$lock_key, $client_id], 1);
   
}

固然上面的分布式锁仍是不够完善的,好比redis主从同步延迟,就会产生问题,像java中redission实现的思想是很是好的,你们感兴趣能够看看源码,今天就聊到这里,感兴趣的朋友能够留言你们一块儿讨论

相关文章
相关标签/搜索