分布式存储-Redisson&分布式锁&源码分析

分布式存储-Redisson&分布式锁&源码分析

前面讲了redis的使用,本篇聊聊如何使用利用redis的客户端Redisson去实现分布式锁,而且分析他的源码,剖析如何实现,源码中包含一些点,咱们也会聊到html

  • Lua脚本
  • Redis的Pub&Sub
  • 时间轮 

分布式锁

实际上分布式锁和咱们以前讲的排它锁同样(同一时间只能有一个线程/进程访问一个资源),只不是以前的Synchronize和ReentrantLock是一种线程之间的,而分布式锁是进程之间的,咱们看他们两种实现锁的方式的时候发现,他们都是有一个标识去存储是否能够访问,MarkWord以及AQS中锁的状态,那咱们是否是也能够把把锁的状态存储在一个地方,当咱们要访问共享资源的时候去查询是否有进程正在占用锁?是的Redis就是这样一个第三方的东西,因此咱们能够用它来作分布式锁。一样能够作分布式锁的第三方还有:MySQL、ZK、Etcd 等。以前讲到setNx能够实现分布式锁,setNx咱们说能够知足共享互斥(当咱们设置的时候,若是有其余进程正在更改他则返回0),而且知足是原子性,只要知足这个两个特征就能实现锁的机制,前面咱们聊到CAS的时候也是同样的原理特性。redis

 Redisson实现分布式锁

public class RedisLockMain {

    private static RedissonClient redissonClient;

    static{
        Config config=new Config();
        config.useSingleServer().setAddress("redis://ip:6379");
        redissonClient= Redisson.create(config);
    }

    public static void main(String[] args) throws InterruptedException {
       RLock rLock=redissonClient.getLock("updateRepo");
        for (int i = 0; i < 10; i++) {
            if(rLock.tryLock()){ //返回true,表示得到锁成功
                System.out.println("得到锁成功");
            }else{
                System.out.println("得到锁失败");
            }
            Thread.sleep(2000);
            rLock.unlock();
        }

    }
}

总体梳理

加锁:数据库

  • 使用脚本进行判断咱们要加锁的那个 key,
  • 不存在的话,进行加锁,
  • 加锁的的时候会存储当前的线程id,默认 这个锁 key 的生存时间是 30 秒。

互斥锁(若是此时有第二个客户端请求加锁):数组

  • 由于他们执行的都是同一个Lua,首先仍是判断key,
  • 发现已经已经有人加锁了,因此他就会执行Lua的下一行,会返回一个当前想操做锁的过时时间。
  • 若是得到锁是失败,那就对使用channel订阅释放锁的事件,
  • 若是得到了锁的通知,则开始对锁进行不断的循环获取
  • 循环中尝试得到锁,而且得到锁的剩余时间,
  • 若是拿到了锁,就直接返回,没有拿到锁,那就继续等待

锁的续期机制:

由于怕产生死锁,因此每个锁都有过时时间,可是程序若是执行的时间,比过时的时间还要长,简而言之就是,好比过时时间是30s,而程序执行了32s,这个时候可能别的进程就会抢到锁,那就有可能两个进程同时执行一个逻辑,那就有问题,这里就有一个续约机制,只要咱们抢到锁那就有会启动一个续约机制,叫作看门狗(Watch Dog )底层是时间轮,下面会讲,他其实就是一个定时任务,当咱们的得到锁后,他会把会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,每过10s看门狗就去咱们存储得到锁的线程id的map中进行遍历,而后拿他去redis中查询,看他是否还在持有(持有的话就证实程序还在运行),若是持有,看门狗就会延长锁的时间app

释放锁:分布式

  • 删除锁
  • 广播释放锁的消息
  • 取消 Watch Dog 机制

源码分析

尝试抢占锁ide

//带有超时时间的逻辑
@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
      //尝试得到锁
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        //下面逻辑竞争锁的时候再看
   }

抢占锁逻辑源码分析

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    //leaseTime就是租约时间,就是redis key的过时时间。
    //若是设置了过时时间
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //若是没设置了过时时间,则从配置中获取key超时时间,默认是30s过时
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    //当tryLockInnerAsync执行结束后,触发下面回调
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        //表示第一次设置锁键
        if (ttlRemaining == null) {
            //表示设置过超时时间,更新internalLockLeaseTime, 并返回
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                //leaseTime=-1,启动Watch Dog
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

LUA脚本实现加锁的操做ui

  • 判断lock键是否存在,不存在直接调用hset存储当前线程信息而且设置过时时间,返回nil,告诉客户端直接获取到锁。
  • 判断lock键是否存在,存在则将重入次数加1,并从新设置过时时间,返回nil,告诉客户端直接获取到锁。 
  • 被其它线程已经锁定,返回锁有效期的剩余时间,告诉客户端须要等待。
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

锁的释放流程this

  • 若是lock键不存在,经过 publish 指令发送一个消息表示锁已经可用。
  • 若是锁不是被当前线程锁定,则返回nil 
  • 因为支持可重入,在解锁时将重入次数须要减1 
  • 若是计算后的重入次数>0,则从新设置过时时间 
  • 若是计算后的重入次数<=0,则发消息说锁已经可用 
  •     <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "return redis.call('pttl', KEYS[1]);",
                    Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
        }
    View Code
Redis中的Pub/Sub机制
Redis提供了一组命令可让开发者实现“发布/订阅”模式(publish/subscribe) . 该模式一样能够实现进程间的消息传递
发布/订阅模式包含两种角色, 分别是发布者和订阅者。订阅者能够订阅一个或多个频道,而发布者能够向指定的频道发送消息,全部订阅此频道的订阅者都会收到该消息 
发布者发布消息的命令:PUBLISH, 用法是 :PUBLISH channel message 
好比向channel.1发一条消息:hello  ->PUBLISH channel.1 “hello” 这样就实现了消息的发送,该命令的返回值表示接收到这条消息的订阅者数量
订阅者订阅消息的命令是:SUBSCRIBE channel [channel …] 
好比订阅channel.1 SUBSCRIBE channel.1
 RedissonLock有竞争的状况 有竞争的状况在redis端的lua脚本是相同的,只是不一样的条件执行不一样的redis命令。当经过tryAcquire,发现锁被其它线程申请时,须要进入等待竞争逻辑中
  • this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
  • this.await返回true,进入循环尝试获取锁。
//带有超时时间的逻辑
@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
   
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        //去订阅,若是抢占分布式锁的线程释放了锁,这边就会收到这个消息
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // 阻塞等待subscribe的future的结果对象,若是subscribe方法调用超过了time,说明已经超 过了客户端设置的最大wait time,则直接返回false,取消订阅,再也不继续申请锁了。
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
             //取消订阅
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            //表示抢占锁失败
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        //收到订阅的消息后,走这里的逻辑
        try {
            //判断是否超时,若是等待超时,返回获的锁失败
            time -= System.currentTimeMillis() - current;
            //若是time小于零咱们竞争锁是失败的
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            //这里不断的循环是抢占锁
            while (true) {
                long currentTime = System.currentTimeMillis();
                //抢占锁的时候会返回一个过时时间
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                //若是是空则表示得到锁
                if (ttl == null) {
                    return true;
                }
                //判断是否超时,若是超时,表示获取锁失败
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // 经过信号量(共享锁)阻塞,等待解锁消息. (减小申请锁调用的频率) // 若是剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取 一个许可(除非被中断或者一直没有可用的许可)。
                // 不然就在wait time 时间范围内等待能够经过信号量
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
// 更新等待时间(最大等待时间-已经消耗的阻塞时间)
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            //取消订阅
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }
锁过时了怎么办?
通常来讲,咱们去得到分布式锁时,为了不死锁的状况,咱们会对锁设置一个超时时间,可是有一种状况是,若是在指定时间内当前线程没有执行完,因为锁超时致使锁被释放,那么其余线程就会拿到这
把锁,从而致使一些故障。为了不这种状况, Redisson引入了一个Watch Dog机制,这个机制是针对分布式锁来实现锁的自动续约,简单来讲,若是当前得到锁的线程没有执行完,那么Redisson会自动给Redis中目标key延长超时时间。默认状况下,看门狗的续期时间是30s,也能够经过修改Confifig.lockWatchdogTimeout来另行指定。
 实际上,当咱们经过tryLock方法没有传递超时时间时, 默认会设置一个30s的超时时间,避免出现死锁的问题。 
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
  
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //若是没设置了过时时间,则从配置中获取key超时时间,默认是30s过时
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
   
    return ttlRemainingFuture;
}
因为默认设置了一个30s的过时时间,为了防止过时以后当前线程还未执行完, 因此经过定时任务对过时时间进行续约
  • 会先判断在expirationRenewalMap中是否存在了entryName,这是个map结构,主要仍是判断在这个服务实例中的加锁客户端的锁key是否存在,
  • 若是已经存在了,就直接返回;主要是考虑到RedissonLock是可重入锁
//这里指的是续约机制
protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    //EXPIRATION_RENEWAL_MAP表示的是须要续约的key
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        // 第一次加锁的时候会调用,内部会启动WatchDog
        entry.addThreadId(threadId);
        renewExpiration();
    }
}
定义一个定时任务(时间轮),该任务中调用 renewExpirationAsync 方法进行续约。
//真正续约
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    //这里使用的是一个时间轮的机制
    //至关于咱们去添加一个过时时间的任务,延迟10s钟去执行一个这个任务
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            //从这个map中获取一个过时的entry
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    //当方法执行完成后,再次调用,这样就成了一个周期执行了
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}
执行Lua脚本,对指定的key进行续约。 
//若是这个key不为空,则给他增长一个时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

LUA脚本

他至关于JavaScript同样,是一种脚本语言,在Redis中咱们能够把多个Redis操做封装成一个LUA指令,从而能够保证原子性。 

EVAL命令-执行脚本 :EVAL] [脚本内容] [key参数的数量] [key …] [arg …]

好比咱们要给redis上存储一个key为lua value为hello 那语句就是:eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello

咱们经过lua脚原本实现一个访问频率限制功能:【tonumber 只是将传递进来的数值转换为数字而已
@RestController
public class LuaController {

    @Autowired
    RedissonClient redissonClient;

    private final String LIMIT_LUA="local times=redis.call('incr',KEYS[1])\n" +
            "if times==1 then\n" +
            "    redis.call('expire',KEYS[1],ARGV[1])\n" +
            "end\n" +
            "if times > tonumber(ARGV[2]) then\n" +
            "    return 0\n" +
            "end \n" +
            "return 1";


    @GetMapping("/lua/{id}")
    public String lua(@PathVariable("id")Integer id) throws ExecutionException, InterruptedException {
        RScript rScript=redissonClient.getScript();
        List<Object> keys= Arrays.asList("LIMIT:"+id);
        //设置10s即为过时  能够访问三次 【脚本类型-读写类型】  【LUA脚本】 【返回类型】 【keys】 【value】 
        RFuture<Object> future=rScript.evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3);
        return future.get().toString();
    }
}

这样每隔10s就能够从新访问

LUA的原子性:他真的太暴力了!! 当redis正在执行一段lua命令的时候,咱们不能对Redis进行任何操做!! 这就是原子性的缘由。Redis提供了lua-time-limit参数限制脚本的最长运行时间,默认是5秒钟。咱们能够配置这个来防止没法访问redis.当脚本运行时间超过这个限制后,Redis将开始接受其余命令但不会执行(以确保脚本的原子性),而是返回BUSY的错误,咱们可使用script kill 的命令终止当前执行的脚本,而后redis即恢复正常。

 时间轮

咱们上面说到的看门狗就用到了时间轮,不断的查看时间。时间轮是由一个环形数组组成,能够想象它是一个时间表盘,每一个数字时间的间隔就是时间槽,这个槽叫作bucket,槽里面就是我们须要执行的任务。咱们能够设置他的时间槽,和每一个槽的时间单位,好比设置他为1s,设置8个时间槽。而后这个时间轮就开始执行,那整个流程执行完成就是8s。执行的时候,依次访问每一个数组元素,而后执行元素中我们的任务。咱们可使用他去实现定时关单,由于有时候订单交易量特别多,直接去轮询数据的效率有点低,这里直接使用时间轮,就省去了数据库的压力。图是copy的。

使用:

先构建一个HashedWheelTimer时间轮
  • tickDuration: 100 ,表示每一个时间格表明当前时间轮的基本时间跨度,这里是100ms,也就是指针100ms跳动一次,每次跳动一个窗格
  • ticksPerWheel:1024,表示时间轮上一共有多少个窗格,分配的窗格越多,占用内存空间就越大
  • leakDetection:是否开启内存泄漏检测。
  • maxPendingTimeouts[可选参数],最大容许等待的任务数,默认没有限制
  • 经过newTimeout()把须要延迟执行的任务添加到时间轮中
  • 下面的任务就会根据我们传递进来的数据进行延迟执行
@RestController
@RequestMapping("/timer")
public class HashedWheelController {

    //时间轮的定义
    HashedWheelTimer hashedWheelTimer=new HashedWheelTimer(
            new DefaultThreadFactory("demo-timer"),
            100, TimeUnit.MILLISECONDS,1024,false);

    @GetMapping("/{delay}")
    public void tick(@PathVariable("delay")Long delay){
        //SCHEDULED(定时执行的线程)
        //Timer(Java原生定时任务执行)
        //订单关单
        System.out.println("CurrentTime:"+new Date());
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println("Begin Execute:"+new Date());
        },delay,TimeUnit.SECONDS);
    }
}
时间轮的原理解析
建立时间轮:时间轮本质上是一个环状数组,好比咱们初始化时间轮时:ticksPerWheel=8,那么意味着这个环状数组的长度是8。
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
添加任务
当经过newTimeout()方法添加一个延迟任务时,该任务首先会加入到一个阻塞队列中中。而后会有一个定时任务从该队列获取任务,
添加到时间轮的指定位置:
  •   根据任务的延迟时间计算要多少个tick才能执行
  •   计算当前任务须要转动多少圈才能执行
  •        经过ticks取模mask,获得一个下标
  •   把任务添加到指定数组下标位置
//当前任务的开始执行时间除以每一个窗口的时间间隔,获得一个calculated值(表示须要通过多少 tick,指针没跳动一个窗格,tick会递增),单位为nanos(微毫秒) 
long calculated = timeout.deadline / tickDuration;
//计算当前任务须要在时间轮中经历的圈数,由于当前任务执行时间有可能大于完整一圈的时间,因此 须要计算通过几圈以后才能执行该任务。
timeout.remainingRounds = (calculated - tick) / wheel.length; //取最大的一个tick,有可能当前任务在队列中已通过了执行时间,这种状况下直接用calculated这 个值就没意义了。 
final long ticks = Math.max(calculated, tick);
// Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask); //经过ticks取模mask,获得一个下标 
HashedWheelBucket bucket = wheel[stopIndex]; //把任务添加到指定数组下标位置
任务执行:
  •   Worker线程按照每次间隔时间转动后,获得该时间窗格中的任务链表,而后从链表的head开始逐个取出任务,有两个判断条件:
  •   当前任务须要转动的圈数为0,表示任务是当前圈开始执行
  •   当前任务达到了delay时间,也就是 timeout.deadline <= deadline
  •   最终调用timeout.expire()方法执行任务。