在并发编程中最经常使用的手段就是锁了,好比基于JVM底层的synchronized、基于AQS的ReentrantLock;但是这些锁都只是局限于单机,本篇给你们介绍常见的分布式锁。redis
基于redis实现的分布式锁咱们可使用setnx命令,这个命令的做用是若是指定的key不存在时则set一对k-v,这样同一时刻就只能有一个请求能够set这个key达到加锁的目的,可是这个命令不能同时设置过时时间,这样可能会致使死锁。如图一,请求A和请求B在T1时刻同时发起setnx命令,请求A成功了,而后在T2设置key的过时时间,若是在这以前请求A所在的服务忽然挂了,那这个key就一直存在,这个时候其余请求就没法加锁。算法
使用这个命令能够在设置一个key的同时设置key的过时时间,NX是当key不存在时进行操做,XX是当key存在时进行操做。业务执行完成以后,这个时候须要手动释放这个锁;那么如何保证释放锁的安全性呢?首先要确保释放的锁是本身的,咱们能够利用key对应的value来判断当前这个key是否是本身设置的,这样就能保证释放的锁是本身的;编程
private Boolean lock(String key, String value) {
return stringRedisTemplate.opsForValue().setIfAbsent(key, value, 10L, TimeUnit.SECONDS);
}
private Boolean unLock(String key, String value) {
String cacheValue = stringRedisTemplate.opsForValue().get(key);
if (!value.equals(cacheValue)) {
return false;
}
return stringRedisTemplate.delete(key);
}
复制代码
那上面这段代码就能保证释放锁的安全性吗?这个方法存在的问题在于在判断了key对应的value与本身的value相等以后,若是这个时候key不争气的恰好到期失效了,其余线程获取了这个锁,那么下面的delete key操做就将其余线程的锁释放掉了。怎么就那么多幺蛾子…… 那么如何保证释放锁的原子性呢?安全
Lua脚本我不过多的介绍,有兴趣的同窗能够去了解,直接上代码bash
private Boolean luaUnLock(String key, String value) {
ScriptSource lua = new ResourceScriptSource(new ClassPathResource("redisUnLock.lua"));
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(lua);
redisScript.setResultType(Boolean.class);
return stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
}
复制代码
redisUnLock.lua数据结构
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
复制代码
这段脚本比较简单,就是比较参数key对应的value是否与参数value相等,相等则删除这个key,在redis中lua脚本可以保证原子性。那么问题叒来了!这样就保证了这个分布式锁的安全性吗?如今这个分布式锁的问题在于存在业务时间过长致使锁过时被其余线程获取的状况,此时须要检测续租锁来避免这个问题。并发
那么如何续租呢,主要思路就是用一个线程检测当前这个业务是否执行完,锁还有多久过时;若是锁即将失效时业务尚未执行完那么就给这个锁从新设置过时时间。这里咱们使用redisson的实现,毕竟本身实现的轮子没那么靠谱😅。异步
public class RedissonLockerImpl implements RedissonLocker {
@Resource
private RedissonClient redissonClient;
@Override
public void lock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
}
@Override
public void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
}
}
public void test() {
CountDownLatch count = new CountDownLatch(2);
String lockKey = "LOCK_KEY";
CustomizeThreadPool.threadPool.execute(() -> {
try {
count.await();
redissonLocker.lock(lockKey);
log.info("线程1获取锁");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redissonLocker.unlock(lockKey);
log.info("线程1释放锁");
}
});
count.countDown();
CustomizeThreadPool.threadPool.execute(() -> {
try {
count.await();
redissonLocker.lock(lockKey);
log.info("线程2获取锁");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redissonLocker.unlock(lockKey);
log.info("线程2释放锁");
}
});
count.countDown();
}
复制代码
public void test2() {
String lockKey = "LOCK_KEY";
CustomizeThreadPool.threadPool.execute(() -> {
redissonLocker.lock(lockKey);
redissonLocker.lock(lockKey);
try {
TimeUnit.SECONDS.sleep(25);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redissonLocker.unlock(lockKey);
}
});
}
复制代码
怎么样,这个API是否是至关简洁呢,可能有同窗看到以前的代码会问你这里加锁的时候为何没有给锁设置过时时间呢?你加锁key对应的value呢?经过这两个例子咱们能够看到加锁key对应的value是一个hash结构,第一个属性对应的就是咱们所说的value,用来判断是不是本身加的锁;第二个属性对应的实际上是加锁的次数,这和Java中的ReentrantLock同样是可重入锁,因此第二个例子里只作了一次unLock没办法释放锁。至于这个key对应的value和锁过时时间在下面的源码分析介绍。分布式
package org.redisson.config;
import ..........
public class Config {
// 其余源码省略
private long lockWatchdogTimeout;
public Config() {
// 默认的锁过时时间
this.lockWatchdogTimeout = 30000L;
}
public Config(Config oldConf) {
this.lockWatchdogTimeout = 30000L;
// 若是有读取配置文件修改的锁过时时间
this.setLockWatchdogTimeout(oldConf.getLockWatchdogTimeout());
}
}
// 不带过时时间加锁
public void lock() {
try {
this.lockInterruptibly();
} catch (InterruptedException var2) {
Thread.currentThread().interrupt();
}
}
public void lockInterruptibly() throws InterruptedException {
this.lockInterruptibly(-1L, (TimeUnit)null);
}
public void lock(long leaseTime, TimeUnit unit) {
try {
this.lockInterruptibly(leaseTime, unit);
} catch (InterruptedException var5) {
Thread.currentThread().interrupt();
}
}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 获取当前线程的ID
long threadId = Thread.currentThread().getId();
// 尝试获取锁返回过时时间
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
// 若是加锁失败
if (ttl != null) {
// 订阅解锁队列
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
while(true) {
// 尝试加锁
ttl = this.tryAcquire(leaseTime, unit, threadId);
// 加锁成功则返回
if (ttl == null) {
return;
}
if (ttl >= 0L) {
// 加锁失败阻塞
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 取消订阅解锁消息
this.unsubscribe(future, threadId);
}
}
}
//尝试加锁
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
// 设置了默认的过时时间
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 若是调用的是没有过时时间的lock,则默认时间为lockWatchdogTimeout
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
public void operationComplete(Future<Long> future) throws Exception {
if (future.isSuccess()) {
Long ttlRemaining = (Long)future.getNow();
// 加锁成功以后开始一个调度任务
if (ttlRemaining == null) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
}
// 调用lua脚本异步加锁,value由getLockName()生成,uuid+threadId
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
// 若是当前不存在key则加锁,若是当前存在而且是本身的则加锁次数加一
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
// 锁续租定时任务
private void scheduleExpirationRenewal(final long threadId) {
if (!expirationRenewalMap.containsKey(this.getEntryName())) {
// 添加一个回调任务,每1/3锁过时时间执行一次
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
// 异步重置锁过时时间
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
// 从加锁集合移除 RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
if (!future.isSuccess()) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
} else {
// 成功重置锁时间以后再次调用
if ((Boolean)future.getNow()) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
// 保证任务不会被重复建立,取消任务
if (expirationRenewalMap.putIfAbsent(this.getEntryName(), new RedissonLock.ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}
}
复制代码
从源码中咱们能够了解redisson实现分布式锁的大体流程;当咱们没有设置锁过时时间的时候,redisson会使用lockWatchdogTimeout时间(默认为30s)设置为锁过时时间;redisson设置的redis数据结构是一个hash,其中一个属性是锁的值,由uuid和当前线程id组成,另外一个属性是加锁次数用来实现可重入性;当没有设置锁过时时间的时候,redisson会每隔1/3锁过时时间将锁过时时间重置为初始值(默认30s时,当过时时间还有20s就会从新设置过时时间为30s)直到释放锁;若是设置了过时时间则不会有锁续租的功能。加锁的时候若是当前key不存在则直接设置key,若是存在而且是本身的则将加锁次数加一。加锁失败则订阅释放锁redis channel,线程进入阻塞。释放锁先判断当前是不是本身的锁,若是是则将当前加锁次数减一,若是减一以后为0则删除key,若是有续租任务则取消续租任务,向redis channel中发一条消息唤醒被阻塞的线程获取锁。 ide