Stream
是Redis 5.0
引入的一种新数据类型,容许消费者等待生产者发送的新数据,数组
每一个Stream都有惟一的名称,它就是Redis的key
,在咱们首次使用xadd
指令追加消息时自动建立。bash
每一个Stream均可以挂多个消费组,每一个消费组会有个游标last_delivered_id
在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。每一个消费组都有一个Stream内惟一的名称,消费组不会自动建立,它须要单独的指令xgroup create
进行建立,须要指定从Stream的某个消息ID开始消费,这个ID用来初始化last_delivered_id
变量。服务器
每一个消费组(Consumer Group)的状态都是独立的,相互不受影响。也就是说同一份Stream内部的消息会被每一个消费组都消费到。学习
同一个消费组(Consumer Group)能够挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id
往前移动。每一个消费者者有一个组内惟一名称。ui
消费者(Consumer)内部会有个状态变量pending_ids
,它记录了当前已经被客户端读取的消息,可是尚未ack
。若是客户端没有ack,这个变量里面的消息ID会愈来愈多,一旦某个消息被ack
,它就开始减小。这个pending_ids
变量在Redis官方被称之为PEL
,也就是Pending Entries List
,它用来确保客户端至少消费了消息一次spa
1,xadd
追加消息code
2,xdel
删除消息cdn
3,xrange
获取消息列表,会自动过滤已经删除的消息blog
4,xlen
消息长度string
5,del
删除Stream
# xadd key ID field string [field string ...] *号表示服务器自动生成ID
127.0.0.1:6379> xadd mystream * name zfh age 25
"1578845024424-0"
127.0.0.1:6379> xadd mystream * name tom age 10
"1578845052908-0"
127.0.0.1:6379> xadd mystream * name jerry age 9
"1578845085696-0"
127.0.0.1:6379>
复制代码
# xlen key 消息长度
127.0.0.1:6379> xlen mystream
(integer) 3
# 消息列表 -表示最小值, +表示最大值
# xrange key start end [COUNT count]
127.0.0.1:6379> xrange mystream - +
1) 1) "1578845024424-0"
2) 1) "name"
2) "zfh"
3) "age"
4) "25"
2) 1) "1578845052908-0"
2) 1) "name"
2) "tom"
3) "age"
4) "10"
3) 1) "1578845085696-0"
2) 1) "name"
2) "jerry"
3) "age"
4) "9"
127.0.0.1:6379> xrange mystream 1578845052908-0 + # 指定最小消息ID的列表
1) 1) "1578845052908-0"
2) 1) "name"
2) "tom"
3) "age"
4) "10"
2) 1) "1578845085696-0"
2) 1) "name"
2) "jerry"
3) "age"
4) "9"
127.0.0.1:6379> xrange mystream - 1578845052908-0 # 指定最大消息ID的列表
1) 1) "1578845024424-0"
2) 1) "name"
2) "zfh"
3) "age"
4) "25"
2) 1) "1578845052908-0"
2) 1) "name"
2) "tom"
3) "age"
4) "10"
复制代码
# xdel key ID [ID ...]
127.0.0.1:6379> xdel mystream 1578845052908-0
(integer) 1
127.0.0.1:6379> xlen mystream
(integer) 2
127.0.0.1:6379> xrange mystream - +
1) 1) "1578845024424-0"
2) 1) "name"
2) "zfh"
3) "age"
4) "25"
2) 1) "1578845085696-0"
2) 1) "name"
2) "jerry"
3) "age"
4) "9"
127.0.0.1:6379> del mystream # 删除整个Stream
(integer) 1
127.0.0.1:6379> xrange mystream - +
(empty list or set)
复制代码
消息ID的形式是timestampInMillis-sequence
,例如1578845052908-0
,它表示当前的消息在毫米时间戳1578845052908
时产生,而且是该毫秒内产生的第0
条消息。消息ID能够由服务器自动生成,也能够由客户端本身指定,可是形式必须是整数-整数,并且必须是后面加入的消息的ID要大于前面的消息ID。
未完待续~下一篇学习消息的用法