关于redis分布式锁, 查了不少资料, 发现不少只是实现了最基础的功能, 可是, 并无解决当锁已超时而业务逻辑还未执行完的问题, 这样会致使: A线程超时时间设为10s(为了解决死锁问题), 但代码执行时间可能须要30s, 而后redis服务端10s后将锁删除, 此时, B线程刚好申请锁, redis服务端不存在该锁, 能够申请, 也执行了代码, 那么问题来了, A、B线程都同时获取到锁并执行业务逻辑, 这与分布式锁最基本的性质相违背: 在任意一个时刻, 只有一个客户端持有锁, 即独享java
为了解决这个问题, 本文将用完整的代码和测试用例进行验证, 但愿能给小伙伴带来一点帮助git
压测工具jmeter
下载连接
提取码: 8f2agithub
redis-desktop-manager客户端
下载连接
提取码: 9bhfredis
postman
下载连接
提取码: vfu7算法
也能够直接官网下载, 我这边都整理到网盘了spring
须要postman是由于我还没找到jmeter多开窗口的办法, 哈哈缓存
springmvc项目安全
maven依赖mvc
<!--redis--> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> <version>1.6.5.RELEASE</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.7.3</version> </dependency>
分布式锁工具类: DistributedLockdom
测试接口类: PcInformationServiceImpl
锁延时守护线程类: PostponeTask
先测试在不开启锁延时线程的状况下, A线程超时时间设为10s, 执行业务逻辑时间设为30s, 10s后, 调用接口, 查看是否可以获取到锁, 若是获取到, 说明存在线程安全性问题
同上, 在加锁的同时, 开启锁延时线程, 调用接口, 查看是否可以获取到锁, 若是获取不到, 说明延时成功, 安全性问题解决
1)、DistributedLock
package com.cn.pinliang.common.util; import com.cn.pinliang.common.thread.PostponeTask; import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import java.io.Serializable; import java.util.Collections; @Component public class DistributedLock { @Autowired private RedisTemplate<Serializable, Object> redisTemplate; private static final Long RELEASE_SUCCESS = 1L; private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "EX"; // 解锁脚本(lua) private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; /** * 分布式锁 * @param key * @param value * @param expireTime 单位: 秒 * @return */ public boolean lock(String key, String value, long expireTime) { return redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> { Jedis jedis = (Jedis) redisConnection.getNativeConnection(); String result = jedis.set(key, value, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return Boolean.TRUE; } return Boolean.FALSE; }); } /** * 解锁 * @param key * @param value * @return */ public Boolean unLock(String key, String value) { return redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> { Jedis jedis = (Jedis) redisConnection.getNativeConnection(); Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(key), Collections.singletonList(value)); if (RELEASE_SUCCESS.equals(result)) { return Boolean.TRUE; } return Boolean.FALSE; }); } }
说明: 就2个方法, 加锁解锁, 加锁使用jedis setnx方法, 解锁执行lua脚本, 都是原子性操做
2)、PcInformationServiceImpl
public JsonResult add() throws Exception { String key = "add_information_lock"; String value = RandomUtil.produceStringAndNumber(10); long expireTime = 10L; boolean lock = distributedLock.lock(key, value, expireTime); String threadName = Thread.currentThread().getName(); if (lock) { System.out.println(threadName + " 得到锁..............................."); Thread.sleep(30000); distributedLock.unLock(key, value); System.out.println(threadName + " 解锁了..............................."); } else { System.out.println(threadName + " 未获取到锁..............................."); return JsonResult.fail("未获取到锁"); } return JsonResult.succeed(); }
说明: 测试类很简单, value随机生成, 保证惟一, 不会在超时状况下解锁其余客户端持有的锁
3)、打开redis-desktop-manager客户端, 刷新缓存, 能够看到, 此时是没有add_information_lock
的key的
4)、启动jmeter, 调用接口测试
设置5个线程同时访问, 在10s的超时时间内查看redis, add_information_lock
存在, 屡次调接口, 只有一个线程可以获取到锁
redis
1-4个请求, 都未获取到锁
第5个请求, 获取到锁
OK, 目前为止, 一切正常, 接下来测试10s以后, A仍在执行业务逻辑, 看别的线程是否能获取到锁
能够看到, 操做成功, 说明A和B同时执行了这段本应该独享的代码, 须要优化
package com.cn.pinliang.common.util; import com.cn.pinliang.common.thread.PostponeTask; import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import java.io.Serializable; import java.util.Collections; @Component public class DistributedLock { @Autowired private RedisTemplate<Serializable, Object> redisTemplate; private static final Long RELEASE_SUCCESS = 1L; private static final Long POSTPONE_SUCCESS = 1L; private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "EX"; // 解锁脚本(lua) private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; // 延时脚本 private static final String POSTPONE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return '0' end"; /** * 分布式锁 * @param key * @param value * @param expireTime 单位: 秒 * @return */ public boolean lock(String key, String value, long expireTime) { // 加锁 Boolean locked = redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> { Jedis jedis = (Jedis) redisConnection.getNativeConnection(); String result = jedis.set(key, value, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return Boolean.TRUE; } return Boolean.FALSE; }); if (locked) { // 加锁成功, 启动一个延时线程, 防止业务逻辑未执行完毕就因锁超时而使锁释放 PostponeTask postponeTask = new PostponeTask(key, value, expireTime, this); Thread thread = new Thread(postponeTask); thread.setDaemon(Boolean.TRUE); thread.start(); } return locked; } /** * 解锁 * @param key * @param value * @return */ public Boolean unLock(String key, String value) { return redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> { Jedis jedis = (Jedis) redisConnection.getNativeConnection(); Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(key), Collections.singletonList(value)); if (RELEASE_SUCCESS.equals(result)) { return Boolean.TRUE; } return Boolean.FALSE; }); } /** * 锁延时 * @param key * @param value * @param expireTime * @return */ public Boolean postpone(String key, String value, long expireTime) { return redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> { Jedis jedis = (Jedis) redisConnection.getNativeConnection(); Object result = jedis.eval(POSTPONE_LOCK_SCRIPT, Lists.newArrayList(key), Lists.newArrayList(value, String.valueOf(expireTime))); if (POSTPONE_SUCCESS.equals(result)) { return Boolean.TRUE; } return Boolean.FALSE; }); } }
说明: 新增了锁延时方法, lua脚本, 自行脑补相关语法
2)、PcInformationServiceImpl不须要改动
3)、PostponeTask
package com.cn.pinliang.common.thread; import com.cn.pinliang.common.util.DistributedLock; public class PostponeTask implements Runnable { private String key; private String value; private long expireTime; private boolean isRunning; private DistributedLock distributedLock; public PostponeTask() { } public PostponeTask(String key, String value, long expireTime, DistributedLock distributedLock) { this.key = key; this.value = value; this.expireTime = expireTime; this.isRunning = Boolean.TRUE; this.distributedLock = distributedLock; } @Override public void run() { long waitTime = expireTime * 1000 * 2 / 3;// 线程等待多长时间后执行 while (isRunning) { try { Thread.sleep(waitTime); if (distributedLock.postpone(key, value, expireTime)) { System.out.println("延时成功..........................................................."); } else { this.stop(); } } catch (Exception e) { e.printStackTrace(); } } } private void stop() { this.isRunning = Boolean.FALSE; } }
说明: 调用lock同时, 当即开启PostponeTask线程, 线程等待超时时间的2/3时间后, 开始执行锁延时代码, 若是延时成功, add_information_lock
这个key会一直存在于redis服务端, 直到业务逻辑执行完毕, 所以在此过程当中, 其余线程没法获取到锁, 也即保证了线程安全性
下面是测试结果
10s后, 查看redis服务端, add_information_lock
仍存在, 说明延时成功
此时用postman再次请求, 发现获取不到锁
看一下控制台打印
A线程在19:09:11获取到锁, 在10 * 2 / 3 = 6s后进行延时, 成功, 保证了业务逻辑未执行完毕的状况下不会释放锁
A线程执行完毕, 锁释放, 其余线程又能够竞争锁
OK, 目前为止, 解决了锁超时而业务逻辑仍在执行的锁冲突问题, 还很简陋, 而最严谨的方式仍是使用官方的 Redlock 算法实现, 其中 Java 包推荐使用 redisson, 思路差很少其实, 都是在快要超时时续期, 以保证业务逻辑未执行完毕不会有其余客户端持有锁
后面学习redisson, 看一下大神是怎么实现的
若是有什么不对的或者能够优化的但愿小伙伴多多指教, 留言评论什么的, 谢谢