采用 redis 和 zookeeper 的方式实现html
采用 redis 实现分布式锁的基本思路经过一个单节点的 redis 服务器,而后全部客户端线程经过设置同一个 key 来实现锁的获取,这里的关键是要区分开不一样节点机器的不一样线程(例如每一个 JVM 里面的不一样线程都经过一个单例对象来获取锁,该单例对象中对应一个 UUID,这样就区分开了不一样的 JVM,而后一个 JVM 的不一样线程经过线程 ID 来区分,UUID+线程ID 就区分开了不一样进程的不一样线程),对应缓存值须要保存两部分信息,一个就是线程的惟一标识(UUID+线程ID);另一个就是该线程获取锁的次数,用来支持重入锁的特性,能够将这两个信息保存到一个 json 对象中,这样 key 和 value 就都有了,若是线程设置 key 成功,则表示锁获取成功,若是当前 key 已经被设置,则说明锁已经被人占用,若是当前 key 对应的 value 里面的线程标识正好好当前线程标识相同,则锁重入java
采用 zookeeper 方式实现分布式锁的基本思路是利用,ZooKeeper机制规定:同一个目录下只能有一个惟一的文件名。例如:咱们在Zookeeper目录 /test 目录下建立,两个客户端建立一个名为 Lock 节点,只有一个可以成功。利用名称惟一性,加锁操做时,只须要全部客户端一块儿建立 /test/Lock 节点,只有一个建立成功,成功者得到锁。解锁时,只需删除 /test/Lock 节点,其他客户端再次进入竞争建立节点,直到全部客户端都得到锁。node
参考 redisson 项目 git
RLock 接口介绍github
注:这里的 lockInterruptibly 和 tryLock 方法都和 jdk 中的 Lock 接口有区别redis
lockInterruptibly 方法中多了 leaseTime 参数,表示锁最大持有时间,即客户申请到锁以后不论是否手动 unlock 了,超过 leaseTime 设定的时间后都将自动释放锁,防止客户程序异常致使锁没法释放的问题算法
tryLock 方法除了 waitTime 参数外,也多了一个 leaseTime 时间,其原理也是同样的json
下面是 RedissonLock 里面的部分源码分析缓存
lockInterruptibly 获取锁的主要入口方法安全
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { Long ttl; if (leaseTime != -1) { ttl = tryLockInner(leaseTime, unit); } else { ttl = tryLockInner(); } // 锁获取成功,直接返回 if (ttl == null) { return; } // redis 消息机制 // "redisson_lock__channel__{" + getName() + "}"; 在该 channel 上订阅消息,当 unlock 发生时,将 channel 上全部监听者将收到通知 Future<RedissonLockEntry> future = subscribe(); future.sync(); try { while (true) { // 再尝试一次获取锁,若是获取到了就直接返回 if (leaseTime != -1) { ttl = tryLockInner(leaseTime, unit); } else { ttl = tryLockInner(); } // lock acquired if (ttl == null) { break; } // 阻塞等待,当其余线程调用 unlock 方法时被唤醒,或者 ttl 时间超时 // 唤醒以后须要从新竞争锁,由于可能多个线程被同时唤醒,而每次只会有一个线程成功获取锁 RedissonLockEntry entry = getEntry(); if (ttl >= 0) { entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { entry.getLatch().acquire(); } } } finally { unsubscribe(future); } }
tryLockInner 方法的内部实现
Long tryLockInner(long leaseTime, TimeUnit unit) { internalLockLeaseTime = unit.toMillis(leaseTime); // 这里经过一条 redis 的批处理命令来设置 key(这里由 redis 的特性来保证整条批处理命令的原子性) // 第一个 if 若是 key 不存在,则设置 uuid+线ID 对应的值为 1,并设值 key 对应的超时时间 // 第二个 if 为重入锁特性的支持,而且刷新 key 的超时时间 // 不然返回 key 对应超时时间,即锁获取失败 return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "if (redis.call('exists', KEYS[1]) == 0) then " + // KEYS[1] == Collections.<Object>singletonList(getName()) "redis.call('hset', KEYS[1], ARGV[2], 1); " + // ARGV[2] == getLockName() "redis.call('pexpire', KEYS[1], ARGV[1]); " + // ARGV[1] == internalLockLeaseTime "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName()); }
unlock 方法的内部实现
@Override public void unlock() { // 这里须要先理解 redis 中缓存的 key 和 value 的结构,key 对应的就是锁的名称,全部节点的全部线程都是采用同一个 key (同一把锁) // value 对应的是一个对象的结构,其中有两个属性 {UUID+线程ID, 获取锁的次数} Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + // 若是对应的 key 已经不存在,说明 key 已经超时,redis 自动删除了该 key,getChannelName() 发布 unlockMessage 通知其余在阻塞等待获取锁的线程 "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // key 存在,可是已经不是被当前线程占有 "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // getLockName() 其实等于 UUID+线程ID,将这个值-1,若是结果大于0,说明锁存在重入,则从新刷新锁超时时间 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + // -1以后=0,则正常删除 key,而且发布 unlockMessage 事件 "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName()); if (opStatus == null) { // IllegalMonitorStateException 表示当前调用 unlock 方法的线程不是持有 lock 的线程 throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { cancelExpirationRenewal(); } }
RedissonLock 使用起来很是简单,若是须要详细了解 RedissonLock 的使用,能够看看 Redisson 项目中相关的测试用例
public void testLock() { RLock lock = redisson.getLock("lockName"); try { // ..... } finally { if(lock.isHeldByCurrentThread()) lock.unlock(); } } // redisson 建立示例代码,通常实现成单例模式 Config config = new Config(); config.useSingleServer().setAddress("127.0.0.1:6379"); RedissonClient redisson = Redisson.create(config);
本文详细介绍了采用 RedissonLock 方式实现分布式锁的相关源码和使用方式,可是这种方式也存在一个问题是,用于实现分布式锁的获取和释放是一个单实例的 redis 实例,若是该实例宕机,系统中全部的分布式锁获取程序都没法正常工做,那么第一个问题:
如何经过一个集群环境的 redis 实例来实现分布式锁的管理,以实现分布式锁的高可用性
一个可行的解决思路以下(参考:http://www.open-open.com/lib/view/open1415107259996.html):
咱们假设有 N 个 Redis 主节点。这些节点是相互独立的,所以咱们不使用复制或其余隐式同步机制。咱们已经描述过在单实例状况下如何安全地获取锁。咱们也指出此算法将使用这种方法从单实例获取和释放锁。在如下示例中,咱们设置N=5(这是个比较适中的值),这样咱们须要在不一样物理机或虚拟机上运行 5 个 Redis 主节点,以确保它们的出错是尽量独立的。
为了获取锁,客户端执行如下操做:
获取当前时间,以毫秒为单位。
以串行的方式尝试从全部的N个实例中获取锁,使用的是相同的key值和相同的随机value值。在从每一个实例获取锁时,客户端会设置一个链接超时,其时长相比锁的自动释放时间要短得多。例如,若锁的自动释放时间是10秒,那么链接超时大概设在5到50毫秒之间。这能够避免当Redis节点挂掉时,会长时间堵住客户端:若是某个节点没及时响应,就应该尽快转到下个节点。
客户端计算获取全部锁耗费的时长,方法是使用当前时间减去步骤1中的时间戳。当且仅当客户端能从多数节点(至少3个)中得到锁,而且耗费的时长小于锁的有效期时,可认为锁已经得到了。
若是锁得到了,它的最终有效时长将从新计算为其原时长减去步骤3中获取锁耗费的时长。
若是锁获取失败了(要么是没有锁住N/2+1个节点,要么是锁的最终有效时长为负数),客户端会对全部实例进行解锁操做(即便对那些没有加锁成功的实例也同样)。