java 分布式锁

实现分布式锁的基本思路

采用 redis 和 zookeeper 的方式实现html

  • 采用 redis 实现分布式锁的基本思路经过一个单节点的 redis 服务器,而后全部客户端线程经过设置同一个 key 来实现锁的获取,这里的关键是要区分开不一样节点机器的不一样线程(例如每一个 JVM 里面的不一样线程都经过一个单例对象来获取锁,该单例对象中对应一个 UUID,这样就区分开了不一样的 JVM,而后一个 JVM 的不一样线程经过线程 ID 来区分,UUID+线程ID 就区分开了不一样进程的不一样线程),对应缓存值须要保存两部分信息,一个就是线程的惟一标识(UUID+线程ID);另一个就是该线程获取锁的次数,用来支持重入锁的特性,能够将这两个信息保存到一个 json 对象中,这样 key 和 value 就都有了,若是线程设置 key 成功,则表示锁获取成功,若是当前 key 已经被设置,则说明锁已经被人占用,若是当前 key 对应的 value 里面的线程标识正好好当前线程标识相同,则锁重入java

  • 采用 zookeeper 方式实现分布式锁的基本思路是利用,ZooKeeper机制规定:同一个目录下只能有一个惟一的文件名。例如:咱们在Zookeeper目录 /test 目录下建立,两个客户端建立一个名为 Lock 节点,只有一个可以成功。利用名称惟一性,加锁操做时,只须要全部客户端一块儿建立 /test/Lock 节点,只有一个建立成功,成功者得到锁。解锁时,只需删除 /test/Lock 节点,其他客户端再次进入竞争建立节点,直到全部客户端都得到锁。node


RedissonLock 实现分布式锁部分源码介绍

参考 redisson 项目 git

RLock 接口介绍github

注:这里的 lockInterruptibly 和 tryLock 方法都和 jdk 中的 Lock 接口有区别redis

lockInterruptibly 方法中多了 leaseTime 参数,表示锁最大持有时间,即客户申请到锁以后不论是否手动 unlock 了,超过 leaseTime 设定的时间后都将自动释放锁,防止客户程序异常致使锁没法释放的问题算法

tryLock 方法除了 waitTime 参数外,也多了一个 leaseTime 时间,其原理也是同样的json


下面是 RedissonLock 里面的部分源码分析缓存

lockInterruptibly 获取锁的主要入口方法安全

    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        Long ttl;
        if (leaseTime != -1) {
            ttl = tryLockInner(leaseTime, unit);
        } else {
            ttl = tryLockInner();
        }
        // 锁获取成功,直接返回
        if (ttl == null) {
            return;
        }

        // redis 消息机制
        // "redisson_lock__channel__{" + getName() + "}"; 在该 channel 上订阅消息,当 unlock 发生时,将 channel 上全部监听者将收到通知
        Future<RedissonLockEntry> future = subscribe();
        future.sync();

        try {
            while (true) {
                // 再尝试一次获取锁,若是获取到了就直接返回
                if (leaseTime != -1) {
                    ttl = tryLockInner(leaseTime, unit);
                } else {
                    ttl = tryLockInner();
                }
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // 阻塞等待,当其余线程调用 unlock 方法时被唤醒,或者 ttl 时间超时
                // 唤醒以后须要从新竞争锁,由于可能多个线程被同时唤醒,而每次只会有一个线程成功获取锁
                RedissonLockEntry entry = getEntry();
                if (ttl >= 0) {
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    entry.getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future);
        }
    }

tryLockInner 方法的内部实现

    Long tryLockInner(long leaseTime, TimeUnit unit) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        // 这里经过一条 redis 的批处理命令来设置 key(这里由 redis 的特性来保证整条批处理命令的原子性)
        // 第一个 if 若是 key 不存在,则设置 uuid+线ID 对应的值为 1,并设值 key 对应的超时时间
        // 第二个 if 为重入锁特性的支持,而且刷新 key 的超时时间
        // 不然返回 key 对应超时时间,即锁获取失败
        return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +   // KEYS[1] == Collections.<Object>singletonList(getName())
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +  // ARGV[2] == getLockName()
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +   // ARGV[1] == internalLockLeaseTime 
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName());
    }

unlock 方法的内部实现

    @Override
    public void unlock() {
        // 这里须要先理解 redis 中缓存的 key 和 value 的结构,key 对应的就是锁的名称,全部节点的全部线程都是采用同一个 key (同一把锁)
        // value 对应的是一个对象的结构,其中有两个属性 {UUID+线程ID, 获取锁的次数}
        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('exists', KEYS[1]) == 0) then " +   // 若是对应的 key 已经不存在,说明 key 已经超时,redis 自动删除了该 key,getChannelName() 发布 unlockMessage 通知其余在阻塞等待获取锁的线程
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; " +
                        "end;" +
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +  // key 存在,可是已经不是被当前线程占有
                            "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +  // getLockName() 其实等于 UUID+线程ID,将这个值-1,若是结果大于0,说明锁存在重入,则从新刷新锁超时时间
                        "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "return 0; " +
                        "else " +
                            "redis.call('del', KEYS[1]); " +  // -1以后=0,则正常删除 key,而且发布 unlockMessage 事件
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; "+
                        "end; " +
                        "return nil;",
                        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName());
        if (opStatus == null) {
            // IllegalMonitorStateException 表示当前调用 unlock 方法的线程不是持有 lock 的线程
            throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }


RedissonLock 分布式锁的基本使用

RedissonLock 使用起来很是简单,若是须要详细了解 RedissonLock 的使用,能够看看 Redisson 项目中相关的测试用例

public void testLock() {
	RLock lock = redisson.getLock("lockName");
	try {
		// .....
	} finally {
		if(lock.isHeldByCurrentThread())
			lock.unlock();
	}
}

// redisson 建立示例代码,通常实现成单例模式
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);


其余问题探讨

本文详细介绍了采用 RedissonLock 方式实现分布式锁的相关源码和使用方式,可是这种方式也存在一个问题是,用于实现分布式锁的获取和释放是一个单实例的 redis 实例,若是该实例宕机,系统中全部的分布式锁获取程序都没法正常工做,那么第一个问题:

如何经过一个集群环境的 redis 实例来实现分布式锁的管理,以实现分布式锁的高可用性

一个可行的解决思路以下(参考:http://www.open-open.com/lib/view/open1415107259996.html):

咱们假设有 N 个 Redis 主节点。这些节点是相互独立的,所以咱们不使用复制或其余隐式同步机制。咱们已经描述过在单实例状况下如何安全地获取锁。咱们也指出此算法将使用这种方法从单实例获取和释放锁。在如下示例中,咱们设置N=5(这是个比较适中的值),这样咱们须要在不一样物理机或虚拟机上运行 5 个 Redis 主节点,以确保它们的出错是尽量独立的。

为了获取锁,客户端执行如下操做:

  1. 获取当前时间,以毫秒为单位。

  2. 以串行的方式尝试从全部的N个实例中获取锁,使用的是相同的key值和相同的随机value值。在从每一个实例获取锁时,客户端会设置一个链接超时,其时长相比锁的自动释放时间要短得多。例如,若锁的自动释放时间是10秒,那么链接超时大概设在5到50毫秒之间。这能够避免当Redis节点挂掉时,会长时间堵住客户端:若是某个节点没及时响应,就应该尽快转到下个节点。

  3. 客户端计算获取全部锁耗费的时长,方法是使用当前时间减去步骤1中的时间戳。当且仅当客户端能从多数节点(至少3个)中得到锁,而且耗费的时长小于锁的有效期时,可认为锁已经得到了。

  4. 若是锁得到了,它的最终有效时长将从新计算为其原时长减去步骤3中获取锁耗费的时长。

  5. 若是锁获取失败了(要么是没有锁住N/2+1个节点,要么是锁的最终有效时长为负数),客户端会对全部实例进行解锁操做(即便对那些没有加锁成功的实例也同样)。

相关文章
相关标签/搜索