Redis 5 新特性中,Streams 数据结构的引入,能够说它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 做为消息队列使用时,获得更完善,更强大的原生支持,其中尤其明显的是持久化消息队列。同时,stream 借鉴了 kafka 的消费组模型概念和设计,使消费消息处理上更加高效快速。本文就 Streams 数据结构中经常使用 API 进行分析。
本文所使用 Redis 版本为 5.0.5 。若是使用更早的 5.x 版本,有些 API 使用效果,与本文中描述略有不一样。segmentfault
Streams 添加数据使用 XADD 指令进行添加,消息中的数据以 K-V 键值对的形式进行操做。一条消息能够存在多个键值对,添加命令格式:bash
XADD key ID field string [field string ...]
其中 key 为 Streams 的名称,ID 为消息的惟一标志,不可重复,field string 就为键值对。下面咱们就添加以 person 为名称的流,进行操做。服务器
XADD person * name ytao des https://ytao.top
上面添加案例中,ID 使用 * 号复制,这里表明着服务端自动生成 Id,添加后返回数据 "1578238486193-0"
数据结构
这里自动生成的 Id 格式为 <millisecondsTime>-<sequenceNumber>
Id 是由两部分组成:优化
好比:1578238486193-3 表示在 1578238486193 毫秒的时间戳时,添加的第 4 条消息。spa
除了服务端自动生成 Id 方式外,也支持指定 Id 的生成,可是指定 Id 有如下条件限制:设计
不然,当不知足上述条件时,添加后会抛出异常:指针
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
实际上,当添加一条消息时,会进行两部操做。第一步,先判断若是不存在 Streams,则建立 Streams 的名称,再添加消息到 Streams 中。即便添加消息时,因为 Id 异常,也能够在 Redis 中存在以当前 Streams 的名称。
Streams 中 Id 也可做为指针使用,由于它是一个有序的标记。code
生产中,若是这样使用添加消息,会存在一个问题,那就是消息数量太大时,会使服务宕机。这里 Streams 的设计初期也有考虑到这个问题,那就是能够指定 Streams 的容量。若是容量操做这个设定的值,就会对调旧的消息。在添加消息时,设置 MAXLEN
参数。blog
XADD person MAXLEN 5 * name ytao des https://ytao.top
这样就指定该了 Streams 中的容量为 5 条消息。也可以使用 XTRIM 截取消息,从小到大剔除多余的消息:
XTRIM person MAXLEN 8
查看消息数量使用 XLEN 指令进行操做。
XLEN key
例:查看 person 流中的消息数量:
> XLEN person (integer) 5
查询 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。
查询数据时,能够按照指定 Id 范围进行查询,XRANGE 查询指令格式:
XRANGE key start end [COUNT count]
参数说明:
这里 start 和 end 有-
和+
两个非指定值,他们分别表示无穷小和无穷大,因此当使用这个两个值时,会查询出所有的消息。
> XRANGE person - + 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!"
上面查询的消息数据,能够看到是按照先进先出的顺序查询出来的。
使用 COUNT 指定查询返回的数量:
# 查询全部的消息,而且返回一条数据 > XRANGE person - + COUNT 1 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top"
在范围查询中,Id 的后半部分可省略,后半部分中的数据会所有查询到。
XREVRANGE 的查询和 XRANGE 指令中的使用相似,但查询的 start 和 end 参数顺序进行了调换:
XREVRANGE key end start [COUNT count]
使用案例:
> XREVRANGE person + - 1) 1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top"
查询后的结果与 XRANGE 的结果顺序恰好相反,其余都同样,这两个指令可进行消息的升序和降序的返回。
删除消息使用 XDEL 指令操做,只需指定将要删除的 Streams 名称和 Id 便可,支持一次删除多个消息 。
XDEL key ID [ID ...]
删除案例:
# 查询全部消息 > XRANGE person - + 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!" # 删除消息 > XDEL person 2-0 (integer) 1 # 再次查询删除后的全部消息 > XRANGE person - + 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" # 查询删除后的长度 > XLEN person (integer) 2
从上面能够看到,删除消息后,长度也会减小相应的数量。
在 Redis 的 PUB/SUB 中,咱们是经过订阅来消费消息,在 Streams 数据结构中,一样也能实现同等功能,当没有新的消息时,可进行阻塞等待。不只支持单独消费,并且还能够支持群组消费。
单独消费使用 XREAD 指令。能够看到,下面命令中,STREAMS,key, 以及 ID 为必填项。ID 表示将要读取大于该 ID 的消息。当 ID 值使用 $
赋予时,表示已存在消息的最大 Id 值。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
上面的 COUNT
参数用来指定读取的最大数量,与 XRANGE 的用法同样。
> XREAD COUNT 1 STREAMS person 0 1) 1) "person" 2) 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" > XREAD COUNT 2 STREAMS person 0 1) 1) "person" 2) 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!"
在 XREAD 里面还有个 BLOCK
参数,这个是用来阻塞订阅消息的,BLOCK
携带的参数为阻塞时间,单位为毫秒,若是在这个时间内没有新的消息消费,那么就会释放该阻塞。当这里的时间指定为 0 时,会一直阻塞,直到有新的消息来消费到。
# 窗口 1 开启阻塞,等待新消息的到来 > XREAD BLOCK 0 STREAMS person $ # 另开一个链接窗口 2,添加一条新的消息 > XADD person 2-2 name tao des coder "2-2" # 窗口 1,获取到有新的消息来消费,而且带有阻塞的时间 > XREAD BLOCK 0 STREAMS person $ 1) 1) "person" 2) 1) 1) "2-2" 2) 1) "name" 2) "tao" 3) "des" 4) "coder" (60.81s)
当使用 XREAD 进行顺序消费时,须要额外记录下读取到位置的 Id,方便下次继续消费。
群组消费的主要目的也就是为了分流消息给不一样的客户端处理,以更高效的速率处理消息。为达到这一肝功能需求,咱们须要作三件事:建立群组,群组读取消息,向服务端确认消息以处理。
操做群组使用 XGROUP 指令:
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
上面命令中,包含操做有:
咱们当前须要使用的是建立消费组:
# 以当前存在的最大 Id 做为消费起始 > XGROUP CREATE person group1 $ OK
群组读取使用 XREADGROUP 指令,COUNT
和BLOCK
的使用相似 XREAD 的操做,只是多了个群组和消费者的指定:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
因为群组消费和单独消费相似,这里只进行个阻塞分析,这里 Id 也有个特殊值>
,表示还未进行消费的消息:
# 窗口 1,消费群组中,taotao 消费者创建阻塞监听 XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person > # 窗口 2,消费群组中,yangyang 消费者创建阻塞监听 XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person > # 窗口 3,添加消费消息 > XADD person 3-1 name tony des 666 "3-1" # 窗口 1,读取到新消息,此时 窗口 2 没有任何反应 > XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person > 1) 1) "person" 2) 1) 1) "3-1" 2) 1) "name" 2) "tony" 3) "des" 4) "666" (77.54s) # 窗口 3,再次添加消费消息 > XADD person 3-2 name james des abc! "3-2" # 窗口 2,读取到新消息,此时 窗口 1 没有任何反应 > XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person > 1) 1) "person" 2) 1) 1) "3-2" 2) 1) "name" 2) "james" 3) "des" 4) "abc!" (76.36s)
以上执行流程中,group1 群组中有两个消费者,当添加两条消息后,这两个消费者轮流消费。
消息消费后,为避免再次重复消费,这是须要向服务端发送 ACK,确保消息被消费后的标记。
例以下列状况,咱们上面咱们将最新两条消息已进行了消费,可是当咱们再次读取消息时,仍是被读到:
> XREADGROUP GROUP group1 yangyang STREAMS person 0 1) 1) "person" 2) 1) 1) "3-2" 2) 1) "name" 2) "james" 3) "des" 4) "abc!"
这时,咱们使用 XACK 指令告诉服务器,咱们已处理的消息:
XACK key group ID [ID ...]0
让服务器标记 3-2 已处理:
> XACK person group1 3-2 (integer) 1
再次获取群组读取消息:
> XREADGROUP GROUP group1 yangyang STREAMS person 0 1) 1) "person" 2) (empty list or set)
队列中没有了可读消息。
除了上面以讲解到的 API 外,查看消费群组信息可以使用 XINFO 指令查看,本文不作分析。
上面对 Streams 经常使用 API 进行了分析,咱们能够感觉到 Redis 在消息队列支持的道路上,也愈来愈强大。若是使用过它的 PUB/SUB 功能的话,就会感觉到 5.x 迭代正是将你的一些痛点进行了优化。
我的博客: https://ytao.top
关注公众号 【ytao】,更多原创好文