Redis 5 新特性中,Streams 数据结构的引入,能够说它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 做为消息队列使用时,获得更完善,更强大的原生支持,其中尤其明显的是持久化消息队列。同时,stream 借鉴了 kafka 的消费组模型概念和设计,使消费消息处理上更加高效快速。本文就 Streams 数据结构中经常使用 API 进行分析。bash
本文所使用 Redis 版本为 5.0.5 。若是使用更早的 5.x 版本,有些 API 使用效果,与本文中描述略有不一样。服务器
Streams 添加数据使用 XADD 指令进行添加,消息中的数据以 K-V 键值对的形式进行操做。一条消息能够存在多个键值对,添加命令格式:数据结构
XADD key ID field string [field string ...]
复制代码
其中 key 为 Streams 的名称,ID 为消息的惟一标志,不可重复,field string 就为键值对。下面咱们就添加以 person 为名称的流,进行操做。优化
XADD person * name ytao des https://ytao.top
复制代码
上面添加案例中,ID 使用 * 号复制,这里表明着服务端自动生成 Id,添加后返回数据 "1578238486193-0"
ui
这里自动生成的 Id 格式为 <millisecondsTime>-<sequenceNumber>
Id 是由两部分组成:spa
好比:1578238486193-3 表示在 1578238486193 毫秒的时间戳时,添加的第 4 条消息。设计
除了服务端自动生成 Id 方式外,也支持指定 Id 的生成,可是指定 Id 有如下条件限制:指针
不然,当不知足上述条件时,添加后会抛出异常:code
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
复制代码
实际上,当添加一条消息时,会进行两部操做。第一步,先判断若是不存在 Streams,则建立 Streams 的名称,再添加消息到 Streams 中。即便添加消息时,因为 Id 异常,也能够在 Redis 中存在以当前 Streams 的名称。 Streams 中 Id 也可做为指针使用,由于它是一个有序的标记。cdn
生产中,若是这样使用添加消息,会存在一个问题,那就是消息数量太大时,会使服务宕机。这里 Streams 的设计初期也有考虑到这个问题,那就是能够指定 Streams 的容量。若是容量操做这个设定的值,就会对调旧的消息。在添加消息时,设置 MAXLEN
参数。
XADD person MAXLEN 5 * name ytao des https://ytao.top
复制代码
这样就指定该了 Streams 中的容量为 5 条消息。也可以使用 XTRIM 截取消息,从小到大剔除多余的消息:
XTRIM person MAXLEN 8
复制代码
查看消息数量使用 XLEN 指令进行操做。
XLEN key
复制代码
例:查看 person 流中的消息数量:
> XLEN person
(integer) 5
复制代码
查询 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。
查询数据时,能够按照指定 Id 范围进行查询,XRANGE 查询指令格式:
XRANGE key start end [COUNT count]
复制代码
参数说明:
这里 start 和 end 有-
和+
两个非指定值,他们分别表示无穷小和无穷大,因此当使用这个两个值时,会查询出所有的消息。
> XRANGE person - +
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
3) 1) "2-0"
2) 1) "name"
2) "gaga"
3) "des"
4) "fishion!"
复制代码
上面查询的消息数据,能够看到是按照先进先出的顺序查询出来的。
使用 COUNT 指定查询返回的数量:
# 查询全部的消息,而且返回一条数据
> XRANGE person - + COUNT 1
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
复制代码
在范围查询中,Id 的后半部分可省略,后半部分中的数据会所有查询到。
XREVRANGE 的查询和 XRANGE 指令中的使用相似,但查询的 start 和 end 参数顺序进行了调换:
XREVRANGE key end start [COUNT count]
复制代码
使用案例:
> XREVRANGE person + -
1) 1) "2-0"
2) 1) "name"
2) "gaga"
3) "des"
4) "fishion!"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
3) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
复制代码
查询后的结果与 XRANGE 的结果顺序恰好相反,其余都同样,这两个指令可进行消息的升序和降序的返回。
删除消息使用 XDEL 指令操做,只需指定将要删除的 Streams 名称和 Id 便可,支持一次删除多个消息 。
XDEL key ID [ID ...]
复制代码
删除案例:
# 查询全部消息
> XRANGE person - +
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
3) 1) "2-0"
2) 1) "name"
2) "gaga"
3) "des"
4) "fishion!"
# 删除消息
> XDEL person 2-0
(integer) 1
# 再次查询删除后的全部消息
> XRANGE person - +
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
# 查询删除后的长度
> XLEN person
(integer) 2
复制代码
从上面能够看到,删除消息后,长度也会减小相应的数量。
在 Redis 的 PUB/SUB 中,咱们是经过订阅来消费消息,在 Streams 数据结构中,一样也能实现同等功能,当没有新的消息时,可进行阻塞等待。不只支持单独消费,并且还能够支持群组消费。
单独消费使用 XREAD 指令。能够看到,下面命令中,STREAMS,key, 以及 ID 为必填项。ID 表示将要读取大于该 ID 的消息。当 ID 值使用 $
赋予时,表示已存在消息的最大 Id 值。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
复制代码
上面的 COUNT
参数用来指定读取的最大数量,与 XRANGE 的用法同样。
> XREAD COUNT 1 STREAMS person 0
1) 1) "person"
2) 1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
> XREAD COUNT 2 STREAMS person 0
1) 1) "person"
2) 1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
复制代码
在 XREAD 里面还有个 BLOCK
参数,这个是用来阻塞订阅消息的,BLOCK
携带的参数为阻塞时间,单位为毫秒,若是在这个时间内没有新的消息消费,那么就会释放该阻塞。当这里的时间指定为 0 时,会一直阻塞,直到有新的消息来消费到。
# 窗口 1 开启阻塞,等待新消息的到来
> XREAD BLOCK 0 STREAMS person $
# 另开一个链接窗口 2,添加一条新的消息
> XADD person 2-2 name tao des coder
"2-2"
# 窗口 1,获取到有新的消息来消费,而且带有阻塞的时间
> XREAD BLOCK 0 STREAMS person $
1) 1) "person"
2) 1) 1) "2-2"
2) 1) "name"
2) "tao"
3) "des"
4) "coder"
(60.81s)
复制代码
当使用 XREAD 进行顺序消费时,须要额外记录下读取到位置的 Id,方便下次继续消费。
群组消费的主要目的也就是为了分流消息给不一样的客户端处理,以更高效的速率处理消息。为达到这一肝功能需求,咱们须要作三件事:建立群组,群组读取消息,向服务端确认消息以处理。
操做群组使用 XGROUP 指令:
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
复制代码
上面命令中,包含操做有:
咱们当前须要使用的是建立消费组:
# 以当前存在的最大 Id 做为消费起始
> XGROUP CREATE person group1 $
OK
复制代码
群组读取使用 XREADGROUP 指令,COUNT
和BLOCK
的使用相似 XREAD 的操做,只是多了个群组和消费者的指定:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
复制代码
因为群组消费和单独消费相似,这里只进行个阻塞分析,这里 Id 也有个特殊值>
,表示还未进行消费的消息:
# 窗口 1,消费群组中,taotao 消费者创建阻塞监听
XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
# 窗口 2,消费群组中,yangyang 消费者创建阻塞监听
XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
# 窗口 3,添加消费消息
> XADD person 3-1 name tony des 666
"3-1"
# 窗口 1,读取到新消息,此时 窗口 2 没有任何反应
> XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
1) 1) "person"
2) 1) 1) "3-1"
2) 1) "name"
2) "tony"
3) "des"
4) "666"
(77.54s)
# 窗口 3,再次添加消费消息
> XADD person 3-2 name james des abc!
"3-2"
# 窗口 2,读取到新消息,此时 窗口 1 没有任何反应
> XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
1) 1) "person"
2) 1) 1) "3-2"
2) 1) "name"
2) "james"
3) "des"
4) "abc!"
(76.36s)
复制代码
以上执行流程中,group1 群组中有两个消费者,当添加两条消息后,这两个消费者轮流消费。
消息消费后,为避免再次重复消费,这是须要向服务端发送 ACK,确保消息被消费后的标记。 例以下列状况,咱们上面咱们将最新两条消息已进行了消费,可是当咱们再次读取消息时,仍是被读到:
> XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
2) 1) 1) "3-2"
2) 1) "name"
2) "james"
3) "des"
4) "abc!"
复制代码
这时,咱们使用 XACK 指令告诉服务器,咱们已处理的消息:
XACK key group ID [ID ...]0
复制代码
让服务器标记 3-2 已处理:
> XACK person group1 3-2
(integer) 1
复制代码
再次获取群组读取消息:
> XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
2) (empty list or set)
复制代码
队列中没有了可读消息。 除了上面以讲解到的 API 外,查看消费群组信息可以使用 XINFO 指令查看,本文不作分析。
上面对 Streams 经常使用 API 进行了分析,咱们能够感觉到 Redis 在消息队列支持的道路上,也愈来愈强大。若是使用过它的 PUB/SUB 功能的话,就会感觉到 5.x 迭代正是将你的一些痛点进行了优化。
我的博客: ytao.top
关注公众号 【ytao】,更多原创好文