相较于Redis4.0,Redis5.0增长了不少新的特性,而streams是其中最重要的特性之一。streams是redis 的一种基本数据结构,它是一个新的强大的支持多播的可持久化的消息队列,在设计上借鉴了kafaka。javascript
streams的数据类型自己很是简单,有点相似于hash结构,可是它的额外特性异常强大且复杂:java
pub/sub
机制和list
消息被消费后就会被删除,streams消费过的数据会被持久化的保存在历史中。pub/sub
有些相似。streams 提供了默认的id模式用来惟一标识streams中的每一条数据,由两部分组成:<millisecondsTime>-<sequenceNumber>
millisecondsTime是redis服务所在机器的时间,sequenceNumber用于同一毫秒建立的数据。须要注意的一点是streams的id老是单调增加的,即便redis服务所在的服务器时间异常。若是当前的毫秒数小于之前的毫秒数,就会使用历史记录中最大的毫秒数,而后序列号递增。而这样作的缘由是由于streams的机制容许根据时间区间或者某一个时间节点或者某一id查找数据。node
streams 的基础写命令为XADD
,其语法为XADD key ID field value [field value ...]
redis
127.0.0.1:6379> XADD mystream * name dwj age 18 "1574925508730-0" 127.0.0.1:6379>
上面的例子使用XADD
向名为mystream
的streams中添加了一条数据,ID使用*表示使用streams使用默认的ID,在本例中redis返回的1574925508730-0
就是redis为咱们插入的数据生成的ID。数据库
另外streams 查看streams长度的命令为XLEN
segmentfault
127.0.0.1:6379> XLEN mystream (integer) 3 127.0.0.1:6379>
从streams中读取数据会比写数据复杂不少,用日志文件进行对比,咱们能够查看历史日志,能够根据范围查询日志,咱们能够经过unix的命令tail -f
来监听日志,能够多个用户查看到同一份日志,也能够多个用户只能查看到本身有权限查看的那一部分日志。安全
首先来介绍一下 根据范围查询,这两种操做都比较简单,以XRANGE
为例,它的语法格式为XRANGE key start end [COUNT count]
, 咱们只须要提供两个id,start
和end
,返回的将是一个包含start
和end
的闭区间。两个特殊的ID-
和+
分别表示可能的最小ID和最大ID。ruby
127.0.0.1:6379> XRANGE mystream - + 1) 1) "1574835253335-0" 2) 1) "name" 2) "bob" 3) "age" 4) "23" 2) 1) "1574925508730-0" 2) 1) "name" 2) "dwj" 3) "age" 4) "18" 127.0.0.1:6379>
咱们前边提到过数据id中包含了建立数据的时间信息,这意味着咱们能够根据时间范围查询数据,为了根据时间范围查询,咱们省略掉ID的序列号部分,若是省略,对于start ID会使用0做为默认的序列号,对于end ID会使用最大序列号做为默认值,这样的话咱们使用两个unix时间戳去查询数据就能够获得那个时间区间内全部的数据。性能优化
1) 1) "1574835253335-0" 2) 1) "name" 2) "bob" 3) "age" 4) "23" 127.0.0.1:6379>
可能还会有同窗注意到语法的最后边还有count
参数,这个参数容许咱们一次只返回固定数量的数据,而后根据返回数据的last_id,做为下一次查询的start,这样就容许咱们在一个量很是大的streams里批量返回数据。
XREVRANGE命令与XRANGE相同,可是以相反的顺序返回元素,就不重复介绍了。bash
XREAD容许咱们从某一结点开始从streams中读取数据,它的语法为XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
,咱们在这里主要将的是经过XREAD
来订阅到达streams新的数据。这种操做可能跟REDIS中原有的pub/sub
机制或者阻塞队列
的概念有些相似,都是等待一个key而后获取到新的数据,可是跟这两种有着本质的差异:
pub/sub
和阻塞队列
容许多个客户端一块儿等待数据,默认状况下,streams会把消息推送给全部等待streams数据的客户端,这个能力跟pub/sub
有点相似,可是streams也容许把消息经过竞争机制推送给其中的一个客户端(这种模式须要用到消费者组的概念,会在后边讲到)。pub/sub
的消息是fire and forget而且从不存储,你只能够订阅到在你订阅时间以后产生的消息,而且消息只会推送给客户端一次,不能查看历史记录。以及使用阻塞队列
时,当客户端收到消息时,这个元素会从队列中弹出,换句话说,不能查看某个消费者消费消息的历史。而在streams中全部的消息会被无限期的加入到streams中(消息能够被显式的删除而且存在淘汰机制),客户端须要记住收到的最后一条消息,用于获取到节点以后的新消息。从streams中读取数据
127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) "1574835253335-0" 2) 1) "name" 2) "bob" 3) "age" 4) "23" 2) 1) "1574925508730-0" 2) 1) "name" 2) "dwj" 3) "age" 4) "18" 127.0.0.1:6379>
同list结构同样,streams也提供了阻塞读取的命令
XREAD BLOCK 0 STREAMS mystream
在上边的命令中指定了BLOCK选项,超时时间为0毫秒(意味着永不会过时)。此外,这个地方使用了特殊的id $
,这个特殊的id表明着当前streams中最大的id,这就意味着你只会读取streams中在你监听时间之后的消息。有点相似于Unix的tail -f
。另外XREAD能够同时监听多个流中的数据。
若是咱们想要的不是多个客户端处理相同的消息,而是多个客户端从streams中获取到不一样的消息进行处理。也就是咱们经常使用的生产者-消费者模型。假如想象咱们具备两个生产者p1,p2,三个消费者c1,c2,c3以及7个商品。咱们想按照下面的效果进行处理
p1 =>item1 => c1 p2 =>item2 => c2 p1 =>item3 => c3 p2 =>item4 => c1 p1 =>item5 => c2 p2 =>item6 => c3 p1 =>item7 => c1
为了解决这种场景,redis使用了一个名为消费者的概念,有点相似于kafka,但只是表现上。消费者组就像是一个伪消费者,它从流内读取数据,而后分发给组内的消费者,并记录该消费者组消费了哪些数据,处理了那些数据,并提供了一系列功能。
它的模型相似于以下
| consumer_group_name: mygroup | | consumer_group_stream: somekey | | last_delivered_id: 1292309234234-92 | | | | consumers: | | "consumer-1" with pending messages | | 1292309234234-4 | | 1292309234232-8 | | "consumer-42" with pending messages | | ... (and so forth) |
从上边的模型中咱们能够看出消费者组记录处理的最后一条消息,将消息分发给不一样的消费者,每一个消费者只能看到本身的消息。若是把消费者组看作streams的辅助数据结构,咱们能够看出一个streams能够拥有多个消费者组,一个消费者组内能够拥有多个消费者。实际上,一个streams容许客户端使用XREAD读取的同时另外一个客户端经过消费者群组读取数据。
咱们首先建立一个包含了一些数据的streams
127.0.0.1:6379> XADD fruit * message apple "1574935311149-0" 127.0.0.1:6379> XADD fruit * message banada "1574935315886-0" 127.0.0.1:6379> XADD fruit * message pomelo "1574935323628-0"
而后建立一个消费者组
127.0.0.1:6379> XGROUP CREATE fruit mygroup $ OK
注意咱们须要指定一个id,这里咱们使用的是特殊id$
,咱们也可使用0或者一个unix时间戳,这样,消费者组只会读取这个节点以后的消息。
如今消费者组建立好了,咱们可使用XREADGROUP命令当即开始尝试经过消费者组读取消息。XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
,与XREAD
相似,提供了BLOCK选项。假设指定消费者分别是Alice和Bob,来看看系统会怎样返回不一样消息给Alice和Bob。
127.0.0.1:6379> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS fruit > 1) 1) "fruit" 2) 1) 1) "1574936034258-0" 2) 1) "message" 2) "apple" 127.0.0.1:6379>
上边命令表明的信息是:我要经过mygroup
读取streams fruit
中的数据,我在群组中的身份是Alice
,请给我一条数据。 >
操做符只在消费者组的上线文中有效,表明消息到目前为止没有交给其它消费者处理过。
咱们也可使用一个有效的id,在这种状况下,消费者组会告诉咱们的历史待处理消息,而不会告诉咱们新的消息。这个特性也是颇有用的,当消费者由于某些缘由从新启动后,咱们能够查看本身的历史待处理消息,处理完待处理消息后再去处理新的消息。
咱们能够经过XACK
命令告诉消费者组某条消息已经被正确处理,不要显示在个人历史待处理消息列表中。XACK
的语法为XACK key group ID [ID ...]
127.0.0.1:6379> XACK fruit mygroup 1574936034258-0 (integer) 1
有几件事须要记住:
在一个消费者群组中可能存在多个消费者消费消息,可是也可能会存在某一个消费者永久退出消费者群组的状况,这样咱们就须要一种机制,把该消费者的待处理消息分配给消费者群组的另外一个消费者。这就须要咱们具备查看待处理消息的能力以及把某个消息分配给指定消费者的能力。前者是经过一个叫XPENDING
的命令,它的语法为XPENDING key group [start end count] [consumer]
127.0.0.1:6379> XPENDING fruit mygroup 1) (integer) 1 2) "1574936042937-0" 3) "1574936042937-0" 4) 1) 1) "Alice" 2) "1"
上述返回结果表明的是消费者群组有1条待处理命令,待处理消息的起始id为1574936042937-0
,结束id为1574936042937-0
,名为Alice
的消费者有一个待处理命令,可能有人会好奇咱们在前边往fruit
放入了3个水果,使用XACK
处理了一个水果,消费者待处理列表中应该有两个水果,而事实上消费者群组的待处理列表为该群组下消费者待处理消息的合集,当有消费者经过群组获取消息的时候会改变消费者群组的状态,这也是前边提到的为何XREADGROUP
必须在master节点进行调用。
咱们可使用start end count 参数来查看某个范围内消息的状态
127.0.0.1:6379> XPENDING fruit mygroup - + 10 Alice 1) 1) "1574936042937-0" 2) "Alice" 3) (integer) 903655 4) (integer) 1 2) 1) "1574936052018-0" 2) "Alice" 3) (integer) 491035 4) (integer) 1
这样咱们就看到了一条消息的详细信息,id为1574936042937-0
的消息的消费者为Alice
,它的pending时间为903655
,这个消息被分配了1次。
咱们会发现第一条消息的处理时间有点长,咱们怀疑Alice
已经不能处理这条消息了,因而咱们想把这条消息分配给Bob
,这种场景下就须要用到了XCLAIM
命令,它的语法为XCLAIM ...
,其中min-idle-time为消息的最小空闲时间,只有消息的空闲时间大于这个值消息才会被分配,由于消息被分配的时候会重置消息的空闲时间,若是有同时把一条消息分配给两个客户端,只会第一条命令生效,由于当消息分配给第一个客户端的时候重置空闲时间,第二条命令则会失效。
咱们也可使用一个独立的进程来不断寻找超时的消息,并把它分配给活跃的消费者,不过须要注意的是,若是消息的分配次数达到某个阙值,不该该把消息再分配出去,而是应该放到别的地方。
streams具备不错的可观察性,前边的XPENDING
命令容许咱们查看streams在某个消费者群组内待处理消息的状态。可是咱们想看的更多,好比在这个streams下有多少个group, 在这个group下有多少消费者。这就要用到XINFO
命令:
查看streams
信息:
127.0.0.1:6379> XINFO STREAM mystream 1) "length" 2) (integer) 2 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "groups" 8) (integer) 1 9) "last-generated-id" 10) "1574925508730-0" 11) "first-entry" 12) 1) "1574835253335-0" 2) 1) "name" 2) "bob" 3) "age" 4) "23" 13) "last-entry" 14) 1) "1574925508730-0" 2) 1) "name" 2) "dwj" 3) "age" 4) "18"
输出中会告诉咱们streams的长度,群组数量,第一条和最后一条信息的详情。下面看一下streams下群组的信息:
127.0.0.1:6379> XINFO GROUPS fruit 1) 1) "name" 2) "mygroup" 3) "consumers" 4) (integer) 1 5) "pending" 6) (integer) 2 7) "last-delivered-id" 8) "1574936052018-0" 2) 1) "name" 2) "mygroup-1" 3) "consumers" 4) (integer) 0 5) "pending" 6) (integer) 0 7) "last-delivered-id" 8) "0-0"
咱们能够从输出中看到fruit
下有两个群组,群组的名称以及待处理消息的数量,处理的最后一条消息。咱们能够在详细的查看下消费者群组内消费者的状态。
127.0.0.1:6379> XINFO CONSUMERS fruit mygroup 1) 1) "name" 2) "Alice" 3) "pending" 4) (integer) 2 5) "idle" 6) (integer) 1990242 2) 1) "name" 2) "Bob" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 9178
从输出中能够看到消费者待处理消息的数量以及消费者的闲置时间。
若是从streams能够查看到历史记录,咱们可能会有疑惑,若是streams无限期的加入内存会不会够用,一旦消息数量达到上限,将消息永久删除或者持久化到数据库都是有必要的,redis也提供了诸如此类场景的支持。
一种方法是咱们使用XADD
的时候指定streams的最大长度,XADD mystream MAXLEN ~ 1000
其中的数值前能够加上~
标识不须要精确的将长度保持在1000,比1000多一些也能够接受。若是不使用该标识,性能会差一些。另外一种方法是使用XTRIM
,该命令也是使用MAXLEN
选项,> XTRIM mystream MAXLEN ~ 10
前面提到了在streams API里边存在一些特殊的id。
首先是-
和+
,这两个ID在XRANGE
命令中使用,分别表明最小的id和最大的id。-
表明0-1
,+
表明18446744073709551615-18446744073709551615
,从使用上方便了不少。在XPENDING
等范围查询中均可以使用。$
表明streams中当前存在的最大的id,在XREAD
和XGROUP
中表明只获取新到的消息。须要注意的是$
跟+
的含义并不一致。
还有一个特殊的id是>
,这个id只可以在XREADGROUP
命令中使用,意味着在这个消费者群组中,历来没有分配给其余的消费者,因此老是使用>
做为群组中的last delivered ID
。
与redis的其它数据结构同样,streams会异步复制到从节点,并持久化到AOF和RDB文件中,而且消费者群组的状态也会按照此机制进行持久化。
须要注意的几点是:
appendfsync always
这样会严重下降Redis的速度)WAIT
命令能够用于强制将更改传输到一组从节点上。虽然这使得数据不太可能会丢失,可是redis的Sentinel和cluster在进行故障转移的时候不必定会使用具备最新数据的从节点,在一些特殊故障下,反而会使用缺乏一些数据的从节点。所以在使用redis streams和消费者群组在设计程序的时候,确保了解你的应用程序在故障期间的应对策略,并进行相应地配置,评估它对你的程序是否足够安全。
删除streams中的数据使用XDEL
命令,其语法为XDEL key ID [ID ...]
,须要注意的是在当前的实现中,在宏节点彻底为空以前,内存并无真正回收,因此你不该该滥用这个特性。
streams的不阻塞命令,好比XRANGE
或者不使用BLOCK选项的XREAD
和XREADGROUP
跟redis普通命令一致,因此没有必要讨论。若是有兴趣的话能够在redis的文档中查看到对应命令的时间复杂度。streams命令的速度在必定范围内跟set
是一致的,XADD
命令的速度很是快,在一个普通的机器上,一秒钟能够插入50w~100w条数据。
咱们感兴趣的是在消费者群组的阻塞场景下,从经过XADD
命令向streams中插入一条数据,到消费者经过群组读取到这条消息的性能。
为了测试消息从产生到消费间的延迟,咱们使用ruby程序进行测试,将消息的产生时间做为消息的一个字段,而后把消息推送到streams中,客户端收到消息后使用当前时间跟生产时间进行对比,从而计算出消息的延迟时间。这个程序未进行性能优化,运行在一个双核的机器上,同时redis也运行在这台机器上,以此来模拟不是理想条件下的场景。消息每秒钟产生1w条,群组内有10个消费者消费数据。测试结果以下:
Processed between 0 and 1 ms -> 74.11% Processed between 1 and 2 ms -> 25.80% Processed between 2 and 3 ms -> 0.06% Processed between 3 and 4 ms -> 0.01% Processed between 4 and 5 ms -> 0.02%
99.9%的请求的延迟小于等于2毫秒,并且异常值很是接近平均值。另外须要注意的两点:
原文连接: https://redis.io/topics/strea...
本文做者:Worktile工程师 杜文杰
文章来源:Worktile技术博客
欢迎访问交流更多关于技术及协做的问题。
文章转载请注明出处。