Redis Stream 消息队列基础命令笔记

Published: 2023-09-28

Tags: MQ Redis

本文总阅读量

发送消息

使用 XADD 命令向队列发送消息

127.0.0.1:6379> XADD mystream * user david msg Hello
"1695864755027-0"
  • mystream 为消息队列名称,XADD 命令发送消息时没有自动创建队列
  • 此处的 * 表示由 Redis 自动生成消息 ID(推荐用法),也可手动指定

Redis 6.2 版本支持 NOMKSTREAM 参数,无消息队列时不自动创建

接收消息

使用 XREAD 命令从消息队列中读取消息

127.0.0.1:6379> XREAD streams mystream 0
1) 1) "mystream"
   2) 1) 1) "1695864755027-0"
         2) 1) "user"
            2) "david"
            3) "msg"
            4) "Hello"
  • 此处的 0 值代表从队列中第一条消息开始获取,同时也可以设置为消息ID,表示消息读取的起点

获取指定数量的消息

127.0.0.1:6379> XREAD COUNT 1 streams mystream 0
1) 1) "mystream"
   2) 1) 1) "1695864755027-0"
         2) 1) "user"
            2) "david"
            3) "msg"
            4) "Hello"

读取模式也分为组合和非阻塞

127.0.0.1:6379> XREAD BLOCK 1000 streams mystream $
(nil)
(1.08s)

设置 BLOCK 参数后,XREAD 变为阻塞模式(默认是非阻塞模式),当处于阻塞模式,队列中有消息则立即返回,否则阻塞等待消息

在阻塞模式下,$ 可以用来表示最新的消息 ID,在非阻塞模式下,$ 没有任何意义

消费者组原理

当多个消费者同时消费一个消息队列时,它们可以重复消费同一条消息,换句话说,消息队列中有十条消息,三个消费者都可以读取到这十条消息。

有时,我们需要多个消费者合作消费同一个消息队列,即消息队列中有十条消息,三个消费者分别消费其中的一部分。

例如,消费者 A 消费信息 1,2,5,8,消费者 B 消费信息 4,9,10,消费者 C 消费信息 3,6,7,三个消费者合作完成消息消费,当消费能力不足时,可以采用这种消费者组模式。

准备一些测试数据

127.0.0.1:6379> MULTI
127.0.0.1:6379> XADD mystream * msg 1
127.0.0.1:6379> XADD mystream * msg 2
127.0.0.1:6379> XADD mystream * msg 3
127.0.0.1:6379> XADD mystream * msg 4
127.0.0.1:6379> XADD mystream * msg 5
127.0.0.1:6379> EXEC
1) "1695865694880-0"
2) "1695865694880-1"
3) "1695865694880-2"
4) "1695865694880-3"
5) "1695865694880-4"

注:使用普通 XREAD 读取 Stream 中的消息不会使其进入 Pending 状态,稍后提到的 Pending 状态和 ACK 只适用于消费者组中的消息,消息被 XREAD 读取后还在 Stream 中,所以多个消费者都可以读取。

创建消费者组

创建消费者组

127.0.0.1:6379> XGROUP CREATE mystream mqGroup 0
OK
  • 这里的 0 根 XREAD 中的 0 语义一致,均表示从队列起始处获取消息

创建消费者组时自动创建消息队列

127.0.0.1:6379> XGROUP CREATE mystream2 mqGroup2 0 MKSTREAM
OK

查询当前消息队列下所有的消费者组

127.0.0.1:6379> XINFO GROUPS mystream
1) 1) "name"
   2) "mqGroup"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"

查询组内消费者

127.0.0.1:6379> XINFO CONSUMERS mystream mqGroup
(empty array)

暂时还未消费,所以本例中返回空数组。

消费者组获取消息

获取消息

127.0.0.1:6379> XREADGROUP group mqGroup consuerA count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1695864755027-0"
         2) 1) "user"
            2) "david"
            3) "msg"
            4) "Hello"

127.0.0.1:6379> XREADGROUP group mqGroup consuerA count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1695865694880-0"
         2) 1) "msg"
            2) "1"

127.0.0.1:6379> XREADGROUP group mqGroup consuerB count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1695865694880-1"
         2) 1) "msg"
            2) "2"

本例借助 XREADGROUP 命令从队列获取消息,组内的消费者无需提前创建,可直接使用,参数 > 表示组中尚未消费的起始消息,参数 count 1 表示获取一条,语法与 XREAD 基本相同,但增加了组的概念。

组内消耗的基本原理

STREAM 类型将为每个消费者组记录最后处理(交付)的报文 ID(last_delivered_id),以便在组内消耗时,可以从该值的后面开始读取,确保不会重复消耗,这就是消费者组的基本运作方式。

此外,当消费者组消费时,还必须考虑另一个问题,即如果一个消费者消费了一条信息,但没有成功处理(例如消费者进程宕机),那么这条信息就可能丢失,因为群组中的其他消费者无法再次消费这条信息。

下面讨论消息被消费者获取,但消费者处理失败的解决方案。

待处理消息列表

为解决在组内读取消息过程中因消费者崩溃而导致报文丢失的问题,STREAM 设计了一个待处理列表来记录已读取但未处理的消息。

查询消费者组信息

127.0.0.1:6379> XPENDING mystream mqGroup
1) (integer) 3
2) "1695864755027-0" # 起始消息ID
3) "1695865694880-1" # 节数消息ID
4) 1) 1) "consuerA"
      2) "2"
   2) 1) "consuerB"
      2) "1"

可以看到 PENDING 队列中 consuerA 有两条正在处理的消息,consuerB 有一条正在处理消息。

查看 PENDING 队列消息详情

127.0.0.1:6379> XPENDING mystream mqGroup - + 10
1) 1) "1695864755027-0"
   2) "consuerA"
   3) (integer) 588491
   4) (integer) 1
2) 1) "1695865694880-0"
   2) "consuerA"
   3) (integer) 579214
   4) (integer) 1
3) 1) "1695865694880-1"
   2) "consuerB"
   3) (integer) 237014
   4) (integer) 1

输出三条正在处理消息的详情,字段含义分别是:消息ID、消费者名称、消息被读取所经过的时间(毫秒)、消息被读取的次数

从上面的结果可以看出,之前读取的消息都记录在待处理列表中,这表明被读取的消息没有被处理,只是被读取了。 如何表明消费者已完成消息处理呢?使用 XACK 命令告知报文处理已完成,示例如下

确认消息处理完成

使用 XACK 确认消息

127.0.0.1:6379> XACK mystream mqGroup 1695864755027-0
(integer) 1

再次运行查询 PENDING 消息详情的命令,输出如下,消息 ID 为 1695864755027-0 的消息已被成功消费

127.0.0.1:6379> XPENDING mystream mqGroup - + 10
1) 1) "1695865694880-0"
   2) "consuerA"
   3) (integer) 728248
   4) (integer) 1
2) 1) "1695865694880-1"
   2) "consuerB"
   3) (integer) 386048
   4) (integer) 1

这种机制确保消费者读取了但不处理消息时,消息也不会丢失

等待消费者再次上线后,可以从待处理列表获取并继续处理消息,以确保消息是有序的且不会丢失

此时又出现一个问题,如果消费者宕机后没有办法继续工作,可以把消费者的待处理信息转给其他消费者处理,这就是消息传输。

消息传输

在消息传送操作过程中,消息会被传送到自己的 Pending 列表中。

要使用 XCLAIM 命令,需要设置组、目标消费者和传输的信息 ID,还需要提供 IDLE(已读取的时间长度)。只有过了这段时间,才能进行传输。

使用 XCLAIM 进行消息传输

127.0.0.1:6379> XCLAIM mystream mqGroup consumerB 360000 1695865694880-0
1) 1) "1695865694880-0"
   2) 1) "msg"
      2) "1"

以下命令传输消息 ID 为 1695865694880-0,且时间超过 360s 的消息到 consumerB 的 PENDING 列表,指定 IDLE 参数可以确保传输不会花费太长时间(注:过滤出等待时间较长的消息,同时也能避免所有的 Pending 都被传输)。

执行后,再次查看可见 consumerA 的 PENDING 队列已经没有消息,1695865694880-0 出现在 consumerB 的 PENDING 队列

127.0.0.1:6379> XPENDING mystream mqGroup - + 10
1) 1) "1695865694880-0"
   2) "consumerB"
   3) (integer) 11479
   4) (integer) 2
2) 1) "1695865694880-1"
   2) "consuerB"
   3) (integer) 680393
   4) (integer) 1

可以观察到消息被读取的次数变为了 2,同时 IDLE 时间进行了重置,重置 IDLE 的意义是确保它不会被重复传输

重复传输无影响

例如,在接下来的连续两次传输中,第二次传输不会成功

127.0.0.1:6379> XCLAIM mystream mqGroup consumerB 360000 1695865694880-0
127.0.0.1:6379> XCLAIM mystream mqGroup consumerB 360000 1695865694880-0

这就是消息传输,到目前为止,我们已经使用了 Pending 消息的 ID、消费者属性和其 IDLE 属性,还有一个属性是消息被读取的次数,即传输计数器,该属性的功能是计算消息被读取的次数,包括传输。

补充:XCLAIM 命令不仅可以传输消息,当离线的原消费者重新上线后,可以通过 XCLAIM 命令来获取 Pending 消息。

即 XCLAIM 有两个场景:1. 重新获取 Pending 消息,2. 消息所有权转移

死信消息删除

如上所述,当同一个消息被读取的次数过多,我们就可以假定它是消费者无法处理消息,那么即使消息被反复传送给不同的消费者,也不会被消费掉,它也会在待处理列表中停留很长时间。

此时,不断转移消息会让投递计数器会被累积(从上一节的例子中可以看出),当累积到我们预设的某个阈值时,我们就认为它是坏消息(也叫死信、DeadLetter、无法投递的消息),可以直接处理坏消息并将其删除。

使用 XRANGE 命令可以查询所有队列消息

127.0.0.1:6379> XRANGE mystream - +
1) 1) "1695864755027-0"
   2) 1) "user"
      2) "david"
      3) "msg"
      4) "Hello"
2) 1) "1695865694880-0"
   2) 1) "msg"
      2) "1"
3) 1) "1695865694880-1"
   2) 1) "msg"
      2) "2"
4) 1) "1695865694880-2"
   2) 1) "msg"
      2) "3"
5) 1) "1695865694880-3"
   2) 1) "msg"
      2) "4"
6) 1) "1695865694880-4"
   2) 1) "msg"
      2) "5"

删除消息,可以使用 XDEL 命令,以 1695864755027-01695865694880-0 消息为例,首先从 mystream 消息队列中删除

127.0.0.1:6379> XDEL mystream 1695864755027-0
(integer) 1

127.0.0.1:6379> XDEL mystream 1695865694880-0
(integer) 1

再次查看 mystream 队列可以看到它被删除,但是 mqGroup 消费者组的 PENDING 列表中消息依然存在,可以执行 XACK 来标记其完成

127.0.0.1:6379> XRANGE mystream - +
1) 1) "1695865694880-1"
   2) 1) "msg"
      2) "2"
2) 1) "1695865694880-2"
   2) 1) "msg"
      2) "3"
3) 1) "1695865694880-3"
   2) 1) "msg"
      2) "4"
4) 1) "1695865694880-4"
   2) 1) "msg"
      2) "5"

127.0.0.1:6379> XPENDING mystream mqGroup - + 10
1) 1) "1695865694880-0"  # <--- 依然存在
   2) "consumerB"
   3) (integer) 947553
   4) (integer) 2
2) 1) "1695865694880-1"
   2) "consuerB"
   3) (integer) 1616467
   4) (integer) 1

通过 ACK 确认的方式 “删除” 待处理消息列表中的消息

127.0.0.1:6379> XPENDING mystream mqGroup - + 10
1) 1) "1695865694880-0"
   2) "consumerB"
   3) (integer) 1048972
   4) (integer) 2
2) 1) "1695865694880-1"
   2) "consuerB"
   3) (integer) 1717886
   4) (integer) 1

127.0.0.1:6379> XACK mystream mqGroup 1695865694880-0
(integer) 1

127.0.0.1:6379> XPENDING mystream mqGroup - + 10
1) 1) "1695865694880-1"
   2) "consuerB"
   3) (integer) 1840638
   4) (integer) 1

消息队列的创建与查询

当前没有专门的命令来创建「空」消息队列,除通过 XADD 命令写消息时自动创建消息队列,还可以在创建消费者组的时创建

通过消费者组创建消息队列

127.0.0.1:6379> xgroup create s g $ mkstream
OK
127.0.0.1:6379> xgroup destroy s g
(integer) 1

创建后,删除掉消费者组

查询队列基本信息

127.0.0.1:6379> XINFO stream mystream
 1) "length"
 2) (integer) 4
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1695865694880-4"
 9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1695865694880-1"
    2) 1) "msg"
       2) "2"
13) "last-entry"
14) 1) "1695865694880-4"
    2) 1) "msg"
       2) "5"

限制消息队列长度

Stream 没有参数来自动限制队列长度,有一些折中的方式如下

定长删除策略

127.0.0.1:6379> xadd s * msg 1
"1695871306010-0"
127.0.0.1:6379> xadd s * msg 2
"1695871307960-0"
127.0.0.1:6379> xadd s * msg 3
"1695871310232-0"
127.0.0.1:6379> xadd s * msg 4
"1695871311338-0"
127.0.0.1:6379> xadd s * msg 5
"1695871312698-0"
127.0.0.1:6379> xadd s * msg 6
"1695871314097-0"

127.0.0.1:6379> XTRIM s MAXLEN 5
(integer) 1

127.0.0.1:6379> XRANGE s - +
1) 1) "1695871307960-0"
   2) 1) "msg"
      2) "2"
2) 1) "1695871310232-0"
   2) 1) "msg"
      2) "3"
3) 1) "1695871311338-0"
   2) 1) "msg"
      2) "4"
4) 1) "1695871312698-0"
   2) 1) "msg"
      2) "5"
5) 1) "1695871314097-0"
   2) 1) "msg"
      2) "6"

执行后会保留最新的 MAXLEN 长度的消息,这个命令每次执行生效,非持久化的限制,如果继续添加消息,队列长度依然会增长。

最小消息ID删除策略

可以逐出消息ID小于指定ID的条目(>=6.2版本),所有ID低于 1695871312698-0 的条目都将被删除

127.0.0.1:6379> XTRIM s MINID 1695871312698
(integer) 3

127.0.0.1:6379> XRANGE s - +
1) 1) "1695871312698-0"
   2) 1) "msg"
      2) "5"
2) 1) "1695871314097-0"
   2) 1) "msg"
      2) "6"