前言java
对于分布式锁的问题我也查过不少资料,感受不少方式实现的并不完善,或者看着云里雾里的,不知因此然,因而就整理了这篇文章,但愿对您有用,有写的不对的地方,欢迎留言指正。redis
首先我们来聊聊什么是分布式锁,到底解决了什么问题?直接看代码数据库
$stock = $this->getStockFromDb();//查询剩余库存 if ($stock>0){ $this->ReduceStockInDb(); // 在数据库中进行减库存操做 echo "successful"; }else{ echo "库存不足"; }
很简单的一个场景,用户下单,我们查询商品库存够不够,不够的话直接返回库存不足相似的错误信息,若是库存够的话直接在数据库中库存-1,而后返回成功,在业务逻辑上这段代码是没有什么问题的。session
可是,这段代码是存在严重的问题的。
并发
若是库存只剩 1,而且在并发比较高的状况下,好比两个请求同时执行了这段代码,同时查到库存为 1,而后顺利成章的都去数据库执行 stock-1 的操做,这样库存就会变成-1,而后就会引起超卖的现象,刚才说的是两个请求同时执行,若是同时几千个请求打过来,可见形成的损失是很是大的。因而呢有些聪明人就想了个办法,办法以下。分布式
你们都知道 redis 有个 setnx 命令,不知道的话也不要紧,我已经帮你查过了高并发
咱们把上面的代码优化一下优化
version-1this
$lock_key="lock_key"; $res = $redis->setNx($lock_key, 1); if (!$res){ return "error_code"; } $stock = $this->getStockFromDb();//查询剩余库存 if ($stock>0){ $this->ReduceStockInDb(); // 在数据库中进行减库存操做 echo "successful"; }else{ echo "库存不足"; } $redis->delete($lock_key);
看似问题解决了,其实并否则。lua
咱们这里伪代码写的简单,查询一下库存,而后减1操做而已,可是真实的生产环境中的状况是很是复杂的,在一些极端状况下,程序极可能会报错,崩溃,若是第一次执行加锁了以后程序报错了,那这个锁永远存在,接下来的请求永远也请求不进来了,因此我们继续优化
version-2
try{ //新加入try catch处理,这样程序万一报错会把锁删除掉 $lock_key="lock_key"; $expire_time = 5;//新加入过时时间,这样锁不会一直占有 $res = $redis->setNx($lock_key, 1, $expire_time); if (!$res){ return "error_code"; } $stock = $this->getStockFromDb();//查询剩余库存 if ($stock>0){ $this->ReduceStockInDb(); // 在数据库中进行减库存操做 echo "successful"; }else{ echo "库存不足"; } }finally { $redis->delete($lock_key); }
此次是把死锁问题解决了,可是问题仍是存在,你们能够先想想还存在什么问题再接着往下看。
存在的问题以下
我们接着优化
version-3
try{ //新加入try catch处理,这样程序万一报错会把锁删除掉 $lock_key="lock_key"; $expire_time = 5;//新加入过时时间,这样锁不会一直占有 $client_id = session_create_id(); //对每一个请求生成惟一性的id $res = $redis->setNx($lock_key, $client_id, $expire_time); if (!$res){ return "error_code"; } $stock = $this->getStockFromDb();//查询剩余库存 if ($stock>0){ $this->ReduceStockInDb(); // 在数据库中进行减库存操做 echo "successful"; }else{ echo "库存不足"; } }finally { if ($redis->get($lock_key) == $client_id){ //在这里加一个判断,保证每次删除的锁是当次请求加的锁,这样避免误删了别的请求加的锁 $redis->delete($lock_key); } }
可是上面方案还有问题,咱们看最后 redis是先进行了get操做判断,而后再删除,是两步操做,并无保证其原子性,redis的多步操做能够用lua脚原本保证原子性,其实看到lua也不须要感受太陌生,他就是一种语言而已,在这里的做用是把多个redis操做打包成一个命令去执行,保证了原子性而已
version-4
try{ //新加入try catch处理,这样程序万一报错会把锁删除掉 $lock_key="lock_key"; $expire_time = 5;//新加入过时时间,这样锁不会一直占有 $client_id = session_create_id(); //对每一个请求生成惟一性的id $res = $redis->setNx($lock_key, $client_id, $expire_time); if (!$res){ return "error_code"; } $stock = $this->getStockFromDb();//查询剩余库存 if ($stock>0){ $this->ReduceStockInDb(); // 在数据库中进行减库存操做 echo "successful"; }else{ echo "库存不足"; } }finally { $script = ' //此处用lua脚本执行是为了get对比以后再delete的两步操做的原子性 if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end '; return $instance->eval($script, [$lock_key, $client_id], 1); }
这样封装以后,分布式锁应该就比较完善了。固然咱们还能够进一步的优化一下用户体验
version-5
$retry_times = 3; //重试次数 $usleep_times = 5000;//重试间隔时间 try{ //新加入try catch处理,这样程序万一报错会把锁删除掉 $lock_key="lock_key"; $expire_time = 5;//新加入过时时间,这样锁不会一直占有 while($retry_times > 0){ $client_id = session_create_id(); //对每一个请求生成惟一性的id $res = $redis->setNx($lock_key, $client_id, $expire_time); if ($res){ break; } echo "尝试从新获取锁"; $retry_times--; usleep($usleep_times); } if (!$res){ //重试三次以后都没有获取到锁则给用户返回错误信息 return "error_code"; } $stock = $this->getStockFromDb();//查询剩余库存 if ($stock>0){ $this->ReduceStockInDb(); // 在数据库中进行减库存操做 echo "successful"; }else{ echo "库存不足"; } }finally { $script = ' //此处用lua脚本执行是为了get对比以后再delete的两步操做的原子性 if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end '; return $instance->eval($script, [$lock_key, $client_id], 1); }
固然上面的分布式锁仍是不够完善的,好比redis主从同步延迟,就会产生问题,像java中redission实现的思想是很是好的,你们感兴趣能够看看源码,今天就聊到这里,感兴趣的朋友能够留言你们一块儿讨论