上一篇文章主要侧重如何获取锁以及所获取成功的场景,本文将着重对失败以及解锁的状况进行分析,探寻Redisson分布式锁最具艺术的地方。node
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); // 订阅监听redis消息,而且建立RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。 final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // 阻塞等待subscribe的future的结果对象,若是subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,再也不继续申请锁了。 if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } try { time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); // 再次尝试一次申请锁 ttl = tryAcquire(leaseTime, unit, threadId); // if (ttl == null) { return true; } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); // 经过信号量(共享锁)阻塞,等待解锁消息(这一点设计的很是精妙:减小了其余分布式节点的等待或者空转等无效锁申请的操做,总体提升了性能) // 若是剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。 // 不然就在wait time 时间范围内等待能够经过信号量 if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } } } finally { //不管是否得到锁,都要取消订阅解锁消息 unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
咱们看到当获取锁的时长超过请求等待时间,直接进入acquireFailed(进一步调用acquireFailedAsync),并同步返回false,获取锁失败。接下里咱们直接进入该异步(异步处理IO,提升系统吞吐量)方法,对其进行解析:redis
@Override protected RFuture<Void> acquireFailedAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, "redis.call('zrem', KEYS[2], ARGV[1]); " + "redis.call('lrem', KEYS[1], 0, ARGV[1]); ", Arrays.<Object>asList(getThreadsQueueName(), getTimeoutSetName()), getLockName(threadId)); }
能够看到,这一步就是把该线程从获取锁操做的等待队列中直接删掉;网络
接着往下看,若是未达到请求超时时间,则首先订阅该锁的信息。当其余线程释放锁的时候,会同时根据锁的惟一通道publish一条分布式的解锁信息,接收到分布式消息后, 等待获取锁的Semaphore中的监听队列中的listenser线程可从新申请锁,这个后面会深刻讲解。下面是订阅的具体细节:框架
public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) { final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>(); //根据channelName拿到信号量,channelName=UUID+":"+name,对应一个锁。 final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName); final RPromise<E> newPromise = new RedissonPromise<E>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { return semaphore.remove(listenerHolder.get()); } }; Runnable listener = new Runnable() { @Override public void run() { E entry = entries.get(entryName); if (entry != null) { entry.aquire(); semaphore.release(); entry.getPromise().addListener(new TransferListener<E>(newPromise)); return; } E value = createEntry(newPromise); value.aquire(); E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null) { oldValue.aquire(); semaphore.release(); oldValue.getPromise().addListener(new TransferListener<E>(newPromise)); return; } RedisPubSubListener<Object> listener = createListener(channelName, value); connectionManager.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); } }; //把生成的监听线程listenser加入到信号量的监听集合中去,后面发布解锁消息的时候,会唤醒 semaphore.acquire(listener); listenerHolder.set(listener); return newPromise; }
接着回到tryLock方法,看到finally里面:不管是否得到锁,都要取消订阅解锁消息,这里不作赘述。异步
接着咱们一并分析一下解锁的过程分布式
public void unlock() { Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { // 解锁成功以后取消更新锁expire的时间任务,针对于没有锁过时时间的 cancelExpirationRenewal(); } // Future<Void> future = unlockAsync(); // future.awaitUninterruptibly(); // if (future.isSuccess()) { // return; // } // if (future.cause() instanceof IllegalMonitorStateException) { // throw (IllegalMonitorStateException)future.cause(); // } // throw commandExecutor.convertException(future); }
解锁的逻辑相对简单,具体步骤以下:ide
若是lock键不存在,发消息说锁已经可用性能
若是锁不是被当前线程锁定,则返回nilui
因为支持可重入,在解锁时将重入次数须要减1线程
若是计算后的重入次数>0,则从新设置过时时间
若是计算后的重入次数<=0,则发消息说锁已经可用
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
下面咱们再看一下Redisson是如何处理解锁消息的(LockPubSub.unlockMessage):
/** * * @author Nikita Koksharov * */ public class LockPubSub extends PublishSubscribe<RedissonLockEntry> { public static final Long unlockMessage = 0L; @Override protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) { return new RedissonLockEntry(newPromise); } @Override protected void onMessage(RedissonLockEntry value, Long message) { if (message.equals(unlockMessage)) { // 释放一个许可,唤醒等待的entry.getLatch().tryAcquire去再次尝试获取锁。 value.getLatch().release(); while (true) { Runnable runnableToExecute = null; // 若是entry还有其余Listeners回调,也唤醒执行。 synchronized (value) { Runnable runnable = value.getListeners().poll(); if (runnable != null) { if (value.getLatch().tryAcquire()) { runnableToExecute = runnable; } else { value.addListener(runnable); } } } if (runnableToExecute != null) { runnableToExecute.run(); } else { return; } } } } }
Redisson还有不少东西能够挖掘,不只局限分布式锁(对于分布式锁的一些细节,本文摘抄了网络中比较靠谱的一些片断,方便你们理解)。做者Nikita Koksharov 把原来Conrrent包下不少同步类(好比:CountDownLatch,Semaphore),用分布式的方式实现了一遍,仍是很厉害的。这些加强的实现,之后在工做都将大有用处。这些点,之后有空的时候再慢慢研究。