咱们知道分布式锁的特性是排他、避免死锁、高可用。分布式锁的实现能够经过数据库的乐观锁(经过版本号)或者悲观锁(经过for update)、Redis的setnx()命令、Zookeeper(在某个持久节点添加临时有序节点,判断当前节点是不是序列中最小的节点,若是不是则监听比当前节点还要小的节点。若是是,获取锁成功。当被监听的节点释放了锁(也就是被删除),会通知当前节点。而后当前节点再尝试获取锁,如此反复)redis
本篇文章,主要讲如何用Redis的形式实现分布式锁。后续文章会讲解热点KEY读取,缓存穿透和缓存雪崩的场景和解决方案、缓存更新策略等等知识点,理论知识点较多。spring
个人redis配置以下数据库
spring.redis.host=
spring.redis.port=6379
#reids超时链接时间
spring.redis.timeout=100000
spring.redis.password=
#链接池最大链接数
spring.redis.pool.max-active=10000
#链接池最大空闲数
spring.redis.pool.max-idle=1000
#链接池最大等待时间
spring.redis.pool.max-wait=10000
复制代码
@Component
@Getter
@Setter
@ConfigurationProperties(prefix = "spring.redis")
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.timeout}")
private int timeout;
@Value("${spring.redis.pool.max-active}")
private int poolMaxActive;
@Value("${spring.redis.pool.max-idle}")
private int poolMaxIdle;
@Value("${spring.redis.pool.max-wait}")
private int poolMaxWait;
}
复制代码
@Component
public class RedisPoolFactory {
@Autowired
private RedisConfig redisConfig;
@Bean
public JedisPool jedisPoolFactory() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxIdle(redisConfig.getPoolMaxIdle());
poolConfig.setMaxTotal(redisConfig.getPoolMaxActive());
poolConfig.setTestOnBorrow(true);
poolConfig.setMaxWaitMillis(redisConfig.getPoolMaxWait());
JedisPool jp = new JedisPool(poolConfig, redisConfig.getHost(), redisConfig.getPort(),
redisConfig.getTimeout(), redisConfig.getPassword(), 0);
return jp;
}
}
复制代码
为了区分不一样模块的key,我抽象出了一个KeyPrefix接口和BasePrefix类。apache
public interface KeyPrefix {
int expireSeconds();
String getPrefix();
}
复制代码
/**
* @author cmazxiaoma
* @version V1.0
* @Description: TODO
* @date 2018/5/10 12:35
*/
public abstract class BasePrefix implements KeyPrefix {
private int expireSeconds;
private String prefix;
public BasePrefix(int expireSeconds, String prefix) {
this.expireSeconds = expireSeconds;
this.prefix = prefix;
}
public BasePrefix(String prefix) {
this(0, prefix);
}
@Override
public int expireSeconds() {
return expireSeconds;
}
@Override
public String getPrefix() {
String className = getClass().getSimpleName();
return className + ":" + prefix;
}
}
复制代码
下面进入正文。由于分布式系统之间是不一样进程的,单机版的锁没法知足要求。因此咱们能够借助中间件Redis的setnx()命令实现分布式锁。setnx()命令只会对不存在的key设值,返回1表明获取锁成功。对存在的key设值,会返回0表明获取锁失败。这里的value是System.currentTimeMillis() (获取锁的时间)+锁持有的时间。我这里设置锁持有的时间是200ms,实际业务执行的时间远比这200ms要多的多,持有锁的客户端应该检查锁是否过时,保证锁在释放以前不会过时。由于客户端故障的状况多是很复杂的。好比如今有A,B俩个客户端。A客户端获取了锁,执行业务中作了骚操做致使阻塞了好久,时间应该远远超过200ms,当A客户端从阻塞状态下恢复继续执行业务代码时,A客户端持有的锁因为过时已经被其余客户端占有。这时候A客户端执行释放锁的操做,那么有可能释放掉其余客户端的锁。缓存
我这里设置的客户端等待锁的时间是200ms。这里经过轮询的方式去让客户端获取锁。若是客户端在200ms以内没有锁的话,直接返回false。实际场景要设置合适的客户端等待锁的时间,避免消耗CPU资源。bash
若是获取锁的逻辑只有这三行代码的话,会形成死循环,明显不符合分布式锁的特性。并发
if (jedis.setnx(realKey, value) == 1) {
return true;
}
复制代码
因此,咱们要加上锁过时,而后获取锁的策略。经过realKey获取当前的currentValue。currentValue也就是获取锁的时间 + 锁持有的时间。 若是currentValue不等于null 且 currentValue 小于当前时间,说明锁已通过期。这时候若是忽然来了C,D两个客户端获取锁的请求,不就让C,D两个客户端都获取锁了吗。若是防止这种现象发生,咱们采用getSet()命令来解决。getSet(key,value)的命令会返回key对应的value,而后再把key原来的值更新为value。也就是说getSet()返回的是已过时的时间戳。若是这个已过时的时间戳等于currentValue,说明获取锁成功。app
假设客户端A一开始持有锁,保存在redis中的value(时间戳)等于T1。 这时候客户端A的锁已通过期,那么C,D客户端就能够开始争抢锁了。currentValue是T1,C客户端的value是T2,D客户端的value是T3。首先C客户端进入到String oldValue = jedis.getSet(realKey, value);
这行代码,得到的oldValue是T1,同时也会把realKey对应的value更新为T2。再执行后续的代码,oldValue等于currentValue,那么客户端C获取锁成功。接着D客户端也执行到了String oldValue = jedis.getSet(realKey, value);
这行代码,获取的oldValue是T2,同时也会把realKey对应的value更新为T3。因为oldValue不等于currentValue,那么客户端D获取锁失败。分布式
public boolean lock(KeyPrefix prefix, String key, String value) {
Jedis jedis = null;
Long lockWaitTimeOut = 200L;
Long deadTimeLine = System.currentTimeMillis() + lockWaitTimeOut;
try {
jedis = jedisPool.getResource();
String realKey = prefix.getPrefix() + key;
for (;;) {
if (jedis.setnx(realKey, value) == 1) {
return true;
}
String currentValue = jedis.get(realKey);
// if lock is expired
if (!StringUtils.isEmpty(currentValue) &&
Long.valueOf(currentValue) < System.currentTimeMillis()) {
// gets last lock time
String oldValue = jedis.getSet(realKey, value);
if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
return true;
}
}
lockWaitTimeOut = deadTimeLine - System.currentTimeMillis();
if (lockWaitTimeOut <= 0L) {
return false;
}
}
} finally {
returnToPool(jedis);
}
}
复制代码
咱们讲解了获取的逻辑,接着讲讲释放锁的逻辑。咱们在这里加上!StringUtils.isEmpty(currentValue) && value.equals(currentValue)
判断是为了防止释放了不属于当前客户端的锁。仍是举个例子,若是没有这个逻辑,A客户端调用unlock()方法以前,锁忽然就过时了。这时候B客户端发现锁过时了,立马获取了锁。而后A客户端接着调用unlock()方法,却释放了本来属于B客户端的锁。ide
public void unlock(KeyPrefix prefix, String key, String value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String realKey = prefix.getPrefix() + key;
String currentValue = jedis.get(realKey);
if (!StringUtils.isEmpty(currentValue)
&& value.equals(currentValue)) {
jedis.del(realKey);
}
} catch (Exception ex) {
log.info("unlock error");
} finally {
returnToPool(jedis);
}
}
复制代码
编码RedisController,模拟商品秒杀操做。测试分布式锁是否可行。(强调:这里只是举一个例子,更直观的判断分布式锁可行,不适合实际场景!!!!!实际上抢购,是直接将库存放入到redis,是否结束标记放入到内存中,经过内存标记和redis中的decr()预减库存,而后将秒杀消息入队到消息队列中,最后消费消息并落地到DB中)
/**
* @author cmazxiaoma
* @version V1.0
* @Description: TODO
* @date 2018/8/28 9:27
*/
@RestController
@RequestMapping("/redis")
public class RedisController {
private static LongAdder longAdder = new LongAdder();
private static Long LOCK_EXPIRE_TIME = 200L;
private static Long stock = 10000L;
@Autowired
private RedisService redisService;
static {
longAdder.add(10000L);
}
@GetMapping("/v1/seckill")
public String seckillV1() {
Long time = System.currentTimeMillis() + LOCK_EXPIRE_TIME;
if (!redisService.lock(SeckillKeyPrefix.seckillKeyPrefix, "redis-seckill", String.valueOf(time))) {
return "人太多了,换个姿式操做一下";
}
if (longAdder.longValue() == 0L) {
return "已抢光";
}
doSomeThing();
if (longAdder.longValue() == 0L) {
return "已抢光";
}
longAdder.decrement();
redisService.unlock(SeckillKeyPrefix.seckillKeyPrefix, "redis-seckill", String.valueOf(time));
Long stock = longAdder.longValue();
Long bought = 10000L - stock;
return "已抢" + bought + ", 还剩下" + stock;
}
@GetMapping("/detail")
public String detail() {
Long stock = longAdder.longValue();
Long bought = 10000L - stock;
return "已抢" + bought + ", 还剩下" + stock;
}
@GetMapping("/v2/seckill")
public String seckillV2() {
if (longAdder.longValue() == 0L) {
return "已抢光";
}
doSomeThing();
if (longAdder.longValue() == 0L) {
return "已抢光";
}
longAdder.decrement();
Long stock = longAdder.longValue();
Long bought = 10000L - stock;
return "已抢" + bought + ", 还剩下" + stock;
}
@GetMapping("/v3/seckill")
public String seckillV3() {
if (stock == 0) {
return "已抢光";
}
doSomeThing();
stock--;
Long bought = 10000L - stock;
return "已抢" + bought + ", 还剩下" + stock;
}
public void doSomeThing() {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
复制代码
对http://localhost:8081/redis/v1/seckill
进行压测,我使用的压测工具是ab测试工具。这里用10000个并发用户,20000个请求来进行压测。
ab -c 10000 -n 20000 http://localhost:8081/redis/v1/seckill
复制代码
压测结果以下:
E:\cmazxiaoma_download\httpd-2.4.34-o102o-x64-vc14\Apache24\bin>ab -c 10000 -n 2
0000 http://localhost:8081/redis/v1/seckill
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient)
Completed 2000 requests
Completed 4000 requests
Completed 6000 requests
Completed 8000 requests
Completed 10000 requests
Completed 12000 requests
Completed 14000 requests
Completed 16000 requests
Completed 18000 requests
Completed 20000 requests
Finished 20000 requests
Server Software:
Server Hostname: localhost
Server Port: 8081
Document Path: /redis/v1/seckill
Document Length: 22 bytes
Concurrency Level: 10000
Time taken for tests: 108.426 seconds
Complete requests: 20000
Failed requests: 19991
(Connect: 0, Receive: 0, Length: 19991, Exceptions: 0)
Total transferred: 3420218 bytes
HTML transferred: 760218 bytes
Requests per second: 184.46 [#/sec] (mean)
Time per request: 54213.000 [ms] (mean)
Time per request: 5.421 [ms] (mean, across all concurrent requests)
Transfer rate: 30.80 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 6.3 0 549
Processing: 2393 36477 16329.1 45101 90269
Waiting: 182 36435 16351.4 45046 90267
Total: 2393 36477 16329.0 45101 90269
Percentage of the requests served within a certain time (ms)
50% 45101
66% 47680
75% 49136
80% 50392
90% 53200
95% 53743
98% 54510
99% 56014
100% 90269 (longest request)
复制代码
咱们再来看看是否有超卖现象,貌似仍是正常。
我打开RedisDesktopManager查看db0的key信息时,发现还有一个key没有删除掉。说明咱们写的unlock()方法在1w并发用户,2w请求下仍是存在问题。
仔细推敲本身以前写的代码发现(仍是拿上面的例子说事),客户端D虽然获取锁失败,可是以前进行了String oldValue = jedis.getSet(realKey, value);
操做,仍是成功的更新了realKey对应的value。咱们进行unlock()操做时,释放客户端的锁是根据value来标识当前客户端的。一开始客户端C的value是T2,因为客户端D的getSet()操做,覆盖掉了客户端C的value,让其更新成T3。因为value.equals(currentValue)
条件不成立,因此不会执行到jedis.del(realKey)
其实lock()方法也经不起推敲: 1.分布式各个系统时间不一致,若是要这样作,只能进行时间同步。 2.当某个客户端锁过时时,多个客户端开始争抢锁。虽然最后只有一个客户端能成功锁,可是获取锁失败的客户端能覆盖获取锁成功客户端的过时时间。 3.当客户端的锁过时时间被覆盖,会形成锁不具备标识性,会形成客户端没有释放锁。
因此咱们要重写lock与unlock()的逻辑,看到网上已经有不少的解决方案。(不过也有不少错误案例)
咱们能够经过redis的set(key,value,NX,EX,timeout)合并普通的set()和expire()操做,使其具备原子性。
/**
* Set the string value as value of the key. The string can't be longer than 1073741824 bytes (1 * GB). * @param key * @param value * @param nxxx NX|XX, NX -- Only set the key if it does not already exist. XX -- Only set the key * if it already exist. * @param expx EX|PX, expire time units: EX = seconds; PX = milliseconds * @param time expire time in the units of <code>expx</code> * @return Status code reply */ public String set(final String key, final String value, final String nxxx, final String expx, final long time) { checkIsInMultiOrPipeline(); client.set(key, value, nxxx, expx, time); return client.getStatusCodeReply(); } 复制代码
经过set(key,value,NX,EX,timeout)方法,咱们就能够轻松实现分布式锁。值得注意的是这里的value做为客户端锁的惟一标识,不能重复。
public boolean lock1(KeyPrefix prefix, String key, String value, Long lockExpireTimeOut,
Long lockWaitTimeOut) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String realKey = prefix.getPrefix() + key;
Long deadTimeLine = System.currentTimeMillis() + lockWaitTimeOut;
for (;;) {
String result = jedis.set(realKey, value, "NX", "PX", lockExpireTimeOut);
if ("OK".equals(result)) {
return true;
}
lockWaitTimeOut = deadTimeLine - System.currentTimeMillis();
if (lockWaitTimeOut <= 0L) {
return false;
}
}
} catch (Exception ex) {
log.info("lock error");
} finally {
returnToPool(jedis);
}
return false;
}
复制代码
咱们可使用lua脚本合并get()和del()操做,使其具备原子性。一切大功告成。
public boolean unlock1(KeyPrefix prefix, String key, String value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String realKey = prefix.getPrefix() + key;
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(luaScript, Collections.singletonList(realKey),
Collections.singletonList(value));
if ("1".equals(result)) {
return true;
}
} catch (Exception ex) {
log.info("unlock error");
} finally {
returnToPool(jedis);
}
return false;
}
复制代码
刚才看了评论,看到了各位大佬提出的一系列问题。我作出如下解释:
2.请耐心读完本篇文章。第一个案例代码是错误的,我后续讲解了如何发现和分析错误案例代码的思路。 在此基础下,推导出正确的代码。
3.经过评论,我看到有一篇文章做者的思路是这样的: 获取锁以后,经过标志位和开启新线程的方式轮询去刷新当前客户端持有锁的时间,以保证在释放锁以前锁不会过时,而后锁释放后,将标志位置为false,线程中止循环。可是这样有一个问题:假如执行了lock()操做以后,客户端因为一些缘由阻塞了,那么unlock()方法一直得不到执行,那么标志位一直为true,开启刷新过时时间的线程一直死循环,会形成资源的严重浪费。并且线程一直增长当前客户端持有锁的时间,会形成其余客户端一直拿不到锁,并且形成死锁。
你们好,我是cmazxiaoma(寓意是沉梦昂志的小马),感谢各位阅读本文章。 小弟不才。 若是您对这篇文章有什么意见或者错误须要改进的地方,欢迎与我讨论。 若是您以为还不错的话,但愿大家能够点个赞。 但愿个人文章对你能有所帮助。 有什么意见、看法或疑惑,欢迎留言讨论。
最后送上:心之所向,素履以往。生如逆旅,一苇以航。