前提, 应用服务是分布式或多服务, 而这些"多"有共同的"redis";
(2017-12-04) 笑哭, 写这篇以前一直以为应该有大神已经写好了, 但未找到. 其实redis官网已经给出了实现(百度、阿里都是用的这套): Redis分布式锁、Distributed locks with Redis
java版本的名字叫redisson, 其github: https://github.com/redisson/redisson
GitHub: https://github.com/vergilyn/SpringBootDemo
代码结构:
html
(具体实现思路参考: 分布式锁的实现、如何用消息系统避免分布式事务?)java
一、基于数据库
能够用数据库的行锁for update
, 或专门新建一张锁控制表
来实现.
过于依赖数据库, 且健壮性也不是特别好, 彻底能够把此种方案舍弃.
(话说都涉及到分布式或多服务器,基本主要仍是用redis、memcached或其余缓存服务实现并发锁)
二、基于ZooKeeper实现分布式锁
并未去研究, 参考上面的博客连接.
三、基于redis实现
redis实现的复杂度不算高, 只是须要注意一些实现细节. 健壮性貌似只比zookeeper差点, 但彻底可接受.git
一、主要的redis核心命令: 利用redis是单线程的特性, 用setnx、getset、time来实现.
二、思路: redis的key-value就表明一个对象锁, 当此key存在说明锁已被获取, 其他相同对象操做则须要等待获取锁.
三、须要注意的细节:
1) 锁的释放, 要特别避免死锁出现, 主要是特殊状况下如何释放锁.
2) 等待获取锁的线程, 最好有超时机制.
3) 注意多服务器之间的时间是否同步.
4) 注意获取锁操做别占用或建立太多的链接(即便及时关闭了链接), 很影响系统的性能.github
/** * 锁的策略参考: <a href="http://blog.csdn.net/u010359884/article/details/50310387">基于redis分布式锁实现“秒杀”</a> * FIXME 此方式加锁策略存在必定缺陷: 在setIfAbsent()以后expire()执行以前程序异常 锁不会被释放. 虽然出现概率极低 * * @param timeout timeout的时间范围内轮询锁, 单位: 秒 * @param expire 设置锁超时时间 * @return true, 获取锁成功; false, 获取锁失败. */ public boolean lock(long timeout, long expire, final TimeUnit unit) { long beginTime = System.nanoTime(); // 用nanos、mills具体看需求. timeout = TimeUnit.SECONDS.toNanos(timeout); try { // 在timeout的时间范围内不断轮询锁 while (System.nanoTime() - beginTime < timeout) { // 锁不存在的话,设置锁并设置锁过时时间,即加锁 if (this.redisClient.opsForValue().setIfAbsent(this.key, "1")) { this.redisClient.expire(key, expire, unit);//设置锁失效时间, 防止永久阻塞 this.lock = true; return true; } // 短暂休眠后轮询,避免可能的活锁 System.out.println("get lock waiting..."); Thread.sleep(30, RANDOM.nextInt(30)); } } catch (Exception e) { throw new RuntimeException("locking error", e); } return false; }
以上锁策略已经很完美, 1) 指定了获取锁的超时时间; 2) 设置了锁的失效, 防止永久阻塞;
但可能有极端状况, 即setIfAbsent()
成功, expire()
执行以前, 若是出现异常状况, 致使expire()
没有执行, 因此此时会出现永久阻塞. (道理是很难遇到这状况)redis
/** * 特别注意: 若是多服务器之间存在时间差, 并不建议用System.nanoTime()、System.currentTimeMillis(). * 更好的是统一用redis-server的时间, 但只能获取到milliseconds. * 锁的策略参考: <a href="http://www.jeffkit.info/2011/07/1000/?spm=5176.100239.blogcont60663.7.9f4d4a8h4IOxe">用Redis实现分布式锁</a> * * @param timeout 获取锁超时, 单位: 毫秒 * @param expire 锁失效时常, 单位: 毫秒 * @return true, 获取锁成功; false, 获取锁失败. */ public boolean lockB(long timeout, long expire) { long bt = System.currentTimeMillis(); long lockVal; String lockExpireTime; try { while (!this.lock) { if(System.currentTimeMillis() - bt > timeout){ throw new RedisLockException("get lock timeout!"); } // 锁的键值: {当前时间} + {失效时常} = {锁失效时间} lockVal = getRedisTime() + expire; // 1. 尝试获取锁 boolean ifAbsent = this.redisClient.opsForValue().setIfAbsent(this.key, lockVal + ""); if (ifAbsent) { // 设置成功, 表示得到锁 // 这种策略下, 是否设置key失效不过重要. 由于, 正常流程中最后会释放锁(del-key); 若是是异常状况下未释放锁, 后面的代码也会判断锁是否失效. // 设置的好处: 能减小redis的内存消耗, 及时清理无效的key(暂时只想到这) // this.redisClient.expire(key, timeout, TimeUnit.SECONDS); this.lock = true; return true; } lockExpireTime = this.redisClient.opsForValue().get(this.key); long curTime = getRedisTime(); // curTime > expireVal: 表示此锁已无效 /* 在锁无效的前提下, 尝试获取锁: (必定要用)getAndSet() * * 假设锁已失效, 且未正常expire. 此时C一、C2同时执行到此, C2先执行getAndSet(key, time-02), C2获取到锁 * 此时C1.getAndSet(key, time-01)返回的是time-02, 显然curTime > time-02: false. * 因此, C1并未获取到锁. 但C1修改了key的值为: time-01. * 但由于C一、C2是同时执行到此, 因此time-0一、time-02的值近视相等. * (若多服务器存在时间差, 那这个差值有问题, 因此服务器时间若是不一样步则不能用System.nanoTime()、System.currentTimeMillis(), 该用redis-server time.) */ if (curTime > NumberUtils.toLong(lockExpireTime, 0)) { // getset必须在{curTime > expireVal} 判断以后; 不然, 可能出现死循环 lockExpireTime = this.redisClient.opsForValue().getAndSet(this.key, lockVal + ""); if (curTime > NumberUtils.toLong(lockExpireTime, 0)) { // this.redisClient.expire(key, timeout, TimeUnit.SECONDS); // 是否设置失效不重要, 理由同上. this.lock = true; return true; } } // 锁被占用, 短暂休眠等待轮询 System.out.println(this + ": get lock waiting..."); Thread.sleep(40); } } catch (Exception e) { e.printStackTrace(); throw new RedisLockException("locking error", e); } System.out.println(this + ": get lock error."); return false; }
此种锁策略特别要注意:
1) 若是多服务器之间时间不一样步, 那么能够用redis-server的时间.
2) getset的调用必须在curTime > lockExpireTime
的前提下, 不然会出现死循环.
3) 并发时getset产生的偏差, 彻底可忽略.
4) 特别要注意redis链接的释放, 不然很容易占用过多的redis链接数.spring
public class RedisLock { private String key; private boolean lock = false; private final StringRedisTemplate redisClient; private final RedisConnection redisConnection; /** * @param purpose 锁前缀 * @param key 锁定的ID等东西 */ public RedisLock(String purpose, String key, StringRedisTemplate redisClient) { if (redisClient == null) { throw new IllegalArgumentException("redisClient 不能为null!"); } this.key = purpose + "_" + key + "_redis_lock"; this.redisClient = redisClient; this.redisConnection = redisClient.getConnectionFactory().getConnection(); } /** * 锁的策略参考: <a href="http://blog.csdn.net/u010359884/article/details/50310387">基于redis分布式锁实现“秒杀”</a> * FIXME 此方式加锁策略存在必定缺陷: 在setIfAbsent()以后expire()执行以前程序异常 锁不会被释放. 虽然出现概率极低 * * @param timeout timeout的时间范围内轮询锁, 单位: 秒 * @param expire 设置锁超时时间 * @return true, 获取锁成功; false, 获取锁失败. */ public boolean lockA(long timeout, long expire, final TimeUnit unit) { long beginTime = System.nanoTime(); // 用nanos、mills具体看需求. timeout = unit.toNanos(timeout); try { // 在timeout的时间范围内不断轮询锁 while (System.nanoTime() - beginTime < timeout) { // 锁不存在的话,设置锁并设置锁过时时间,即加锁 if (this.redisClient.opsForValue().setIfAbsent(this.key, "1")) { this.redisClient.expire(key, expire, unit);//设置锁失效时间, 防止永久阻塞 this.lock = true; return true; } // 短暂休眠后轮询,避免可能的活锁 System.out.println("get lock waiting..."); Thread.sleep(30); } } catch (Exception e) { throw new RedisLockException("locking error", e); } return false; } /** * 特别注意: 若是多服务器之间存在时间差, 并不建议用System.nanoTime()、System.currentTimeMillis(). * 更好的是统一用redis-server的时间, 但只能获取到milliseconds. * 锁的策略参考: <a href="http://www.jeffkit.info/2011/07/1000/?spm=5176.100239.blogcont60663.7.9f4d4a8h4IOxe">用Redis实现分布式锁</a> * * @param timeout 获取锁超时, 单位: 毫秒 * @param expire 锁失效时常, 单位: 毫秒 * @return true, 获取锁成功; false, 获取锁失败. */ public boolean lockB(long timeout, long expire) { long bt = System.currentTimeMillis(); long lockVal; String lockExpireTime; try { while (!this.lock) { if(System.currentTimeMillis() - bt > timeout){ throw new RedisLockException("get lock timeout!"); } // 锁的键值: {当前时间} + {失效时常} = {锁失效时间} lockVal = getRedisTime() + expire; // 1. 尝试获取锁 boolean ifAbsent = this.redisClient.opsForValue().setIfAbsent(this.key, lockVal + ""); if (ifAbsent) { // 设置成功, 表示得到锁 this.lock = true; return true; } lockExpireTime = this.redisClient.opsForValue().get(this.key); long curTime = getRedisTime(); if (curTime > NumberUtils.toLong(lockExpireTime, 0)) { lockExpireTime = this.redisClient.opsForValue().getAndSet(this.key, lockVal + ""); if (curTime > NumberUtils.toLong(lockExpireTime, 0)) { this.lock = true; return true; } } // 锁被占用, 短暂休眠等待轮询 System.out.println(this + ": get lock waiting..."); Thread.sleep(40); } } catch (Exception e) { e.printStackTrace(); throw new RedisLockException("locking error", e); } System.out.println(this + ": get lock error."); return false; } /** * @return current redis-server time in milliseconds. */ private long getRedisTime() { return this.redisConnection.time(); } private void closeConnection(){ if(!this.redisConnection.isClosed()){ this.redisConnection.close(); } } /** 释放锁 */ public void unlock() { if (this.lock) { redisClient.delete(key); } } public boolean isLock() { return lock; } }
@Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RedisLockedKey { /** * 复杂对象中须要加锁的成员变量 */ String field() default ""; }
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RedisDistributedLock { /** 锁key的前缀 */ String lockedPrefix() default ""; /** 轮询锁的时间超时时常, 单位: ms */ long timeout() default 2000; /** redis-key失效时常, 单位: ms */ int expireTime() default 1000; }
@Component @Aspect public class RedisDistributedLockAop { @Autowired private StringRedisTemplate redisTemplate; /** * 定义缓存逻辑 */ @Around("@annotation(com.vergilyn.demo.springboot.distributed.lock.annotation.RedisDistributedLock)") public void cache(ProceedingJoinPoint pjp) { Method method = getMethod(pjp); RedisDistributedLock cacheLock = method.getAnnotation(RedisDistributedLock.class); String key = getRedisKey(method.getParameterAnnotations(), pjp.getArgs()); RedisLock redisLock = new RedisLock(cacheLock.lockedPrefix(), key, redisTemplate); // boolean isLock = redisLock.lockB(cacheLock.timeout(), cacheLock.expireTime()); boolean isLock = redisLock.lockA(cacheLock.timeout(), cacheLock.expireTime(), TimeUnit.MILLISECONDS); if (isLock) { try { pjp.proceed(); return; } catch (Throwable e) { e.printStackTrace(); } finally { redisLock.unlock(); } } System.out.println("执行方法失败"); } /** * 获取被拦截的方法对象 */ private Method getMethod(ProceedingJoinPoint pjp) { Object[] args = pjp.getArgs(); Class[] argTypes = new Class[pjp.getArgs().length]; for (int i = 0; i < args.length; i++) { argTypes[i] = args[i].getClass(); } Method method = null; try { method = pjp.getTarget().getClass().getMethod(pjp.getSignature().getName(), argTypes); } catch (NoSuchMethodException | SecurityException e) { e.printStackTrace(); } return method; } private String getRedisKey(Annotation[][] annotations, Object[] args){ if (null == args || args.length == 0) { throw new RedisLockException("方法参数为空,没有被锁定的对象"); } if (null == annotations || annotations.length == 0) { throw new RedisLockException("没有被注解的参数"); } // 只支持第一个注解为RedisLockedKey的参数 for (int i = 0; i < annotations.length; i++) { for (int j = 0; j < annotations[i].length; j++) { if (annotations[i][j] instanceof RedisLockedKey) { //注解为LockedComplexObject RedisLockedKey redisLockedKey = (RedisLockedKey) annotations[i][j]; String field = redisLockedKey.field(); try { // field存在, 表示取参数对象的相应field; if(StringUtils.isBlank(field)){ return args[i].toString(); }else { return args[i].getClass().getDeclaredField(redisLockedKey.field()).toString(); } } catch (NoSuchFieldException | SecurityException e) { e.printStackTrace(); throw new RedisLockException("注解对象中不存在属性: " + redisLockedKey.field()); } } } } throw new RedisLockException("未找到注解对象!"); } }
public class RedisLockException extends RuntimeException{ public RedisLockException(String msg, Throwable throwable) { super(msg, throwable); } public RedisLockException(String msg) { super(msg); } }
#### 视状况调整 # 部分redis配置 spring.redis.database=0 spring.redis.host=127.0.0.1 # spring.redis.password= spring.redis.port=6379 # 链接池最大链接数(使用负值表示没有限制) spring.redis.pool.max-active=1 spring.redis.pool.max-wait=-1 # 链接池中的最大空闲链接 spring.redis.pool.max-idle=4 # 链接池中的最小空闲链接 spring.redis.pool.min-idle=0 spring.redis.timeout=2000
@SpringBootApplication @EnableCaching public class DistributedLockApplication implements CommandLineRunner{ @Autowired StringRedisTemplate redisTemplate; @Autowired LockService lockService; @Autowired ThreadPoolTaskExecutor executor; @Bean public ThreadPoolTaskExecutor myExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心线程数 executor.setCorePoolSize(8); // 最大线程数 executor.setMaxPoolSize(12); // 运行线程满时,等待队列的大小 executor.setQueueCapacity(1000); executor.setThreadNamePrefix("vl-thread-"); // 池和队列满的策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 空闲线程清除时间 executor.setKeepAliveSeconds(60); // 是否容许释放核心线程 executor.setAllowCoreThreadTimeOut(true); executor.initialize(); return executor; } public static void main(String[] args) { SpringApplication application = new SpringApplication(DistributedLockApplication.class); application.setAdditionalProfiles("redis"); application.run(args); } @Override public void run(String... args) throws Exception { System.out.println("run...."); for (int i = 0; i < 2; i++) { executor.execute(new Runnable() { @Override public void run() { // lockService.lockMethod(new LockBean(1L)); lockService.lockMethod("arg1", 1L); } }); } System.out.println(executor.getThreadPoolExecutor().getTaskCount()); } }
public interface LockService { public void lockMethod(String arg1,Long arg2); public void lockMethod(LockBean lockBean); }
@Service public class LockServiceImpl implements LockService { public static Map<Long, Integer> goods; static{ goods = new HashMap<>(); goods.put(1L, 100); goods.put(2L, 200); } @Override @RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void lockMethod(String arg1, @RedisLockedKey Long arg2) { //最简单的秒杀,这里仅做为demo示例 System.out.println("lockMethod, goods: " + reduceInventory(arg2)); } @Override @RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void lockMethod(@RedisLockedKey(field = "idic")LockBean lockBean) { System.out.println("lockMethod bean, goods: " + reduceInventory(lockBean.getIdic())); } // 模拟秒杀操做,姑且认为一个秒杀就是将库存减一 private Integer reduceInventory(Long commodityId){ goods.put(commodityId, goods.get(commodityId) - 1); return goods.get(commodityId); } }
public class LockBean { private Long idic; public LockBean(){} public LockBean(long idic) { this.idic = idic; } public Long getIdic() { return idic; } public void setIdic(Long idic) { this.idic = idic; } }
以上只是简单实现代码, 若是用于实际项目中, 以上代码存在不少性能问题, 具体性能问题:
1) 太频繁的获取redis链接、关闭链接.
lockA
: 每次while一定有一次setIfAbsent
, 可能会有expire
, 而后释放锁有delete
. 因此一次正常的流程就须要3个链接. 若是是并发同时竞争等待获取锁, 那么性能影响也蛮大的.
lockB
: 这种策略要用到的链接更多, 而且若是是this.redisClient.getConnectionFactory().getConnection().time()
还要注意要手动释放这个链接.
针对此问题, (我的)想到的可能的代码改进方案, 每一个RedisLock
中用一个redisConnection
, 把全部的StringRedisTemplate
命令换成更底层的redisConnection
命令:数据库
public class RedisLock { private String key; private boolean lock = false; private final RedisConnection redisConnection; public RedisLock(String purpose, String key, RedisConnection redisConnection) { if (redisConnection == null) { throw new IllegalArgumentException("redisConnection 不能为null!"); } this.key = purpose + "_" + key + "_redis_lock"; this.redisConnection = redisConnection; } public boolean lockAc(long timeout, long expire, final TimeUnit unit) { long beginTime = System.nanoTime(); timeout = unit.toNanos(timeout); try { while (System.nanoTime() - beginTime < timeout) { if (this.redisConnection.setNX(this.key.getBytes(), "1".getBytes())) { this.redisConnection.expire(key.getBytes(), unit.toSeconds(expire)); this.lock = true; return true; } System.out.println("lockAc get lock waiting..."); Thread.sleep(30); } } catch (Exception e) { throw new RedisLockException("locking error", e); } return false; } private long getRedisTime() { return this.redisConnection.time(); } private void closeConnection(){ if(!this.redisConnection.isClosed()){ this.redisConnection.close(); } } public void unlock() { if (this.lock) { this.redisConnection.delete(key); } closeConnection(); // 用完必定要关闭, 这个位置不必定好, 可能在Aop调用unlock的finally处更好 } public boolean isLock() { return lock; } }
以上改进代码依然可能存在的问题:
1) 链接极可能没有正常关闭.
2) 链接依然过多, 假设并发有1000个, 那同样会产生1000个链接, 且这些链接只会在竞争获取锁完后才会释放.(且产生了1000个RedisLock对象)
3) 是否能够缓存注解对象?缓存
针对问题2)
, 主要想达到怎么尽量减小redis链接?
好比: 有1000个并发, 其中200个是兑换商品A, 其中300个是兑换商品B, 其中500个是兑换商品C.springboot
一、是否能够用单例模式
来实现RedisLock
?
对单例
、多线程
仍是很混乱, 很差说. 但若是可行, 会否太影响获取锁的性能?
好比兑换商品A的200个并发共用一个redisConnection, 感受仍是合理的, 毕竟互相之间是竞争关系.
但商品A、商品B、商品C若是也共用一个redisConnection, 是否是彻底不合理?
他们之间根本是"并行"的, 相互之间没有一点联系.
二、因此, 是否更进一步的实现是: 同一个锁竞争用相同的RedisLock
对象和RedisConnection
链接.
即竞争商品A的200个并发用同一个"redisConnection_A"、"redisLock_A", 商品B的300个并发用同一个"redisConnection_B"、"redisLock_B"?服务器
针对问题3)
, 在代码RedisDistributedLockAop
中, 每次都会:
1) getMethod(pjp)
: 获取拦截方法.
2) 经过拦截方法解析出getRedisKey
.
是否是能够这么实现, 相同的拦截方法只有第一次须要经过反射获取. 以后直接从缓存(如map)中获取到method
, 且由于同一个方法, 所能取field
也是同样的.
好比, 有一下几个方法都须要用到分布式并发锁:
@RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void a(String arg1, @RedisLockedKey Long arg2) { // ... } @RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void b(@RedisLockedKey(field = "idic")LockBean lockBean) { // ... } @RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void c(@RedisLockedKey(field = "xx")LockBean lockBean) { // ... }
// key: 完整方法名, 要惟一正确找到; value: 缓存的method Map<String, Method> methodCache = new HashMap<>; methodCache.put("com.service.aa.a()", method); methodCache.put("com.service.aa.b()", method); methodCache.put("com.service.bb.b()", method); // 而后, 同一个方法的注解内容相同, 因此彻底能够直接调用, 省略RedisLockedKey的逻辑判断 if(StringUtils.isBlank(field)){ return args[i].toString(); }else { return args[i].getClass().getDeclaredField(redisLockedKey.field()).toString(); }
以上只是本身的构想, 这些构想的可行性, 代码的具体实现还很难说...
(2017-12-04) 有空分析看下源码redisson的实现思路, 对比下本身的不足之处.
多个线程之间不能共享链接, 参考: REDIS实践之请勿踩多进程共用一个实例链接的坑