(1)redis如何实现分布式锁?java
(2)redis分布式锁有哪些优势?node
(3)redis分布式锁有哪些缺点?mysql
(4)redis实现分布式锁有没有现成的轮子可使用?git
Redis(全称:Remote Dictionary Server 远程字典服务)是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。github
本章咱们将介绍如何基于redis实现分布式锁,并把其实现的进化史从头至尾讲明白,以便你们在面试的时候能讲清楚redis分布式锁的来(忽)龙(悠)去(考)脉(官)。面试
基于前面关于锁(分布式锁)的学习,咱们知道实现锁的条件有三个:redis
(1)状态(共享)变量,它是有状态的,这个状态的值标识了是否已经被加锁,在ReentrantLock中是经过控制state的值实现的,在ZookeeperLock中是经过控制子节点来实现的;算法
(2)队列,它是用来存放排队的线程,在ReentrantLock中是经过AQS的队列实现的,在ZookeeperLock中是经过子节点的有序性实现的;spring
(3)唤醒,上一个线程释放锁以后唤醒下一个等待的线程,在ReentrantLock中结合AQS的队列释放时自动唤醒下一个线程,在ZookeeperLock中是经过其监听机制来实现的;sql
那么上面三个条件是否是必要的呢?
其实否则,实现锁的必要条件只有第一个,对共享变量的控制,若是共享变量的值为null就给他设置个值(java中可使用CAS操做进程内共享变量),若是共享变量有值则不断重复检查其是否有值(重试),待锁内逻辑执行完毕再把共享变量的值设置回null。
说白了,只要有个地方存这个共享变量就好了,并且要保证整个系统(多个进程)内只有这一份便可。
这也是redis实现分布式锁的关键【本篇文章由公众号“彤哥读源码”原创】。
既然上面说了实现分布式锁只须要对共享变量控制到位便可,那么redis咱们怎么控制这个共享变量呢?
首先,咱们知道redis的基础命令有get/set/del,经过这三个命令能够实现分布式锁吗?固然能够。
在获取锁以前先get lock_user_1
看这个锁存不存在,若是不存在则再set lock_user_1 value
,若是存在则等待一段时间后再重试,最后使用完成了再删除这个锁del lock_user_1
便可。
可是,这种方案有个问题,若是一开始这个锁是不存在的,两个线程去同时get,这个时候返回的都是null(nil),而后这两个线程都去set,这时候就出问题了,两个线程均可以set成功,至关于两个线程都获取到同一个锁了。
因此,这种方案不可行!
上面的方案不可行的主要缘由是多个线程同时set都是能够成功的,因此后来有了setnx
这个命令,它是set if not exist
的缩写,也就是若是不存在就set。
能够看到,当重复对同一个key进行setnx的时候,只有第一次是能够成功的。
所以,方案二就是先使用setnx lock_user_1 value
命令,若是返回1则表示加锁成功,若是返回0则表示其它线程先执行成功了,那就等待一段时间后重试,最后同样使用del lock_user_1
释放锁。
可是,这种方案也有个问题,若是获取锁的这个客户端断线了怎么办?这个锁不是一直都不会释放吗?是的,是这样的。
因此,这种方案也不可行!
上面的方案不可行的主要缘由是获取锁以后客户端断线了没法释放锁的问题,那么,我在setnx以后立马再执行setex能够吗?
答案是能够的,2.6.12以前的版本使用redis实现分布式锁你们都是这么玩的。
所以,方案三就是先使用setnx lock_user_1 value
命令拿到锁,再当即使用setex lock_user_1 30 value
设置过时时间,最后使用del lock_user_1
释放锁。
在setnx获取到锁以后再执行setex设置过时时间,这样就很大几率地解决了获取锁以后客户端断线不会释放锁的问题。
可是,这种方案依然有问题,若是setnx以后setex以前这个客户端就断线了呢?嗯~,彷佛无解,不过这种几率实在是很是小,因此2.6.12以前的版本你们也都这么用,几乎没出现过什么问题。
因此,这种方案基本可用,只是不太好!
上面的方案不太好的主要缘由是setnx/setex是两条独立的命令,没法解决前者成功以后客户端断线的问题,那么,把两条命令合在一块儿不就好了吗?
是的,redis官方也意识到这个问题了,因此2.6.12版本给set命令加了一些参数:
SET key value [EX seconds] [PX milliseconds] [NX|XX]
EX,过时时间,单位秒
PX,过时时间,单位毫秒
NX,not exist,若是不存在才设置成功
XX,exist exist?若是存在才设置成功
经过这个命令咱们就不再怕客户端无端断线了【本篇文章由公众号“彤哥读源码”原创】。
所以,方案四就是先使用set lock_user_1 value nx ex 30
获取锁,获取锁以后使用,使用完成了最后del lock_user_1
释放锁。
然而,这种方案就没有问题吗?
固然有问题,其实这里的释放锁只要简单地执行del lock_user_1
便可,并不会检查这个锁是否是当前客户端获取到的。
因此,这种方案还不是很完美。
上面的方案不完美的主要缘由是释放锁这里控制的还不是很到位,那么有没有其它方法能够控制释放锁的线程和加锁的线程必定是同一个客户端呢?
redis官方给出的方案是这样的:
// 加锁 SET resource_name my_random_value NX PX 30000 // 释放锁 if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
加锁的时候,设置随机值,保证这个随机值只有当前客户端本身知道。
释放锁的时候,执行一段lua脚本,把这段lua脚本当成一个完整的命令,先检查这个锁对应的值是否是上面设置的随机值,若是是再执行del释放锁,不然直接返回释放锁失败。
咱们知道,redis是单线程的,因此这段lua脚本中的get和del不会存在并发问题,可是不能在java中先get再del,这样会当成两个命令,会有并发问题,lua脚本至关因而一个命令一块儿传输给redis的。
这种方案算是比较完美了,可是还有一点小缺陷,就是这个过时时间设置成多少合适呢?
设置的太小,有可能上一个线程还没执行完锁内逻辑,锁就自动释放了,致使另外一个线程能够获取锁了,就出现并发问题了;
设置的过大,就要考虑客户端断线了,这个锁要等待很长一段时间。
因此,这里又衍生出一个新的问题,过时时间我设置小一点,可是快到期了它能自动续期就行了。
上面方案的缺陷是过时时间很差把握,虽然也能够本身启一个监听线程来处理续期,可是代码实在不太好写,好在现成的轮子redisson已经帮咱们把这个逻辑都实现好了,咱们拿过来直接用就能够了。
并且,redisson充分考虑了redis演化过程当中留下的各类问题,单机模式、哨兵模式、集群模式,它通通都处理好了,无论是从单机进化到集群仍是从哨兵进化到集群,都只须要简单地修改下配置就能够了,不用改动任何代码,能够说是非(业)常(界)方(良)便(心)。
redisson实现的分布式锁内部使用的是Redlock算法,这是官方推荐的一种算法。
另外,redisson还提供了不少分布式对象(分布式的原子类)、分布式集合(分布式的Map/List/Set/Queue等)、分布式同步器(分布式的CountDownLatch/Semaphore等)、分布式锁(分布式的公平锁/非公平锁/读写锁等),有兴趣的能够去看看,下面贴出连接:
Redlock介绍:https://redis.io/topics/distlock
redisson介绍:https://github.com/redisson/redisson/wiki
由于前面五种方案都已通过时,因此彤哥这里偷个懒,就不去一一实现的,咱们直接看最后一种redisson的实现方式。
添加spring redis及redisson的依赖,我这里使用的是springboot 2.1.6版本,springboot 1.x版本的本身注意下,查看上面的github能够找到方法。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-data-21</artifactId> <version>3.11.0</version> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.11.0</version> </dependency>
配置redis的链接信息,彤哥这里给出了三种方式。
spring: redis: # 单机模式 #host: 192.168.1.102 #port: 6379 # password: <your passowrd> timeout: 6000ms # 链接超时时长(毫秒) # 哨兵模式 【本篇文章由公众号“彤哥读源码”原创】 # sentinel: # master: <your master> # nodes: 192.168.1.101:6379,192.168.1.102:6379,192.168.1.103:6379 # 集群模式(三主三从伪集群) cluster: nodes: - 192.168.1.102:30001 - 192.168.1.102:30002 - 192.168.1.102:30003 - 192.168.1.102:30004 - 192.168.1.102:30005 - 192.168.1.102:30006
定义Locker接口。
public interface Locker { void lock(String key, Runnable command); }
直接使用RedissonClient获取锁,注意这里不须要再单独配置RedissonClient这个bean,redisson框架会根据配置自动生成RedissonClient的实例,咱们后面说它是怎么实现的。
@Component public class RedisLocker implements Locker { @Autowired private RedissonClient redissonClient; @Override public void lock(String key, Runnable command) { RLock lock = redissonClient.getLock(key); try { // 【本篇文章由公众号“彤哥读源码”原创】 lock.lock(); command.run(); } finally { lock.unlock(); } } }
启动1000个线程,每一个线程内部打印一句话,而后睡眠1秒。
@RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class RedisLockerTest { @Autowired private Locker locker; @Test public void testRedisLocker() throws IOException { for (int i = 0; i < 1000; i++) { new Thread(()->{ locker.lock("lock", ()-> { // 可重入锁测试 locker.lock("lock", ()-> { System.out.println(String.format("time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName())); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); }); }, "Thread-"+i).start(); } System.in.read(); } }
运行结果:
能够看到稳定在1000ms左右打印一句话,说明这个锁是可用的,并且是可重入的。
time: 1570100167046, threadName: Thread-756 time: 1570100168067, threadName: Thread-670 time: 1570100169080, threadName: Thread-949 time: 1570100170093, threadName: Thread-721 time: 1570100171106, threadName: Thread-937 time: 1570100172124, threadName: Thread-796 time: 1570100173134, threadName: Thread-944 time: 1570100174142, threadName: Thread-974 time: 1570100175167, threadName: Thread-462 time: 1570100176180, threadName: Thread-407 time: 1570100177194, threadName: Thread-983 time: 1570100178206, threadName: Thread-982 ...
刚才说RedissonClient不须要配置,其实它是在RedissonAutoConfiguration中自动配置的,咱们简单看下它的源码,主要看redisson()这个方法:
@Configuration @ConditionalOnClass({Redisson.class, RedisOperations.class}) @AutoConfigureBefore(RedisAutoConfiguration.class) @EnableConfigurationProperties({RedissonProperties.class, RedisProperties.class}) public class RedissonAutoConfiguration { @Autowired private RedissonProperties redissonProperties; @Autowired private RedisProperties redisProperties; @Autowired private ApplicationContext ctx; @Bean @ConditionalOnMissingBean(name = "redisTemplate") public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate<Object, Object>(); template.setConnectionFactory(redisConnectionFactory); return template; } @Bean @ConditionalOnMissingBean(StringRedisTemplate.class) public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) { StringRedisTemplate template = new StringRedisTemplate(); template.setConnectionFactory(redisConnectionFactory); return template; } @Bean @ConditionalOnMissingBean(RedisConnectionFactory.class) public RedissonConnectionFactory redissonConnectionFactory(RedissonClient redisson) { return new RedissonConnectionFactory(redisson); } @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(RedissonClient.class) public RedissonClient redisson() throws IOException { Config config = null; Method clusterMethod = ReflectionUtils.findMethod(RedisProperties.class, "getCluster"); Method timeoutMethod = ReflectionUtils.findMethod(RedisProperties.class, "getTimeout"); Object timeoutValue = ReflectionUtils.invokeMethod(timeoutMethod, redisProperties); int timeout; if(null == timeoutValue){ // 超时未设置则为0 timeout = 0; }else if (!(timeoutValue instanceof Integer)) { // 转毫秒 Method millisMethod = ReflectionUtils.findMethod(timeoutValue.getClass(), "toMillis"); timeout = ((Long) ReflectionUtils.invokeMethod(millisMethod, timeoutValue)).intValue(); } else { timeout = (Integer)timeoutValue; } // 看下是否给redisson单独写了一个配置文件 if (redissonProperties.getConfig() != null) { try { InputStream is = getConfigStream(); config = Config.fromJSON(is); } catch (IOException e) { // trying next format try { InputStream is = getConfigStream(); config = Config.fromYAML(is); } catch (IOException e1) { throw new IllegalArgumentException("Can't parse config", e1); } } } else if (redisProperties.getSentinel() != null) { // 若是是哨兵模式 Method nodesMethod = ReflectionUtils.findMethod(Sentinel.class, "getNodes"); Object nodesValue = ReflectionUtils.invokeMethod(nodesMethod, redisProperties.getSentinel()); String[] nodes; // 看sentinel.nodes这个节点是列表配置仍是逗号隔开的配置 if (nodesValue instanceof String) { nodes = convert(Arrays.asList(((String)nodesValue).split(","))); } else { nodes = convert((List<String>)nodesValue); } // 生成哨兵模式的配置 config = new Config(); config.useSentinelServers() .setMasterName(redisProperties.getSentinel().getMaster()) .addSentinelAddress(nodes) .setDatabase(redisProperties.getDatabase()) .setConnectTimeout(timeout) .setPassword(redisProperties.getPassword()); } else if (clusterMethod != null && ReflectionUtils.invokeMethod(clusterMethod, redisProperties) != null) { // 若是是集群模式 Object clusterObject = ReflectionUtils.invokeMethod(clusterMethod, redisProperties); Method nodesMethod = ReflectionUtils.findMethod(clusterObject.getClass(), "getNodes"); // 集群模式的cluster.nodes是列表配置 List<String> nodesObject = (List) ReflectionUtils.invokeMethod(nodesMethod, clusterObject); String[] nodes = convert(nodesObject); // 生成集群模式的配置 config = new Config(); config.useClusterServers() .addNodeAddress(nodes) .setConnectTimeout(timeout) .setPassword(redisProperties.getPassword()); } else { // 单机模式的配置 config = new Config(); String prefix = "redis://"; Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl"); // 判断是否走ssl if (method != null && (Boolean)ReflectionUtils.invokeMethod(method, redisProperties)) { prefix = "rediss://"; } // 生成单机模式的配置 config.useSingleServer() .setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort()) .setConnectTimeout(timeout) .setDatabase(redisProperties.getDatabase()) .setPassword(redisProperties.getPassword()); } return Redisson.create(config); } private String[] convert(List<String> nodesObject) { // 将哨兵或集群模式的nodes转换成标准配置 List<String> nodes = new ArrayList<String>(nodesObject.size()); for (String node : nodesObject) { if (!node.startsWith("redis://") && !node.startsWith("rediss://")) { nodes.add("redis://" + node); } else { nodes.add(node); } } return nodes.toArray(new String[nodes.size()]); } private InputStream getConfigStream() throws IOException { // 读取redisson配置文件 Resource resource = ctx.getResource(redissonProperties.getConfig()); InputStream is = resource.getInputStream(); return is; } }
网上查到的资料中不少配置都是多余的(多是版本问题),看下源码很清楚,这也是看源码的一个好处。
(1)redis因为历史缘由致使有三种模式:单机、哨兵、集群;
(2)redis实现分布式锁的进化史:set -> setnx -> setnx + setex -> set nx ex(或px) -> set nx ex(或px) + lua script -> redisson;
(3)redis分布式锁有现成的轮子redisson可使用;
(4)redisson还提供了不少有用的组件,好比分布式集合、分布式同步器、分布式对象;
redis分布式锁有哪些优势?
答:1)大部分系统都依赖于redis作缓存,不须要额外依赖其它组件(相对于zookeeper来讲);
2)redis能够集群部署,相对于mysql的单点更可靠;
3)不会占用mysql的链接数,不会增长mysql的压力;
4)redis社区相对活跃,redisson的实现更是稳定可靠;
5)利用过时机制解决客户端断线的问题,虽然不太及时;
6)有现成的轮子redisson可使用,锁的种类比较齐全;
redis分布式锁有哪些缺点?
答:1)集群模式下会在全部master节点执行加锁命令,大部分(2N+1)成功了则得到锁,节点越多,加锁的过程越慢;
2)高并发状况下,未得到锁的线程会睡眠重试,若是同一把锁竞争很是激烈,会占用很是多的系统资源;
3)历史缘由致使的坑挺多的,本身很难实现出来健壮的redis分布式锁;
总之,redis分布式锁的优势是大于缺点的,并且社区活跃,这也是咱们大部分系统使用redis做为分布式锁的缘由。
三、死磕 java同步系列之JMM(Java Memory Model)
八、死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁
九、死磕 java同步系列之ReentrantLock源码解析(二)——条件锁
十、死磕 java同步系列之ReentrantLock VS synchronized
十一、死磕 java同步系列之ReentrantReadWriteLock源码解析
1三、死磕 java同步系列之CountDownLatch源码解析
1五、死磕 java同步系列之StampedLock源码解析
1六、死磕 java同步系列之CyclicBarrier源码解析
欢迎关注个人公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一块儿畅游源码的海洋。