前言:在分布式环境中,咱们常用锁来进行并发控制,锁可分为乐观锁和悲观锁,基于数据库版本戳的实现是乐观锁,基于或zookeeper的实现可认为是悲观锁了。乐观锁和悲观锁最根本的区别在于线程之间是否相互阻塞。html
从2.6.12版本开始,redis为SET命令增长了一系列选项(set [key] NX/XX EX/PX [expiration]):java
EX seconds – 设置键key的过时时间,单位时秒node
PX milliseconds – 设置键key的过时时间,单位时毫秒git
NX – 只有键key不存在的时候才会设置key的值github
XX – 只有键key存在的时候才会设置key的值web
原文地址:https://redis.io/commands/set
中文地址:http://redis.cn/commands/set.html面试
注意: 因为SET命令加上选项已经能够彻底取代SETNX, SETEX, PSETEX的功能,因此在未来的版本中,redis可能会不推荐使用而且最终抛弃这几个命令。redis
小编分类整理了许多java进阶学习材料和BAT面试题,须要资料的请加JAVA高阶学习Q群:8515318105;就能领取2019年java架构师进阶学习资料和BAT面试题。算法
这里简单提一下,在旧版本的redis中(指2.6.12版本以前),使用redis实现分布式锁通常须要setNX、expire、getSet、del等命令。并且会发现这种实现有不少逻辑判断的原子操做以及本地时间等并无控制好。数据库
而在旧版本的redis中,redis的超时时间很难控制,用户迫切须要把setNX和expiration结合为一体的命令,把他们做为一个原子操做,这样新版本的多选项set命令诞生了。然而这并无彻底解决复杂的超时控制带来的问题。
接下来,咱们的一切讨论都基于新版redis。
在这里,我先提出几个在实现redis分布式锁中须要考虑的关键问题
1.一、为了防止死锁,redis至少须要设置一个超时时间;
1.二、由1.1引伸出来,当锁自动释放了,可是程序并无执行完毕,这时候其余线程又获取到锁执行一样的程序,可能会形成并发问题,这个问题咱们须要考虑一下是否归属于分布式锁带来问题的范畴。
2.一、每一个获取redis锁的线程应该释放本身获取到的锁,而不是其余线程的,因此咱们须要在每一个线程获取锁的时候给锁作上不一样的标记以示区分;
2.二、由2.1带来的问题是线程在释放锁的时候须要判断当前锁是否属于本身,若是属于本身才释放,这里涉及到逻辑判断语句,至少是两个操做在进行,那么咱们须要考虑这两个操做要在一个原子内执行,否者在两个行为之间可能会有其余线程插入执行,致使程序紊乱。
单实例的redis(这里指只有一个master节点)每每是不可靠的,虽然实现起来相对简单一些,可是会面临着宕机等不可用的场景,即便在主从复制的时候也显得并不可靠(由于redis的主从复制每每是异步的)。
关于Martin Kleppmann的Redlock的分析
原文地址:https://redis.io/topics/distlock
中文地址:http://redis.cn/topics/distlock.html
文章分析得出,这种算法只需具有3个特性就能够实现一个最低保障的分布式锁。
安全属性(Safety property): 独享(相互排斥)。在任意一个时刻,只有一个客户端持有锁。
活性A(Liveness property A): 无死锁。即使持有锁的客户端崩溃(crashed)或者网络被分裂(gets partitioned),锁仍然能够被获取。
活性B(Liveness property B): 容错。 只要大部分Redis节点都活着,客户端就能够获取和释放锁.
第一点安全属性意味着悲观锁(互斥锁)是咱们作redis分布式锁的前提,否者将可能形成并发;
第二点代表为了不死锁,咱们须要设置锁超时时间,保证在必定的时间事后,锁能够从新被利用;
第三点是说对于客户端来讲,获取锁和手动释放锁能够有更高的可靠性。
怎么才能合理判断程序真正处理的有效时间范围?(这里有个时间偏移的问题)
redis Master节点宕机后恢复(可能尚未持久化到磁盘)、主从节点切换,(N/2)+1这里的N应该怎么动态计算更合理?
接下来再看,redis之父antirez对Redlock的评价
原文地址:http://antirez.com/news/101
文中主要提到了网络延迟和本地时钟的修改(不论是时间服务器或人为修改)对这种算法可能形成的影响。
获取锁(含自动释放锁):
SET resource_name my_random_value NX PX 30000
手动删除锁(Lua脚本):
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
复制代码
为了保证在尽量短的时间内获取到(N/2)+1个节点的锁,能够并行去获取各个节点的锁(固然,并行可能须要消耗更多的资源,由于串行只须要count到足够数量的锁就能够中止获取了);
另外,怎么动态实时统一获取redis master nodes须要更进一步去思考了。
小编分类整理了许多java进阶学习材料和BAT面试题,须要资料的请加JAVA高阶学习Q群:8515318105;就能领取2019年java架构师进阶学习资料和BAT面试题。
QA,补充一下说明(如下为我与朋友沟通的状况,以说明文中你们可能不够明白的地方):
一、在关键问题2.1中,删除就删除了,会形成什么问题?
线程A超时,准备删除锁;但此时的锁属于线程B;线程B还没执行完,线程A把锁删除了,这时线程C获取到锁,同时执行程序;因此不能乱删。
二、在关键问题2.2中,只要在key生成时,跟线程相关就不用考虑这个问题了吗?
不一样的线程执行程序,线程之间肯虽然有差别呀,而后在redis锁的value设置有线程信息,好比线程id或线程名称,是分布式环境的话加个机器id前缀咯(相似于twitter的snowflake算法!),可是在del命令只会涉及到key,不会再次检查value,因此仍是须要lua脚本控制if(condition){xxx}的原子性。
三、那要不要考虑锁的重入性?
不须要重入;try…finally 没得重入的场景;对于单个线程来讲,执行是串行的,获取锁以后一定会释放,由于finally的代码一定会执行啊(只要进入了try块,finally一定会执行)。
四、为何两个线程都会去删除锁?(貌似重复的问题。无论怎样,仍是耐心解答吧)
每一个线程只能管理本身的锁,不能管理别人线程的锁啊。这里能够联想一下ThreadLocal。
五、若是加锁的线程挂了怎么办?只能等待自动超时?
看你怎么写程序的了,一种是问题3的回答;另外,那就自动超时咯。这种状况也适用于网络over了。
六、时间太长,程序异常就会蛋疼,时间过短,就会出现程序尚未处理完就超时了,这岂不是很尴尬?
是呀,因此须要更好的衡量这个超时时间的设置。
实践部分主要代码:
package com.caiya.cms.web.component;
import com.caiya.cache.CacheException;
import com.caiya.cache.redis.JedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* redis实现分布式锁
* 可实现特性:
* 一、使多线程无序排队获取和释放锁;
* 二、丢弃未成功得到锁的线程处理;
* 三、只释放线程自己加持的锁;
* 四、避免死锁
*
* @author wangnan
* @since 1.0
*/
public final class RedisLock {
private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
/**
* 尝试加锁(仅一次)
*
* @param lockKey 锁key
* @param lockValue 锁value
* @param expireSeconds 锁超时时间(秒)
* @return 是否加锁成功
* @throws CacheException
*/
public static boolean tryLock(String lockKey, String lockValue, long expireSeconds) throws CacheException {
JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();
try {
String response = jedisCache.set(lockKey, lockValue, "nx", "ex", expireSeconds);
return Objects.equals(response, "OK");
} finally {
jedisCache.close();
}
}
/**
* 加锁(指定最大尝试次数范围内)
*
* @param lockKey 锁key
* @param lockValue 锁value
* @param expireSeconds 锁超时时间(秒)
* @param tryTimes 最大尝试次数
* @param sleepMillis 每两次尝试之间休眠时间(毫秒)
* @return 是否加锁成功
* @throws CacheException
*/
public static boolean lock(String lockKey, String lockValue, long expireSeconds, int tryTimes, long sleepMillis) throws CacheException {
boolean result;
int count = 0;
do {
count++;
result = tryLock(lockKey, lockValue, expireSeconds);
try {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
} while (!result && count <= tryTimes);
return result;
}
/**
* 释放锁
*
* @param lockKey 锁key
* @param lockValue 锁value
*/
public static void unlock(String lockKey, String lockValue) {
JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();
try {
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Object result = jedisCache.eval(luaScript, 1, lockKey, lockValue);
// Objects.equals(result, 1L);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
jedisCache.close();
}
// return false;
}
private RedisLock() {
}
}
复制代码
...
String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();// 跟业务相关的惟一拼接键
String lockValue = Constant.DEFAULT_CACHE_NAME + ":" + System.getProperty("JvmId") + ":" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();// 生成集群环境中的惟一值
boolean locked = RedisLock.tryLock(lockKey, lockValue, 100);// 只尝试一次,在本次处理过程当中直接拒绝其余线程的请求
if (!locked) {
throw new IllegalAccessException("您的操做太频繁了,休息一下再来吧~");
}
try {
// 开始处理核心业务逻辑
Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
...
...
} finally {
RedisLock.unlock(lockKey, lockValue);// 在finally块中释放锁
}
复制代码
...
String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();
String lockValue = Constant.DEFAULT_CACHE_NAME + ":机器编号:" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();
boolean locked = RedisLock.lock(lockKey, lockValue, 100, 20, 100);// 非公平锁,无序竞争(这里须要合理根据业务处理状况设置最大尝试次数和每次休眠时间)
if (!locked) {
throw new IllegalAccessException("系统太忙,本次操做失败");// 通常来讲,不会走到这一步;若是真的有这种状况,而且在合理设置锁尝试次数和等待响应时间以后仍然处理不过来,可能须要考虑优化程序响应时间或者用消息队列排队执行了
}
try {
// 开始处理核心业务逻辑
Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
...
...
} finally {
RedisLock.unlock(lockKey, lockValue);
}
...
复制代码
基于redis的分布式锁实现客户端Redisson:
https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers
基于zookeeper的分布式锁实现:
http://curator.apache.org/curator-recipes/shared-reentrant-lock.html
小编分类整理了许多java进阶学习材料和BAT面试题,须要资料的请加JAVA高阶学习Q群:8515318105;就能领取2019年java架构师进阶学习资料和BAT面试题。