Redis单节点的分布式锁只须要注意三点就能够了:api
1.加锁并设置锁的过时时间必须是原子操做;app
2.锁的value值必需要有惟一性;dom
3.释放锁的时候要验证其value值,不是本身加的锁不能释放.异步
可是单节点分布式锁最大的缺点就是,它只做用在一个Redis节点上,若是该节点挂了,那就挂了.async
那可不能够经过哨兵机制来保证高可用呢?分布式
答案是不行.性能
由于Redis在进行主从复制的时候是异步的.测试
假设 clientA 拿到锁后,在 master 还没同步到 slave 时,master 发生了故障,这时候 salve 升级为 master,致使锁丢失.ui
RedLock 的思想是:假设有5个Redis节点.这些节点彻底相互独立,不存在主从或者集群机制,都是 master.而且这5个Redis实例运行在5台机器上,这样保证他们不会同时宕掉.spa
客户端应该按照如下操做来获取锁:
1.获取当前时间戳,假设是T1.
2.依次尝试从这5个Redis实例获取锁.当客户端向Redis请求获取锁时,客户端应该设置超时时间,而且这个超时时间应该小于锁的失效时间.好比你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间.这样能够避免Redis已经挂掉的状况下,客户端还在等待响应结果.若是Redis没有在规定时间内响应,客户端应该尽快尝试去另一个Redis实例请求获取锁.
3.请求完全部的Redis节点后,只有知足以下两点,才算真正的获取到锁:
1)当前时间 - T1 的时间差小于锁的过时时间.好比T1=00:00:00,而后从5个Redis节点都拿到了锁,当前时间是 00:00:05,也就是说获取锁一共用了5秒钟.假设锁的过时时间是3秒,那么此次获取锁的操做就算失败了.
2)从(N/2+1)个Redis节点都获取到锁.这个很好理解,5个节点,你拿2个,我拿2个,到底算谁的?
总结一句话就是:从开始获取锁计时,只要在锁的过时时间内成功获取到一半以上的锁便算成功,不然算失败.
4.当客户端获取到了锁,锁的真正有效时间 = 锁的过时时间 - 获取锁所使用的时间(也就是第3步计算出来的时间).
5.若是客户端因为某些缘由(好比获取锁的实例个数小于N/2+1,或者已经超过了有效时间),没有获取到锁,客户端便会在全部的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功),由于可能已经获取了小于 N/2+1个锁,必须释放掉,不然会影响其余客户端获取锁.
关因而否启动AOF永久存储,须要有所取舍.
1.永久启动,因为Redis的过时机制是按照unix时间戳走的,因此当咱们重启Redis后,依然会按照规定的时间过时.可是永久启动对性能有必定影响;
2.采用默认的1秒1次.若是在1秒内断电,会致使数据丢失,这时候若是马上重启会致使锁的互斥性实效.
因此有效的解决方案是,采用AOF,1秒1次,无论什么缘由宕机后,等待必定时间再重启.这个时间就是锁的过时时间.
Demo:
安装官方提供的 RedLock.net
Startup:
public class Startup { private RedLockFactory _redLockFactory; public void ConfigureServices(IServiceCollection services) { services.AddControllers(); var endPoints = new List<RedLockEndPoint> { new DnsEndPoint("127.0.0.1", 6379), new DnsEndPoint("127.0.0.1", 6380), new DnsEndPoint("127.0.0.1", 6381) }; _redLockFactory = RedLockFactory.Create(endPoints); services.AddSingleton(typeof(IDistributedLockFactory), _redLockFactory); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime applicationLifetime) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } //应用程序结束时释放,由于不是容器建立的对象 applicationLifetime.ApplicationStopping.Register(() => { _redLockFactory.Dispose(); }); app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); } }
测试api:
[ApiController] public class ValuesController : ControllerBase { private static int _stock = 10; private readonly IDistributedLockFactory _distributedLockFactory; public ValuesController(IDistributedLockFactory distributedLockFactory) { _distributedLockFactory = distributedLockFactory; } [Route("lockTest")] [HttpGet] public async Task<int> DistributedLockTest() { // resource 锁定的资源 var resource = "the-thing-we-are-locking-on"; // expiryTime 锁的过时时间 var expiry = TimeSpan.FromSeconds(5); // waitTime 等待时间 var wait = TimeSpan.FromSeconds(1); // retryTime 等待时间内,多久重试一次 var retry = TimeSpan.FromMilliseconds(250); using (var redLock = await _distributedLockFactory.CreateLockAsync(resource, expiry, wait, retry)) { if (redLock.IsAcquired) { // 模拟执行业务逻辑 await Task.Delay(new Random().Next(100, 500)); if (stock > 0) { stock--; return stock; } return stock; } Console.WriteLine($"{DateTime.Now} : 获取锁失败"); } return -99; } }
测试控制台:
static void Main(string[] args) { HttpClient client = new HttpClient(); var result = Parallel.For(0, 20, (i) => { var stopwatch = new Stopwatch(); stopwatch.Start(); var response = client.GetAsync($"http://localhost:5000/locktest").Result; stopwatch.Stop(); var data = response.Content.ReadAsStringAsync().Result; Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}"); }); client.Dispose(); Console.ReadKey(); }
测试结果: