又长又细,万字长文带你解读Redisson分布式锁的源码

前言

上一篇文章写了Redis分布式锁的原理和缺陷,以为有些不过瘾,只是简单的介绍了下Redisson这个框架,具体的原理什么的还没说过呢。趁年前项目忙的差很少了,反正闲着也是闲着,不如把Redisson的源码也学习一遍好了。node

虽然说是一时兴起,但仔细研究以后发现Redisson的源码解读工做量仍是挺大的,其中用到了大量的Java并发类,而且引用了Netty做为通讯工具,实现与Redis组件的远程调用,这些知识点若是要所有讲解的话不太现实,本文的重点主要是关于Redisson分布式锁的实现原理,因此网络通讯和并发原理这块的代码解读不会太仔细,有不足之处还望见谅!web

Redis 发布订阅

以前说过,分布式锁的核心功能其实就三个:加锁、解锁、设置锁超时。这三个功能也是咱们研究Redisson分布式锁原理的方向。redis

在学习以前,咱们有必要先了解一个知识点,就是有关Redis的发布订阅功能。算法

Redis 发布订阅 (pub/sub) 是一种消息通讯模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息,发布者能够向指定的渠道 (channel) 发送消息,订阅者若是订阅了该频道的话就能收到消息,从而实现多个客户端的通讯效果。后端

订阅的命令是SUBSCRIBE channel[channel ...],能够订阅一个或多个频道,当有新消息经过PUBLISH命令发送给频道时,订阅者就能收到消息,就好像这样安全

开启两个客户端,一个订阅了频道channel1,另外一个经过PUBLISH发送消息后,订阅的那个就能收到了,靠这种模式就能实现不一样客户端之间的通讯。微信

固然,关于这种通讯模式有哪些妙用场景咱们就不展开了,你们能够本身去网上查阅一下,咱们的主角仍是Redisson,热身完毕,该上主菜了。网络

Redisson源码

在使用Redisson加锁以前,须要先获取一个RLock实例对象,有了这个对象就能够调用lock、tryLock方法来完成加锁的功能并发

Config config = new Config();
config.useSingleServer()
  .setPassword("")
  .setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
// RLock对象
RLock lock = redisson.getLock("myLock");

配置好对应的host,而后就能够建立一个RLock对象。RLock是一个接口,具体的同步器须要实现该接口,当咱们调用redisson.getLock()时,程序会初始化一个默认的同步执行器RedissonLock框架

这里面初始化了几个参数,

commandExecutor:异步的Executor执行器,Redisson中全部的命令都是经过...Executor 执行的 ;

id:惟一ID,初始化的时候是用UUID建立的;

internalLockLeaseTime:等待获取锁时间,这里读的是配置类中默认定义的,时间为30秒;

同时,图片里我还标注了一个方法getEntryName,返回的是 “ID :锁名称” 的字符串,表明的是当前线程持有对应锁的一个标识,这些参数有必要留个印象,后面的源码解析中常常会出现。

说完了初始化的东西,咱们就能够开始学习加锁和解锁的源码了。

加锁

Redisson的加锁方法有两个,tryLocklock,使用上的区别在于tryLock能够设置锁的过时时长leaseTime和等待时长waitTime,核心处理的逻辑都差很少,咱们先从tryLock讲起。

tryLock

代码有点长啊。。。整成图片不太方便,直接贴上来吧,

/**
 * @param waitTime 等待锁的时长 
 * @param leaseTime 锁的持有时间 
 * @param unit 时间单位
 * @return
 * @throws InterruptedException
 */
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {    
        // 剩余的等待锁的时间
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        
        final long threadId = Thread.currentThread().getId();
        // 尝试获取锁,若是没取到锁,则返回锁的剩余超时时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // ttl为null,说明能够抢到锁了,返回true
        if (ttl == null) {
            return true;
        }
        
        // 若是waitTime已经超时了,就返回false,表明申请锁失败
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        // 订阅分布式锁, 解锁时进行通知,看,这里就用到了咱们上面说的发布-订阅了吧
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // 阻塞等待锁释放,await()返回false,说明等待超时了
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                         // 等待都超时了,直接取消订阅
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
         // 进入死循环,反复去调用tryAcquire尝试获取锁,跟上面那一段拿锁的逻辑同样
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

代码仍是挺长的,不过流程也就两步,要么线程拿到锁返回成功;要么没拿到锁而且等待时间还没过就继续循环拿锁,同时监听锁是否被释放。

拿锁的方法是tryAcquire,传入的参数分别是锁的持有时间,时间单位以及表明当前线程的ID,跟进代码查看调用栈,它会调到一个叫作tryAcquireAsync的方法:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        // 若是有设置锁的等待时长的话,就直接调用tryLockInnerAsync方法获取锁
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        // 没有设置等待锁的时长的话,加多一个监听器,也就是调用lock.lock()会跑的逻辑,后面会说
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

咱们继续跟,看看tryLockInnerAsync方法的源码:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
String getLockName(long threadId) {
    return id + ":" + threadId;
}

这里就是底层的调用栈了,直接操做命令,整合成lua脚本后,调用netty的工具类跟redis进行通讯,从而实现获取锁的功能。

这段脚本命令仍是有点意思的,简单解读一下:

  • 先用 exists key命令判断是否锁是否被占据了,没有的话就用 hset命令写入,key为锁的名称,field为“客户端惟一ID:线程ID”,value为1;
  • 锁被占据了,判断是不是当前线程占据的,是的话value值加1;
  • 锁不是被当前线程占据,返回锁剩下的过时时长;

命令的逻辑并不复杂,但不得不说,做者的设计仍是颇有心的,用了redis的Hash结构存储数据,若是发现当前线程已经持有锁了,就用hincrby命令将value值加1,value的值将决定释放锁的时候调用解锁命令的次数,达到实现锁的可重入性效果。

每一步命令对应的逻辑我都在下面的图中标注了,你们能够读一下:

咱们继续跟代码吧,根据上面的命令能够看出,若是线程拿到锁的话,tryLock方法会直接返回true,万事大吉。

拿不到的话,就会返回锁的剩余过时时长,这个时长有什么做用呢?咱们回到tryLock方法中死循环的那个地方:

这里有一个针对waitTime和key的剩余过时时间大小的比较,取到两者中比较小的那个值,而后用Java的Semaphore信号量的tryAcquire方法来阻塞线程。

那么Semaphore信号量又是由谁控制呢,什么时候才能release呢。这里又须要回到上面来看,各位看官应该还记得,咱们上面贴的tryLock代码中还有这一段:

current = System.currentTimeMillis();
// 订阅分布式锁, 解锁时进行通知
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);

订阅的逻辑显然是在subscribe方法里,跟着方法的调用链,它会进入到PublishSubscribe.Java中:

这段代码的做用在于将当前线程的threadId添加到一个AsyncSemaphore中,而且设置一个redis的监听器,这个监听器是经过redis的发布、订阅功能实现的。

一旦监听器收到redis发来的消息,就从中获取与当前thread相关的,若是是锁被释放的消息,就立马经过操做Semaphore(也就是调用release方法)来让刚才阻塞的地方释放。

释放后线程继续执行,仍旧是判断是否已经超时。若是还没超时,就进入下一次循环再次去获取锁,拿到就返回true,没有拿到的话就继续流程。

这里说明一下,之因此要循环,是由于锁可能会被多个客户端同时争抢,线程阻塞被释放以后的那一瞬间极可能仍是拿不到锁,可是线程的等待时间又还没过,这个时候就须要从新跑循环去拿锁。

这就是tryLock获取锁的整个过程了,画一张流程图的话表示大概是这样:

lock

除了tryLock,通常咱们还常常直接调用lock来获取锁,lock的拿锁过程跟tryLock基本是一致的,区别在于lock没有手动设置锁过时时长的参数,该方法的调用链也是跑到tryAcquire方法来获取锁的,不一样的是,它会跑到这部分的逻辑:

这段代码作了两件事:

一、预设30秒的过时时长,而后去获取锁

二、开启一个监听器,若是发现拿到锁了,就开启定时任务不断去刷新该锁的过时时长

刷新过时时长的方法是scheduleExpirationRenewal,贴一下源码吧:

private void scheduleExpirationRenewal(final long threadId) {
 // expirationRenewalMap是一个ConcurrentMap,存储标志为"当前线程ID:key名称"的任务
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                // 检测锁是否存在的lua脚本,存在的话就用pexpire命令刷新过时时长
                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }

代码的流程比较简单,大概就是开启一个定时任务,每隔internalLockLeaseTime / 3的时间(这个时间是10秒)就去检测锁是否还被当前线程持有,是的话就从新设置过时时长internalLockLeaseTime,也就是30秒的时间。

而这些定时任务会存储在一个ConcurrentHashMap对象expirationRenewalMap中,存储的key就为“线程ID:key名称”,若是发现expirationRenewalMap中不存在对应当前线程key的话,定时任务就不会跑,这也是后面解锁中的一步重要操做。

上面这段代码就是Redisson中所谓的”看门狗“程序,用一个异步线程来定时检测并执行的,以防手动解锁以前就过时了。

其余的逻辑就跟tryLock()基本没什么两样啦,你们看一下就知道了

解锁

有拿锁的方法,天然也就有解锁。Redisson分布式锁解锁的上层调用方法是unlock(),默认不用传任何参数

@Override
    public void unlock() {
     // 发起释放锁的命令请求
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
         // 成功释放锁,取消"看门狗"的续时线程
            cancelExpirationRenewal();
        }
    }

解锁相关的命令操做在unlockInnerAsync方法中定义,

又是一大串的lua脚本,比起前面加锁那段脚本的命令稍微复杂了点,不过不要紧,咱们简单梳理一下,命令的逻辑大概是这么几步:

一、判断锁是否存在,不存在的话用publish命令发布释放锁的消息,订阅者收到后就能作下一步的拿锁处理;

二、锁存在但不是当前线程持有,返回空置nil;

三、当前线程持有锁,用hincrby命令将锁的可重入次数-1,而后判断重入次数是否大于0,是的话就从新刷新锁的过时时长,返回0,不然就删除锁,并发布释放锁的消息,返回1;

当线程彻底释放锁后,就会调用cancelExpirationRenewal()方法取消"看门狗"的续时线程

void cancelExpirationRenewal() {
 // expirationRenewalMap移除对应的key,就不会执行当前线程对应的"看门狗"程序了
    Timeout task = expirationRenewalMap.remove(getEntryName());
    if (task != null) {
        task.cancel();
    }
}

这就是释放锁的过程了,怎么样,是否是仍是比较简单的,阅读起来比加锁那份代码舒服多了,固然啦,简单归简单,为了方便大家理清整个分布式锁的过程,我固然仍是费心费力的给大家画流程图展现下啦(就冲这点,是否是该给我来个三连啊,哈哈):

RedLock

以上就是Redisson分布式锁的原理讲解,总的来讲,就是简单的用lua脚本整合基本的set命令实现锁的功能,这也是不少Redis分布式锁工具的设计原理。除此以外,Redisson还支持用"RedLock算法"来实现锁的效果,这个工具类就是RedissonRedLock

用法也很简单,建立多个Redisson Node, 由这些无关联的Node就能够组成一个完整的分布式锁

RLock lock1 = Redisson.create(config1).getLock(lockKey);
RLock lock2 = Redisson.create(config2).getLock(lockKey);
RLock lock3 = Redisson.create(config3).getLock(lockKey);

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
   redLock.lock();
} finally {
   redLock.unlock();
}

RedLock算法原理方面我就不细说了,你们有兴趣能够看我以前的文章,或者是网上搜一下,简单的说就是能必定程度上能有效防止Redis实例单点故障的问题,但并不彻底可靠,不论是哪一种设计,光靠Redis自己都是没法保证锁的强一致性的。

仍是那句话,鱼和熊掌不可兼得,性能和安全方面也每每如此,Redis强大的性能和使用的方便足以知足平常的分布式锁需求,若是业务场景对锁的安全隐患没法忍受的话,最保底的方式就是在业务层作幂等处理。

总结

看了本文的源码解析,相信各位看官对Redisson分布式锁的设计也有了足够的了解,固然啦,虽然是讲解源码,咱们的主要精力仍是放在分布式锁的原理上,一些无关流程的代码就没有带你们字斟酌句的解读了,你们有兴趣的话能够本身去阅读看看,源码中不少地方都展现了一些基础并发工具和网络通讯的妙用之处,学习一下仍是挺有收获的。

最后我仍是想吐槽一下,Redisson的注释是真的少啊。。。。。。

若是您以为文章有用的话,欢迎点个赞支持一下,这将是对我创做的最好鼓励!

做者:鄙人薛某,一个不拘于技术的互联网人,喜欢用通俗易懂的语言来解构后端技术的知识点,想看更多精彩文章的能够关注个人公众号,微信搜索【鄙人薛某】便可关注

相关文章
相关标签/搜索