说道Redis分布式锁大部分人都会想到:setnx+lua
,或者知道set key value px milliseconds nx
。后一种方式的核心实现命令以下:java
- 获取锁(unique_value能够是UUID等) SET resource_name unique_value NX PX 30000 - 释放锁(lua脚本中,必定要比较value,防止误解锁) if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
这种实现方式有3大要点(也是面试几率很是高的地方):面试
set key value px milliseconds nx
;事实上这类琐最大的缺点就是它加锁时只做用在一个Redis节点上,即便Redis经过sentinel保证高可用,若是这个master节点因为某些缘由发生了主从切换,那么就会出现锁丢失的状况:redis
正由于如此,Redis做者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock。笔者认为,Redlock也是Redis全部分布式锁实现方式中惟一能让面试官高潮的方式。算法
antirez提出的redlock算法大概是这样的:服务器
在Redis的分布式环境中,咱们假设有N个Redis master。这些节点彻底互相独立,不存在主从复制或者其余集群协调机制。咱们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。如今咱们假设有5个Redis master节点,同时咱们须要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。网络
为了取到锁,客户端应该执行如下操做:并发
redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)。dom
<!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.3.2</version> </dependency>
首先,咱们来看一下redission封装的redlock算法实现的分布式锁用法,很是简单,跟重入锁(ReentrantLock)有点相似:机器学习
Config config = new Config(); config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389") .setMasterName("masterName") .setPassword("password").setDatabase(0); RedissonClient redissonClient = Redisson.create(config); // 还能够getFairLock(), getReadWriteLock() RLock redLock = redissonClient.getLock("REDLOCK_KEY"); boolean isLock; try { isLock = redLock.tryLock(); // 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。 isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS); if (isLock) { //TODO if get lock success, do something; } } catch (Exception e) { } finally { // 不管如何, 最后都要解锁 redLock.unlock(); } 惟一ID 实现分布式锁的一个很是重要的点就是set的value要具备惟一性,redisson的value是怎样保证value的惟一性呢?答案是UUID+threadId。入口在redissonClient.getLock("REDLOCK_KEY"),源码在Redisson.java和RedissonLock.java中: protected final UUID id = UUID.randomUUID(); String getLockName(long threadId) { return id + ":" + threadId; }
获取锁的代码为redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),二者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:分布式
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); // 获取锁时向5个redis实例发送的命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // 首先分布式锁的KEY不能存在,若是确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并经过pexpire设置失效时间(也是锁的租约时间) "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 若是分布式锁的KEY已经存在,而且value也匹配,表示是当前线程持有的锁,那么重入次数加1,而且设置失效时间 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 获取分布式锁的KEY的失效时间毫秒数 "return redis.call('pttl', KEYS[1]);", // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2] Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
获取锁的命令中,
释放锁的代码为redLock.unlock(),核心源码以下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) { // 向5个redis实例都执行以下命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 若是分布式锁KEY不存在,那么向channel发布一条消息 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + // 若是分布式锁存在,可是value不匹配,表示锁已经被占用,那么直接返回 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // 若是就是当前线程占有分布式锁,那么将重入次数减1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 重入次数减1后的值若是大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + // 重入次数减1后的值若是为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3] Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
做者: 阿飞的博客
免费Java资料领取,涵盖了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo/Kafka、Hadoop、Hbase、Flink等高并发分布式、大数据、机器学习等技术。
传送门:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q