新姿式!Redis中调用Lua脚本以实现原子性操做

背景:有一服务提供者Leader,有多个消息订阅者Workers。Leader是一个排队程序,维护了一个用户队列,当某个资源空闲下来并被分配至队列中的用户时,Leader会向订阅者推送消息(消息带有惟一标识ID),订阅者在接收到消息后会进行特殊处理并再次推往前端。前端

问题:前端只须要接收到一条由Worker推送的消息便可,可是若是Workers不作消息重复推送判断的话,会致使前端收到多条消息推送,从而影响正常业务逻辑。java


方案一(未经过)

在Worker接收到消息时,尝试先从redis缓存中根据消息的ID获取值,有如下两种状况:redis

  • 若是值不存在,则表示当前这条消息是第一次被推送,能够执行继续执行推送程序,固然,不要忘了将当前消息ID做为键插入缓存中,并设置一个过时时间,标记这条消息已经被推送过了。shell

  • 若是值存在,则表示当前这条消息是被推送过的,跳过推送程序。缓存

代码能够这么写:bash

public void waitingForMsg() {
    // Message Received.
    String value = redisTemplate.opsForValue().get("msg_pushed_" + msgId);
    if (!StringUtils.hasText(value)) {
        // 当不能从缓存中读取到数据时,表示消息是第一次被推送
        // 赶忙往缓存中插入一个标识,表示当前消息已经被推送过了
        redisTemplate.opsForValue().set("msg_pushed_" + msgId, "1");
        // 再设置一个过时时间,防止数据无限制保留
        redisTemplate.expire("msg_pushed_" + msgId, 20, TimeUnit.SECONDS);
        // 接下来就能够执行推送操做啦
        this.pushMsgToFrontEnd();
    }
}
复制代码

看起来彷佛是没啥问题,可是咱们从redis的角度分析一下请求,看看是否是真的没问题。ide

> get msg_pushed_1 # 此时Worker1尝试获取值
> get msg_pushed_1 # Worker2也没闲着,执行了这句话,而且时间找得刚恰好,就在Worker1准备插入值以前
> set msg_pushed_1 "1" # Worker1以为消息没有被推送,插入了一个值
> set msg_pushed_1 "1" # Worker2也这么以为,作了一样的一件事复制代码

你看,仍是有可能会往前端推送屡次消息,因此这个方案不经过。ui

再仔细想想,出现这个问题的缘由是啥?———— 就是在执行get和set命令时,没有保持原子性操做,致使其余命令有机可趁,那是否是能够把get和set命令当成一整个部分执行,不让其余命令插入执行呢?this

有不少方案能够实现,例如给键加锁或者添加事务可能能够完成这个操做。可是咱们今天讨论一下另一种方案,在Redis中执行Lua脚本。atom


方案二

咱们能够看一下Redis官方文档对Lua脚本原子性的解释

Atomicity of scripts

Redis uses the same Lua interpreter to run all the commands. Also Redis guarantees that a script is executed in an atomic way: no other script or Redis command will be executed while a script is being executed. This semantic is similar to the one of MULTI / EXEC. From the point of view of all the other clients the effects of a script are either still not visible or already completed.

大体意思是说:咱们Redis采用相同的Lua解释器去运行全部命令,咱们能够保证,脚本的执行是原子性的。做用就相似于加了MULTI/EXEC。


好,原子性有保证了,那么咱们再看看编写语法。

> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second1) "key1"2) "key2"3) "first"4) "second"复制代码

由前至后的命令解释(Arg 表示参数的意思 argument):

    eval: Redis执行Lua脚本的命令,后接脚本内容及各参数。这个命令是从2.6.0版本才开始支持的。

    1st. Arg : Lua脚本,其中的KEYS[]和ARGV[]是传入script的参数 。

    2nd. Arg: 后面跟着的KEY个数n,从第三个参数开始的总共n个参数会被做为KEYS传入script中,在script中能够经过KEYS[1], KEYS[2]…格式读取,下标从1开始 。

    Remain Arg: 剩余的参数能够在脚本中经过ARGV[1], ARGV[2]…格式读取 ,下标从1开始 。

咱们执行脚本内容是return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}表示返回传入的参数,因此咱们能够看到参数被原封不动的返回了。


接着,咱们再来实战一下,在Lua脚本中调用Redis方法吧。

咱们能够在Lua脚本中经过如下两个命令调用redis的命令程序

  • redis.call()

  • redis.pcall()

二者的做用是同样的,可是程序出错时的返回结果略有不一样。

使用方法,命令和在Redis中执行如出一辙:

> eval "return redis.call('set', KEYS[1], ARGV[1])" 1 foo bar
OK
> eval "return redis.call('get', KEYS[1])" 1 foo
"bar"复制代码


是否是很简单,说了这么多,咱们赶忙来现学现卖,写一个脚本应用在咱们的场景中吧。

> eval "if redis.call('get', KEYS[1]) == false then redis.call('set', KEYS[1], ARGV[1]) redis.call('expire', KEYS[1], ARGV[2]) return 0 else return 1 end" 1 msg_push_1 "1" 10复制代码

脚本的意思和咱们以前在方案一中写的程序逻辑同样,先判断缓存中是否存在键,若是不存在则存入键和其值,而且设置失效时间,最后返回0;若是存在则返回1。PS: 若是对if redis.call('get', KEYS[1]) == false这里为何获得的结果要与false比较的话,能够看最后的Tip。

  • 执行第一次:咱们发现返回值0,而且咱们看到缓存中插入了一条数据,键为msg_push_1、值为"1"

  • 在失效前,执行屡次:咱们发现返回值一直为1。而且在执行第一次后的10秒,该键被自动删除。


将以上逻辑迁入咱们java代码后,就是下面这个样子啦

public boolean isMessagePushed(String messageId) {
    Assert.hasText(messageId, "消息ID不能为空");

    // 使用lua脚本检测值是否存在
    String script = "if redis.call('get', KEYS[1]) == false then redis.call('set', KEYS[1], ARGV[1]) redis.call('expire', KEYS[1], ARGV[2]) return 0 else return 1 end";

    // 这里使用Long类型,查看源码可知脚本返回值类型只支持Long, Boolean, List, or deserialized value type.
    DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
    redisScript.setScriptText(script);
    redisScript.setResultType(Long.class);

    // 设置key
    List<String> keyList = new ArrayList<>();
    // key为消息ID
    keyList.add(messageId);

    // 每一个键的失效时间为20秒
    Long result = redisTemplate.execute(redisScript, keyList, 1, 20);

    // 返回true: 已读、false: 未读
    return result != null && result != 0L;
}

public void waitingForMsg() {
    // Message Received.
    if (!this.isMessagePushed(msgId)) {
        // 返回false表示未读,接下来就能够执行推送操做啦
        this.pushMsgToFrontEnd();
    }
}复制代码

Tip

这里只是简单的Redis中使用Lua脚本介绍,详细的使用方法能够参考官方文档,并且还有其余不少用法介绍。

对了,上面还有一个须要注意一下,就是关于Redis和Lua中变量的相互转换,由于提及来啰哩啰嗦的,因此没放在上文中,最后能够简单说一下。

Redis to Lua conversion table.

  • Redis integer reply -> Lua number

  • Redis bulk reply -> Lua string

  • Redis multi bulk reply -> Lua table (may have other Redis data types nested)

  • Redis status reply -> Lua table with a single ok field containing the status

  • Redis error reply -> Lua table with a single err field containing the error

  • Redis Nil bulk reply and Nil multi bulk reply -> Lua false boolean type // 这里就是上面咱们在脚本中作是否为空判断的时候if redis.call('get', KEYS[1]) == false,采用与false比较的缘由。Redis的nil(相似null)会被转换为Lua的false

Lua to Redis conversion table.

  • Lua number -> Redis integer reply (the number is converted into an integer)

  • Lua string -> Redis bulk reply

  • Lua table (array) -> Redis multi bulk reply (truncated to the first nil inside the Lua array if any)

  • Lua table with a single ok field -> Redis status reply

  • Lua table with a single err field -> Redis error reply

  • Lua boolean false -> Redis Nil bulk reply.

注意点:

Lua的Number类型会被转为Redis的Integer类型,所以若是但愿获得小数时,须要由Lua返回String类型的数字。 

相关文章
相关标签/搜索