单机环境下咱们能够经过JAVA的Synchronized和Lock来实现进程内部的锁,可是随着分布式应用和集群环境的出现,系统资源的竞争从单进程多线程的竞争变成了多进程的竞争,这时候就须要分布式锁来保证。
实现分布式锁如今主流的方式大体有如下三种
1. 基于数据库的索引和行锁
2. 基于Redis的单线程原子操做:setNX
3. 基于Zookeeper的临时有序节点
这篇文章咱们用Redis来实现,会基于现有的各类锁实现来分析,最后分享Redission的锁源码分析来看下分布式锁的开源实现
复制代码
1、 经过setNx和getSet来实现redis
这是如今网上大部分版本的实现方式,笔者以前项目里面用到分布式锁也是经过这样的方式实现数据库
public boolean lock(Jedis jedis, String lockName, Integer expire) {
//返回是否设置成功
//setNx加锁
long now = System.currentTimeMillis();
boolean result = jedis.setnx(lockName, String.valueOf(now + expire * 1000)) == 1;
if (!result) {
//防止死锁的容错
String timestamp = jedis.get(lockName);
if (timestamp != null && Long.parseLong(timestamp) < now) {
//不经过del方法来删除锁。而是经过同步的getSet
String oldValue = jedis.getSet(lockName, String.valueOf(now + expire));
if (oldValue != null && oldValue.equals(timestamp)) {
result = true;
jedis.expire(lockName, expire);
}
}
}
if (result) {
jedis.expire(lockName, expire);
}
return result;
}
复制代码
代码分析:缓存
经过setNx命令老保证操做的原子性,获取到锁,而且把过时时间设置到value里面安全
经过expire方法设置过时时间,若是设置过时时间失败的话,再经过value的时间戳来和当前时间戳比较,防止出现死锁bash
经过getSet命令在发现锁过时未被释放的状况下,避免删除了在这个过程当中有可能被其他的线程获取到了锁服务器
存在问题多线程
2、 经过Redis高版本的原子命令并发
jedis的set命令能够自带复杂参数,经过这些参数能够实现原子的分布式锁命令异步
jedis.set(lockName, "", "NX", "PX", expireTime);
复制代码
代码分析分布式
redis的set命令能够携带复杂参数,第一个是锁的key,第二个是value,能够存放获取锁的客户端ID,经过这个校验是否当前客户端获取到了锁,第三个参数取值NX/XX,第四个参数 EX|PX,第五个就是时间
NX:若是不存在就设置这个key XX:若是存在就设置这个key
EX:单位为秒,PX:单位为毫秒
这个命令实质上就是把咱们以前的setNx和expire命令合并成一个原子操做命令,不须要咱们考虑set失败或者expire失败的状况
1、 经过Redis的del命令
public boolean unlock(Jedis jedis, String lockName) {
jedis.del(lockName);
return true;
}
复制代码
代码分析
经过redis的del命令能够直接删除锁,可能会出现误删其余线程已经存在的锁的状况
2、 Redis的del检查
public static void unlock2(Jedis jedis, String lockKey, String requestId) {
// 判断加锁与解锁是否是同一个客户端
if (requestId.equals(jedis.get(lockKey))) {
// 若在此时,这把锁忽然不是这个客户端的,则会误解锁
jedis.del(lockKey);
}
}
复制代码
代码分析
新增了requestId客户端ID的判断,但因为不是原子操做,在多个进程下面的并发竞争状况下,没法保证安全
3、 Redis的LUA脚本
public static boolean unlock3(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(""));
if (1L == (long) result) {
return true;
}
return false;
}
复制代码
代码分析
经过Lua脚原本保证操做的原子性,其实就是把以前的先判断再删除合并成一个原子性的脚本命令,逻辑就是,先经过get判断value是否是相等,若相等就删除,不然就直接return
Redission是redis官网推荐的一个redis客户端,除了基于redis的基础的CURD命令之外,重要的是就是Redission提供了方便好用的分布式锁API
复制代码
1、 基本用法
RedissonClient redissonClient = RedissonTool.getInstance();
RLock distribute_lock = redissonClient.getLock("distribute_lock");
try {
boolean result = distribute_lock.tryLock(3, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (distribute_lock.isLocked()) {
distribute_lock.unlock();
}
}
复制代码
代码流程
2、 具体实现
咱们经过tryLock来分析redission分布式的实现,lock方法跟tryLock差很少,只不过没有最长等待时间的设置,会自旋循环等待锁的释放,直到获取锁为止
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
//获取当前线程ID,用于实现可重入锁
final long threadId = Thread.currentThread().getId();
//尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
//等待时间结束,返回获取失败
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
//订阅锁的队列,等待锁被其他线程释放后通知
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (subscribeFuture.isSuccess()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message,等待订阅的队列消息
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
复制代码
代码分析
首先tryAcquire尝试获取锁,若返回ttl为null,说明获取到锁了
判断等待时间是否过时,若是过时,直接返回获取锁失败
经过Redis的Channel订阅监听队列,subscribe内部经过信号量semaphore,再经过await方法阻塞,内部实际上是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果,来保证订阅成功,再判断是否到了等待时间
再次尝试申请锁和等待时间的判断,循环阻塞在这里等待锁释放的消息RedissonLockEntry也维护了一个semaphore的信号量
不管是否释放锁,最终都要取消订阅这个队列消息
redission内部的getEntryName是客户端实例ID+锁名称来保证多个实例下的锁可重入
tryAcquire获取锁
redisssion获取锁的核心代码,内部实际上是异步调用,可是用get方法阻塞了
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
复制代码
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
复制代码
tryLockInnerAsync方法内部是基于Lua脚原本获取锁的
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
复制代码
Redission避免死锁的解决方案:
Redission为了不锁未被释放,采用了一个特殊的解决方案,若未设置过时时间的话,redission默认的过时时间是30s,同时未避免锁在业务未处理完成以前被提早释放,Redisson在获取到锁且默认过时时间的时候,会在当前客户端内部启动一个定时任务,每隔internalLockLeaseTime/3的时间去刷新key的过时时间,这样既避免了锁提早释放,同时若是客户端宕机的话,这个锁最多存活30s的时间就会自动释放(刷新过时时间的定时任务进程也宕机)
// lock acquired,获取到锁的时候设置按期更新时间的任务
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
//expirationRenewalMap的并发安全MAP记录设置过的缓存,避免并发状况下重复设置任务,internalLockLeaseTime / 3的时间后从新设置过时时间
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
复制代码
unlock解锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
复制代码
Redission的unlock解锁也是基于Lua脚本实现的,内部逻辑是先判断锁是否存在,不存在说明已经被释放了,发布锁释放消息后返回,锁存在再判断当前线程是否锁拥有者,不是的话,无权释放返回,解锁的话,会减去重入的次数,从新更新过时时间,若重入数捡完,删除当前key,发布锁释放消息
主要基于Redis来设计和实现分布式锁,经过经常使用的设计思路引伸到Redission的实现,不管是设计思路仍是代码健壮性Redission的设计都是优秀的,值得学习,下一步会讲解关于Zookeeper的分布式锁实现和相关开源源码分析。