本案例主要讲解
Redis
实现分布式锁的两种实现方式:Jedis
实现、Redisson
实现。网上关于这方面讲解太多了,Van自认为文笔没他们好,仍是用示例代码说明。java
jedis
实现该方案只考虑
Redis
单机部署的场景mysql
jedis.set(String key, String value, String nxxx, String expx, int time)
key
: 使用key
来当锁,由于key
是惟一的;value
: 我传的是惟一值(UUID
),不少童鞋可能不明白,有key
做为锁不就够了吗,为何还要用到value
?缘由是分布式锁要知足解铃还须系铃人:经过给value
赋值为requestId
,咱们就知道这把锁是哪一个请求加的了,在解锁的时候要验证value
值,不能误解锁;nxxx
: 这个参数我填的是NX
,意思是SET IF NOT EXIST
,即当key
不存在时,咱们进行set
操做;若key
已经存在,则不作任何操做;expx
: 这个参数我传的是PX
,意思是咱们要给这个key
加一个过时的设置,具体时间由第五个参数决定;time
: 与第四个参数相呼应,表明key
的过时时间。set()
加入了NX
参数,能够保证若是已有key
存在,则函数不会调用成功,也就是只有一个客户端能持有锁,知足互斥性;key
被删除),不会发生死锁;value
赋值为requestId
,表明加锁的客户端请求标识,那么在客户端在解锁的时候就能够进行校验是不是同一个客户端。释放锁时须要验证
value
值,也就是说咱们在获取锁的时候须要设置一个value
,不能直接用del key
这种粗暴的方式,由于直接del key
任何客户端均可以进行解锁了,因此解锁时,咱们须要判断锁是不是本身的(基于value
值来判断)git
Lua
脚本代码,做用是:获取锁对应的value
值,检查是否与requestId
相等,若是相等则删除锁(解锁);Lua
代码传到jedis.eval()
方法里,并使参数KEYS[1]
赋值为lockKey
,ARGV[1]
赋值为requestId
。eval()
方法是将Lua
代码交给Redis服务端执行。这里放出的是关键代码,详细可运行的代码可至文末地址下载示例代码。github
该案例模拟家庭内多人经过领取一个奖励,可是只能有一我的能领取成功,不能重复领取(以前作过奖励模块的需求)redis
family_reward_record
表CREATE TABLE `family_reward_record` ( `id` bigint(10) NOT NULL AUTO_INCREMENT COMMENT '主键id', `family_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '商品名称', `reward_type` int(10) NOT NULL DEFAULT '1' COMMENT '商品库存数量', `state` int(1) NOT NULL DEFAULT '0' COMMENT '商品状态', `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '入库时间', `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=270 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='家庭领取奖励表(家庭内多人只能有一我的能领取成功,不能重复领取)';
application.yml
spring: datasource: url: jdbc:mysql://47.98.178.84:3306/dev username: dev password: password driver-class-name: com.mysql.jdbc.Driver redis: host: 47.98.178.84 port: 6379 password: password timeout: 2000 # mybatis mybatis: mapper-locations: classpath:mapper/*.xml type-aliases-package: cn.van.mybatis.demo.entity
RedisConfig.java
@Configuration public class RedisConfig extends CachingConfigurerSupport { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private int port; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.timeout}") private int timeout; @Bean public JedisPool redisPoolFactory() { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); if (StringUtils.isEmpty(password)) { return new JedisPool(jedisPoolConfig, host, port, timeout); } return new JedisPool(jedisPoolConfig, host, port, timeout, password); } @Bean(name = "redisTemplate") public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(redisConnectionFactory); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, Visibility.ANY); objectMapper.enableDefaultTyping(DefaultTyping.NON_FINAL); Jackson2JsonRedisSerializer<Object> jsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); jsonRedisSerializer.setObjectMapper(objectMapper); redisTemplate.setDefaultSerializer(jsonRedisSerializer); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
RedisDistributedLock.java
@Component public class RedisDistributedLock { /** * 成功获取锁标示 */ private static final String LOCK_SUCCESS = "OK"; /** * 成功解锁标示 */ private static final Long RELEASE_SUCCESS = 1L; @Autowired private JedisPool jedisPool; /** * redis 数据存储过时时间 */ final int expireTime = 500; /** * 尝试获取分布式锁 * @param lockKey 锁 * @param lockValue 请求标识 * @return 是否获取成功 */ public boolean tryLock(String lockKey, String lockValue) { Jedis jedis = null; try{ jedis = jedisPool.getResource(); String result = jedis.set(lockKey, lockValue, "NX", "PX", expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } } finally { if(jedis != null){ jedis.close(); } } return false; } /** * 释放分布式锁 * @param lockKey 锁 * @param lockValue 请求标识 * @return 是否释放成功 */ public boolean unLock(String lockKey, String lockValue) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(lockValue)); if (RELEASE_SUCCESS.equals(result)) { return true; } } finally { if(jedis != null){ jedis.close(); } } return false; } }
familyId = 1
的家庭同时领取奖励@Override public HttpResult receiveAward() { Long familyId = 1L; Map<String, Object> params = new HashMap<String, Object>(16); params.put("familyId", familyId); params.put("rewardType", 1); int count = familyRewardRecordMapper.selectCountByFamilyIdAndRewardType(params); if (count == 0) { FamilyRewardRecordDO recordDO = new FamilyRewardRecordDO(familyId,1,0,LocalDateTime.now()); int num = familyRewardRecordMapper.insert(recordDO); if (num == 1) { return HttpResult.success(); } return HttpResult.failure(-1, "记录插入失败"); } return HttpResult.success("该记录已存在"); }
familyId = 2
的家庭同时领取奖励@Override public HttpResult receiveAwardLock() { Long familyId = 2L; Map<String, Object> params = new HashMap<String, Object>(16); params.put("familyId", familyId); params.put("rewardType", 1); int count = familyRewardRecordMapper.selectCountByFamilyIdAndRewardType(params); if (count == 0) { // 没有记录则建立领取记录 FamilyRewardRecordDO recordDO = new FamilyRewardRecordDO(familyId,1,0,LocalDateTime.now()); // 分布式锁的key(familyId + rewardType) String lockKey = recordDO.getFamilyId() + "_" + recordDO.getRewardType(); // 分布式锁的value(惟一值) String lockValue = createUUID(); boolean lockStatus = redisLock.tryLock(lockKey, lockValue); // 锁被占用 if (!lockStatus) { log.info("锁已经占用了"); return HttpResult.failure(-1,"失败"); } // 无论多个请求,加锁以后,只会有一个请求能拿到锁,进行插入操做 log.info("拿到了锁,当前时刻:{}",System.currentTimeMillis()); int num = familyRewardRecordMapper.insert(recordDO); if (num != 1) { log.info("数据插入失败!"); return HttpResult.failure(-1, "数据插入失败!"); } log.info("数据插入成功!准备解锁..."); boolean unLockState = redisLock.unLock(lockKey,lockValue); if (!unLockState) { log.info("解锁失败!"); return HttpResult.failure(-1, "解锁失败!"); } log.info("解锁成功!"); return HttpResult.success(); } log.info("该记录已存在"); return HttpResult.success("该记录已存在"); } private String createUUID() { UUID uuid = UUID.randomUUID(); String str = uuid.toString().replace("-", "_"); return str; }
我采用的是
JMeter
工具进行测试,加锁和不加锁的状况都设置成:五次并发请求。算法
/** * 家庭成员领取奖励(不加锁) * @return */ @PostMapping("/receiveAward") public HttpResult receiveAward() { return redisLockService.receiveAward(); }
POST
/** * 家庭成员领取奖励(加锁) * @return */ @PostMapping("/receiveAwardLock") public HttpResult receiveAwardLock() { return redisLockService.receiveAwardLock(); }
POST
经过对比,说明分布式锁起做用了。spring
我上家使用的就是这种加锁方式,看上去很OK,实际上在Redis
集群的时候会出现问题,好比:sql
A
客户端在Redis
的master
节点上拿到了锁,可是这个加锁的key
尚未同步到slave
节点,master
故障,发生故障转移,一个slave
节点升级为master
节点,B
客户端也能够获取同个key
的锁,但客户端A
也已经拿到锁了,这就致使多个客户端都拿到锁。json
正由于如此,Redis
做者antirez
基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock
。缓存
Redlock
实现antirez
提出的Redlock
算法大概是这样的:
在Redis
的分布式环境中,咱们假设有N
个Redis master
。这些节点彻底互相独立,不存在主从复制或者其余集群协调机制。咱们确保将在N
个实例上使用与在Redis
单实例下相同方法获取和释放锁。如今咱们假设有5
个Redis master
节点,同时咱们须要在5
台服务器上面运行这些Redis
实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行如下操做(RedLock
算法加锁步骤):
Unix
时间,以毫秒为单位;5
个实例,使用相同的key
和具备惟一性的value
(例如UUID
)获取锁。当向Redis
请求获取锁时,客户端应该设置一个网络链接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10
秒,则超时时间应该在5-50
毫秒之间。这样能够避免服务器端Redis
已经挂掉的状况下,客户端还在死死地等待响应结果。若是服务器端没有在规定时间内响应,客户端应该尽快尝试去另一个Redis
实例请求获取锁;1
记录的时间)就获得获取锁使用的时间。当且仅当从大多数(N/2+1
,这里是3
个节点)的Redis
节点都取到锁,而且使用的时间小于锁失效时间时,锁才算获取成功;key
的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3
计算的结果)。N/2+1
个Redis
实例取到锁或者取锁时间已经超过了有效时间),客户端应该在全部的Redis
实例上进行解锁(即使某些Redis
实例根本就没有加锁成功,防止某些节点获取到锁可是客户端没有获得响应而致使接下来的一段时间不能被从新获取锁)。向全部的Redis
实例发送释放锁命令便可,不用关心以前有没有从Redis
实例成功获取到锁.
这部分以最多见的案例:抢购时的商品超卖(库存数减小为负数)为例
good
表CREATE TABLE `good` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id', `good_name` varchar(255) NOT NULL COMMENT '商品名称', `good_counts` int(255) NOT NULL COMMENT '商品库存', `create_time` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '建立时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT='商品表'; -- 插入两条测试数据 INSERT INTO `good` VALUES (1, '哇哈哈', 5, '2019-09-20 17:39:04'); INSERT INTO `good` VALUES (2, '卫龙', 5, '2019-09-20 17:39:06');
Redisson
配置类 RedissonConfig.java
我这里配置的是单机,更多配置详见https://github.com/redisson/redisson/wiki/配置
@Configuration public class RedissonConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; @Value("${spring.redis.password}") private String password; /** * RedissonClient,单机模式 * @return * @throws IOException */ @Bean public RedissonClient redissonSentinel() { //支持单机,主从,哨兵,集群等模式,此为单机模式 Config config = new Config(); config.useSingleServer() .setAddress("redis://" + host + ":" + port) .setPassword(password); return Redisson.create(config); } }
@Override public HttpResult saleGoods(){ // 以指定goodId = 1:哇哈哈为例 Long goodId = 1L; GoodDO goodDO = goodMapper.selectByPrimaryKey(goodId); int goodStock = goodDO.getGoodCounts(); if (goodStock >= 1) { goodMapper.saleOneGood(goodId); } return HttpResult.success(); }
@Override public HttpResult saleGoodsLock(){ // 以指定goodId = 2:卫龙为例 Long goodId = 2L; GoodDO goodDO = goodMapper.selectByPrimaryKey(goodId); int goodStock = goodDO.getGoodCounts(); String key = goodDO.getGoodName(); log.info("{}剩余总库存,{}件", key,goodStock); // 将商品的实时库存放在redis 中,便于读取 stringRedisTemplate.opsForValue().set(key, Integer.toString(goodStock)); // redisson 锁 的key String lockKey = goodDO.getId() +"_" + key; RLock lock = redissonClient.getLock(lockKey); // 设置60秒自动释放锁 (默认是30秒自动过时) lock.lock(60, TimeUnit.SECONDS); // 此步开始,串行销售 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key)); // 若是缓存中库存量大于1,能够继续销售 if (stock >= 1) { goodDO.setGoodCounts(stock - 1); int num = goodMapper.saleOneGood(goodId); if (num == 1) { // 减库存成功,将缓存同步 stringRedisTemplate.opsForValue().set(key,Integer.toString((stock-1))); } log.info("{},当前库存,{}件", key,stock); } lock.unlock(); return HttpResult.success(); }
采用的是
JMeter
工具进行测试,初始化的时候两个商品的库存设置都是:5
;因此这里加锁和不加锁的状况都设置成:十次并发请求。
/** * 售卖商品(不加锁) * @return */ @PostMapping("/saleGoods") public HttpResult saleGoods() { return redisLockService.saleGoods(); }
POST
id =1
的商品库存减为-5
/** * 售卖商品(加锁) * @return */ @PostMapping("/saleGoodsLock") public HttpResult saleGoodsLock() { return redisLockService.saleGoodsLock(); }
POST
id =1
的商品库存减为0
经过2.3.1
和2.3.2
的结果对比很明显:前者出现了超卖状况,库存数卖到了-5
,这是决不容许的;而加了锁的状况后,库存只会减小到0
,便再也不销售。
再次说明:以上代码不全,如需尝试,请前往Van 的 Github 查看完整示例代码
第一种基于Redis
的分布式锁并不适合用于生产环境。Redisson
可用于生产环境。固然,分布式的选择还有Zookeeper
的选项,Van后续会整理出来供你们参考。
https://github.com/vanDusty/SpringBoot-Home/tree/master/springboot-demo-lock/redis-lock