Redission锁继承Implements Reentrant Lock,因此具有 Reentrant Lock 锁中的一些特性:超时,重试,可中断等。加上Redission中Redis具有分布式的特性,因此很是适合用来作Java中的分布式锁。 下面咱们对其加锁、解锁过程当中的源码细节进行一一分析。redis
锁的接口定义了一下方法:分布式
分布式锁当中加锁,咱们经常使用的加锁接口:ide
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
下面咱们来看一下方法的具体实现:ui
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } try { time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
首先咱们看到调用tryAcquire尝试获取锁,在这里是否能获取到锁,是根据锁名称的过时时间TTL来断定的(TTL<=0:则说明该锁不存在或者已经超时,此时获取锁成功。TTL>0:则说明该锁被其余现成持有,此时获取锁失败);线程
下面咱们接着看一下tryAcquire的实现:blog
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); }
能够看到真正获取锁的操做通过一层get操做里面执行的,这里为什么要这么操做,本人也不是太理解,若有理解错误,欢迎指正。继承
get 是由CommandAsyncExecutor(一个线程Executor)封装的一个Executor
设置一个单线程的同步控制器CountDownLatch,用于控制单个线程的中断信息。我的理解通过中间的这么一步:主要是为了支持线程可中断操做。接口
public <V> V get(RFuture<V> future) { if (!future.isDone()) { final CountDownLatch l = new CountDownLatch(1); future.addListener(new FutureListener<V>() { @Override public void operationComplete(Future<V> future) throws Exception { l.countDown(); } }); boolean interrupted = false; while (!future.isDone()) { try { l.await(); } catch (InterruptedException e) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } // commented out due to blocking issues up to 200 ms per minute for each thread:因为每一个线程的阻塞问题,每分钟高达200毫秒 // future.awaitUninterruptibly(); if (future.isSuccess()) { return future.getNow(); } throw convertException(future); }
咱们进一步往下看:get
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
首先判断锁是否有超时时间,有过时时间的话,会在后面获取锁的时候设置进去。没有过时时间的话,则会用默认的同步
private long lockWatchdogTimeout = 30 * 1000;
下面咱们在进一步往下分析真正获取锁的操做:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
我把里面的重点信息作了如下三点总结:
1:真正执行的是一段具备原子性的Lua脚本,而且最终也是由CommandAsynExecutor去执行。
2:锁真正持久化到Redis时,用的hash类型key field value
3:获取锁的三个参数:getName()是逻辑锁名称,例如:分布式锁要锁住的methodName+params;internalLockLeaseTime是毫秒单位的锁过时时间;getLockName则是锁对应的线程级别的名称,由于支持相同线程可重入,不一样线程不可重入,因此这里的锁的生成方式是:UUID+":"threadId。有的同窗可能会问,这样不是很缜密:不一样的JVM可能会生成相同的threadId,因此Redission这里加了一个区分度很高的UUID;
Lua脚本中的执行分为如下三步:
1:exists检查redis中是否存在锁名称;若是不存在,则获取成功;同时把逻辑锁名称KEYS[1],线程级别的锁名称[ARGV[2],value=1,设置到redis。并设置逻辑锁名称的过时时间ARGV[2],返回;
2:若是检查到存在KEYS[1],[ARGV[2],则说明获取成功,此时会自增对应的value值,记录重入次数;并更新锁的过时时间
3:key不存,直接返回key的剩余过时时间(-2)
锁获取失败、解锁过程后在后面的文章继续补充