Skip to content

Stream类型相关命令

Redis Stream 主要用于消息队列(MQ,Message Queue)

1. Redis的消息队列历史

1.1 Redis5.0之前

  1. 方案1-list类型实现
    List实现方式其实就是点对点的模式,按照插入顺序排序,你可以添加一个元素到列表的头部(左边)或者尾部(右边)。
    所以常用来做异步队列使用,将需要延后处理的任务结构体序列化成字符串塞进Redis的列表,另一个线程从这个列表中轮询数据进行处理。
    Alt text 缺点是:不能重复消费消息,因为之前的消息已经弹出列表。
  2. 方案2-list类型实现
    Alt text Redis 发布订阅 (pub/sub) 有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。

1.2 Redis5.0之后

Redis新增了数据类型:Stream,它支持消息的持久化、支持自动生成全局唯-ID、支持ack确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠。

2. Stream底层结构

Stream结构是一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容,如下图: streammq 图中名词概念说明:

名词含义
Message Content消息内容
Consumer group消费组,通过XGROUP CREATE命令创建,同一个消费组可以有多个消费者
Last_delivered_id游标,每个消费组会有个游标last_delivered_id,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。
Consumer消费者,消费组中的消费者
Pending_ids消费者会有一个状态变量,用于记录被当前消费已读取但未ack的消息Id,如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack它就开始减少。这个pending_ids变量在Redis官方被称之为PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理

3. 消息队列相关命令

3.1 XADD命令

语法: XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
用于向消息队列中添加消息,其中key指的是消息队列名称,如果指定的消息队列不存在,默认会新建一个消息队列。一条消息包含消息ID和属性-属性值(field-value)组,图中的Message Content指的就是属性值(field-value)组,消息ID必须在当前消息队列中是唯一的, 消息ID由两部分组成:毫秒时间戳和序列号, 序列号是64位长度,序列化在相同时间戳上会从0开始自增,理论上序列号在极端条件下精度足够使用。如需手动设置消息ID, 格式为:毫秒数-序列号, 可以不指定序列化,格式为<ms>-*, 其中<ms>指的是毫秒数。

  1. 队列是否自动创建参数说明:
  • NOMKSTREAM: 指定的消息队列不存在, 不会自动创建队列。
  1. 队列长度参数说明:
  • MAXLEN: 指定消息队列的长度的最大值,超过最大长度会将队列头部的消息弹出。
  • MINID: 限制消息队列中消息ID的最小值。
  • =: 配合MAXLEN使用,表示队列长度等于threshold,配合MINID使用表示消息ID等于threshold
  • ~: 配合MAXLEN使用, 表示队列长度可能实际需要比threshold大一点,默认大一点是100*当前队列长度。配合MINID使用表示弹出比消费IDthreshold小的消息。
  • threshold: 配合MAXLEN表示消息队列的长度,配合 MINID则表示消息ID。
  • LIMIT: 表示限制消息队列溢出时中弹出的消息个数,若设置count为0,则表示禁止弹出消息。
  1. 消息ID的生成策略:
  • id: 手动设置消息ID, 消息ID的格式为毫秒时间戳-序列号
  • *: 表示由Redis自动生成消息ID。
sh
192.168.101.105:6379> xadd order_msg maxlen = 2 * id 1 name iphone16 price 12000
"1709912020272-0"
192.168.101.105:6379> xadd order_msg maxlen = 2 * id 3 name notebook price 8999
"1709912078600-0"
192.168.101.105:6379> xadd order_msg maxlen = 2 * id 4 name car price 23999
"1709912116371-0"
192.168.101.105:6379> xlen order_msg
(integer) 2
192.168.101.105:6379> type order_msg
stream

3.2 XRANGE命令

语法: XRANGE key start end [COUNT count]
返回消息队列中满足给定消息ID范围的消息。范围由start(最小ID)和end(最大ID)指定。消息队列中所有ID在指定的startend之间或与其中一个消息ID相等(闭合区间)的消息将会被返回。特殊的消息ID: -+分别表示消息队列中的最小消息ID和最大消息ID,startend也可以用消息ID中的毫秒时间戳表示, XRANGE执行中会自动将startend补全加上-0, 可以用来在指定时间内获取条目。加上COUNT选项,表示返回指定数量的消息。

sh
## 获取单个消息
127.0.0.1:6379> xrange data_msg 1709954018692-0 1709954018692-0
1709954018692-0
f2
v6
## 迭代获取消息队列消息 
## 步骤1. 获取前两条
127.0.0.1:6379> xrange data_msg - + count 2
1709948247495-0
f2
v8
1709948259950-0
f2
v8
## 步骤2. 将已经获取的消息最后一条ID:1709948259950-0加1作为start
127.0.0.1:6379> xrange data_msg 1709948259950-1  + count 2
1709949201528-0
f2
v8
1709949202031-0
f2
v8
## 获取指定时间范围的消息
127.0.0.1:6379> xrange data_msg 1709948259950  1709948269950
1709948259950-0
f2
v8

3.3 XREVRANGE命令

语法: XREVRANGE key end start [COUNT count]
XRANGE相似,区别在于获取消息列表元素的方向是相反的,end在前,start在后。

sh
192.168.101.105:6379> XREVRANGE data_msg + - count 2
1) 1) "1709954018692-0"
   2) 1) "f2"
      2) "v6"
2) 1) "1709949203557-0"
   2) 1) "f2"
      2) "v8"
## 获取最新的消息
192.168.101.105:6379> XREVRANGE data_msg + - count 1
1) 1) "1709954018692-0"
   2) 1) "f2"
      2) "v6"

3.4 XDEL命令

语法: XDEL key id [id ...]
用于删除指定消息ID的消息,返回删除消息的条数。

sh
## 其中消息ID为111111111111111-001不存在
192.168.101.105:6379> xdel it_test 232434324234-0 232434324234-1 111111111111111-001
(integer) 2

3.5 XLEN命令

语法: XLEN key
返回指定消息队列的消息数目,如果消息队列不存在或者消息队列为空,则返回0。判定消息队列是否存在可以使用TYPEEXISTS命令。

sh
192.168.101.105:6379> xlen it_test
(integer) 61
192.168.101.105:6379> xlen it_test_not_exist
(integer) 0

3.6 XTRIM命令

语法: XTRIM key <MAXLEN | MINID> [= | ~] threshold [LIMIT count]
通过弹出老旧消息,对消息队列进行修剪限制长度。截取长度有两种策略:

  • MAXLEN: 弹出旧消息,使得消息队列长度在threshold范围内,其中threshold为整数。
  • MINID: 弹出比指定消息IDthreshold还小的消息, 其中threshold为消息ID。
    其他参数说明:
  • =: 配合MAXLEN使用,表示队列长度等于threshold,配合MINID使用表示消息ID等于threshold
  • ~: 配合MAXLEN使用, 表示队列长度可能实际需要比threshold大一点,默认大一点是100*当前队列长度。配合MINID使用表示弹出比消费IDthreshold小的消息。
  • threshold: 配合MAXLEN表示消息队列的长度,配合 MINID则表示消息ID。
  • LIMIT count: 指定弹出消息的条数count,需要配合~使用。
sh
192.168.101.105:6379> xlen it_test
(integer) 61
192.168.101.105:6379> xtrim it_test maxlen = 10
(integer) 51
192.168.101.105:6379> xlen it_test
(integer) 10
192.168.101.105:6379> xtrim it_test minid 232434324234-58
(integer) 5
192.168.101.105:6379> xlen it_test
(integer) 5

3.7 XREAD命令

语法: XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
从一个或者多个消息队列中读取消息,返回比指定消息ID大的消息。$代表特殊ID,表示以当前消息队列已经存储的最大的ID作为最后一个ID,当前消息队列中不存在大于当前最大ID的消息,因此此时返回nil, 0-0代表从最小的ID开始获取消息队列中的消息,当不指定count,将会返回Stream中的所有消息,注意也可以使用0(00/000也都是可以的……)。

  • BLOCK milliseconds: 表示以阻塞的方式读取消息,milliseconds为阻塞毫秒数。如果milliseconds设置为0,表示永远阻塞。
  • COUNT count: 指定每一个消息队列返回消息数量。
sh
## 得到下一条消息, 此时阻塞等待生产者发出消息
192.168.101.105:6379> xread count 1 block 0 streams it_test $ 

## 新开一个redis客户端,利用xadd添加消息
127.0.0.1:6379> xadd it_test * name jack
1709983273356-0
## 回到第一个客户端
192.168.101.105:6379> xread count 1 block 0 streams it_test $
1) 1) "it_test"
   2) 1) 1) "1709983273356-0"
         2) 1) "name"
            2) "jack"
(390.41s)
192.168.101.105:6379> xread count 2 block 0 streams it_test data_msg 0-0 0-0
1) 1) "it_test"
   2) 1) 1) "232434324234-58"
         2) 1) "f1"
            2) "v2"
      2) 1) "232434324234-59"
         2) 1) "f1"
            2) "v2"
2) 1) "data_msg"
   2) 1) 1) "1709948247495-0"
         2) 1) "f2"
            2) "v8"
      2) 1) "1709948259950-0"
         2) 1) "f2"
            2) "v8"

4. 消费者组相关命令

4.1 XGROUP CREATE命令

语法:XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
用于创建消费组。对应同一个消息队列,消费组名是唯一的, 若指定的消息队列不存在默认会直接返回报错。 参数说明:

  • key: 消息队列名称。
  • group: 消费者组名称。
  • id: 表示从指定的消息ID开始消费消息,特殊的,0表示从头开始消费消息。
  • $: 表示从消息尾部开始消费消息,也就是只消费最新消息。
  • MKSTREAM: 表示消息队列不存在自动创建。
  • ENTRIESREAD entries-read: 启用消费者组的滞后跟踪,其中entries-read表示已经被消费者读取条目数。
sh
192.168.101.105:6379> XGROUP CREATE mystream_not_exist mygroup 0
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
192.168.101.105:6379> XGROUP CREATE mystream_not_exist mygroup $ MKSTREAM
OK
192.168.101.105:6379> XGROUP CREATE mystream mygroup 0
OK
192.168.101.105:6379> XGROUP CREATE mystream mygroup 0
(error) BUSYGROUP Consumer Group name already exists

4.2 XREADGROUP命令

语法:XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
用来支持消费组消费消息,和XREAD命令的功能类似,使用消费者组来消费消息的目的是需要多个消费者进行负载均衡地分担消费同一个消息队列消息,如果你需要所有消费者各自消费所有的消息队列消息,可以直接使用XREAD命令。所以建议使用XREADGROUP命令加上COUNT参数,若不带上COUNT参数默认消费者消费所有消息,后果是其他消费者在同一个消息队列无法消费消息。消费过后的消息默认会进入Pending Entries List(等待确认队列)中。 参数说明:

  • group: 消费者组名称。
  • consumer: 消费者组中的消费者名称。
  • COUNT count: 指定消费者读取多少消息。
  • BLOCK milliseconds: 表示以阻塞的方式读取消息,milliseconds为阻塞毫秒数。如果milliseconds设置为0,表示永远阻塞。
  • NOACK: 消息被消费后消息不进入等待确认队列,已消费消息从消息队列中移除,用于消费消息非常可靠或者能够容忍丢失消息的场景,很少用。
  • id: 使用>表示从第一条尚未被消费的消息开始读取,若使用消息ID(比如某个具体的消息ID,比如0表示从头开始,比如毫秒时间戳),此时只会返回进入等待确认队列中比id大的消息,在使用消息ID的情形,命令将忽略BLOCKNOACK参数。
sh
## 将mystream队列中消息分给consumer1,consumer2,consumer3, consumer4消费
192.168.101.105:6379> xreadgroup group mygroup consumer1 count 2 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1709952513951-0"
         2) 1) "id"
            2) "12345"
            3) "LIMIT"
            4) "MIN"
      2) 1) "1709992605237-0"
         2) 1) "f1"
            2) "v1"
192.168.101.105:6379> xreadgroup group mygroup consumer2 count 2 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1709992616419-0"
         2) 1) "f2"
            2) "v2"
      2) 1) "1709992621238-0"
         2) 1) "f3"
            2) "v3"
192.168.101.105:6379> xreadgroup group mygroup consumer3 count 2 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1709992625070-0"
         2) 1) "f4"
            2) "v4"
      2) 1) "1709992628314-0"
         2) 1) "f5"
            2) "v5"
192.168.101.105:6379> xreadgroup group mygroup consumer4 count 2 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1709992631921-0"
         2) 1) "f6"
            2) "v6"
## 查看等待队列中待确认的消息            
127.0.0.1:6379> xpending mystream mygroup
7
1709952513951-0
1709992631921-0
consumer1
2
consumer2
2
consumer3
2
consumer4
1

4.3 XGROUP CREATECONSUMER命令

语法:XGROUP CREATECONSUMER key group consumer
创建消费者组中的消费者,XREADGROUP可以自动创建消费者,但是它不能在消息队列没有消息的情形下使用,这时需要使用XGROUP CREATECONSUMER命令。

sh
127.0.0.1:6379> xgroup createconsumer mystream mygroup comsumer999
1

4.4 XGROUP DELCONSUMER命令

语法:XGROUP DELCONSUMER key group consumer
用于删除指定消费者组中的消费者。当老旧的消费者不再使用的常见有用。

警告

消费者若拥有的任何挂起消息在删除后将变得不可转移。因此,强烈建议在从组中删除消费者之前声明或确认任何挂起的消息。

sh
127.0.0.1:6379> xgroup delconsumer mystream mygroup comsumer999
0

4.5 XGROUP DESTROY命令

语法:XGROUP DESTROY key group
用于删除消费者组,无视组里面是否还有消费者、待确认的消息,命令执行直接删除,需要谨慎使用。

sh
127.0.0.1:6379> xgroup create  mystream mm_test 0
OK
127.0.0.1:6379> xgroup destroy mystream mm_test
1
127.0.0.1:6379> xgroup destroy mystream mm_test
0

4.6 XGROUP SETID命令

语法:XGROUP SETID key group <id | $> [ENTRIESREAD entries-read]
用来设置消费者组起始消费消息的消息ID, 避免删除消费者组再使用XGROUP CREATE命令重新创建消费者组,ENTRIESREAD表示启用消费者组的滞后跟踪,其中entries-read表示已经被消费者读取条目数。

sh
## 设置消费者组重新开始消费
127.0.0.1:6379> xgroup setid mystream mygroup 0

4.4 XACK命令

语法:XACK key group id [id ...]
向消息队列确认消息处理已完成,其中指定的一个或者多个消息将被从等待确认队列中移除,等待确认队列的消息来自于XREADGROUP命令或者XCLAIM命令执行,一旦消息被消费者成功消费,就应该调用ACK命令说明消费不需要重复消费,从而释放占用的Redis内存资源。

sh
## 消息不存在,返回0
127.0.0.1:6379> XACK mystream mygroup 1526569495631-0
0
127.0.0.1:6379> XACK mystream mygroup 1709952513951-0
1
## 可见等待确认队列中只剩一条消息
127.0.0.1:6379> xpending mystream mygroup - + 10 consumer1
1709992605237-0
consumer1
1215224
1

4.5 XPENDING命令

语法:XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
查询每个消费组内所有消费者已读取、但尚未确认的消息。这些消息都存放在等待确认队列中,一旦这些消息被XACK命令执行,会立即从等待确认队列中移除并被标记消息的归属。XPENDING命令常和XCLAIM命令使用处理一直失败的消息,将此消息转交给其他消费者处理。 参数说明,用来起过滤作用:

  • IDLE min-idle-time: 消息的没有被消费的累计时间
  • start: 起始消息ID
  • count: 返回的消息条数
  • end: 结束消息ID
  • consumer: 指定的消费者
  1. 默认返回值说明:
  • 第一行: 等待确认队列中相关消息的条数
  • 第二行~第三行: 最小和最大消息ID
  • 第四行之后: 遍历消费者名称和已经处理但未确认的消息条数
  1. 配合startendcount可以返回具体处理消息的细节信息:
  • 第一行: 消息的ID
  • 第二行: 消费者名称
  • 第三行: 自上次将此消息传递给此消费者以来所经过的毫秒数
  • 第四行:此消息被传递的次数
sh
127.0.0.1:6379> xpending mystream mygroup
7
1709952513951-0
1709992631921-0
consumer1
2
consumer2
2
consumer3
2
consumer4
1    
127.0.0.1:6379> xpending mystream mygroup - + 10 consumer1
1709992605237-0
consumer1
1215224
1

5. XINFO相关命令

5.1 XINFO STREAM命令

语法:XINFO STREAM key [FULL [COUNT count]]
打印消息队列相关信息,FULL参数表示额外打印消息信息和消费者组以及里面的消费者的信息,COUNT参数指定打印消息的条数和相关等待确认队列的消息条数,默认是10条,设置0表示打印所有消息。

  1. 默认信息包含如下内容:
  • length: 消息队列的长度
  • radix-tree-keys: 底层radix数据结构中键的数量
  • radix-tree-nodes: 底层radix数据结构中的节点数
  • groups: 为消息队列定义的消费者组的数量
  • last-generated-id: 被添加到流中的最近的消息ID
  • max-deleted-entry-id: 从流中删除的最大消息ID
  • entries-added: 在消息队列的累计加入消息的数量。
  • first-entry: 消息队列中最早一个消息的ID和字段值
  • last-entry: 消息队列中最近一个消息的ID和字段值
  1. 倘若加上FULL参数,则除了以上默认信息还额外遍历打印消费者组以下内容:
  • name: 消费者组的名称
  • last-delivered-id: 最近消费的消息ID
  • entries-read: 消费者消费的最后一个消息的逻辑计数器的值
  • lag: 消息队列中未被消费的消息的条数,当无法确定时为NULL
  • pel-count: 在等待确认队列中和消费者组相关的消息条数
  • pending: 已经处理但尚未确认的消息数量
  • consumers: 一个数组,包含消费者组中每个消费者的信息
sh
127.0.0.1:6379> xinfo stream mystream
length
7
radix-tree-keys
1
radix-tree-nodes
2
last-generated-id
1709992631921-0
max-deleted-entry-id
0-0
entries-added
7
recorded-first-entry-id
1709952513951-0
groups
1
first-entry
1709952513951-0
id
12345
LIMIT
MIN
last-entry
1709992631921-0
f6
v6

5.2 XINFO GROUPS命令

语法:XINFO GROUPS key
打印指定消息队列下面的消费者组信息。 默认遍历打印消费者组以下内容:

  • name: 消费者组的名称
  • consumers: 该组中消费者的数量
  • pending: 该组的等待确认队列(PEL)的长度
  • last-delivered-id: 该组消费者的最后消费的一个消息ID,
  • entries-read: 最后一个被传递给群组消费者的消息的逻辑"读取计数器"的值
  • lag: 消息队列中未被消费的消息的条数,当无法确定时为NULL
sh
127.0.0.1:6379> xinfo groups mystream
name
mygroup
consumers
4
pending
6
last-delivered-id
1709992631921-0
entries-read
7
lag
0

5.3 XINFO CONSUMERS命令

语法:XINFO CONSUMERS key group
打印指定消息队列中指定分组的消费者信息。

  • name: 消费者名称
  • pending: 已经处理但尚未确认的消息数量
  • idle: 消费者最后一次尝试互动后的毫秒数(例如:XREADGROUPXCLAIMXAUTOCLAIM)
  • inactive: 从消费者最后一次成功的互动以来已经过去的毫秒数(例如: XREADGROUP将消息实际传递到PEL,XCLAIM/XAUTOCLAIM实际索取了消息)
sh
127.0.0.1:6379> xinfo consumers mystream mygroup
name
consumer1
pending
1
idle
8425863
inactive
8425863
name
consumer2
pending
2
idle
8419127
inactive
8419127
name
consumer3
pending
2
idle
6945162
inactive
8411196
name
consumer4
pending
1
idle
8402963
inactive
8402963