Streams:深刻理解Redis5.0新特性

概述

相较于Redis4.0,Redis5.0增长了不少新的特性,而streams是其中最重要的特性之一。streams是redis 的一种基本数据结构,它是一个新的强大的支持多播的可持久化的消息队列,在设计上借鉴了kafaka。javascript

streams的数据类型自己很是简单,有点相似于hash结构,可是它的额外特性异常强大且复杂:java

  • 支持持久化。streams能持久化存储数据,不一样于pub/sub机制和list 消息被消费后就会被删除,streams消费过的数据会被持久化的保存在历史中。
  • 支持多播。 这一点跟 pub/sub有些相似。
  • 支持消费者组。streams 容许同一消费组内的消费者竞争消息,并提供了一系列机制容许消费者查看本身的历史消费消息。并容许监控streams的消费者组信息,消费者组内消费者信息,也能够监控streams内消息的状态。

基础内容

数据 ID

streams 提供了默认的id模式用来惟一标识streams中的每一条数据,由两部分组成:
<millisecondsTime>-<sequenceNumber>
millisecondsTime是redis服务所在机器的时间,sequenceNumber用于同一毫秒建立的数据。须要注意的一点是streams的id老是单调增加的,即便redis服务所在的服务器时间异常。若是当前的毫秒数小于之前的毫秒数,就会使用历史记录中最大的毫秒数,而后序列号递增。而这样作的缘由是由于streams的机制容许根据时间区间或者某一个时间节点或者某一id查找数据。node

向streams插入数据

streams 的基础写命令为XADD,其语法为XADD key ID field value [field value ...]redis

127.0.0.1:6379> XADD mystream * name dwj age 18
"1574925508730-0"
127.0.0.1:6379>

上面的例子使用XADD向名为mystream的streams中添加了一条数据,ID使用*表示使用streams使用默认的ID,在本例中redis返回的1574925508730-0就是redis为咱们插入的数据生成的ID。数据库

另外streams 查看streams长度的命令为XLENsegmentfault

127.0.0.1:6379> XLEN mystream
(integer) 3
127.0.0.1:6379>

从streams中读取数据

从streams中读取数据会比写数据复杂不少,用日志文件进行对比,咱们能够查看历史日志,能够根据范围查询日志,咱们能够经过unix的命令tail -f来监听日志,能够多个用户查看到同一份日志,也能够多个用户只能查看到本身有权限查看的那一部分日志。安全

按范围查询: XRANGE 和 XREVRANGE

首先来介绍一下 根据范围查询,这两种操做都比较简单,以XRANGE为例,它的语法格式为XRANGE key start end [COUNT count], 咱们只须要提供两个id,startend,返回的将是一个包含startend的闭区间。两个特殊的ID-+分别表示可能的最小ID和最大ID。ruby

127.0.0.1:6379> XRANGE mystream - +
1) 1) "1574835253335-0"
   2) 1) "name"
      2) "bob"
      3) "age"
      4) "23"
2) 1) "1574925508730-0"
   2) 1) "name"
      2) "dwj"
      3) "age"
      4) "18"
127.0.0.1:6379>

咱们前边提到过数据id中包含了建立数据的时间信息,这意味着咱们能够根据时间范围查询数据,为了根据时间范围查询,咱们省略掉ID的序列号部分,若是省略,对于start ID会使用0做为默认的序列号,对于end ID会使用最大序列号做为默认值,这样的话咱们使用两个unix时间戳去查询数据就能够获得那个时间区间内全部的数据。性能优化

1) 1) "1574835253335-0"
   2) 1) "name"
      2) "bob"
      3) "age"
      4) "23"
127.0.0.1:6379>

可能还会有同窗注意到语法的最后边还有count参数,这个参数容许咱们一次只返回固定数量的数据,而后根据返回数据的last_id,做为下一次查询的start,这样就容许咱们在一个量很是大的streams里批量返回数据。
XREVRANGE命令与XRANGE相同,可是以相反的顺序返回元素,就不重复介绍了。bash

经过XREAD读取数据

XREAD容许咱们从某一结点开始从streams中读取数据,它的语法为XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...],咱们在这里主要将的是经过XREAD来订阅到达streams新的数据。这种操做可能跟REDIS中原有的pub/sub机制或者阻塞队列的概念有些相似,都是等待一个key而后获取到新的数据,可是跟这两种有着本质的差异:

  • streams跟pub/sub阻塞队列容许多个客户端一块儿等待数据,默认状况下,streams会把消息推送给全部等待streams数据的客户端,这个能力跟pub/sub有点相似,可是streams也容许把消息经过竞争机制推送给其中的一个客户端(这种模式须要用到消费者组的概念,会在后边讲到)。
  • pub/sub的消息是fire and forget而且从不存储,你只能够订阅到在你订阅时间以后产生的消息,而且消息只会推送给客户端一次,不能查看历史记录。以及使用阻塞队列时,当客户端收到消息时,这个元素会从队列中弹出,换句话说,不能查看某个消费者消费消息的历史。而在streams中全部的消息会被无限期的加入到streams中(消息能够被显式的删除而且存在淘汰机制),客户端须要记住收到的最后一条消息,用于获取到节点以后的新消息。
  • Streams 消费者组提供了一种Pub/Sub或者阻塞列表都不能实现的控制级别,同一个Stream不一样的群组,显式地确认已经处理的项目,检查待处理的项目的能力,申明未处理的消息,以及每一个消费者拥有连贯历史可见性,单个客户端只能查看本身过去的消息历史记录。

从streams中读取数据

127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1574835253335-0"
         2) 1) "name"
            2) "bob"
            3) "age"
            4) "23"
      2) 1) "1574925508730-0"
         2) 1) "name"
            2) "dwj"
            3) "age"
            4) "18"
127.0.0.1:6379>

同list结构同样,streams也提供了阻塞读取的命令

XREAD BLOCK 0 STREAMS mystream

在上边的命令中指定了BLOCK选项,超时时间为0毫秒(意味着永不会过时)。此外,这个地方使用了特殊的id $,这个特殊的id表明着当前streams中最大的id,这就意味着你只会读取streams中在你监听时间之后的消息。有点相似于Unix的tail -f。另外XREAD能够同时监听多个流中的数据。

消费者组

若是咱们想要的不是多个客户端处理相同的消息,而是多个客户端从streams中获取到不一样的消息进行处理。也就是咱们经常使用的生产者-消费者模型。假如想象咱们具备两个生产者p1,p2,三个消费者c1,c2,c3以及7个商品。咱们想按照下面的效果进行处理

p1 =>item1 => c1
p2 =>item2 => c2
p1 =>item3 => c3
p2 =>item4 => c1
p1 =>item5 => c2
p2 =>item6 => c3
p1 =>item7 => c1

为了解决这种场景,redis使用了一个名为消费者的概念,有点相似于kafka,但只是表现上。消费者组就像是一个伪消费者,它从流内读取数据,而后分发给组内的消费者,并记录该消费者组消费了哪些数据,处理了那些数据,并提供了一系列功能。

  1. 每条消息都提供给不一样的消费者,所以不可能将相同的消息传递给多个消费者。
  2. 消费者在消费者组中经过名称来识别,该名称是实施消费者的客户必须选择的区分大小写的字符串。这意味着即使断开链接事后,消费者组仍然保留了全部的状态,由于客户端会从新申请成为相同的消费者。 然而,这也意味着由客户端提供惟一的标识符。
  3. 每个消费者组都有一个第一个ID永远不会被消费的概念,这样一来,当消费者请求新消息时,它能提供之前从未传递过的消息。
  4. 消费消息须要使用特定的命令进行显式确认,表示:这条消息已经被正确处理了,因此能够从消费者组中逐出。
  5. 消费者组跟踪全部当前全部待处理的消息,也就是,消息被传递到消费者组的一些消费者,可是尚未被确认为已处理。因为这个特性,当访问一个Stream的历史消息的时候,每一个消费者将只能看到传递给它的消息。

它的模型相似于以下

| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |

从上边的模型中咱们能够看出消费者组记录处理的最后一条消息,将消息分发给不一样的消费者,每一个消费者只能看到本身的消息。若是把消费者组看作streams的辅助数据结构,咱们能够看出一个streams能够拥有多个消费者组,一个消费者组内能够拥有多个消费者。实际上,一个streams容许客户端使用XREAD读取的同时另外一个客户端经过消费者群组读取数据。

建立一个消费者群组

咱们首先建立一个包含了一些数据的streams

127.0.0.1:6379> XADD fruit * message apple
"1574935311149-0"
127.0.0.1:6379> XADD fruit * message banada
"1574935315886-0"
127.0.0.1:6379> XADD fruit * message pomelo
"1574935323628-0"

而后建立一个消费者组

127.0.0.1:6379> XGROUP CREATE fruit mygroup $
OK

注意咱们须要指定一个id,这里咱们使用的是特殊id$,咱们也可使用0或者一个unix时间戳,这样,消费者组只会读取这个节点以后的消息。

如今消费者组建立好了,咱们可使用XREADGROUP命令当即开始尝试经过消费者组读取消息。
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...],与XREAD相似,提供了BLOCK选项。假设指定消费者分别是Alice和Bob,来看看系统会怎样返回不一样消息给Alice和Bob。

127.0.0.1:6379> XREADGROUP GROUP  mygroup Alice COUNT 1 STREAMS fruit >
1) 1) "fruit"
   2) 1) 1) "1574936034258-0"
         2) 1) "message"
            2) "apple"
127.0.0.1:6379>

上边命令表明的信息是:我要经过mygroup读取streams fruit中的数据,我在群组中的身份是Alice,请给我一条数据。 >操做符只在消费者组的上线文中有效,表明消息到目前为止没有交给其它消费者处理过。
咱们也可使用一个有效的id,在这种状况下,消费者组会告诉咱们的历史待处理消息,而不会告诉咱们新的消息。这个特性也是颇有用的,当消费者由于某些缘由从新启动后,咱们能够查看本身的历史待处理消息,处理完待处理消息后再去处理新的消息。
咱们能够经过XACK命令告诉消费者组某条消息已经被正确处理,不要显示在个人历史待处理消息列表中。XACK的语法为XACK key group ID [ID ...]

127.0.0.1:6379> XACK fruit mygroup 1574936034258-0
(integer) 1

有几件事须要记住:

  1. 消费者是在他们第一次被说起的时候自动建立的,不须要显式建立。
  2. 即便使用XREADGROUP,你也能够同时从多个key中读取,可是要让其工做,你须要给每个Stream建立一个名称相同的消费者组。这并非一个常见的需求,可是须要说明的是,这个功能在技术上是能够实现的。
  3. XREADGROUP命令是一个写命令,由于当它从Stream中读取消息时,消费者组被修改了,因此这个命令只能在master节点调用。

从永久失败中恢复

在一个消费者群组中可能存在多个消费者消费消息,可是也可能会存在某一个消费者永久退出消费者群组的状况,这样咱们就须要一种机制,把该消费者的待处理消息分配给消费者群组的另外一个消费者。这就须要咱们具备查看待处理消息的能力以及把某个消息分配给指定消费者的能力。前者是经过一个叫XPENDING的命令,它的语法为XPENDING key group [start end count] [consumer]

127.0.0.1:6379> XPENDING fruit mygroup
1) (integer) 1
2) "1574936042937-0"
3) "1574936042937-0"
4) 1) 1) "Alice"
      2) "1"

上述返回结果表明的是消费者群组有1条待处理命令,待处理消息的起始id为1574936042937-0,结束id为1574936042937-0,名为Alice的消费者有一个待处理命令,可能有人会好奇咱们在前边往fruit放入了3个水果,使用XACK处理了一个水果,消费者待处理列表中应该有两个水果,而事实上消费者群组的待处理列表为该群组下消费者待处理消息的合集,当有消费者经过群组获取消息的时候会改变消费者群组的状态,这也是前边提到的为何XREADGROUP必须在master节点进行调用。
咱们可使用start end count 参数来查看某个范围内消息的状态

127.0.0.1:6379> XPENDING fruit mygroup - + 10 Alice
1) 1) "1574936042937-0"
   2) "Alice"
   3) (integer) 903655
   4) (integer) 1
2) 1) "1574936052018-0"
   2) "Alice"
   3) (integer) 491035
   4) (integer) 1

这样咱们就看到了一条消息的详细信息,id为1574936042937-0的消息的消费者为Alice,它的pending时间为903655,这个消息被分配了1次。
咱们会发现第一条消息的处理时间有点长,咱们怀疑Alice已经不能处理这条消息了,因而咱们想把这条消息分配给Bob,这种场景下就须要用到了XCLAIM命令,它的语法为XCLAIM ... ,其中min-idle-time为消息的最小空闲时间,只有消息的空闲时间大于这个值消息才会被分配,由于消息被分配的时候会重置消息的空闲时间,若是有同时把一条消息分配给两个客户端,只会第一条命令生效,由于当消息分配给第一个客户端的时候重置空闲时间,第二条命令则会失效。
咱们也可使用一个独立的进程来不断寻找超时的消息,并把它分配给活跃的消费者,不过须要注意的是,若是消息的分配次数达到某个阙值,不该该把消息再分配出去,而是应该放到别的地方。

streams的可观察性

streams具备不错的可观察性,前边的XPENDING命令容许咱们查看streams在某个消费者群组内待处理消息的状态。可是咱们想看的更多,好比在这个streams下有多少个group, 在这个group下有多少消费者。这就要用到XINFO命令:
查看streams信息:

127.0.0.1:6379> XINFO STREAM mystream
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 1
 9) "last-generated-id"
10) "1574925508730-0"
11) "first-entry"
12) 1) "1574835253335-0"
    2) 1) "name"
       2) "bob"
       3) "age"
       4) "23"
13) "last-entry"
14) 1) "1574925508730-0"
    2) 1) "name"
       2) "dwj"
       3) "age"
       4) "18"

输出中会告诉咱们streams的长度,群组数量,第一条和最后一条信息的详情。下面看一下streams下群组的信息:

127.0.0.1:6379> XINFO GROUPS fruit
1) 1) "name"
   2) "mygroup"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 2
   7) "last-delivered-id"
   8) "1574936052018-0"
2) 1) "name"
   2) "mygroup-1"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"

咱们能够从输出中看到fruit下有两个群组,群组的名称以及待处理消息的数量,处理的最后一条消息。咱们能够在详细的查看下消费者群组内消费者的状态。

127.0.0.1:6379> XINFO CONSUMERS fruit mygroup
1) 1) "name"
   2) "Alice"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 1990242
2) 1) "name"
   2) "Bob"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 9178

从输出中能够看到消费者待处理消息的数量以及消费者的闲置时间。

设置streams上限

若是从streams能够查看到历史记录,咱们可能会有疑惑,若是streams无限期的加入内存会不会够用,一旦消息数量达到上限,将消息永久删除或者持久化到数据库都是有必要的,redis也提供了诸如此类场景的支持。
一种方法是咱们使用XADD的时候指定streams的最大长度,XADD mystream MAXLEN ~ 1000 其中的数值前能够加上~标识不须要精确的将长度保持在1000,比1000多一些也能够接受。若是不使用该标识,性能会差一些。另外一种方法是使用XTRIM,该命令也是使用MAXLEN选项,> XTRIM mystream MAXLEN ~ 10

一些特殊的id

前面提到了在streams API里边存在一些特殊的id。
首先是-+,这两个ID在XRANGE命令中使用,分别表明最小的id和最大的id。-表明0-1+表明18446744073709551615-18446744073709551615,从使用上方便了不少。在XPENDING等范围查询中均可以使用。
$表明streams中当前存在的最大的id,在XREADXGROUP中表明只获取新到的消息。须要注意的是$+的含义并不一致。
还有一个特殊的id是>,这个id只可以在XREADGROUP命令中使用,意味着在这个消费者群组中,历来没有分配给其余的消费者,因此老是使用>做为群组中的last delivered ID

持久化,复制和消息安全性

与redis的其它数据结构同样,streams会异步复制到从节点,并持久化到AOF和RDB文件中,而且消费者群组的状态也会按照此机制进行持久化。
须要注意的几点是:

  • 若是消息的持久化以及状态很重要,则AOF必须使用强fsync配合(AOF记录每一条更改redis数据的命令,有不少种持久化机制,在这个地方要用到的是appendfsync always 这样会严重下降Redis的速度)
  • 默认状况下,异步复制不能保证从节点的数据与主节点保持一致,在故障转移之后可能会丢失一些内容,这跟从节点从主节点接受数据的能力有关。
  • WAIT命令能够用于强制将更改传输到一组从节点上。虽然这使得数据不太可能会丢失,可是redis的Sentinel和cluster在进行故障转移的时候不必定会使用具备最新数据的从节点,在一些特殊故障下,反而会使用缺乏一些数据的从节点。

所以在使用redis streams和消费者群组在设计程序的时候,确保了解你的应用程序在故障期间的应对策略,并进行相应地配置,评估它对你的程序是否足够安全。

从streams中删除数据

删除streams中的数据使用XDEL命令,其语法为XDEL key ID [ID ...],须要注意的是在当前的实现中,在宏节点彻底为空以前,内存并无真正回收,因此你不该该滥用这个特性。

streams的性能

streams的不阻塞命令,好比XRANGE或者不使用BLOCK选项的XREADXREADGROUP跟redis普通命令一致,因此没有必要讨论。若是有兴趣的话能够在redis的文档中查看到对应命令的时间复杂度。streams命令的速度在必定范围内跟set是一致的,XADD命令的速度很是快,在一个普通的机器上,一秒钟能够插入50w~100w条数据。
咱们感兴趣的是在消费者群组的阻塞场景下,从经过XADD命令向streams中插入一条数据,到消费者经过群组读取到这条消息的性能。
为了测试消息从产生到消费间的延迟,咱们使用ruby程序进行测试,将消息的产生时间做为消息的一个字段,而后把消息推送到streams中,客户端收到消息后使用当前时间跟生产时间进行对比,从而计算出消息的延迟时间。这个程序未进行性能优化,运行在一个双核的机器上,同时redis也运行在这台机器上,以此来模拟不是理想条件下的场景。消息每秒钟产生1w条,群组内有10个消费者消费数据。测试结果以下:

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%

99.9%的请求的延迟小于等于2毫秒,并且异常值很是接近平均值。另外须要注意的两点:

  1. 消费者每次处理1w条消息,这样增长了一些延迟,这样作是为了消费速度较慢的消费者可以保持保持消息流。
  2. 用来作测试的系统相比于如今的系统很是慢。

原文连接: https://redis.io/topics/strea...

本文做者:Worktile工程师 杜文杰

文章来源:Worktile技术博客

欢迎访问交流更多关于技术及协做的问题。

文章转载请注明出处。

相关文章
相关标签/搜索