为何要用分布式锁?redis
先上一张截图,这是在浏览别人的博客时看到的.dom
在了解为何要用分布式锁以前,咱们应该知道到底什么是分布式锁.分布式
锁按照不一样的维度,有多种分类.好比测试
1.悲观锁,乐观锁;ui
2.公平锁,非公平锁;lua
3.独享锁,共享锁;spa
4.线程锁,进程锁;操作系统
等等.线程
咱们平时用的锁,好比 lock,它是线程锁,主要用来给方法,代码块加锁.因为进程的内存单元是被其全部线程共享的,因此线程锁控制的实际是多个线程对同一块内存区域的访问.code
有线程锁,就必然有进程锁.顾名思义,进程锁的目的是控制多个进程对共享资源的访问.由于进程之间彼此独立,各个进程是没法控制其余进程对资源的访问,因此只能经过操做系统来控制.好比 Mutex.
可是进程锁有一个前提,那就是须要多个进程在同一个系统中,若是多个进程不在同一个系统,那就只能使用分布式锁来控制了.
分布式锁是控制分布式系统中不一样系统之间访问共享资源的一种锁实现.它和线程锁,进程锁的做用都是同样,只是范围不同.
因此要实现分布式锁,就必须依靠第三方存储介质来存储锁的信息.由于各个进程之间彼此谁都不服谁,只能找一个带头大哥咯;
如下示例需引用NUGET: CSRedisCore
示例一
CSRedisClient redisClient = new CSRedis.CSRedisClient("127.0.0.1:6379,defaultDatabase=0"); var lockKey = "lockKey"; var stock = 5;//商品库存 var taskCount = 10;//线程数量 redisClient.Del(lockKey);//测试前,先把锁删了. for (int i = 0; i < taskCount; i++) { Task.Run(() => { //获取锁 do { //setnx : key不存在才会成功,存在则失败. var success = redisClient.SetNx(lockKey, 1); if (success == true) { break; } Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再尝试获取锁 } while (true); Console.WriteLine($"线程:{Task.CurrentId} 拿到了锁,开始消费"); if (stock <= 0) { Console.WriteLine($"库存不足,线程:{Task.CurrentId} 抢购失败!"); redisClient.Del(lockKey); return; } stock--; //模拟处理业务 Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3))); Console.WriteLine($"线程:{Task.CurrentId} 消费完毕!剩余 {stock} 个"); //业务处理完后,释放锁. redisClient.Del(lockKey); }); }
运行结果:
看起来貌似没毛病,实际上上述代码有个致命的问题:
当某个线程拿到锁以后,若是系统崩溃了,那么锁永远都不会被释放.所以,咱们应该给锁加一个过时时间,当时间到了,尚未被主动释放,咱们就让redis释放掉它,以保证其余消费者能够拿到锁,进行消费.
这里给锁加过时时间也有讲究,不能拿到锁后再加,好比:
......
//setnx : key不存在才会成功,存在则失败. var success = redisClient.SetNx(lockKey, 1); if (success == true) { redisClient.Set(lockKey, 1, expireSeconds: 5); break; }
这样操做的话,获取锁和设置锁的过时时间就不是原子操做,一样会出现上面提到的问题.Redis 提供了一个合而为一的操做能够解决这个问题.
//set : key存在则失败,不存在才会成功,而且过时时间5秒 var success = redisClient.Set(lockKey, 1, expireSeconds: 5, exists: RedisExistence.Nx);
这个问题虽然解决了,但随之产生了一个新的问题:
假设有3个线程A,B,C
当线程A拿到锁后执行业务的时候超时了,超过了锁的过时时间还没执行完,这时候锁被Redis释放了,
因而线程B拿到了锁并开始执行业务逻辑.
当线程B的业务逻辑还没执行完的时候,线程A的业务逻辑执行完了,因而乎就跑去释放掉了锁.
这时候线程C就能够拿到锁开始执行它的业务逻辑.
这不就乱套了么...
所以,线程在释放锁的时候应该判断这个锁还属不属于本身.
因此,在设置锁的时候,redis的value值不能像上面代码那样,随便给个1,而应该给一个随机值,表明当前线程.
var id = Guid.NewGuid().ToString("N"); //获取锁 do { //set : key存在则失败,不存在才会成功,而且过时时间5秒 var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx); if (success == true) { break; } Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再尝试获取锁 } while (true); Console.WriteLine($"线程:{Task.CurrentId} 拿到了锁,开始消费");
.........
//业务处理完后,释放锁. var value = redisClient.Get<string>(lockKey); if (value == id) { redisClient.Del(lockKey); }
完美了吗?
不完美.仍是老生常谈的问题,取value和删除key 分了两步走,不是原子操做.
而且,这里还不能用pipe,由于须要根据取到的value来决定下一个操做.上面设置过时时间却是能够用pipe.
因此,这里只能用lua.
完整的代码以下:
CSRedisClient redisClient = new CSRedis.CSRedisClient("127.0.0.1:6379,defaultDatabase=0"); var lockKey = "lockKey"; var stock = 5;//商品库存 var taskCount = 10;//线程数量 var script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";//释放锁的redis脚本 redisClient.Del(lockKey);//测试前,先把锁删了. for (int i = 0; i < taskCount; i++) { Task.Run(() => { var id = Guid.NewGuid().ToString("N"); //获取锁 do { //set : key存在则失败,不存在才会成功,而且过时时间5秒 var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx); if (success == true) { break; } Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再尝试获取锁 } while (true); Console.WriteLine($"线程:{Task.CurrentId} 拿到了锁,开始消费"); if (stock <= 0) { Console.WriteLine($"库存不足,线程:{Task.CurrentId} 抢购失败!"); redisClient.Eval(script,lockKey,id); return; } stock--; //模拟处理业务,这里不考虑失败的状况 Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3))); Console.WriteLine($"线程:{Task.CurrentId} 消费完毕!剩余 {stock} 个"); //业务处理完后,释放锁. redisClient.Eval(script, lockKey, id); }); }
这篇文章只介绍了单节点Redis的分布式锁,由于单节点,因此不是高可用.
多节点Redis则须要用官方介绍的RedLock,这玩意有点绕,我须要捋一捋.