文章主旨
本文主要说明使用redis(codis)实现分布式锁的方法和要常见的问题及解决办法java
主要原理
/** * Set the string value as value of the key. The string can't be longer than 1073741824 bytes (1 * GB). * @param key * @param value * @param nxxx NX|XX, NX -- Only set the key if it does not already exist. XX -- Only set the key * if it already exist. * @param expx EX|PX, expire time units: EX = seconds; PX = milliseconds * @param time expire time in the units of <code>expx</code> * @return Status code reply */ public String set(final String key, final String value, final String nxxx, final String expx, final long time) { checkIsInMultiOrPipeline(); client.set(key, value, nxxx, expx, time); return client.getStatusCodeReply(); }
参数nxxx为NX时,当key不存在的时候才会执行set操做,结合redis单线程执行命令的特色能够实现操做互斥。git
常见问题及解决办法
问题1: 在哪里释放锁
正常过程分三部分:加锁,执行任务,释放锁。正常执行没有问题,可是若是执行任务阶段抛出异常,就会致使锁没有主动释放。github
解决办法
正确的写法是放到finally块中释放锁,这样能够解决抛异常的问题。伪代码大概是这样:redis
try { lock(); doTask(); } finally { releaseLock(); }
问题2: 锁过时时间长短选择问题
- 锁过时时间过短: 容易发生任务还没执行完,锁就自动释放了,就会致使发生并发,分布式锁失效。
- 锁时间太长: 虽然绝大多数状况下finally都能释放锁,可是也有例外,好比程序非正常终止,好比执行释放操做时redis服务不响应了。就会致使锁长期空置,没法被任何程序得到,过时时间越长,影响时间越长,通常都须要人工处理,很烦。
解决办法
要是能自动给锁续过时时间就行了。因此方法是获取锁的时候,锁的时间设置稍短一些,好比30秒,在获取锁成功后,起一个按期执行的后台任务,每隔10秒,设置一下过时时间为30秒。spring
问题3: 如何防止释放了别人的锁
假设碰到了问题2中锁过时时间过短的问题了,A任务获取了锁,过时时间是5秒,在第10秒的时候,A任务还在执行中,可是显然锁已经失效了,这时候B任务获取到了锁,在第11秒的时候,A任务执行完成了,执行释放锁操做,若是只是根据key判断的话,两个key是相同的,因此A会把B刚获取到的锁给释放掉,会形成一连串的不良反应,锁都乱套了。springboot
解决办法
不能简单根据key知道锁的归属,因此在获取锁的时候,应该生成一个惟一的value,用来标识。能够在set的时候,生成一个惟一的guid作为vlaue,在释放锁的时候,先判断获取redis里的value看看是否是和set的一致,只有一致的状况下,才执行del操做释放锁。这样就能够解决。伪代码以下:网络
val = redis.get(key); if val == setValue redis.del(key); endif
可是伪代码里获取值和删除key并非原子,因此仍是可能产生问题。改用lua脚原本确保操做的原子性。并发
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
问题4: 偶发性网络抖动致使加锁,或锁释放失败
网络抖动仍是比较常见的问题,解决办法也很简单,就是加锁和释放锁操做加一个重试机制,合理的参数设置能够达到很大程度的容错目的。分布式
其实到目前为止,redis分布式锁依然不是彻底可靠的,由于redis服务的问题致使数据不一致的问题没有被考虑到,不过要求不是那么严格的场景够用了。fetch
代码实现
handy-lock是一个redis分布式锁java类库,解决了上面提到的问题,简单易用。
特性
- 兼容codis
- setnx原子加锁,lua脚本原子释放锁
- 每次加锁value用guid,不会存在释放别人的锁的问题
- 加锁和释放锁自带重试机制
- 提供了自动延期的锁,避免过时时间长度选择的困境
引用
<dependency> <groupId>com.github.free-jungle</groupId> <artifactId>handy-lock-starter</artifactId> <version>1.0.0</version> </dependency>
配置
redis操做使用springboot的RedisTemplate实现,因此配置和spring.redis配置是同样的
spring: redis: host: 127.0.0.1 port: 6379
使用举例
例1: 使用RedisLockManager加锁[推荐]
@Resource private RedisLockManager redisLockManager; public void lockUseLockManager(String id) { String lockKey = String.format("%s.%s#%s", this.getClass().getCanonicalName(), "lockUseLockManager", id); try (RedisLock redisLock = redisLockManager.fetchAndTryLock(lockKey, 5000, 1000, 2)) { TimeUnit.SECONDS.sleep(2L); } catch (LockFailException | IOException | InterruptedException ex) { LOGGER.error("error when lockUseLockManager", ex); } }
例2: 注解方式加锁-静态key
@RedisDistributedLockable(key = "com.github.free.jungle.lock.examples.service.impl.lockUseAnnotation", expireInMilliseconds = 10000, waitInMilliseconds = 10, tryCount = 1) public void lockUseAnnotation() { LOGGER.info("start lockUseAnnotation"); try { TimeUnit.MILLISECONDS.sleep(100L); } catch (Exception ex) { LOGGER.error("error when lockUseAnnotation", ex); } LOGGER.info("end lockUseAnnotation"); }
虽然提供了注解的使用方式,仍是推荐直接用例1的方式,代码易读,易懂,用起来也很简单,没有比 注解麻烦。
例3: 注解方式加锁-动态key
@RedisDistributedLockable(keySpel = "'com.github.free.jungle.lock.examples.service.impl.lockUserAnnotationWithSpel#'+#id", expireInMilliseconds = 10000, waitInMilliseconds = 1000, tryCount = 3) public void lockUserAnnotationWithSpel(String id) { LOGGER.info("start lockUserAnnotationWithSpel:{}", id); try { TimeUnit.SECONDS.sleep(2L); } catch (Exception ex) { LOGGER.error("error when lockUseAnnotation", ex); } LOGGER.info("end lockUserAnnotationWithSpel:{}", id); }
适用于key须要根据入参动态拼装的状况,其中keySpel是spel表达式
例4: 自动延长过时时间的分布式锁[推荐]
上面的用法有一个困难的问题,就是过时时间参数(expireInMilliseconds)的配置可能很难,由于:
- 锁有效时间过短,任务还没执行完,redis就过时了,这样分布式执行就会产生并发的问题
- 锁有效期太长,极端状况当程序异常退出,没有正确释放锁,锁长时间没法获取,致使任务没法进行的问题
因此实现了一个可以自动延长锁有效时间的加锁方法,使用方法以下:
@Resource private RedisLockManager redisLockManager; public void lockWithScheduleUseLockManager(String id) { String lockKey = String.format("%s.%s#%s", this.getClass().getCanonicalName(), "lockUseLockManager", id); try (RedisLock redisLock = redisLockManager.fetchAndTryLockWithSchedule(lockKey)) { TimeUnit.SECONDS.sleep(30L); } catch (LockFailException | IOException | InterruptedException ex) { LOGGER.error("error when lockUseLockManager", ex); } }
详细方法说明
类RedisLockManager 的方法有较为详尽的注释,请直接查看源码。
样例项目
handy-lock-examples是专门的使用样例项目,做为参考使用
已知问题
只是实现了客户端层面的简易分布式锁,因此没法处理因为redis服务端故障形成数据不一致的问题,须要 更严谨的分布式锁的状况建议用Redssion