文章很长,并且持续更新,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈(总入口) 奉上如下珍贵的学习资源:javascript
入大厂 、作架构、大力提高Java 内功 必备的精彩博文 | 2021 秋招涨薪1W + 必备的精彩博文 |
---|---|
1:Redis 分布式锁 (图解-秒懂-史上最全) | 2:Zookeeper 分布式锁 (图解-秒懂-史上最全) |
3: Redis与MySQL双写一致性如何保证? (面试必备) | 4: 面试必备:秒杀超卖 解决方案 (史上最全) |
5:面试必备之:Reactor模式 | 6: 10分钟看懂, Java NIO 底层原理 |
7:TCP/IP(图解+秒懂+史上最全) | 8:Feign原理 (图解) |
9:DNS图解(秒懂 + 史上最全 + 高薪必备) | 10:CDN图解(秒懂 + 史上最全 + 高薪必备) |
10: 分布式事务( 图解 + 史上最全 + 吐血推荐 ) |
Java 面试题 30个专题 , 史上最全 , 面试必刷 | 阿里、京东、美团... 随意挑、横着走!!! |
---|---|
1: JVM面试题(史上最强、持续更新、吐血推荐) | 2:Java基础面试题(史上最全、持续更新、吐血推荐 |
3:架构设计面试题 (史上最全、持续更新、吐血推荐) | 4:设计模式面试题 (史上最全、持续更新、吐血推荐) |
1七、分布式事务面试题 (史上最全、持续更新、吐血推荐) | 一致性协议 (史上最全) |
2九、多线程面试题(史上最全) | 30、HR面经,过五关斩六将后,当心阴沟翻船! |
9.网络协议面试题(史上最全、持续更新、吐血推荐) | 更多专题, 请参见【 疯狂创客圈 高并发 总目录 】 |
SpringCloud 精彩博文 | |
---|---|
nacos 实战(史上最全) | sentinel (史上最全+入门教程) |
SpringCloud gateway (史上最全) | 更多专题, 请参见【 疯狂创客圈 高并发 总目录 】 |
在单体的应用开发场景中,在多线程的环境下,涉及并发同步的时候,为了保证一个代码块在同一时间只能由一个线程访问,咱们通常能够使用synchronized语法和ReetrantLock去保证,这其实是本地锁的方式。html
也就是说,在同一个JVM内部,你们每每采用synchronized或者Lock的方式来解决多线程间的安全问题。但在分布式集群工做的开发场景中,在JVM之间,那么就须要一种更加高级的锁机制,来处理种跨JVM进程之间的线程安全问题.java
解决方案是:使用分布式锁node
总之,对于分布式场景,咱们能够使用分布式锁,它是控制分布式系统之间互斥访问共享资源的一种方式。mysql
好比说在一个分布式系统中,多台机器上部署了多个服务,当客户端一个用户发起一个数据插入请求时,若是没有分布式锁机制保证,那么那多台机器上的多个服务可能进行并发插入操做,致使数据重复插入,对于某些不容许有多余数据的业务来讲,这就会形成问题。而分布式锁机制就是为了解决相似这类问题,保证多个服务之间互斥的访问共享资源,若是一个服务抢占了分布式锁,其余服务没获取到锁,就不进行后续操做。git
大体意思以下图所示(不必定准确):程序员
何为分布式锁?github
分布式锁的条件:面试
分布式锁的实现:redis
分布式锁的实现由不少种,文件锁、数据库、redis等等,比较多;分布式锁常见的多种实现方式:
在实践中,仍是redis作分布式锁性能会高一些
所谓悲观锁,悲观锁是对数据被的修改持悲观态度(认为数据在被修改的时候必定会存在并发问题),所以在整个数据处理过程当中将数据锁定。
悲观锁的实现,每每依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,不然,即便在应用层中实现了加锁机制,也没法保证外部系统不会修改数据)。
数据库的行锁、表锁、排他锁等都是悲观锁,这里以行锁为例,进行介绍。以咱们经常使用的MySQL为例,咱们经过使用select...for update语句, 执行该语句后,会在表上加持行锁,一直到事务提交,解除行锁。
使用场景举例:
在秒杀案例中,生成订单和扣减库存的操做,能够经过商品记录的行锁,进行保护。们经过使用select...for update语句,在查询商品表库存时将该条记录加锁,待下单减库存完成后,再释放锁。
示例的SQL以下:
//0.开始事务 begin; //1.查询出商品信息 select stockCount from seckill_good where id=1 for update; //2.根据商品信息生成订单 insert into seckill_order (id,good_id) values (null,1); //3.修改商品stockCount减一 update seckill_good set stockCount=stockCount-1 where id=1; //4.提交事务 commit;
以上,在对id = 1的记录修改前,先经过for update的方式进行加锁,而后再进行修改。这就是比较典型的悲观锁策略。
若是以上修改库存的代码发生并发,同一时间只有一个线程能够开启事务并得到id=1的锁,其它的事务必须等本次事务提交以后才能执行。这样咱们能够保证当前的数据不会被其它事务修改。
咱们使用select_for_update,另一定要写在事务中.
注意:要使用悲观锁,咱们必须关闭mysql数据库中自动提交的属性,命令set autocommit=0;便可关闭,由于MySQL默认使用autocommit模式,也就是说,当你执行一个更新操做后,MySQL会马上将结果进行提交。
悲观锁的实现,每每依靠数据库提供的锁机制。在数据库中,悲观锁的流程以下:
使用乐观锁就不须要借助数据库的锁机制了。
乐观锁的概念中其实已经阐述了他的具体实现细节:主要就是两个步骤:冲突检测和数据更新。其实现方式有一种比较典型的就是Compare and Swap(CAS)技术。
CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知此次竞争中失败,并能够再次尝试。
CAS的实现中,在表中增长一个version字段,操做前先查询version信息,在数据提交时检查version字段是否被修改,若是没有被修改则进行提交,不然认为是过时数据。
好比前面的扣减库存问题,经过乐观锁能够实现以下:
//1.查询出商品信息 select stockCount, version from seckill_good where id=1; //2.根据商品信息生成订单 insert into seckill_order (id,good_id) values (null,1); //3.修改商品库存 update seckill_good set stockCount=stockCount-1, version = version+1 where id=1, version=version;
以上,咱们在更新以前,先查询一下库存表中当前版本(version),而后在作update的时候,以version 做为一个修改条件。
当咱们提交更新的时候,判断数据库表对应记录的当前version与第一次取出来的version进行比对,若是数据库表当前version与第一次取出来的version相等,则予以更新,不然认为是过时数据。
CAS 乐观锁有两个问题:
(1) CAS 存在一个比较重要的问题,即ABA问题. 解决的办法是version字段顺序递增。
(2) 乐观锁的方式,在高并发时,只有一个线程能执行成功,会形成大量的失败,这给用户的体验显然是很很差的。
除了在数据库层面加分布式锁,一般还能够使用如下更高性能、更高可用的分布式锁:
有关zookeeper分布式锁的原理和实现,具体请参见下面的博客:
Zookeeper 分布式锁 (图解+秒懂+史上最全)
或者阅读笔者的《Java高并发核心编程(卷1)》
本文重点介绍Redis分布式锁,分为两个维度进行介绍:
(1)基于Jedis手工造轮子分布式锁
(2)介绍Redission 分布式锁的使用和原理。
咱们首先讲解 Jedis 普通分布式锁实现,而且是纯手工的模式,从最为基础的Redis命令开始。
只有充分了解与分布式锁相关的普通Redis命令,才能更好的了解高级的Redis分布式锁的实现,由于高级的分布式锁的实现彻底基于普通Redis命令。
Redis发展到如今,几种常见的部署架构有:
从分布式锁的角度来讲, 不管是单机模式、主从模式、哨兵模式、集群模式,其原理都是类同的。 只是主从模式、哨兵模式、集群模式的更加的高可用、或者更加高并发。
因此,接下来先基于单机模式,基于Jedis手工造轮子实现本身的分布式锁。
Redis分布式锁机制,主要借助setnx和expire两个命令完成。
setnx命令:
SETNX 是SET if Not eXists的简写。将 key 的值设为 value,当且仅当 key 不存在; 若给定的 key 已经存在,则 SETNX 不作任何动做。
下面为客户端使用示例:
127.0.0.1:6379> set lock "unlock" OK 127.0.0.1:6379> setnx lock "unlock" (integer) 0 127.0.0.1:6379> setnx lock "lock" (integer) 0 127.0.0.1:6379>
expire命令:
expire命令为 key 设置生存时间,当 key 过时时(生存时间为 0 ),它会被自动删除. 其格式为:
EXPIRE key seconds
下面为客户端使用示例:
127.0.0.1:6379> expire lock 10 (integer) 1 127.0.0.1:6379> ttl lock 8
经过Redis的setnx、expire命令能够实现简单的锁机制:
线程调用setnx方法成功返回1认为加锁成功,其余线程要等到当前线程业务操做完成释放锁后,才能再次调用setnx加锁成功。
以上简单redis分布式锁的问题:
若是出现了这么一个问题:若是setnx
是成功的,可是expire
设置失败,一旦出现了释放锁失败,或者没有手工释放,那么这个锁永远被占用,其余线程永远也抢不到锁。
因此,须要保障setnx和expire两个操做的原子性,要么所有执行,要么所有不执行,两者不能分开。
解决的办法有两种:
使用set的命令时,同时设置过时时间的示例以下:
127.0.0.1:6379> set unlock "234" EX 100 NX (nil) 127.0.0.1:6379> 127.0.0.1:6379> set test "111" EX 100 NX OK
这样就完美的解决了分布式锁的原子性; set 命令的完整格式:
set key value [EX seconds] [PX milliseconds] [NX|XX]
EX seconds:设置失效时长,单位秒 PX milliseconds:设置失效时长,单位毫秒 NX:key不存在时设置value,成功返回OK,失败返回(nil) XX:key存在时设置value,成功返回OK,失败返回(nil)
使用set命令实现加锁操做,先展现加锁的简单代码实习,再带你们慢慢解释为何这样实现。
加锁的简单代码实现
package com.crazymaker.springcloud.standard.lock; @Slf4j @Data @AllArgsConstructor public class JedisCommandLock { private RedisTemplate redisTemplate; private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; /** * 尝试获取分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } }
能够看到,咱们加锁用到了Jedis的set Api:
jedis.set(String key, String value, String nxxx, String expx, int time)
这个set()方法一共有五个形参:
第一个为key,咱们使用key来当锁,由于key是惟一的。
第二个为value,咱们传的是requestId,不少童鞋可能不明白,有key做为锁不就够了吗,为何还要用到value?缘由就是咱们在上面讲到可靠性时,分布式锁要知足第四个条件解铃还须系铃人,经过给value赋值为requestId,咱们就知道这把锁是哪一个请求加的了,在解锁的时候就能够有依据。
requestId能够使用
UUID.randomUUID().toString()
方法生成。
第三个为nxxx,这个参数咱们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,咱们进行set操做;若key已经存在,则不作任何操做;
第四个为expx,这个参数咱们传的是PX,意思是咱们要给这个key加一个过时的设置,具体时间由第五个参数决定。
第五个为time,与第四个参数相呼应,表明key的过时时间。
总的来讲,执行上面的set()方法就只会致使两种结果:
- 当前没有锁(key不存在),那么就进行加锁操做,并对锁设置个有效期,同时value表示加锁的客户端。
- 已有锁存在,不作任何操做。
心细的童鞋就会发现了,咱们的加锁代码知足前面描述的四个条件中的三个。
首先,set()加入了NX参数,能够保证若是已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,知足互斥性。
其次,因为咱们对锁设置了过时时间,即便锁的持有者后续发生崩溃而没有解锁,锁也会由于到了过时时间而自动解锁(即key被删除),不会被永远占用(而发生死锁)。
最后,由于咱们将value赋值为requestId,表明加锁的客户端请求标识,那么在客户端在解锁的时候就能够进行校验是不是同一个客户端。
因为咱们只考虑Redis单机部署的场景,因此容错性咱们暂不考虑。
仍是先展现代码,再带你们慢慢解释为何这样实现。
解锁的简单代码实现:
package com.crazymaker.springcloud.standard.lock; @Slf4j @Data @AllArgsConstructor public class JedisCommandLock { private static final Long RELEASE_SUCCESS = 1L; /** * 释放分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } }
那么这段Lua代码的功能是什么呢?
其实很简单,首先获取锁对应的value值,检查是否与requestId相等,若是相等则删除锁(解锁)。
第一行代码,咱们写了一个简单的Lua脚本代码。
第二行代码,咱们将Lua代码传到jedis.eval()
方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给Redis服务端执行。
那么为何要使用Lua语言来实现呢?
由于要确保上述操做是原子性的。那么为何执行eval()方法能够确保原子性,源于Redis的特性.
简单来讲,就是在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,而且直到eval命令执行完成,Redis才会执行其余命
错误示例1
最多见的解锁代码就是直接使用 jedis.del() 方法删除锁,这种不先判断锁的拥有者而直接解锁的方式,会致使任何客户端均可以随时进行解锁,即便这把锁不是它的。
public static void wrongReleaseLock1(Jedis jedis, String lockKey) { jedis.del(lockKey); }
错误示例2
这种解锁代码乍一看也是没问题,甚至我以前也差点这样实现,与正确姿式差很少,惟一区别的是分红两条命令去执行,代码以下:
public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) { // 判断加锁与解锁是否是同一个客户端 if (requestId.equals(jedis.get(lockKey))) { // 若在此时,这把锁忽然不是这个客户端的,则会误解锁 jedis.del(lockKey); } }
前面提到,在redis中执行lua脚本,有以下的好处:
那么为何要使用Lua语言来实现呢?
由于要确保上述操做是原子性的。那么为何执行eval()方法能够确保原子性,源于Redis的特性.
简单来讲,就是在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,而且直到eval命令执行完成,Redis才会执行其余命
因此:
大部分的开源框架(如 redission)中的分布式锁组件,都是用纯lua脚本实现的。
题外话: lua脚本是高并发、高性能的必备脚本语言
有关lua的详细介绍,请参见如下书籍:
那么,咱们也来模拟一下
加锁和删除锁的操做,使用纯lua进行封装,保障其执行时候的原子性。
基于纯Lua脚本实现分布式锁的执行流程,大体以下:
--- -1 failed --- 1 success --- local key = KEYS[1] local requestId = KEYS[2] local ttl = tonumber(KEYS[3]) local result = redis.call('setnx', key, requestId) if result == 1 then --PEXPIRE:以毫秒的形式指定过时时间 redis.call('pexpire', key, ttl) else result = -1; -- 若是value相同,则认为是同一个线程的请求,则认为重入锁 local value = redis.call('get', key) if (value == requestId) then result = 1; redis.call('pexpire', key, ttl) end end -- 若是获取锁成功,则返回 1 return result
--- -1 failed --- 1 success -- unlock key local key = KEYS[1] local requestId = KEYS[2] local value = redis.call('get', key) if value == requestId then redis.call('del', key); return 1; end return -1
下一步,实现Lock接口, 完成JedisLock的分布式锁。
其加锁操做,经过调用 lock.lua脚本完成,代码以下:
package com.crazymaker.springcloud.standard.lock; import com.crazymaker.springcloud.common.exception.BusinessException; import com.crazymaker.springcloud.common.util.ThreadUtil; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @Slf4j @Data @AllArgsConstructor public class JedisLock implements Lock { private RedisTemplate redisTemplate; RedisScript<Long> lockScript = null; RedisScript<Long> unLockScript = null; public static final int DEFAULT_TIMEOUT = 2000; public static final Long LOCKED = Long.valueOf(1); public static final Long UNLOCKED = Long.valueOf(1); public static final Long WAIT_GAT = Long.valueOf(200); public static final int EXPIRE = 2000; String key; String lockValue; // lockValue 锁的value ,表明线程的uuid /** * 默认为2000ms */ long expire = 2000L; public JedisLock(String lockKey, String lockValue) { this.key = lockKey; this.lockValue = lockValue; } private volatile boolean isLocked = false; private Thread thread; /** * 获取一个分布式锁 , 超时则返回失败 * * @return 获锁成功 - true | 获锁失败 - false */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { //本地可重入 if (isLocked && thread == Thread.currentThread()) { return true; } expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT; long startMillis = System.currentTimeMillis(); Long millisToWait = expire; boolean localLocked = false; int turn = 1; while (!localLocked) { localLocked = this.lockInner(expire); if (!localLocked) { millisToWait = millisToWait - (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if (millisToWait > 0L) { /** * 尚未超时 */ ThreadUtil.sleepMilliSeconds(WAIT_GAT); log.info("睡眠一下,从新开始,turn:{},剩余时间:{}", turn++, millisToWait); } else { log.info("抢锁超时"); return false; } } else { isLocked = true; localLocked = true; } } return isLocked; } /** * 有返回值的抢夺锁 * * @param millisToWait */ public boolean lockInner(Long millisToWait) { if (null == key) { return false; } try { List<String> redisKeys = new ArrayList<>(); redisKeys.add(key); redisKeys.add(lockValue); redisKeys.add(String.valueOf(millisToWait)); Long res = (Long) redisTemplate.execute(lockScript, redisKeys); return res != null && res.equals(LOCKED); } catch (Exception e) { e.printStackTrace(); throw BusinessException.builder().errMsg("抢锁失败").build(); } } }
其解锁操做,经过调用unlock.lua脚本完成,代码以下:
package com.crazymaker.springcloud.standard.lock; import com.crazymaker.springcloud.common.exception.BusinessException; import com.crazymaker.springcloud.common.util.ThreadUtil; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @Slf4j @Data @AllArgsConstructor public class JedisLock implements Lock { private RedisTemplate redisTemplate; RedisScript<Long> lockScript = null; RedisScript<Long> unLockScript = null; //释放锁 @Override public void unlock() { if (key == null || requestId == null) { return; } try { List<String> redisKeys = new ArrayList<>(); redisKeys.add(key); redisKeys.add(requestId); Long res = (Long) redisTemplate.execute(unLockScript, redisKeys); } catch (Exception e) { e.printStackTrace(); throw BusinessException.builder().errMsg("释放锁失败").build(); } } }
编写个分布式锁服务,用于加载lua脚本,建立 分布式锁,代码以下:
package com.crazymaker.springcloud.standard.lock; import com.crazymaker.springcloud.common.util.IOUtil; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @Slf4j @Data public class RedisLockService { private RedisTemplate redisTemplate; static String lockLua = "script/lock.lua"; static String unLockLua = "script/unlock.lua"; static RedisScript<Long> lockScript = null; static RedisScript<Long> unLockScript = null; { String script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),lockLua); // String script = FileUtil.readString(lockLua, Charset.forName("UTF-8" )); if(StringUtils.isEmpty(script)) { log.error("lua load failed:"+lockLua); } lockScript = new DefaultRedisScript<>(script, Long.class); // script = FileUtil.readString(unLockLua, Charset.forName("UTF-8" )); script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),unLockLua); if(StringUtils.isEmpty(script)) { log.error("lua load failed:"+unLockLua); } unLockScript = new DefaultRedisScript<>(script, Long.class); } public RedisLockService(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } public Lock getLock(String lockKey, String lockValue) { JedisLock lock=new JedisLock(lockKey,lockValue); lock.setRedisTemplate(redisTemplate); lock.setLockScript(lockScript); lock.setUnLockScript(unLockScript); return lock; } }
接下来,终于能够上测试用例了
package com.crazymaker.springcloud.lock; @Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes = {DemoCloudApplication.class}) // 指定启动类 public class RedisLockTest { @Resource RedisLockService redisLockService; private ExecutorService pool = Executors.newFixedThreadPool(10); @Test public void testLock() { int threads = 10; final int[] count = {0}; CountDownLatch countDownLatch = new CountDownLatch(threads); long start = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { pool.submit(() -> { String lockValue = UUID.randomUUID().toString(); try { Lock lock = redisLockService.getLock("test:lock:1", lockValue); boolean locked = lock.tryLock(10, TimeUnit.SECONDS); if (locked) { for (int j = 0; j < 1000; j++) { count[0]++; } log.info("count = " + count[0]); lock.unlock(); } else { System.out.println("抢锁失败"); } } catch (Exception e) { e.printStackTrace(); } countDownLatch.countDown(); }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("10个线程每一个累加1000为: = " + count[0]); //输出统计结果 float time = System.currentTimeMillis() - start; System.out.println("运行的时长为(ms):" + time); System.out.println("每一次执行的时长为(ms):" + time / count[0]); } }
执行用例,结果以下:
2021-05-04 23:02:11.900 INFO 22120 --- [pool-1-thread-7] c.c.springcloud.lock.RedisLockTest LN:50 count = 6000 2021-05-04 23:02:11.901 INFO 22120 --- [pool-1-thread-1] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,从新开始,turn:3,剩余时间:9585 2021-05-04 23:02:11.902 INFO 22120 --- [pool-1-thread-1] c.c.springcloud.lock.RedisLockTest LN:50 count = 7000 2021-05-04 23:02:12.100 INFO 22120 --- [pool-1-thread-4] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,从新开始,turn:3,剩余时间:9586 2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,从新开始,turn:3,剩余时间:9585 2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-8] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,从新开始,turn:3,剩余时间:9585 2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-4] c.c.springcloud.lock.RedisLockTest LN:50 count = 8000 2021-05-04 23:02:12.102 INFO 22120 --- [pool-1-thread-8] c.c.springcloud.lock.RedisLockTest LN:50 count = 9000 2021-05-04 23:02:12.304 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,从新开始,turn:4,剩余时间:9383 2021-05-04 23:02:12.307 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.lock.RedisLockTest LN:50 count = 10000 10个线程每一个累加1000为: = 10000 运行的时长为(ms):827.0 每一次执行的时长为(ms):0.0827
下面有一个简单的使用锁的例子,在10秒内占着锁:
//写数据到文件 function writeData(filename, data) { boolean locked = lock.tryLock(10, TimeUnit.SECONDS); if (!locked) { throw 'Failed to acquire lock'; } try { //将数据写到文件 var file = storage.readFile(filename); var updated = updateContents(file, data); storage.writeFile(filename, updated); } finally { lock.unlock(); } }
问题是:若是在写文件过程当中,发生了 fullGC,而且其时间跨度较长, 超过了10秒, 那么,分布式就自动释放了。
在此过程当中,client2 抢到锁,写了文件。
client1 的fullGC完成后,也继续写文件,注意,此时client1 的并无占用锁,此时写入会致使文件数据错乱,发生线程安全问题。
这就是STW致使的锁过时问题。
STW致使的锁过时问题,具体以下图所示:
STW致使的锁过时问题,大概的解决方案,有:
1: 模拟CAS乐观锁的方式,增长版本号
2:watch dog自动延期机制
1: 模拟CAS乐观锁的方式,增长版本号(以下图中的token)
此方案若是要实现,须要调整业务逻辑,与之配合,因此会入侵代码。
2:watch dog自动延期机制
客户端1加锁的锁key默认生存时间才30秒,若是超过了30秒,客户端1还想一直持有这把锁,怎么办呢?
简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,若是客户端1还持有锁key,那么就会不断的延长锁key的生存时间。
redission,采用的就是这种方案, 此方案不会入侵业务代码。
做为 Java 开发人员,咱们若想在程序中集成 Redis,必须使用 Redis 的第三方库。目前你们使用的最多的第三方库是jedis。
和SpringCloud gateway同样,Redisson也是基于Netty实现的,是更高性能的第三方库。 因此,这里推荐你们使用Redission替代 jedis。
在使用Redission以前,建议你们先掌握Netty的知识。
推荐你们阅读被不少小伙伴评价为史上最为易懂的NIO、Netty书籍:《Java高并发核心编程(卷1)》
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不只提供了一系列的分布式的Java经常使用对象,还实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等,还提供了许多分布式服务。
Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者可以将精力更集中地放在处理业务逻辑上。
1.概况对比
Jedis是Redis的java实现的客户端,其API提供了比较全面的的Redis命令的支持,Redisson实现了分布式和可扩展的的java数据结构,和Jedis相比,功能较为简单,不支持字符串操做,不支持排序,事物,管道,分区等Redis特性。Redisson的宗旨是促进使用者对Redis的关注分离,从而让使用者可以将精力更集中的放在处理业务逻辑上。
2.可伸缩性
Jedis使用阻塞的I/O,且其方法调用都是同步的,程序流程要等到sockets处理完I/O才能执行,不支持异步,Jedis客户端实例不是线程安全的,因此须要经过链接池来使用Jedis。
Redisson使用非阻塞的I/O和基于Netty框架的事件驱动的通讯层,其方法调用时异步的。Redisson的API是线程安全的,因此操做单个Redisson链接来完成各类操做。
3.第三方框架整合
Redisson在Redis的基础上实现了java缓存标准规范;Redisson还提供了Spring Session回话管理器的实现。
github: https://github.com/redisson/redisson#quick-start
支持 Redis 单节点(single)模式、哨兵(sentinel)模式、主从(Master/Slave)模式以及集群(Redis Cluster)模式
程序接口调用方式采用异步执行和异步流执行两种方式
数据序列化,Redisson 的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在 Redis 里的读取和存储
单个集合数据分片,在集群模式下,Redisson 为单个 Redis 集合类型提供了自动分片的功能
提供多种分布式对象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等
提供丰富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等
分布式锁和同步器的实现,可重入锁(Reentrant Lock),公平锁(Fair Lock),联锁(MultiLock),红锁(Red Lock),信号量(Semaphonre),可过时性信号锁(PermitExpirableSemaphore)等
提供先进的分布式服务,如分布式远程服务(Remote Service),分布式实时对象(Live Object)服务,分布式执行服务(Executor Service),分布式调度任务服务(Schedule Service)和分布式映射概括服务(MapReduce)
安装 Redisson 最便捷的方法是使用 Maven 或者 Gradle:
•Maven
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.4</version> </dependency>
•Gradle
compile group: 'org.redisson', name: 'redisson', version: '3.11.4'
目前 Redisson 最新版是 3.11.4,固然你也能够经过搜索 Maven 中央仓库 mvnrepository[1] 来找到 Redisson 的各类版本。
RedissonClient有多种模式,主要的模式有:
单节点模式
哨兵模式
主从模式
集群模式
首先介绍单节点模式。
单节点模式的程序化配置方法,大体以下:
Config config = new Config(); config.useSingleServer().setAddress("redis://myredisserver:6379"); RedissonClient redisson = Redisson.create(config);xxxxxxxxxx Config config = new Config();config.useSingleServer().setAddress("redis://myredisserver:6379");RedissonClient redisson = Redisson.create(config);// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();
SingleServerConfig singleConfig = config.useSingleServer();
SingleServerConfig类的设置参数以下:
address(节点地址)
能够经过
host:port
的格式来指定节点地址。subscriptionConnectionMinimumIdleSize(发布和订阅链接的最小空闲链接数)
默认值:
1
用于发布和订阅链接的最小保持链接数(长链接)。Redisson内部常常经过发布和订阅来实现许多功能。长期保持必定数量的发布订阅链接是必须的。
subscriptionConnectionPoolSize(发布和订阅链接池大小)
默认值:
50
用于发布和订阅链接的链接池最大容量。链接池的链接数量自动弹性伸缩。
connectionMinimumIdleSize(最小空闲链接数)
默认值:
32
最小保持链接数(长链接)。长期保持必定数量的链接有利于提升瞬时写入反应速度。
connectionPoolSize(链接池大小)
默认值:
64
链接池最大容量。链接池的链接数量自动弹性伸缩。
dnsMonitoring(是否启用DNS监测)
默认值:
false
在启用该功能之后,Redisson将会监测DNS的变化状况。
dnsMonitoringInterval(DNS监测时间间隔,单位:毫秒)
默认值:
5000
监测DNS的变化状况的时间间隔。
idleConnectionTimeout(链接空闲超时,单位:毫秒)
默认值:
10000
若是当前链接池里的链接数量超过了最小空闲链接数,而同时有链接空闲时间超过了该数值,那么这些链接将会自动被关闭,并从链接池里去掉。时间单位是毫秒。
connectTimeout(链接超时,单位:毫秒)
默认值:
10000
同节点创建链接时的等待超时。时间单位是毫秒。
timeout(命令等待超时,单位:毫秒)
默认值:
3000
等待节点回复命令的时间。该时间从命令发送成功时开始计时。
retryAttempts(命令失败重试次数)
默认值:
3
若是尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。若是尝试在此限制以内发送成功,则开始启用 timeout(命令等待超时) 计时。
retryInterval(命令重试发送时间间隔,单位:毫秒)
默认值:
1500
在一条命令发送失败之后,等待重试发送的时间间隔。时间单位是毫秒。
reconnectionTimeout(从新链接时间间隔,单位:毫秒)
默认值:
3000
当与某个节点的链接断开时,等待与其从新创建链接的时间间隔。时间单位是毫秒。
failedAttempts(执行失败最大次数)
默认值:
3
在某个节点执行相同或不一样命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(从新链接时间间隔) 超时之后再次尝试。
database(数据库编号)
默认值:
0
尝试链接的数据库编号。
password(密码)
默认值:
null
用于节点身份验证的密码。
subscriptionsPerConnection(单个链接最大订阅数量)
默认值:
5
每一个链接的最大订阅数量。
clientName(客户端名称)
默认值:
null
在Redis节点里显示的客户端名称。
sslEnableEndpointIdentification(启用SSL终端识别)
默认值:
true
开启SSL终端识别能力。
sslProvider(SSL实现方式)
默认值:
JDK
肯定采用哪一种方式(JDK或OPENSSL)来实现SSL链接。
sslTruststore(SSL信任证书库路径)
默认值:
null
指定SSL信任证书库的路径。
sslTruststorePassword(SSL信任证书库密码)
默认值:
null
指定SSL信任证书库的密码。
sslKeystore(SSL钥匙库路径)
默认值:
null
指定SSL钥匙库的路径。
sslKeystorePassword(SSL钥匙库密码)
默认值:
null
指定SSL钥匙库的密码。
Redisson有多种模式,首先介绍单机模式的整合。
<!-- redisson-springboot --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.11.4</version> </dependency>
spring: redis: host: 127.0.0.1 port: 6379 database: 0 timeout: 5000
RedissonConfig.java
import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RedissonConfig { @Autowired private RedisProperties redisProperties; @Bean public RedissonClient redissonClient() { Config config = new Config(); String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + ""); config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword()); config.useSingleServer().setDatabase(3); return Redisson.create(config); } }
因为redission能够有多种模式,处于学习的目的,将多种模式封装成一个start,能够学习一下starter的制做。
封装一个RedissonManager,经过策略模式,根据不一样的配置类型,建立 RedissionConfig实例,而后建立RedissionClient对象。
Redission模拟了Java的面向对象编程思想,能够简单理解为一切皆为对象。
每个 Redisson 对象 实现了RObject and RExpirable 两个interfaces.
Usage example:
RObject object = redisson.get...() object.sizeInMemory(); object.delete(); object.rename("newname"); object.isExists(); // catch expired event object.addListener(new ExpiredObjectListener() { ... }); // catch delete event object.addListener(new DeletedObjectListener() { ... });
每个Redisson 对象的名字,就是 Redis中的 Key.
RMap map = redisson.getMap("mymap"); map.getName(); // = mymap
能够经过 RKeys 接口操做Redis中的keys.
Usage example:
RKeys keys = redisson.getKeys(); Iterable<String> allKeys = keys.getKeys(); Iterable<String> foundedKeys = keys.getKeysByPattern('key*'); long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3"); long deletedKeysAmount = keys.deleteByPattern("test?"); String randomKey = keys.randomKey(); long keysAmount = keys.count(); keys.flushall(); keys.flushdb();
Redisson经过RBucket接口表明能够访问任何类型的基础对象,或者普通对象。
RBucket有一系列的工具方法,如compareAndSet(),get(),getAndDelete(),getAndSet(),set(),size(),trySet()等等,用于设值/取值/获取尺寸。
RBucket普通对象的最大大小,为512兆字节。
RBucket<AnyObject> bucket = redisson.getBucket("anyObject"); bucket.set(new AnyObject(1)); AnyObject obj = bucket.get(); bucket.trySet(new AnyObject(3)); bucket.compareAndSet(new AnyObject(4), new AnyObject(5)); bucket.getAndSet(new AnyObject(6));
下面是一个完整的实例:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testRBucketExamples() { // 默认链接上 127.0.0.1:6379 RedissonClient client = redissonManager.getRedisson(); // RList 继承了 java.util.List 接口 RBucket<String> rstring = client.getBucket("redission:test:bucket:string"); rstring.set("this is a string"); RBucket<UserDTO> ruser = client.getBucket("redission:test:bucket:user"); UserDTO dto = new UserDTO(); dto.setToken(UUID.randomUUID().toString()); ruser.set(dto); System.out.println("string is: " + rstring.get()); System.out.println("dto is: " + ruser.get()); client.shutdown(); } }
运行上面的代码时,能够得到如下输出:
string is: this is a string dto is: UserDTO(id=null, userId=null, username=null, password=null, nickname=null, token=183b6eeb-65a8-4b2a-80c6-cf17c08332ce, createTime=null, updateTime=null, headImgUrl=null, mobile=null, sex=null, enabled=null, type=null, openId=null, isDel=false)
下面的代码简单演示了如何在 Redisson 中使用 RList
对象。RList
是 Java 的 List 集合的分布式并发实现。
考虑如下代码:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testListExamples() { // 默认链接上 127.0.0.1:6379 RedissonClient client = redissonManager.getRedisson(); // RList 继承了 java.util.List 接口 RList<String> nameList = client.getList("redission:test:nameList"); nameList.clear(); nameList.add("张三"); nameList.add("李四"); nameList.add("王五"); nameList.remove(-1); System.out.println("List size: " + nameList.size()); boolean contains = nameList.contains("李四"); System.out.println("Is list contains name '李四': " + contains); nameList.forEach(System.out::println); client.shutdown(); } }
运行上面的代码时,能够得到如下输出:
List size: 2 Is list contains name '李四': true 张三 李四
Redisson 还包括 RMap,它是 Java Map 集合的分布式并发实现,考虑如下代码:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testListExamples() { // 默认链接上 127.0.0.1:6379 RedissonClient client = redissonManager.getRedisson(); // RMap 继承了 java.util.concurrent.ConcurrentMap 接口 RMap<String, Object> map = client.getMap("redission:test:personalMap"); map.put("name", "张三"); map.put("address", "北京"); map.put("age", new Integer(50)); System.out.println("Map size: " + map.size()); boolean contains = map.containsKey("age"); System.out.println("Is map contains key 'age': " + contains); String value = String.valueOf(map.get("name")); System.out.println("Value mapped by key 'name': " + value); client.shutdown(); } }
运行上面的代码时,将会看到如下输出:
Map size: 3 Is map contains key 'age': true Value mapped by key 'name': 张三
Lua是一种开源、简单易学、轻量小巧的脚本语言,用标准C语言编写。
其设计的目的就是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
Redis从2.6版本开始支持Lua脚本,Redis使用Lua能够:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testLuaExamples() { // 默认链接上 127.0.0.1:6379 RedissonClient redisson = redissonManager.getRedisson(); redisson.getBucket("redission:test:foo").set("bar"); String r = redisson.getScript().eval(RScript.Mode.READ_ONLY, "return redis.call('get', 'redission:test:foo')", RScript.ReturnType.VALUE); System.out.println("foo: " + r); // 经过预存的脚本进行一样的操做 RScript s = redisson.getScript(); // 首先将脚本加载到Redis String sha1 = s.scriptLoad("return redis.call('get', 'redission:test:foo')"); // 返回值 res == 282297a0228f48cd3fc6a55de6316f31422f5d17 System.out.println("sha1: " + sha1); // 再经过SHA值调用脚本 Future<Object> r1 = redisson.getScript().evalShaAsync(RScript.Mode.READ_ONLY, sha1, RScript.ReturnType.VALUE, Collections.emptyList()); try { System.out.println("res: " + r1.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } client.shutdown(); } }
运行上面的代码时,将会看到如下输出:
foo: bar sha1: 282297a0228f48cd3fc6a55de6316f31422f5d17 res: bar
RLock 是 Java 中可重入锁的分布式实现,下面的代码演示了 RLock 的用法:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testLockExamples() { // 默认链接上 127.0.0.1:6379 RedissonClient redisson = redissonManager.getRedisson(); // RLock 继承了 java.util.concurrent.locks.Lock 接口 RLock lock = redisson.getLock("redission:test:lock:1"); final int[] count = {0}; int threads = 10; ExecutorService pool = Executors.newFixedThreadPool(10); CountDownLatch countDownLatch = new CountDownLatch(threads); long start = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { pool.submit(() -> { for (int j = 0; j < 1000; j++) { lock.lock(); count[0]++; lock.unlock(); } countDownLatch.countDown(); }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("10个线程每一个累加1000为: = " + count[0]); //输出统计结果 float time = System.currentTimeMillis() - start; System.out.println("运行的时长为:" + time); System.out.println("每一次执行的时长为:" + time/count[0]); } }
此代码将产生如下输出:
10个线程每一个累加1000为: = 10000 运行的时长为:14172.0 每一次执行的时长为:1.4172
RAtomicLong 是 Java 中 AtomicLong 类的分布式“替代品”,用于在并发环境中保存长值。如下示例代码演示了 RAtomicLong 的用法:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testRAtomicLongExamples() { // 默认链接上 127.0.0.1:6379 RedissonClient redisson = redissonManager.getRedisson(); RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong"); // 线程数 final int threads = 10; // 每条线程的执行轮数 final int turns = 1000; ExecutorService pool = Executors.newFixedThreadPool(threads); for (int i = 0; i < threads; i++) { pool.submit(() -> { try { for (int j = 0; j < turns; j++) { atomicLong.incrementAndGet(); } } catch (Exception e) { e.printStackTrace(); } }); } ThreadUtil.sleepSeconds(5); System.out.println("atomicLong: " + atomicLong.get()); redisson.shutdown(); } }
此代码的输出将是:
atomicLong: 10000
基于Redis的Redisson分布式整长型累加器(LongAdder)采用了与java.util.concurrent.atomic.LongAdder相似的接口。经过利用客户端内置的LongAdder对象,为分布式环境下递增和递减操做提供了很高得性能。据统计其性能最高比分布式AtomicLong
对象快 12000 倍。
完美适用于分布式统计计量场景。下面是RLongAdder的使用案例:
RLongAdder atomicLong = redisson.getLongAdder("myLongAdder"); atomicLong.add(12); atomicLong.increment(); atomicLong.decrement(); atomicLong.sum();
如下示例代码演示了 RLongAdder 的用法:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testRAtomicLongExamples() { // 默认链接上 127.0.0.1:6379 RedissonClient redisson = redissonManager.getRedisson(); RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong"); // 线程数 final int threads = 10; // 每条线程的执行轮数 final int turns = 1000; ExecutorService pool = Executors.newFixedThreadPool(threads); for (int i = 0; i < threads; i++) { pool.submit(() -> { try { for (int j = 0; j < turns; j++) { atomicLong.incrementAndGet(); } } catch (Exception e) { e.printStackTrace(); } }); } ThreadUtil.sleepSeconds(5); System.out.println("atomicLong: " + atomicLong.get()); redisson.shutdown(); } }
此代码将产生如下输出:
longAdder: 10000 运行的时长为:5085.0 每一次执行的时长为:0.5085
当再也不使用整长型累加器对象的时候应该自行手动销毁,若是Redisson对象被关闭(shutdown)了,则不用手动销毁。
RLongAdder atomicLong = ... atomicLong.destroy();
Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。Redisson提供了如下几种的对象编码应用,以供你们选择:
编码类名称 | 说明 |
---|---|
org.redisson.codec.JsonJacksonCodec |
Jackson JSON 编码 默认编码 |
org.redisson.codec.AvroJacksonCodec |
Avro 一个二进制的JSON编码 |
org.redisson.codec.SmileJacksonCodec |
Smile 另外一个二进制的JSON编码 |
org.redisson.codec.CborJacksonCodec |
CBOR 又一个二进制的JSON编码 |
org.redisson.codec.MsgPackJacksonCodec |
MsgPack 再来一个二进制的JSON编码 |
org.redisson.codec.IonJacksonCodec |
Amazon Ion 亚马逊的Ion编码,格式与JSON相似 |
org.redisson.codec.KryoCodec |
Kryo 二进制对象序列化编码 |
org.redisson.codec.SerializationCodec |
JDK序列化编码 |
org.redisson.codec.FstCodec |
FST 10倍于JDK序列化性能并且100%兼容的编码 |
org.redisson.codec.LZ4Codec |
LZ4 压缩型序列化对象编码 |
org.redisson.codec.SnappyCodec |
Snappy 另外一个压缩型序列化对象编码 |
org.redisson.client.codec.JsonJacksonMapCodec |
基于Jackson的映射类使用的编码。可用于避免序列化类的信息,以及用于解决使用byte[] 遇到的问题。 |
org.redisson.client.codec.StringCodec |
纯字符串编码(无转换) |
org.redisson.client.codec.LongCodec |
纯整长型数字编码(无转换) |
org.redisson.client.codec.ByteArrayCodec |
字节数组编码 |
org.redisson.codec.CompositeCodec |
用来组合多种不一样编码在一块儿 |
由Redisson默认的编码器为二进制编码器,为了序列化后的内容可见,须要使用Json文本序列化编码工具类。Redisson提供了编码器 JsonJacksonCodec,做为Json文本序列化编码工具类。
问题是:JsonJackson在序列化有双向引用的对象时,会出现无限循环异常。而fastjson在检查出双向引用后会自动用引用符$ref替换,终止循环。
因此,一些特殊场景中:用fastjson能 正常序列化到redis,而JsonJackson则抛出无限循环异常。
为了序列化后的内容可见,因此不用redission其余自带的,自行实现fastjson编码器:
package com.crayon.distributedredissionspringbootstarter.codec; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; import org.redisson.client.codec.BaseCodec; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; import java.io.IOException; public class FastjsonCodec extends BaseCodec { private final Encoder encoder = in -> { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); try { ByteBufOutputStream os = new ByteBufOutputStream(out); JSON.writeJSONString(os, in, SerializerFeature.WriteClassName); return os.buffer(); } catch (IOException e) { out.release(); throw e; } catch (Exception e) { out.release(); throw new IOException(e); } }; private final Decoder<Object> decoder = (buf, state) -> JSON.parseObject(new ByteBufInputStream(buf), Object.class); @Override public Decoder<Object> getValueDecoder() { return decoder; } @Override public Encoder getValueEncoder() { return encoder; } }
替换的方法以下:
*/ @Slf4j public class StandaloneConfigImpl implements RedissonConfigService { @Override public Config createRedissonConfig(RedissonConfig redissonConfig) { Config config = new Config(); try { String address = redissonConfig.getAddress(); String password = redissonConfig.getPassword(); int database = redissonConfig.getDatabase(); String redisAddr = GlobalConstant.REDIS_CONNECTION_PREFIX.getConstant_value() + address; config.useSingleServer().setAddress(redisAddr); config.useSingleServer().setDatabase(database); //密码能够为空 if (!StringUtils.isEmpty(password)) { config.useSingleServer().setPassword(password); } log.info("初始化[单机部署]方式Config,redisAddress:" + address); // config.setCodec( new FstCodec()); config.setCodec( new FastjsonCodec()); } catch (Exception e) { log.error("单机部署 Redisson init error", e); } return config; } }
哨兵模式即sentinel模式,配置Redis哨兵服务的官方文档在这里。
哨兵模式实现代码和单机模式几乎同样,惟一的不一样就是Config的构造.
程序化配置哨兵模式的方法以下:
Config config = new Config(); config.useSentinelServers() .setMasterName("mymaster") // use "rediss://" for SSL connection .addSentinelAddress("redis://127.0.0.1:26389", "redis://127.0.0.1:26379") .addSentinelAddress("redis://127.0.0.1:26319"); RedissonClient redisson = Redisson.create(config);
Redisson的哨兵模式的使用方法以下:
SentinelServersConfig sentinelConfig = config.useSentinelServers();
SentinelServersConfig配置参数以下:
配置Redis哨兵服务的官方文档在这里。Redisson的哨兵模式的使用方法以下:
SentinelServersConfig sentinelConfig = config.useSentinelServers();
SentinelServersConfig
类的设置参数以下:dnsMonitoringInterval(DNS监控间隔,单位:毫秒)
默认值:
5000
用来指定检查节点DNS变化的时间间隔。使用的时候应该确保JVM里的DNS数据的缓存时间保持在足够低的范围才有意义。用
-1
来禁用该功能。masterName(主服务器的名称)
主服务器的名称是哨兵进程中用来监测主从服务切换状况的。
addSentinelAddress(添加哨兵节点地址)
能够经过
host:port
的格式来指定哨兵节点的地址。多个节点能够一次性批量添加。readMode(读取操做的负载均衡模式)
默认值:
SLAVE
(只在从服务节点里读取)注:在从服务节点里读取的数听说明已经至少有两个节点保存了该数据,确保了数据的高可用性。
设置读取操做选择节点的模式。可用值为:
SLAVE
- 只在从服务节点里读取。MASTER
- 只在主服务节点里读取。MASTER_SLAVE
- 在主从服务节点里均可以读取。subscriptionMode(订阅操做的负载均衡模式)
默认值:
SLAVE
(只在从服务节点里订阅)设置订阅操做选择节点的模式。可用值为:
SLAVE
- 只在从服务节点里订阅。MASTER
- 只在主服务节点里订阅。loadBalancer(负载均衡算法类的选择)
默认值:
org.redisson.connection.balancer.RoundRobinLoadBalancer
在使用多个Redis服务节点的环境里,能够选用如下几种负载均衡方式选择一个节点:
org.redisson.connection.balancer.WeightedRoundRobinBalancer
- 权重轮询调度算法org.redisson.connection.balancer.RoundRobinLoadBalancer
- 轮询调度算法org.redisson.connection.balancer.RandomLoadBalancer
- 随机调度算法subscriptionConnectionMinimumIdleSize(从节点发布和订阅链接的最小空闲链接数)
默认值:
1
多从节点的环境里,每一个 从服务节点里用于发布和订阅链接的最小保持链接数(长链接)。Redisson内部常常经过发布和订阅来实现许多功能。长期保持必定数量的发布订阅链接是必须的。
subscriptionConnectionPoolSize(从节点发布和订阅链接池大小)
默认值:
50
多从节点的环境里,每一个 从服务节点里用于发布和订阅链接的链接池最大容量。链接池的链接数量自动弹性伸缩。
slaveConnectionMinimumIdleSize(从节点最小空闲链接数)
默认值:
32
多从节点的环境里,每一个 从服务节点里用于普通操做(非 发布和订阅)的最小保持链接数(长链接)。长期保持必定数量的链接有利于提升瞬时读取反映速度。
slaveConnectionPoolSize(从节点链接池大小)
默认值:
64
多从节点的环境里,每一个 从服务节点里用于普通操做(非 发布和订阅)链接的链接池最大容量。链接池的链接数量自动弹性伸缩。
masterConnectionMinimumIdleSize(主节点最小空闲链接数)
默认值:
32
多从节点的环境里,每一个 主节点的最小保持链接数(长链接)。长期保持必定数量的链接有利于提升瞬时写入反应速度。
masterConnectionPoolSize(主节点链接池大小)
默认值:
64
主节点的链接池最大容量。链接池的链接数量自动弹性伸缩。
idleConnectionTimeout(链接空闲超时,单位:毫秒)
默认值:
10000
若是当前链接池里的链接数量超过了最小空闲链接数,而同时有链接空闲时间超过了该数值,那么这些链接将会自动被关闭,并从链接池里去掉。时间单位是毫秒。
connectTimeout(链接超时,单位:毫秒)
默认值:
10000
同任何节点创建链接时的等待超时。时间单位是毫秒。
timeout(命令等待超时,单位:毫秒)
默认值:
3000
等待节点回复命令的时间。该时间从命令发送成功时开始计时。
retryAttempts(命令失败重试次数)
默认值:
3
若是尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。若是尝试在此限制以内发送成功,则开始启用 timeout(命令等待超时) 计时。
retryInterval(命令重试发送时间间隔,单位:毫秒)
默认值:
1500
在一条命令发送失败之后,等待重试发送的时间间隔。时间单位是毫秒。
reconnectionTimeout(从新链接时间间隔,单位:毫秒)
默认值:
3000
当与某个节点的链接断开时,等待与其从新创建链接的时间间隔。时间单位是毫秒。
failedAttempts(执行失败最大次数)
默认值:
3
在某个节点执行相同或不一样命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(从新链接时间间隔) 超时之后再次尝试。
database(数据库编号)
默认值:
0
尝试链接的数据库编号。
password(密码)
默认值:
null
用于节点身份验证的密码。
subscriptionsPerConnection(单个链接最大订阅数量)
默认值:
5
每一个链接的最大订阅数量。
clientName(客户端名称)
默认值:
null
在Redis节点里显示的客户端名称。
sslEnableEndpointIdentification(启用SSL终端识别)
默认值:
true
开启SSL终端识别能力。
sslProvider(SSL实现方式)
默认值:
JDK
肯定采用哪一种方式(JDK或OPENSSL)来实现SSL链接。
sslTruststore(SSL信任证书库路径)
默认值:
null
指定SSL信任证书库的路径。
sslTruststorePassword(SSL信任证书库密码)
默认值:
null
指定SSL信任证书库的密码。
sslKeystore(SSL钥匙库路径)
默认值:
null
指定SSL钥匙库的路径。
sslKeystorePassword(SSL钥匙库密码)
默认值:
null
指定SSL钥匙库的密码。
经过属性文件,配置的示例以下:
--- sentinelServersConfig: idleConnectionTimeout: 10000 connectTimeout: 10000 timeout: 3000 retryAttempts: 3 retryInterval: 1500 failedSlaveReconnectionInterval: 3000 failedSlaveCheckInterval: 60000 password: null subscriptionsPerConnection: 5 clientName: null loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {} subscriptionConnectionMinimumIdleSize: 1 subscriptionConnectionPoolSize: 50 slaveConnectionMinimumIdleSize: 24 slaveConnectionPoolSize: 64 masterConnectionMinimumIdleSize: 24 masterConnectionPoolSize: 64 readMode: "SLAVE" subscriptionMode: "SLAVE" sentinelAddresses: - "redis://127.0.0.1:26379" - "redis://127.0.0.1:26389" masterName: "mymaster" database: 0 threads: 16 nettyThreads: 32 codec: !<org.redisson.codec.MarshallingCodec> {} transportMode: "NIO"
介绍配置Redis主从服务组态的文档在这里.
程序化配置主从模式的方法以下:
Config config = new Config(); config.useMasterSlaveServers() // use "rediss://" for SSL connection .setMasterAddress("redis://127.0.0.1:6379") .addSlaveAddress("redis://127.0.0.1:6389", "redis://127.0.0.1:6332", "redis://127.0.0.1:6419") .addSlaveAddress("redis://127.0.0.1:6399"); RedissonClient redisson = Redisson.create(config);
主从模式使用到MasterSlaveServersConfig :
MasterSlaveServersConfig masterSlaveConfig = config.useMasterSlaveServers();
MasterSlaveServersConfig
类的设置参数以下:
dnsMonitoringInterval(DNS监控间隔,单位:毫秒)
默认值:
5000
用来指定检查节点DNS变化的时间间隔。使用的时候应该确保JVM里的DNS数据的缓存时间保持在足够低的范围才有意义。用
-1
来禁用该功能。masterAddress(主节点地址)
能够经过
host:port
的格式来指定主节点地址。addSlaveAddress(添加从主节点地址)
能够经过
host:port
的格式来指定从节点的地址。多个节点能够一次性批量添加。readMode(读取操做的负载均衡模式)
默认值:
SLAVE
(只在从服务节点里读取)注:在从服务节点里读取的数听说明已经至少有两个节点保存了该数据,确保了数据的高可用性。
设置读取操做选择节点的模式。可用值为:
SLAVE
- 只在从服务节点里读取。MASTER
- 只在主服务节点里读取。MASTER_SLAVE
- 在主从服务节点里均可以读取。subscriptionMode(订阅操做的负载均衡模式)
默认值:
SLAVE
(只在从服务节点里订阅)设置订阅操做选择节点的模式。可用值为:
SLAVE
- 只在从服务节点里订阅。MASTER
- 只在主服务节点里订阅。loadBalancer(负载均衡算法类的选择)
默认值:
org.redisson.connection.balancer.RoundRobinLoadBalancer
在使用多个Redis服务节点的环境里,能够选用如下几种负载均衡方式选择一个节点:
org.redisson.connection.balancer.WeightedRoundRobinBalancer
- 权重轮询调度算法org.redisson.connection.balancer.RoundRobinLoadBalancer
- 轮询调度算法org.redisson.connection.balancer.RandomLoadBalancer
- 随机调度算法subscriptionConnectionMinimumIdleSize(从节点发布和订阅链接的最小空闲链接数)
默认值:
1
多从节点的环境里,每一个 从服务节点里用于发布和订阅链接的最小保持链接数(长链接)。Redisson内部常常经过发布和订阅来实现许多功能。长期保持必定数量的发布订阅链接是必须的。
subscriptionConnectionPoolSize(从节点发布和订阅链接池大小)
默认值:
50
多从节点的环境里,每一个 从服务节点里用于发布和订阅链接的链接池最大容量。链接池的链接数量自动弹性伸缩。
slaveConnectionMinimumIdleSize(从节点最小空闲链接数)
默认值:
32
多从节点的环境里,每一个 从服务节点里用于普通操做(非 发布和订阅)的最小保持链接数(长链接)。长期保持必定数量的链接有利于提升瞬时读取反映速度。
slaveConnectionPoolSize(从节点链接池大小)
默认值:
64
多从节点的环境里,每一个 从服务节点里用于普通操做(非 发布和订阅)链接的链接池最大容量。链接池的链接数量自动弹性伸缩。
masterConnectionMinimumIdleSize(主节点最小空闲链接数)
默认值:
32
多从节点的环境里,每一个 主节点的最小保持链接数(长链接)。长期保持必定数量的链接有利于提升瞬时写入反应速度。
masterConnectionPoolSize(主节点链接池大小)
默认值:
64
主节点的链接池最大容量。链接池的链接数量自动弹性伸缩。
idleConnectionTimeout(链接空闲超时,单位:毫秒)
默认值:
10000
若是当前链接池里的链接数量超过了最小空闲链接数,而同时有链接空闲时间超过了该数值,那么这些链接将会自动被关闭,并从链接池里去掉。时间单位是毫秒。
connectTimeout(链接超时,单位:毫秒)
默认值:
10000
同任何节点创建链接时的等待超时。时间单位是毫秒。
timeout(命令等待超时,单位:毫秒)
默认值:
3000
等待节点回复命令的时间。该时间从命令发送成功时开始计时。
retryAttempts(命令失败重试次数)
默认值:
3
若是尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。若是尝试在此限制以内发送成功,则开始启用 timeout(命令等待超时) 计时。
retryInterval(命令重试发送时间间隔,单位:毫秒)
默认值:
1500
在一条命令发送失败之后,等待重试发送的时间间隔。时间单位是毫秒。
reconnectionTimeout(从新链接时间间隔,单位:毫秒)
默认值:
3000
当与某个节点的链接断开时,等待与其从新创建链接的时间间隔。时间单位是毫秒。
failedAttempts(执行失败最大次数)
默认值:
3
在某个节点执行相同或不一样命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(从新链接时间间隔) 超时之后再次尝试。
database(数据库编号)
默认值:
0
尝试链接的数据库编号。
password(密码)
默认值:
null
用于节点身份验证的密码。
subscriptionsPerConnection(单个链接最大订阅数量)
默认值:
5
每一个链接的最大订阅数量。
clientName(客户端名称)
默认值:
null
在Redis节点里显示的客户端名称。
sslEnableEndpointIdentification(启用SSL终端识别)
默认值:
true
开启SSL终端识别能力。
sslProvider(SSL实现方式)
默认值:
JDK
肯定采用哪一种方式(JDK或OPENSSL)来实现SSL链接。
sslTruststore(SSL信任证书库路径)
默认值:
null
指定SSL信任证书库的路径。
sslTruststorePassword(SSL信任证书库密码)
默认值:
null
指定SSL信任证书库的密码。
sslKeystore(SSL钥匙库路径)
默认值:
null
指定SSL钥匙库的路径。
sslKeystorePassword(SSL钥匙库密码)
默认值:
null
指定SSL钥匙库的密码。
集群模式除了适用于Redis集群环境,也适用于任何云计算服务商提供的集群模式,例如AWS ElastiCache集群版、Azure Redis Cache和阿里云(Aliyun)的云数据库Redis版。
介绍配置Redis集群组态的文档在这里。 Redis集群组态的最低要求是必须有三个主节点。
集群模式构造Config以下:
Config config = new Config(); config.useClusterServers() .setScanInterval(2000) // 集群状态扫描间隔时间,单位是毫秒 //能够用"rediss://"来启用SSL链接 .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001") .addNodeAddress("redis://127.0.0.1:7002"); RedissonClient redisson = Redisson.create(config);
集群模式使用到ClusterServersConfig :
ClusterServersConfig clusterConfig = config.useClusterServers();
ClusterServersConfig 配置参数以下:
nodeAddresses(添加节点地址)
能够经过
host:port
的格式来添加Redis集群节点的地址。多个节点能够一次性批量添加。scanInterval(集群扫描间隔时间)
默认值:
1000
对Redis集群节点状态扫描的时间间隔。单位是毫秒。
slots(分片数量)
默认值:
231
用于指定数据分片过程当中的分片数量。支持数据分片/框架结构有:集(Set)、映射(Map)、BitSet、Bloom filter, Spring Cache和Hibernate Cache等.readMode(读取操做的负载均衡模式)
默认值:
SLAVE
(只在从服务节点里读取)注:在从服务节点里读取的数听说明已经至少有两个节点保存了该数据,确保了数据的高可用性。
设置读取操做选择节点的模式。可用值为:
SLAVE
- 只在从服务节点里读取。MASTER
- 只在主服务节点里读取。MASTER_SLAVE
- 在主从服务节点里均可以读取。subscriptionMode(订阅操做的负载均衡模式)
默认值:
SLAVE
(只在从服务节点里订阅)设置订阅操做选择节点的模式。可用值为:
SLAVE
- 只在从服务节点里订阅。MASTER
- 只在主服务节点里订阅。loadBalancer(负载均衡算法类的选择)
默认值:
org.redisson.connection.balancer.RoundRobinLoadBalancer
在多Redis服务节点的环境里,能够选用如下几种负载均衡方式选择一个节点:
org.redisson.connection.balancer.WeightedRoundRobinBalancer
- 权重轮询调度算法org.redisson.connection.balancer.RoundRobinLoadBalancer
- 轮询调度算法org.redisson.connection.balancer.RandomLoadBalancer
- 随机调度算法subscriptionConnectionMinimumIdleSize(从节点发布和订阅链接的最小空闲链接数)
默认值:
1
多从节点的环境里,每一个 从服务节点里用于发布和订阅链接的最小保持链接数(长链接)。Redisson内部常常经过发布和订阅来实现许多功能。长期保持必定数量的发布订阅链接是必须的。
subscriptionConnectionPoolSize(从节点发布和订阅链接池大小)
默认值:
50
多从节点的环境里,每一个 从服务节点里用于发布和订阅链接的链接池最大容量。链接池的链接数量自动弹性伸缩。
slaveConnectionMinimumIdleSize(从节点最小空闲链接数)
默认值:
32
多从节点的环境里,每一个 从服务节点里用于普通操做(非 发布和订阅)的最小保持链接数(长链接)。长期保持必定数量的链接有利于提升瞬时读取反映速度。
slaveConnectionPoolSize(从节点链接池大小)
默认值:
64
多从节点的环境里,每一个 从服务节点里用于普通操做(非 发布和订阅)链接的链接池最大容量。链接池的链接数量自动弹性伸缩。
masterConnectionMinimumIdleSize(主节点最小空闲链接数)
默认值:
32
多节点的环境里,每一个 主节点的最小保持链接数(长链接)。长期保持必定数量的链接有利于提升瞬时写入反应速度。
masterConnectionPoolSize(主节点链接池大小)
默认值:
64
多主节点的环境里,每一个 主节点的链接池最大容量。链接池的链接数量自动弹性伸缩。
idleConnectionTimeout(链接空闲超时,单位:毫秒)
默认值:
10000
若是当前链接池里的链接数量超过了最小空闲链接数,而同时有链接空闲时间超过了该数值,那么这些链接将会自动被关闭,并从链接池里去掉。时间单位是毫秒。
connectTimeout(链接超时,单位:毫秒)
默认值:
10000
同任何节点创建链接时的等待超时。时间单位是毫秒。
timeout(命令等待超时,单位:毫秒)
默认值:
3000
等待节点回复命令的时间。该时间从命令发送成功时开始计时。
retryAttempts(命令失败重试次数)
默认值:
3
若是尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。若是尝试在此限制以内发送成功,则开始启用 timeout(命令等待超时) 计时。
retryInterval(命令重试发送时间间隔,单位:毫秒)
默认值:
1500
在一条命令发送失败之后,等待重试发送的时间间隔。时间单位是毫秒。
reconnectionTimeout(从新链接时间间隔,单位:毫秒)
默认值:
3000
当与某个节点的链接断开时,等待与其从新创建链接的时间间隔。时间单位是毫秒。
failedAttempts(执行失败最大次数)
默认值:
3
在某个节点执行相同或不一样命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(从新链接时间间隔) 超时之后再次尝试。
password(密码)
默认值:
null
用于节点身份验证的密码。
subscriptionsPerConnection(单个链接最大订阅数量)
默认值:
5
每一个链接的最大订阅数量。
clientName(客户端名称)
默认值:
null
在Redis节点里显示的客户端名称。
sslEnableEndpointIdentification(启用SSL终端识别)
默认值:
true
开启SSL终端识别能力。
sslProvider(SSL实现方式)
默认值:
JDK
肯定采用哪一种方式(JDK或OPENSSL)来实现SSL链接。
sslTruststore(SSL信任证书库路径)
默认值:
null
指定SSL信任证书库的路径。
sslTruststorePassword(SSL信任证书库密码)
默认值:
null
指定SSL信任证书库的密码。
sslKeystore(SSL钥匙库路径)
默认值:
null
指定SSL钥匙库的路径。
sslKeystorePassword(SSL钥匙库密码)
默认值:
null
指定SSL钥匙库的密码。
Redis发展到如今,几种常见的部署架构有:
先介绍,基于单机模式的简单Redision锁的使用。
单机模式下,简单Redision锁的使用以下:
// 构造redisson实现分布式锁必要的Config Config config = new Config(); config.useSingleServer().setAddress("redis://172.29.1.180:5379").setPassword("a123456").setDatabase(0); // 构造RedissonClient RedissonClient redissonClient = Redisson.create(config); // 设置锁定资源名称 RLock disLock = redissonClient.getLock("DISLOCK"); //尝试获取分布式锁 boolean isLock= disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS); if (isLock) { try { //TODO if get lock success, do something; Thread.sleep(15000); } catch (Exception e) { } finally { // 不管如何, 最后都要解锁 disLock.unlock(); } }
经过代码可知,通过Redisson的封装,实现Redis分布式锁很是方便,和显式锁的使用方法是同样的。RLock接口继承了 Lock接口。
咱们再看一下Redis中的value是啥,和前文分析同样,hash结构, redis 的key就是资源名称。
hash结构的key就是UUID+threadId,hash结构的value就是重入值,在分布式锁时,这个值为1(Redisson还能够实现重入锁,那么这个值就取决于重入次数了):
172.29.1.180:5379> hgetall DISLOCK 1) "01a6d806-d282-4715-9bec-f51b9aa98110:1" 2) "1"
使用客户端工具看到的效果以下:
能够看到,调用getLock()方法后实际返回一个RedissonLock对象
下面来看下tryLock方法,源码以下:
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(threadId); return false; } try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
以上代码使用了异步回调模式,RFuture 继承了 java.util.concurrent.Future
在RedissonLock对象的lock()方法主要调用tryAcquire()方法
因为leaseTime == -1,因而走tryLockInnerAsync()方法,这个方法才是关键
首先,看一下evalWriteAsync方法的定义
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
这和前面的jedis调用lua脚本相似,最后两个参数分别是keys和params。
单独将调用的那一段摘出来看,实际调用是这样的:
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
结合上面的参数声明,咱们能够知道,这里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)
假设:
那么KEYS[1]=DISLOCK,ARGV[2]=01a6d806-d282-4715-9bec-f51b9aa98110:1
所以,这段脚本的意思是
一、判断有没有一个叫“DISLOCK”的key
二、若是没有,则在其下设置一个字段为“01a6d806-d282-4715-9bec-f51b9aa98110:1”,值为“1”的键值对 ,并设置它的过时时间
三、若是存在,则进一步判断“01a6d806-d282-4715-9bec-f51b9aa98110:1”是否存在,若存在,则其值加1,并从新设置过时时间
四、返回“DISLOCK”的生存时间(毫秒)
这里用的数据结构是hash,hash的结构是: key 字段1 值1 字段2 值2 。。。
用在锁这个场景下,key就表示锁的名称,也能够理解为临界资源,字段就表示当前得到锁的线程
全部竞争这把锁的线程都要判断在这个key下有没有本身线程的字段,若是没有则不能得到锁,若是有,则至关于重入,字段值加1(次数)
为什么要使用lua语言?
由于一大堆复杂的业务逻辑,能够经过封装在lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性
回顾一下evalWriteAsync方法的定义
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
注意,其最后两个参数分别是keys和params。
KEYS[1]表明的是你加锁的那个key,好比说:
RLock lock = redisson.getLock("DISLOCK");
这里你本身设置了加锁的那个锁key就是“DISLOCK”。
ARGV[1]表明的就是锁key的默认生存时间
调用的时候,传递的参数为 internalLockLeaseTime ,该值默认30秒。
ARGV[2]表明的是加锁的客户端的ID,相似于下面这样:
01a6d806-d282-4715-9bec-f51b9aa98110:1
lua脚本的第一段if判断语句,就是用“exists DISLOCK”命令判断一下,若是你要加锁的那个锁key不存在的话,你就进行加锁。
如何加锁呢?很简单,用下面的redis命令:
hset DISLOCK 01a6d806-d282-4715-9bec-f51b9aa98110:1 1
经过这个命令设置一个hash数据结构,这行命令执行后,会出现一个相似下面的数据结构:
DISLOCK: { 8743c9c0-0795-4907-87fd-6c719a6b4586:1 1 }
接着会执行“pexpire DISLOCK 30000”命令,设置DISLOCK这个锁key的生存时间是30秒(默认)
那么在这个时候,若是客户端2来尝试加锁,执行了一样的一段lua脚本,会咋样呢?
很简单,第一个if判断会执行“exists DISLOCK”,发现DISLOCK 这个锁key已经存在了。
接着第二个if判断,判断一下,DISLOCK锁key的hash数据结构中,是否包含客户端2的ID,可是明显不是的,由于那里包含的是客户端1的ID。
因此,客户端2会获取到pttl DISLOCK返回的一个数字,这个数字表明了DISLOCK 这个锁key的剩余生存时间。好比还剩15000毫秒的生存时间。
此时客户端2会进入一个while循环,不停的尝试加锁。
若是客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?
RLock lock = redisson.getLock("DISLOCK") lock.lock(); //业务代码 lock.lock(); //业务代码 lock.unlock(); lock.unlock();
分析上面那段lua脚本。
第一个if判断确定不成立,“exists DISLOCK”会显示锁key已经存在了。
第二个if判断会成立,由于DISLOCK的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”
此时就会执行可重入加锁的逻辑,他会用:
incrby DISLOCK
8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
经过这个命令,对客户端1的加锁次数,累加1。
此时DISLOCK数据结构变为下面这样:
DISLOCK: { 8743c9c0-0795-4907-87fd-6c719a6b4586:1 2 }
若是执行lock.unlock(),就能够释放分布式锁,此时的业务逻辑也是很是简单的。
其实说白了,就是每次都对DISLOCK数据结构中的那个加锁次数减1。
若是发现加锁次数是0了,说明这个客户端已经再也不持有锁了,此时就会用:
“del DISLOCK”命令,从redis里删除这个key。
而后呢,另外的客户端2就能够尝试完成加锁了。
@Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } // Future<Void> future = unlockAsync(); // future.awaitUninterruptibly(); // if (future.isSuccess()) { // return; // } // if (future.cause() instanceof IllegalMonitorStateException) { // throw (IllegalMonitorStateException)future.cause(); // } // throw commandExecutor.convertException(future); }
再深刻一下,实际调用的是unlockInnerAsync方法
上图没有截取完整,完整的源码以下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
咱们仍是假设name=DISLOCK,假设线程ID是1
同理,咱们能够知道
KEYS[1]是getName(),即KEYS[1]=DISLOCK
KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{DISLOCK}
ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0
ARGV[2]是生存时间
ARGV[3]是getLockName(threadId),即ARGV[3]=8743c9c0-0795-4907-87fd-6c719a6b4586:1
所以,上面脚本的意思是:
一、判断是否存在一个叫“DISLOCK”的key
二、若是不存在,返回nil
三、若是存在,使用Redis Hincrby 命令用于为哈希表中的字段值加上指定增量值 -1 ,表明减去1
四、若counter >,返回空,若字段存在,则字段值减1
五、若减完之后,counter > 0 值仍大于0,则返回0
六、减完后,若字段值小于或等于0,则用 publish 命令广播一条消息,广播内容是0,并返回1;
能够猜想,广播0表示资源可用,即通知那些等待获取锁的线程如今能够得到锁了
以上是正常状况下获取到锁的状况,那么当没法当即获取到锁的时候怎么办呢?
再回到前面获取锁的位置
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } // 订阅 RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); } protected static final LockPubSub PUBSUB = new LockPubSub(); protected RFuture<RedissonLockEntry> subscribe(long threadId) { return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) { PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); }
这里会订阅Channel,当资源可用时能够及时知道,并抢占,防止无效的轮询而浪费资源
这里的channel为:
redisson_lock__channel:{DISLOCK}
当资源可用用的时候,循环去尝试获取锁,因为多个线程同时去竞争资源,因此这里用了信号量,对于同一个资源只容许一个线程得到锁,其它的线程阻塞
这点,有点儿相似 Zookeeper分布式锁:
有关zookeeper分布式锁的原理和实现,具体请参见下面的博客:
Zookeeper 分布式锁 (图解+秒懂+史上最全)
客户端1加锁的锁key默认生存时间才30秒,若是超过了30秒,客户端1还想一直持有这把锁,怎么办呢?
简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,若是客户端1还持有锁key,那么就会不断的延长锁key的生存时间。
可是聪明的同窗确定会问:
有效时间设置多长,假如个人业务操做比有效时间长,个人业务代码还没执行完,就自动给我解锁了,不就完蛋了吗。
这个问题就有点棘手了,在网上也有不少讨论:
第一种解决方法就是靠程序员本身去把握,预估一下业务代码须要执行的时间,而后设置有效期时间比执行时间长一些,保证不会由于自动解锁影响到客户端业务代码的执行。
可是这并非万全之策,好比网络抖动这种状况是没法预测的,也有可能致使业务代码执行的时间变长,因此并不安全。
第二种方法,使用监事狗watchDog机制实现锁的续期。
第二种方法比较靠谱一点,并且无业务入侵。
在Redisson框架实现分布式锁的思路,就使用watchDog机制实现锁的续期。
当加锁成功后,同时开启守护线程,默认有效期是30秒,每隔10秒就会给锁续期到30秒,只要持有锁的客户端没有宕机,就能保证一直持有锁,直到业务代码执行完毕由客户端本身解锁,若是宕机了天然就在有效期失效后自动解锁。
这里,和前面解决 JVM STW的锁过时问题有点相似,只不过,watchDog自动续期,也没有彻底解决JVM STW的锁过时问题。
如何完全解决 JVM STW的锁过时问题,能够来疯狂创客圈的社群讨论。
实际上,redisson加锁的基本流程图以下:
这里专一于介绍watchdog。
首先watchdog的具体思路是 加锁时,默认加锁 30秒,每10秒钟检查一次,若是存在就从新设置 过时时间为30秒。
而后设置默认加锁时间的参数是 lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)
官方文档描述以下
lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)
默认值:
30000
监控锁的看门狗超时时间单位为毫秒。该参数只适用于分布式锁的加锁请求中未明确使用
leaseTimeout
参数的状况。若是该看门狗未使用lockWatchdogTimeout
去从新调整一个分布式锁的lockWatchdogTimeout
超时,那么这个锁将变为失效状态。这个参数能够用来避免由Redisson客户端节点宕机或其余缘由形成死锁的状况。
须要注意的是
1.watchDog 只有在未显示指定加锁时间时才会生效。(这点很重要)
2.lockWatchdogTimeout设定的时间不要过小 ,好比我以前设置的是 100毫秒,因为网络直接致使加锁完后,watchdog去延期时,这个key在redis中已经被删除了。
在调用lock方法时,会最终调用到tryAcquireAsync。详细解释以下:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { //若是指定了加锁时间,会直接去加锁 if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //没有指定加锁时间 会先进行加锁,而且默认时间就是 LockWatchdogTimeout的时间 //这个是异步操做 返回RFuture 相似netty中的future RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); //这里也是相似netty Future 的addListener,在future内容执行完成后执行 ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { //这里是定时执行 当前锁自动延期的动做 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
scheduleExpirationRenewal 中会调用renewExpiration。
这里咱们能够看到是 启用了一个timeout定时,去执行延期动做
private void renewExpiration() { Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { //若是 没有报错,就再次定时延期 // reschedule itself renewExpiration(); } }); } // 这里咱们能够看到定时任务 是 lockWatchdogTimeout 的1/3时间去执行 renewExpirationAsync }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
最终 scheduleExpirationRenewal会调用到 renewExpirationAsync,
执行下面这段 lua脚本。他主要判断就是 这个锁是否在redis中存在,若是存在就进行 pexpire 延期。
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
1.要使 watchLog机制生效 ,lock时 不要设置 过时时间
2.watchlog的延时时间 能够由 lockWatchdogTimeout指定默认延时时间,可是不要设置过小。如100
3.watchdog 会每 lockWatchdogTimeout/3时间,去延时。
4.watchdog 经过 相似netty的 Future功能来实现异步延时
5.watchdog 最终仍是经过 lua脚原本进行延时
Redisson框架十分强大,除了前面介绍的 getLock方法获取的分布式锁(输入可重入锁的类型),还有不少其余的分布式锁类型。
整体的Redisson框架的分布式锁类型,大体以下:
Redisson的分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口,同时还支持自动过时解锁。
public void testReentrantLock(RedissonClient redisson){ RLock lock = redisson.getLock("anyLock"); try{ // 1. 最多见的使用方法 //lock.lock(); // 2. 支持过时解锁功能,10秒钟之后自动解锁, 无需调用unlock方法手动解锁 //lock.lock(10, TimeUnit.SECONDS); // 3. 尝试加锁,最多等待3秒,上锁之后10秒自动解锁 boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS); if(res){ //成功 // do your business } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
Redisson同时还为分布式锁提供了异步执行的相关方法:
public void testAsyncReentrantLock(RedissonClient redisson){ RLock lock = redisson.getLock("anyLock"); try{ lock.lockAsync(); lock.lockAsync(10, TimeUnit.SECONDS); Future<Boolean> res = lock.tryLockAsync(3, 10, TimeUnit.SECONDS); if(res.get()){ // do your business } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally { lock.unlock(); } }
Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。在提供了自动过时解锁功能的同时,保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。
public void testFairLock(RedissonClient redisson){ RLock fairLock = redisson.getFairLock("anyLock"); try{ // 最多见的使用方法 fairLock.lock(); // 支持过时解锁功能, 10秒钟之后自动解锁,无需调用unlock方法手动解锁 fairLock.lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁之后10秒自动解锁 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { fairLock.unlock(); } }
Redisson同时还为分布式可重入公平锁提供了异步执行的相关方法:
RLock fairLock = redisson.getFairLock("anyLock"); fairLock.lockAsync(); fairLock.lockAsync(10, TimeUnit.SECONDS); Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
Redisson的RedissonMultiLock对象能够将多个RLock对象关联为一个联锁,每一个RLock对象实例能够来自于不一样的Redisson实例。
public void testMultiLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){ RLock lock1 = redisson1.getLock("lock1"); RLock lock2 = redisson2.getLock("lock2"); RLock lock3 = redisson3.getLock("lock3"); RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); try { // 同时加锁:lock1 lock2 lock3, 全部的锁都上锁成功才算成功。 lock.lock(); // 尝试加锁,最多等待100秒,上锁之后10秒自动解锁 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
Redisson的RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也能够用来将多个RLock对象关联为一个红锁,每一个RLock对象实例能够来自于不一样的Redisson实例。
public void testRedLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){ RLock lock1 = redisson1.getLock("lock1"); RLock lock2 = redisson2.getLock("lock2"); RLock lock3 = redisson3.getLock("lock3"); RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); try { // 同时加锁:lock1 lock2 lock3, 红锁在大部分节点上加锁成功就算成功。 lock.lock(); // 尝试加锁,最多等待100秒,上锁之后10秒自动解锁 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
Redisson的分布式可重入读写锁RReadWriteLock,Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。同时还支持自动过时解锁。该对象容许同时有多个读取锁,可是最多只能有一个写入锁。
RReadWriteLock rwlock = redisson.getLock("anyRWLock"); // 最多见的使用方法 rwlock.readLock().lock(); // 或 rwlock.writeLock().lock(); // 支持过时解锁功能 // 10秒钟之后自动解锁 // 无需调用unlock方法手动解锁 rwlock.readLock().lock(10, TimeUnit.SECONDS); // 或 rwlock.writeLock().lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁之后10秒自动解锁 boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS); // 或 boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();
Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore类似的接口和用法。
RSemaphore semaphore = redisson.getSemaphore("semaphore"); semaphore.acquire(); //或 semaphore.acquireAsync(); semaphore.acquire(23); semaphore.tryAcquire(); //或 semaphore.tryAcquireAsync(); semaphore.tryAcquire(23, TimeUnit.SECONDS); //或 semaphore.tryAcquireAsync(23, TimeUnit.SECONDS); semaphore.release(10); semaphore.release(); //或 semaphore.releaseAsync();
Redisson的可过时性信号量(PermitExpirableSemaphore)实在RSemaphore对象的基础上,为每一个信号增长了一个过时时间。每一个信号能够经过独立的ID来辨识,释放时只能经过提交这个ID才能释放。
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); String permitId = semaphore.acquire(); // 获取一个信号,有效期只有2秒钟。 String permitId = semaphore.acquire(2, TimeUnit.SECONDS); // ... semaphore.release(permitId);
Redisson的分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch类似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.trySetCount(1); latch.await(); // 在其余线程或其余JVM里 RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.countDown();
关于Redis分布式锁的高可用问题,大体以下:
在master- slave的集群架构中,就是若是你对某个redis master实例,写入了DISLOCK这种锁key的value,此时会异步复制给对应的master slave实例。
可是,这个过程当中一旦发生redis master宕机,主备切换,redis slave变为了redis master。而此时的主从复制没有完全完成.....
接着就会致使,客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也觉得本身成功加了锁。
此时就会致使多个客户端对一个分布式锁完成了加锁。
这时系统在业务语义上必定会出现问题,致使脏数据的产生。
因此这个是是redis master-slave架构的主从异步复制致使的redis分布式锁的最大缺陷:
在redis master实例宕机的时候,可能致使多个客户端同时完成加锁。
RedLock算法思想:
不能只在一个redis实例上建立锁,应该是在多个redis实例上建立锁,n / 2 + 1,必须在大多数redis节点上都成功建立锁,才能算这个总体的RedLock加锁成功,避免说仅仅在一个redis实例上加锁而带来的问题。
这个场景是假设有一个 redis cluster,有 5 个 redis master 实例。而后执行以下步骤获取一把红锁:
RedLock是基于redis实现的分布式锁,它可以保证如下特性:
互斥性:在任什么时候候,只能有一个客户端可以持有锁;避免死锁:
当客户端拿到锁后,即便发生了网络分区或者客户端宕机,也不会发生死锁;(利用key的存活时间)
容错性:只要多数节点的redis实例正常运行,就可以对外提供服务,加锁或者释放锁;
以sentinel模式架构为例,以下图所示,有sentinel-1,sentinel-2,sentinel-3总计3个sentinel模式集群,若是要获取分布式锁,那么须要向这3个sentinel集群经过EVAL命令执行LUA脚本,须要3/2+1=2,即至少2个sentinel集群响应成功,才算成功的以Redlock算法获取到分布式锁:
提早说明,使用redis分布式锁,是追求高性能, 在cap理论中,追求的是 ap 而不是cp。
因此,若是追求高可用,建议使用 zookeeper分布式锁。
redis分布式锁可能致使的数据不一致性,建议使用业务补偿的方式去弥补。因此,不太建议使用红锁,可是从学习的层面来讲,你们仍是必定要掌握的。
Redisson中有一个MultiLock
的概念,能够将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁
而Redisson中实现RedLock就是基于MultiLock
去作的,接下来就具体看看对应的实现吧
先看下官方的代码使用:
(https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock)
RLock lock1 = redisson1.getLock("lock1"); RLock lock2 = redisson2.getLock("lock2"); RLock lock3 = redisson3.getLock("lock3"); RLock redLock = anyRedisson.getRedLock(lock1, lock2, lock3); // traditional lock method redLock.lock(); // or acquire lock and automatically unlock it after 10 seconds redLock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { redLock.unlock(); } }
这里是分别对3个redis实例加锁,而后获取一个最后的加锁结果。
上面示例中使用redLock.lock()或者tryLock()最终都是执行RedissonRedLock
中方法。
RedissonRedLock
继承自RedissonMultiLock
, 实现了其中的一些方法:
public class RedissonRedLock extends RedissonMultiLock { public RedissonRedLock(RLock... locks) { super(locks); } /** * 锁能够失败的次数,锁的数量-锁成功客户端最小的数量 */ @Override protected int failedLocksLimit() { return locks.size() - minLocksAmount(locks); } /** * 锁的数量 / 2 + 1,例若有3个客户端加锁,那么最少须要2个客户端加锁成功 */ protected int minLocksAmount(final List<RLock> locks) { return locks.size()/2 + 1; } /** * 计算多个客户端一块儿加锁的超时时间,每一个客户端的等待时间 * remainTime默认为4.5s */ @Override protected long calcLockWaitTime(long remainTime) { return Math.max(remainTime / locks.size(), 1); } @Override public void unlock() { unlockInner(locks); } }
看到locks.size()/2 + 1
,例如咱们有3个客户端实例,那么最少2个实例加锁成功才算分布式锁加锁成功。
接着咱们看下lock()
的具体实现
public class RedissonMultiLock implements Lock { final List<RLock> locks = new ArrayList<RLock>(); public RedissonMultiLock(RLock... locks) { if (locks.length == 0) { throw new IllegalArgumentException("Lock objects are not defined"); } this.locks.addAll(Arrays.asList(locks)); } public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1; if (leaseTime != -1) { // 若是等待时间设置了,那么将等待时间 * 2 newLeaseTime = unit.toMillis(waitTime)*2; } // time为当前时间戳 long time = System.currentTimeMillis(); long remainTime = -1; if (waitTime != -1) { remainTime = unit.toMillis(waitTime); } // 计算锁的等待时间,RedLock中:若是remainTime=-1,那么lockWaitTime为1 long lockWaitTime = calcLockWaitTime(remainTime); // RedLock中failedLocksLimit即为n/2 + 1 int failedLocksLimit = failedLocksLimit(); List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size()); // 循环每一个redis客户端,去获取锁 for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { RLock lock = iterator.next(); boolean lockAcquired; try { // 调用tryLock方法去获取锁,若是获取锁成功,则lockAcquired=true if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (Exception e) { lockAcquired = false; } // 若是获取锁成功,将锁加入到list集合中 if (lockAcquired) { acquiredLocks.add(lock); } else { // 若是获取锁失败,判断失败次数是否等于失败的限制次数 // 好比,3个redis客户端,最多只能失败1次 // 这里locks.size = 3, 3-x=1,说明只要成功了2次就能够直接break掉循环 if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { break; } // 若是最大失败次数等于0 if (failedLocksLimit == 0) { // 释放全部的锁,RedLock加锁失败 unlockInner(acquiredLocks); if (waitTime == -1 && leaseTime == -1) { return false; } failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); // 重置迭代器 重试再次获取锁 while (iterator.hasPrevious()) { iterator.previous(); } } else { // 失败的限制次数减一 // 好比3个redis实例,最大的限制次数是1,若是遍历第一个redis实例,失败了,那么failedLocksLimit会减成0 // 若是failedLocksLimit就会走上面的if逻辑,释放全部的锁,而后返回false failedLocksLimit--; } } if (remainTime != -1) { remainTime -= (System.currentTimeMillis() - time); time = System.currentTimeMillis(); if (remainTime <= 0) { unlockInner(acquiredLocks); return false; } } } if (leaseTime != -1) { List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size()); for (RLock rLock : acquiredLocks) { RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); } for (RFuture<Boolean> rFuture : futures) { rFuture.syncUninterruptibly(); } } return true; } }
核心代码都已经加了注释,实现原理其实很简单,基于RedLock思想,遍历全部的Redis客户端,而后依次加锁,最后统计成功的次数来判断是否加锁成功。
分布式锁一旦加了以后,对同一个商品的下单请求,会致使全部下单操做,都必须对同一个商品key加分布式锁。
假设一个商品1分钟6000订单,每秒的 600个下单操做,假设加锁以后,释放锁以前,查库存 -> 建立订单 -> 扣减库存,每一个IO操做100ms,大概300毫秒。
具体以下图:
能够再进行一下优化,将 建立订单 + 扣减库存 并发执行,将两个100ms 减小为一个100ms,这既是空间换时间的思想,大概200毫秒。
将 建立订单 + 扣减库存 批量执行,减小一次IO,也是大概200毫秒。
这个优化方案,有个重要的前提,就是 订单表和库存表在相同的库中,可是,这个前提条件,在数据量大+高并发的场景下,够呛。
那么,一秒内,只能完成多少个商品的秒杀订单的下单操做呢?
1000毫秒 / 200 =5 个订单
如何达到每秒600个下单呢? 仍是要从基础知识里边寻找答案?
分段加锁的思想来源与基础知识。
我常常在疯狂创客圈社群里边,对小伙伴们强调 基础知识的重要性,反复强调, 《Java 高并发三部曲》 必定要多刷,最好刷三遍。
中 《Java 高并发核心编程 卷2》 介绍了 JUC的 LongAdder 和 ConcurrentHashMap的源码和底层原理,他们提高性能的办法是:
空间换时间, 分段加锁
尤为是 LongAdder 的实现思想,能够用于 Redis分布式锁 做为性能提高的手段,将 Redis分布式锁 优化为 Redis分段锁。
有关LongAdder 的系统化学习,请参见 《Java 高并发核心编程 卷2》
回到前面的场景:
假设一个商品1分钟6000订单,每秒的 600个下单操做,假设加锁以后,释放锁以前,查库存 -> 建立订单 -> 扣减库存,通过优化,每一个IO操做100ms,大概200毫秒,一秒钟5个订单。
为了达到每秒600个订单,能够将锁分红 600 /5 =120 个段, 每一次使用随机算法,随机到一个分段, 若是不行,就轮询下一个分段,具体的流程,大体以下:
缺点:
这个是一个理论的时间预估,没有扣除 尝试下一个分段的 时间, 另外,实际上的性能, 会比理论上差,从我们实操案例的测试结果,也能够证实这点。
尼恩的忠实建议:
理论水平的提高,看看视频、看看书,只有两个字,就是须要:多看。
实战水平的提高,只有两个字,就是须要:多干。
参照 LongAdder ,手写一个Redis分段锁, 仍是有点复杂,可是很重要,建议你们动手干一票.
手写一个Redis分段锁的实操,是高并发实战的重要动手实操之一。
有关Redis分段锁的实操的具体材料、源码、问题,欢迎来 疯狂创客圈社群交流。
高并发Java发烧友社群 - 疯狂创客圈 总入口 点击了解详情:
图书:《Netty Zookeeper Redis 高并发实战》 图书简介 - 疯狂创...
图书:《Netty Zookeeper Redis 高并发实战》 图书简介 - 疯狂创...
基于Zookeeper 的分布式锁实现 - SegmentFault 思否