我大学的时候英语6级没过,所以但凡懂点英语的同窗,若是你进到此页面,尽可能去阅读原文,连接在下方原文地址.最次也要对照着原文阅读,以避免我出了什么差错(这是不可避免的),坑了别的小伙伴.node
若是您发现任何翻译的有歧义的地方,欢迎评论或者发邮件至huyanshi2580@gmail.com
程序员
本文翻译自Reids官网对Stream的介绍.redis
最近工做须要,须要学一下Redis的新数据结构Stream
.因为算是比较新一些的技术,中文资料比较少.就找到了Redis官网上做者对Stream的介绍.读完受益不浅.数组
同时,为了记录以及加深理解,决定将原文翻译过来记录在博客里.安全
如下内容为原文,标题《Introduction to Redis Streams》ruby
Stream是Redis 5.0引入的一种新数据类型,它以更抽象的方式模拟日志数据结构,然而日志的本质仍然无缺无损:像日志文件同样,一般实现为仅追加模式打开的文件.Redis stream主要是仅追加的数据结构。至少在概念上是这样,由于Redis Streams是一种在内存中的抽象数据类型,因此它实现了更强大的操做,以克服日志文件自己的限制。bash
让Redis Streams变得很是复杂的是,尽管Stream数据结构自己很是简单,可是它实现了额外的非强制性功能:容许消费者等待生产者添加到流中的新数据的一组阻塞操做,此外还有一个名为Consumer Groups(消费者组)的概念。服务器
消费者组最初由Kafka(TM)(一个很受欢迎的的消息系统)引入。Redis以彻底不一样的方式从新实现了相似的想法,但目标是相同的:容许一组客户端合做消费同一消息流的不一样部分。数据结构
为了理解Redis Streams是什么以及如何使用它们,咱们将忽略全部高级功能,而是根据用于操做和访问它的命令来关注数据结构自己。这基本上是大多数其余Redis数据类型共有的部分,如列表,集合,排序集等。可是,请注意,列表还有一个可选的更复杂的阻塞API,相似于BLPOP
等。所以,Streams 在这方面与列表没有太大的不一样,只是附加的API更复杂,更强大。app
因为Stream是仅追加的数据结构,所以基本写入命令(称为XADD)会将新条目附加到指定的流中。Stream的条目不只仅是一个字符串,而是由一个或多个列-值
对组成。这样,Stream的每一个条目都已经结构化,就像仅以CSV格式追加式写入的文件,每行中存在多个分离的字段。
XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0
复制代码
上面对XADD
命令的调用,在键为mystream
的Stream中添加了值为sensor-id: 123, temperature: 19.8
的一个条目,它使用的条目ID为1518951480106-0
,是自动生成且由XADD
命令返回的.它将键名mystream
做为第一个参数,第二个参数是标识Stream中每一个条目的条目ID。然而,在上面的例子中,咱们使用了*
,由于咱们但愿服务器为咱们生成新的ID。每一个新的ID都会单调递增,更简单地说,添加的每一个新条目都会有比过去的全部条目更高的ID。服务器自动生成ID几乎老是您想要的,而且明确指定ID的缘由很是少见。咱们稍后会详细讨论这个问题。就像日志文件拥有行号或者文件内的字节偏移量同样,每一个条目拥有ID是Stream与日志文件类似的另外一个特征.回到咱们的XADD示例,在键名和ID以后,下一个参数是组成咱们Stream条目的列-值对。
只需使用XLEN命令就能够获取Stream中的项目数:
> XLEN mystream
(integer) 1
复制代码
条目ID由XADD
命令返回,在给定的Stream中明确地标识每个条目.它由两部分组成.
<millisecondsTime>-<sequenceNumber> | 毫秒时间-序列号
毫秒时间部分是正在生成 Stream ID的Redis节点的本地时间,然而,若是当前毫秒时间刚好小于前一个条目时间,则使用前一个条目时间,所以若是时钟回拨,ID的单调递增属性仍然存在。序列号用于在相同毫秒内建立的条目。因为序列号是64位的,因此在相同的毫秒内能够生成的条目数是没有限制的。
这些ID的格式最初看起来可能很奇怪,善意的读者可能想知道为何时间是ID的一部分。缘由是Redis Stream支持根据ID进行范围查询。因为ID与生成条目的时间相关,这使得根据时间范围进行查询基本上是无消耗的.==原文中为free==。咱们即将在使用XRANGE
命令时了解到这一点,
若是因为某种缘由,用户须要与时间无关但实际上与另外一个外部系统ID关联的增量ID,如前所述,XADD
命令能够采用明确的ID而不是使用*
通配符来触发自动生成ID,就像下面的例子这样:
XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2
复制代码
请注意,在这种状况下,最小ID为0-1,而且命令将不接受等于或小于前一个ID的ID:
> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
复制代码
如今咱们终于能够经过XADD
在咱们的Stream中添加条目了。然而,将数据附加到Stream中很是明确,可是为了提取数据而查询Stream的方式并不像这样明确。若是咱们继续类比日志文件,一种显而易见的方法是模仿咱们一般使用Unix命令tail -f
作的事情,也就是说,咱们可能会开始监听以获取附加到Stream的新消息。注意,与Redis 列表的阻塞操做不一样.在列表中,对于给定的元素,BLPOP
等流行风格的操做会阻塞其到达单个客户端,而在Stream中,咱们但愿多个消费者能够看到追加到Stream的新消息,就像多个tail -f
进程能够查看添加到日志的内容那样。使用传统术语,咱们但愿Stream可以将消息扇==fan out==出到多个客户端。
可是,这只是一种潜在的访问模式。咱们还能够以彻底不一样的方式看待Stream:不是做为消息传递系统,而是做为时间序列存储。在这种状况下,获取新追加的信息也颇有用,但另外一种天然查询模式是按时间范围获取消息,或者使用游标遍历消息以逐步检查全部历史记录。这绝对是另外一种有用的访问模式。
最后,若是咱们从消费者的角度看Stream,咱们可能但愿以另外一种方式访问流,即,做为一个能够将多个消费者分隔开来处理这些消息的消息流.以便于消费者组只能看到到达流的信息的一个子集.经过这种方式,能够跨不一样的消费者进行消息处理,而不须要单个消费者处理全部消息:每一个消费者只须要处理不一样的消息。这基本上是Kafka(TM)中的消费者群体。经过消费者组阅读消息是另外一种从Redis Stream中读取的有趣模式。
Redis Stream经过不一样的命令支持上述三种查询模式。接下来的部分将展现它们,从最简单直接的使用开始:范围查询。
范围查询:XRANGE
和 XREVRANGE
.
要按范围查询Stream,咱们只须要指定两个ID,即开始和结束。返回的范围将包括开始和结束ID的元素,所以范围是包含首项与末项的。这两种特殊ID-
和+
分别意味着可能的最小和最大的ID。
> XRANGE mystream - +
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
2) 1) 1518951482479-0
2) 1) "sensor-id"
2) "9999"
3) "temperature"
4) "18.2"
复制代码
返回的每一个条目都是两个项目的数组:ID和列-值对的列表。咱们已经说过条目ID与时间有关,由于-
左边的部分是建立Stream条目的本地节点的Unix时间(以毫秒为单位)(但请注意使用彻底指定的XADD命令复制Stream,所以从属服务器将具备与主服务器相同的ID)。这意味着我可使用XRANGE
查询一个范围内的时间。可是,为了作到这一点,我可能想要省略ID的序列部分:若是省略,则将范围的最小值假设为0,最大值将被假定为最大值可用序列号。这样,仅使用两个Unix毫秒时间查询,咱们以就能够得到在该时间范围内生成的全部条目。例如,我可能想查询:
> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
复制代码
我在这个时间范围内只有一个条目,可是在实际数据集中,我能够查询小时数范围,或者在两毫秒内可能有不少项目,因此返回的结果可能很大。所以,XRANGE
最后支持可选的COUNT选项。经过指定数量,我能够仅得到前N个项目。若是我想要更多,我能够得到最后一个ID,序列号增长一,而后再次查询。让咱们在下面的例子中了解这一点,咱们开始用XADD
添加10个项目(我没有列出这个,假设 Stream mystream中已经填充了10个项目)。要开始个人遍历,每一个命令得到2个项目,我从全范围开始开始查找,但指定数量为2。
> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
复制代码
为了继续遍历接下来的两个项目,我必须拿到返回的最后一个ID,即1519073279157-0并将其序列号部分加1。注意,序列号数字为64位,所以无需检查溢出。生成的Id,1519073279157-1如今能够用做下一个XRANGE
调用的新起始参数:
> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
2) 1) "foo"
2) "value_3"
2) 1) 1519073281432-0
2) 1) "foo"
2) "value_4"
复制代码
就像上面这样。因为XRANGE
查找的时间复杂度为O(log(N)),而后使用O(M)的时间返回M个元素,因此此命令具备对数时间复杂度,这意味着遍历的每一步都很快。所以XRANGE
也是实际上的流迭代器== the de facto 不会翻译==,不须要XSCAN
命令。
命令XREVRANGE
与XRANGE
类似,只是以反转顺序返回元素,所以XREVRANGE
的实际用途是检查Stream中的最后一项是什么:
> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
2) 1) "foo"
2) "value_10"
复制代码
注意,XREVRANGE
命令以相反的顺序获取start和stop参数。
SREAD
监听新项目当咱们不想按范围范文Stream中的项目时,一般咱们想要的是订阅到达Stream的新项目。这个概念可能出如今与Redis 发布/订阅有关的地方,你订阅一个频道,或者一个Reids的阻塞列表,而后等待某个key,已得到到达的最新元素.可是这与您消费一个Stream有根本上的不一样:
Stream能够有多个客户端(消费者)等待数据。默认状况下,每一个新项目都将传递给等待指定Stream中的数据的每一个消费者。这个行为与阻止列表不一样,其中每一个消费者将得到不一样的元素。可是,扇出到多个消费者的能力相似于发布/订阅。
在发布/订阅中消息是自主引导而且永远不会存储的,在阻塞列表中,当客户端收到消息时,它会从列表中弹出(有效删除),Stream以彻底不一样的方式工做.全部消息都无限期地追加在Stream中(除非用户明确要求删除条目):不一样的消费者经过记住收到的最后一条消息的ID,来判断什么是新消息。
Streams Consumer Groups(==Stream的消费者组==)提供发布/订阅或阻塞列表没法实现的控制级别,同一Stream中的不一样组,已处理项目的明确确认,检查待处理项目的能力,未处理消息的声明以及单个客户端的连贯历史可见性,只能查看其私人的历史消息消费记录。
提供监听到达Stream的新消息的能力的命令称为XREAD
。它比XRANGE
复杂一点,因此咱们将开始展现简单的形式,稍后将提供整个命令布局。
> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
复制代码
以上是XREAD
的非阻塞形式。注意,COUNT
选项不是必需的,实际上该命令的惟一强制选项是STREAMS
选项,它指定一个键列表以及消费者已经看过指定Stream的最大ID,因此命令将仅向客户端提供ID大于咱们指定ID的消息。
在上述命令中,咱们编写了STREAMS mystream 0
,咱们但愿得到名为mystream的Stream中的全部ID大于的0-0的消息。正如您在上面的示例中所看到的,该命令返回键名,由于实际上可使用多个键调用此命令以同时从不一样的Stream中读取。我能够写,STREAMS mystream otherstream 0 0
.注意在STREAMS选项以后咱们须要提供key,以及以后的ID。所以,STREAMS
选项必须始终是最后一个。
除了XREAD
能够同时访问多个流,以及咱们可以指定咱们拥有的最后一个ID以获取更新的消息以外,在这个简单的形式中,没有作与XRANGE
不一样的一些事情。可是,有趣的部分是咱们能够经过指定BLOCK
参数轻松地在阻塞命令中使用XREAD
:
> XREAD BLOCK 0 STREAMS mystream $
复制代码
注意,在上面的示例中,除了删除COUN选项以外,我指定了新的BLOCK选项,其超时时间为0毫秒(这意味着永不超时)。并且,mystream我没有使用Stream的普通ID,而是使用了特殊ID$
。这种特殊的ID意味着XREAD
应该使用Stream mystream中已经存储的最大的ID.,因此咱们从开始监听开始,咱们将只收到新的消息。这在某种程度上相似于Unix命令tail -f
。
注意,使用BLOCK选项时,咱们没必要使用特殊ID $
。咱们可使用任何有效的ID。若是命令可以当即服务咱们的请求而不会阻塞,它将执行此操做,不然它将阻塞。一般,若是咱们想要重新条目开始消费Stream,咱们从ID$
开始,以后咱们继续使用收到的最后一条消息的ID来进行下一次调用,依此类推。
XREAD
的阻塞形式也能够经过指定多个键名来监听多个Streams。若是请求能够同步提供,由于至少有一个Stream拥有比咱们指定的ID更大的元素,则返回结果。不然,该命令将阻塞并将返回第一个获取到新数据的Stream的元素(根据指定的ID)。
与阻塞列表操做相似,从等待读取数据的客户端的角度来看,阻塞式的Stream是公正的.由于策略是FIFO。给定Stream的第一个阻塞的客户端也是第一个获取到新元素的客户端.
XREAD
没有除COUNT和BLOCK以外的其余选项,所以它是一个很是基础的命令,具备将消费者链接到一个或多个Stream的特殊功能.消费Stream的更增强大的功能是使用消费者组API。可是使用消费者组来读取信息,要使用另外一个不一样的命令,XREADGROUP
.本指南的下一部分将对此进行介绍。
当手头的任务是使用不一样客户端来消费同一个Stream时,XREAD
已经提供了扇出到N个客户端的方法,还使用从属服务器以提供更强的读取扩展性。然而,有一个明确的问题,咱们想要作的不是向许多客户端提供相同的消息Stream,而是从同一Stream向许多客户端提供不一样的消息子集。一个明显的例子就是处理消息的速度很慢:可以让N个不一样的工做人员接收流的不一样部分,经过将不一样的消息路由到能够作更多工做的(==处理能力强或者当前空闲==)不一样工做人员来扩展消息处理工做。
实际上,若是咱们想象有三个消费者C1,C2,C3,以及包含消息1,2,3,4,5,6,7的Stream,那么咱们想要的是以下图所示的消息服务:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
复制代码
为了实现这种效果,Redis使用了一个名为消费者组的概念。了解Redis消费组与Kafka(TM)消费者组的实现方法无关,这一点很是重要,仅从实现的概念来看,它们只是类似.因此我决定与最初普及这种想法的软件产品相比较,不要改变术语。
消费者组就像一个伪消费者,从Stream中获取数据,实际上为多个消费者提供服务,提供这些保证:
在某种程度上,消费者组能够被想象为关于流的一些状态:
+----------------------------------------+
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" with pending messages |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" with pending messages |
| ... (and so forth) |
+----------------------------------------+
复制代码
若是从这个角度看到这一点,就能够很是简单地理解消费者组能够作什么,如何向消费者提供他们的未决历史记录,以及如何仅处理消费者对新消息的请求,仅当消息ID大于last_delivered_id。同时,若是将消费者组视为Redis Stream的辅助数据结构,很明显单个流能够拥有多个消费者组,拥有消费者的不一样集合。实际上,同一个Stream甚至可让客户端经过XREAD
读取没有消费者组的客户端,以及客户端经过XREADGROUP
来从不一样的消费者组读取.
如今是时候详细的查看使用消费者组的基本命令,以下所示:
XGROUP
用于建立,销毁和管理消费者组。XREADGROUP
用于经过消费者组组从Stream中读取。XACK
是容许消费者将待处理消息标记为正确处理的命令。假设我已经有一个名为mystream的Stream,为了建立一个消费者组,我须要执行如下操做:
> XGROUP CREATE mystream mygroup $
OK
复制代码
注意:目前没法为不存在的Stream建立消费者组,可是在短时间内咱们可能会在XGROUP
命令中添加一个选项,以便在这种状况下建立一个空的Stream。
正如您上面的命令中看到的,在建立消费者组时,咱们必须指定一个ID,在示例中是$
。这是必需的,由于消费者组在其余状态中必须知道在链接后处理哪些消息,即刚刚建立该组时的最后消息ID是什么?若是按照咱们提供的$
,那么只有从如今开始到达Stream的新消息才会提供给该组中的消费者。若是咱们指定0
,消费者组将消费全部Stream历史中的消息记录。固然,您能够指定任何其余有效ID。您所知道的是,消费者组将开始消费ID大于您指定的ID的消息。由于$
表示Stream中当前最大的ID,因此指定$
将仅消费新消息。
如今建立了消费者组,咱们可使用XREADGROUP
命令当即开始尝试经过消费者组读取消息。咱们将从消费者那里读到,消费者名为Alice和Bob,看看系统将如何向Alice和Bob返回不一样的消息。
XREADGROUP
很是相似于XREAD
,也提供相同的BLOCK选项,不然它是一个同步指令。可是,必须始终指定一个强制选项GROUP
,它拥有两个参数:消费者组的名称以及尝试读取的消费者的名称。还支持选项COUNT
,它与XREAD
中相同。
在从Stream中读取信息以前,让咱们在里面放一些消息:
> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0
复制代码
注意: 在这里,message是列的名称,水果是值.记住,Stream的项目是一个小字典.
如今是时候尝试使用消费者组读取一些东西了.
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
复制代码
XREADGROUP
的回复就像XREAD
回复同样。可是请注意上面提供的GROUP
中的 <group-name> <consumer-name>
,它代表我想使用消费者组从mystream中读取消息而且我是消费者Alice。每次消费者使用消费者组执行操做时,它必须指定其名称,惟一地标识该组内的此使用者。
在上面的命令中还有另外一个很是重要的细节,在强制选项STREAMS
以后的,请求的ID是一个特殊ID>
。此特殊ID仅在消费者组的上下文中有效,它意味着:到目前为止,消息从未传递给其余消费者。
这几乎老是你想要的,可是也能够指定一个真实的ID,例如0
或任何其余有效的ID.可是在这个案例中,咱们要求XREADGROUP
向咱们提供未决消息的历史记录,永远不会在组中看到新消息。因此基本上XREADGROUP
基于咱们指定的ID具备如下行为:
>
,那么该命令将仅返回到目前为止从未传递给其余消费者的新消息,而且将更新消费者组的最后一个消息ID。XACK
确认过。咱们能够当即测试此行为,指定ID为0
,没有任何COUNT选项:咱们只会看到惟一的待处理消息,即关于apple的消息:
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
复制代码
然而,若是咱们确认处理过的消息,它将不会被分到未决消息历史记录中,因此系统将不会报告任何东西了.
> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) (empty list or set)
复制代码
别为你还不知道XACK
怎怎么工做而担忧,这个概念只是已处理的消息再也不是咱们能够访问的历史记录中的一部分.
如今轮到Bob读取一些信息了.
> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
2) 1) 1526569506935-0
2) 1) "message"
2) "strawberry"
复制代码
Bob要求最多两条消息,而且正在经过同一个组,mygroup阅读。那么,Redis将只报告新的消息。正如您所看到的那样,apple消息未被传递,由于它已经传递给Alice,所以Bob得到了橘子和草莓等等。
这样,Alice,Bob和该组中的任何其余消费者可以从相同的Stream中读取不一样的消息,读取他们还没有处理消息的历史,或者将消息标记为已处理。这容许建立不一样的拓扑和语义来消费Stream的消息。
有几点须要注意:
XREADGROUP
,您也能够同时读取多个键,可是要使其工做,您须要在每一个Stream中建立一个具备相同名称的消费者组。这不是常见的需求,但值得一提的是该功能在技术上可用。XREADGROUP
是一个写命令,由于即便它从Stream中读取,他的反作用也会修改消费者组,所以只能在主实例中调用它。使用Ruby语言编写的使用消费者组的消费者实现示例以下。Ruby代码的编写方式几乎可让任何有经验而不了解Ruby的程序员阅读:
require 'redis'
if ARGV.length == 0
puts "Please specify a consumer name"
exit 1
end
ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new
def process_message(id,msg)
puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end
$lastid = '0-0'
puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
# Pick the ID based on the iteration: the first time we want to
# read our pending messages, in case we crashed and are recovering.
# Once we consumer our history, we can start getting new messages.
if check_backlog
myid = $lastid
else
myid = '>'
end
items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)
if items == nil
puts "Timeout!"
next
end
# If we receive an empty reply, it means we were consuming our history
# and that the history is now empty. Let's start to consume new messages.
check_backlog = false if items[0][1].length == 0
items[0][1].each{|i|
id,fields = i
# Process the message
process_message(id,fields)
# Acknowledge the message as processed
r.xack(:my_stream_key,GroupName,id)
$lastid = id
}
end
复制代码
正如您所看到的,这里的想法是开始消费历史记录,即咱们的待处理消息列表。这颇有用由于消费者以前可能已经崩溃,因此在从新启动的状况下,咱们但愿再次读取在发送给咱们可是没有获得确认的消息。经过这种方式,咱们能够屡次或一次处理消息(在消费者失败的状况下,但Redis也有持久性和复制的限制,请参阅有关此主题的特定部分)。
消费完历史记录后,咱们会获得一个空的消息列表,咱们能够切换到使用特殊ID>
来消费新消息。
上面的示例容许咱们编写参与同一个消费者组的消费者,处理消息的每一个子集,并从故障中恢复。然而,在现实世界中,消费者可能永远失败并永远没法恢复.因为任何缘由中止且没法恢复后,消费者的待处理消息会发生什么样呢?
Redis消费者组提供了一种在这种状况下正好使用的功能,声明给定消费者的未处理消息,以便此类消息更改全部权并从新分配给其余消费者。该功能很是明确,消费者必须检查待处理消息列表,而且必须使用特殊命令声明特定消息,不然服务器将把待处理的消息永久分配给旧消费者,这样不一样的应用程序就能够选择是否使用这样的功能,以及使用它的方式。
此过程的第一步是提供消费者组中待处理条目的可观察性的命令,称为XPENDING
。这只是一个只读命令,它始终能够安全地调用,不会更改任何消息的全部权。在最简单的形式中,只使用两个参数调用该命令,这两个参数是Stream的名称和消费者者组的名称。
> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
2) "2"
复制代码
以这种方式调用时,命令只输出消费者组中的待处理消息总数,在当前案例下只有两个消息,待处理消息中的较低和较高消息ID,最后是消费者列表和他们的待处理消息数。咱们只有Bob有两个待处理的消息,由于Alice请求的惟一消息是使用XACK
确认的。
咱们能够经过给XPENDING
提供更多参数来询问更多信息,由于完整的命令签名以下:
XPENDING <key> <groupname> [<start-id> <end-id> <count> [<conusmer-name>]]
复制代码
经过提供一个开始和结束ID(也能够像在XRANGE
中同样只是-
与+
)和数量控制的命令返回的信息量,咱们可以更多地了解未处理消息。若是咱们想要将输出限制为仅针对给定消费者组的待处理消息,则使用可选的最终参数(消费者组名称),但咱们不会在下面的示例中使用此功能。
> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
2) 1) 1526569506935-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
复制代码
如今咱们有每条消息的详细信息:ID,消费者名称,以毫秒为单位的空闲时间(即自上次将消息传递给某个消费者以来通过了多少毫秒),最后是给定消息的被发送过的次数。咱们有来自Bob的两条消息,它们闲置74170458毫秒,大约20小时。
注意,没有人阻止咱们检查第一个消息内容是什么,使用XRANGE
就能够。
> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
复制代码
咱们只须要在参数中重复两次相同的ID。如今咱们已经有了一些想法,Alice可能会决定在20小时不处理消息后,Bob可能没法及时恢复,而且是时候声明这些消息并继续代替Bob处理。为此,咱们使用XCLAIM
命令。
这个命令的完整选项的形式很是复杂,由于它用于复制消费者组的更改,但咱们将只使用咱们一般须要的参数。在这种状况下,就像下面同样简单的调用他:
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
复制代码
基本上,对于这个给定的键和组,我但愿更改指定的ID的消息的全部权,并将其分配给指定的名称为<consumer>
的消费者。可是,咱们还提供了最小空闲时间,所以只有在上述消息的空闲时间大于指定的空闲时间时,操做才会起做用。这颇有用,由于可能有两个客户端正在重试同时认领一条消息:
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
复制代码
然而声称一条消息,反作用将重置其空闲时间!而且会增长其接受消息数量的计数器,所以第二个客户端将没法声明它。经过这种方式,咱们能够避免对消息进行不须要的从新处理(即便在通常状况下,您没法得到一次处理)。
这是命令执行的结果:
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
复制代码
Alice成功认领了该消息,如今能够处理消息并确认消息,而且即便原始消费者没有恢复,也能够向前移动。
从上面的示例中能够清楚地看出,做为成功认领给定消息的反作用,XCLAIM
命令也会返回它。但这不是强制性的。JUSTID选项用于返回认领成功的消息。这个选项颇有用,若是要减小客户端和服务器之间使用的带宽,以及提升命令的性能,而且您对该消息不感兴趣,由于稍后您的消费者的实现方式将从新扫描待处理的历史记录消息。
认领也能够经过一个单独的进程来实现:一个只检查待处理消息列表,并将空闲消息分配给看似活跃的消费者。可使用Redis Stream的一个可观察性功能得到活跃的消费者。这是下一节的主题。
您在XPENDING
命令的输出中观察到的计数器是每条消息的交付数量。这个计数器在两种状况下递增:当经过XCLAIM
成功认领消息时,或者当使用XREADGROUP
调用来访问未处理消息的历史时。
当出现故障时,屡次传递消息是正常的,但最终它们一般会获得处理。可是,处理给定的消息有时会出现问题,由于它会以触发处理代码中的错误的方式被破坏或制做(==感受不太OK==)。在这种状况下,会发生的是消费者将连续失败的处理此特定消息。由于咱们有交付尝试的计数器,因此咱们可使用该计数器来检测没法处理消息的缘由。所以,一旦发送计数器达到您选择的数字,将这些消息放入另外一个Stream并将发送通知给系统管理员可能更明智。这基本上是Redis流实现死掉的信息概念的方式。
缺少可观察性的消息系统很难处理。不知道谁在消费消息,哪些消息正在等待,在给定Stream中有哪些活跃的消费者组使得一切都不透明。出于这个缘由,Redis Stream和消费者组有不一样的方式来观察正在发生的事情。咱们已经介绍了XPENDING
,它容许咱们检查在给定时刻正在被处理的消息列表,以及它们的空闲时间和交付数量。
可是,咱们可能但愿作更多的事情,XINFO
命令是一个可观察性接口,能够与子命令一块儿使用,以获取有关Stream或消费者组的信息。
此命令使用子命令以显示有关Stream及其消费者组的状态的不一样信息。例如使用XINFO STREAM
报告有关Stream自己的信息。
> XINFO STREAM mystream
1) length
2) (integer) 13
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2
9) first-entry
10) 1) 1524494395530-0
2) 1) "a"
2) "1"
3) "b"
4) "2"
11) last-entry
12) 1) 1526569544280-0
2) 1) "message"
2) "banana"
复制代码
输出显示有关如何在Stream内部编码的信息,还显示Stream中的第一条和最后一条消息。另外一个可用信息是与该Stream相关联的消费者组的数量。咱们能够进一步挖掘有关消费者组的更多信息
> XINFO GROUPS mystream
1) 1) name
2) "mygroup"
3) consumers
4) (integer) 2
5) pending
6) (integer) 2
2) 1) name
2) "some-other-group"
3) consumers
4) (integer) 1
5) pending
6) (integer) 0
复制代码
正如您在此输出和上一个输出中所看到的,XINFO
命令输出一系列列-值项。由于它是一个可观察性命令,因此它容许人类用户当即了解报告的信息,并容许命令经过添加更多字段来报告更多信息,而不会破坏与旧客户端的兼容性。其余必须提升带宽效率的命令,如XPENDING
,只报告没有字段名称的信息。
上面示例的输出(使用GROUPS
子命令)应该能够清楚地观察字段名称。咱们能够经过检查在该组中注册的消费者来更详细地检查特定消费者组的状态。
> XINFO CONSUMERS mystream mygroup
1) 1) name
2) "Alice"
3) pending
4) (integer) 1
5) idle
6) (integer) 9104628
2) 1) name
2) "Bob"
3) pending
4) (integer) 1
5) idle
6) (integer) 83841983
复制代码
若是你不记得命令的语法,只须要调用命令自己的帮助:
> XINFO HELP
1) XINFO <subcommand> arg arg ... arg. Subcommands are:
2) CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.
3) GROUPS <key> -- Show the stream consumer groups.
4) STREAM <key> -- Show information about the stream.
5) HELP -- Print this help.
复制代码
Redis Stream中的消费者组可能在某种程度上相似于Kafka(TM)基于分区的消费者组,但请注意Redis Stream实际上很是不一样。分区只是逻辑分区,消息只是放在一个Redis键中,所以不一样客户端的服务方式取决于谁能够处理新消息,而不是从哪一个分区客户端读取。例如,若是消费者C3在某个时刻永久失效了,Redis将继续服务C1和C2,全部新消息会就像如今只有两个逻辑分区同样到达。
相似地,若是给定的某个消费者在处理消息方面比其余消费者快得多,则该消费者将相应地在相同的时间单位中接收更多消息。这是可能的,由于Redis明确跟踪全部未确认的消息,并记住谁收到了哪条消息以及从未传递给任何消费者的第一条消息的ID。
可是,这也意味着在Redis中,若是您确实要将有关同一Stream的消息分区为多个Redis实例,则必须使用多个键和一些分片系统(如Redis Cluster或其余特定于某些应用程序的分片系统)。单个Redis Stream不会自动分区到多个实例。
咱们能够说下面的图表是真的:
所以,基本上Kafka分区更相似于使用N个不一样的Redis 键。Redis消费者组是一个从给定Stream负载均衡到N个不一样消费者消息系统。
许多应用程序不但愿永远将数据收集到Stream中。有时在Stream中最多具备给定数量的项是有用的,有时一旦达到给定的大小,将数据从Redis移动到不在内存中且不是那么快但适合储存历史消息的存储介质是有用的。Redis Stream对此有一些支持。一个是XADD
命令的MAXLEN选项。这个选项使用起来很是简单.
> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
复制代码
使用MAXLEN在达到指定长度时,将自动逐出旧条目,以便Stream有一个恒定的大小。目前没有选项能够告诉Stream只保留不超过给定数量的项目,由于为了一致地运行,这样的命令必须在很长一段时间内阻塞以驱逐项目。想象一下,例如,若是存在插入尖峰,而后是长暂停,以及另外一次插入,则他们都具备相同的最大时间。Stream将阻塞以驱逐暂停期间变得太旧的数据。所以,用户须要进行一些规划并了解所需的最大Stream长度。此外,虽然Stream的长度与使用的内存成比例,可是按时间修剪不太容易控制和预测:它取决于插入速率,这是一个常常随时间变化的变量(当他没有变化是,那么只是按照大小进行调整是微不足道的).
然而,使用MAXLEN
进行修整是花销很大的:Stream由宏节点表示为基数树,以便很是节省内存。改变由几十个元素组成的单个宏节点不是最佳的。所以可使用如下特殊形式提供命令:
XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
复制代码
在MAXLEN选项个实际技术之间的~
参数意味着:我并不真的须要这刚好1000个项目,它能够是1000或1010或1030,只需确保至少保存1000个项目。使用此参数,仅在咱们能够删除整个节点时执行修剪。这使它更有效率,一般是你想要的。
还有可用的XTRIM
命令,它执行与上面的MAXLEN
选项很是类似的操做,可是此命令不须要添加任何内容,能够以独立方式对任何Stream运行。
> XTRIM mystream MAXLEN 10
复制代码
或者,使用XADD
:
> XTRIM mystream MAXLEN ~ 10
复制代码
可是,即便目前只实现了MAXLEN
,XTRIM
被设计为能够接受不一样的修剪策略。鉴于这是一个明确的命令,未来有可能容许指定时间修剪,由于用户以独立的方式调用此命令时应该知道她或他在作什么。
XTRIM
应该具有的一个有用的驱逐策略多是经过一个ID范围删除的能力。目前这是不可能的,但未来可能会实施,以便更轻松地将XRANGE
和XTRIM
一块儿用于将数据从Redis移动到其余存储系统(若是须要)。
您可能已经注意到Redis API中可使用多个特殊ID。这是一个简短的回顾,以便他未来能更加有意义.
前两个特殊ID是-
和+
,在XRANGE
命令的范围查询中使用。这两个ID分别表示可能的最小ID(基本上是0-1)和可能的最大ID(即18446744073709551615-18446744073709551615)。正如你所看到的那样,-
和+
写起来更清晰,而不是那些数字。
而后是咱们想要说的API,即Stream中具备最大ID的项的ID。这就是`,仅使用该群组向消费者提供新的内容。
正如您所看到的$
并不意味着+
,它们是两个不一样的东西,+
是在每一个可能的Stream中可能的最大的ID ,而$
是在给定Stream中已经包含的最大ID。另外的API一般只认识+
或$
,由于它颇有用,能够避免以多个含义加载一个给定的符号。
另外一个特殊ID是>
,仅在消费者组的上下文中且仅使用XREADGROUP
命令时才具备特殊含义。这种特殊ID意味着咱们只想要到目前为止从未提供给其余消费者的条目。因此基本上>
是消费者组的最后交付ID。
最后是特殊ID*
,只能与XADD
命令一块儿使用,意味着为咱们要建立的新条目自动选择ID。
所以,咱们有-
,+
,$
,>
和*
,他们拥有不一样的含义,大多数时候,只能在不一样的环境中使用。
与其余Redis数据结构同样,Stream被异步复制到从属并持久存储到AOF和RDB文件中。然而,可能不那么明显的是,消费者组的完整状态也传播到AOF,RDB和从属中,所以若是主服务器中的消息未处理,则从服务器也将具备相同的信息。一样,重启后,AOF将恢复消费者者组的状态。
可是请注意,Redis Stream和消费者组使用Redis默认复制进行持久化和复制,所以:
XADD
命令形成的消费者组状态更改:在故障转移以后,可能会丢失某些内容,具体取决于从服务器从主服务器接收数据的能力。WAIT
命令能够强行让这些变化传播至一系列丛书服务器上。但请注意,虽然这使得数据不太可能丢失,但由Sentinel或Redis Cluster操做的Redis故障转移过程仅执行尽力检查以转移到最新的从站,而且在某些特定故障下可能会使从站丢失一些数据。所以,在使用Redis Stream和消费者者组设计应用程序时,请确保了解应用程序在故障期间应具备的语义属性,并相应地配置,评估它是否足够安全用于您的案例。
Streams还有一个特殊命令,能够经过ID从流中间删除项目。一般,对于仅附加数据结构,这可能看起来像一个奇怪的特征,但它实际上对涉及例如隐私法规的应用程序有用。该命令名为XDE
L,只须要获取Stream的名称,以及要删除的ID:
> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
2) 1) "value"
2) "3"
复制代码
可是在当前实现中,在宏节点彻底为空以前,内存不会被回收,所以您不该滥用此功能。
流和其余Redis数据结构的一个区别在于,当其余数据结构再也不具备元素时,删除元素的命令也会将键自己删除。例如,当对ZREM
的调用将删除有序集合中的最后一个元素时,将彻底删除有序集合。Stream容许保留零元素,当使用MAXLEN
选项且数量为为零(XADD
和XTRIM
命令),或者由于调用了XDEL
.
存在这种不对称的缘由是由于Streams可能具备关联的消费者组,而且咱们不但愿由于Stream中没有元素就丢失消费者组定义的状态.目前,即便没有关联的消费者组,也不会删除该Stream,但这可能在未来发生变化。
没有BLOCK选项的非阻塞Stream命令(如XRANGE
和XREAD
或XREADGROUP
)与任何其余Redis命令同样是同步提供服务,所以讨论此类命令的延迟是没有意义的:更有趣的是检查Redis文档中命令的时间复杂度。能够说,在提取范围时,Stream的XADD
命令很是快,而且若是使用流水线操做,则能够在普通机器中轻松地每秒插入50万到100万个项目。
然而,若是咱们想要理解处理消息的延迟,在阻塞消费者组中的消费者的上下文中,从经过XADD
生成消息的那一刻起,到消费者得到消息的那一刻,延迟就变成了一个有趣的参数。由于XREADGROUP
返回这些信息。
在提供执行测试的结果以前,有必要了解Redis使用什么模型来路由Stream消息(其实是如何管理等待数据的任何阻塞操做)。
阻塞的客户端在哈希表中被引用,该哈希表将至少有一个阻塞消费者的键映射到等待这个键的消费者列表。这样,给定一个接收数据的key,咱们就能够解析全部等待这些数据的客户端。
当发生写入时,在这种状况下,当调用XADD命
令时,它会调用signalKeyAsReady()
函数。这个函数会将键放入须要处理的键列表中,由于这些键可能会为阻止的消费者提供新数据。请注意,稍后将处理此类就绪键,所以在相同的事件循环周期中,键可能会接收其余写入。
最后,在事件循环结束以前,处理就绪键。对于每一个键,运行等待数据的客户端列表,若是适用,这些客户端将接收到达的新数据。在Stream中,数据是消费者请求的适用范围内的消息。
正如您所看到的,基本上,在返回事件循环以前,全部调用XADD
的客户端阻塞地等待消费消息,所以XADD
的调用者应该同时收到Redis的回复,消费者将收到新的消息。
此模型基于推送,将数据添加到使用者缓冲区将直接经过调用XADD
的操做执行,所以延迟每每是可预测的。
为了检查这种延迟特性,咱们使用多个Ruby程序实例进行测试,推送电脑时间做为附加消息的做信息,Ruby程序读取消费者组的消息并处理它们。消息处理步骤包括将当前计算机时间与消息时间戳进行比较,以便理解总延迟。
此类程序未通过优化,而且运行在小型两核的Redis实例中,以便尝试提供在非最佳条件下可能出现的延迟数字。消息以每秒10k的速率生成,同时有10个消费者消费并确认来自同一Redis Stream和消费者组的消息。
得到的结果:
Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%
复制代码
99.9%的请求的延迟小于等于2毫秒,异常值仍然很是接近平均值。
在Stream中添加数百万条未确认的消息不会改变基准测试的要点,大多数查询仍然以很是短的延迟进行处理。
几条评论:
XREADGROUP
的count参数设置为10000.这增长了不少延迟,可是为了让慢速消费者可以与消息流保持一致,这是必需的。因此你能够期待一个更小的真实世界延迟。完。
以上皆为我的所思所得,若有错误欢迎评论区指正。
欢迎转载,烦请署名并保留原文连接。
联系邮箱:huyanshi2580@gmail.com
更多学习笔记见我的博客------>呼延十