发送消息
使用 XADD 命令向队列发送消息
127.0.0.1:6379> XADD mystream * user david msg Hello
"1695864755027-0"
- mystream 为消息队列名称,XADD 命令发送消息时没有自动创建队列
- 此处的 * 表示由 Redis 自动生成消息 ID(推荐用法),也可手动指定
Redis 6.2 版本支持
NOMKSTREAM
参数,无消息队列时不自动创建
接收消息
使用 XREAD 命令从消息队列中读取消息
127.0.0.1:6379> XREAD streams mystream 0
1) 1) "mystream"
2) 1) 1) "1695864755027-0"
2) 1) "user"
2) "david"
3) "msg"
4) "Hello"
- 此处的 0 值代表从队列中第一条消息开始获取,同时也可以设置为消息ID,表示消息读取的起点
获取指定数量的消息
127.0.0.1:6379> XREAD COUNT 1 streams mystream 0
1) 1) "mystream"
2) 1) 1) "1695864755027-0"
2) 1) "user"
2) "david"
3) "msg"
4) "Hello"
读取模式也分为组合和非阻塞
127.0.0.1:6379> XREAD BLOCK 1000 streams mystream $
(nil)
(1.08s)
设置 BLOCK 参数后,XREAD 变为阻塞模式(默认是非阻塞模式),当处于阻塞模式,队列中有消息则立即返回,否则阻塞等待消息
在阻塞模式下,$ 可以用来表示最新的消息 ID,在非阻塞模式下,$ 没有任何意义
消费者组原理
当多个消费者同时消费一个消息队列时,它们可以重复消费同一条消息,换句话说,消息队列中有十条消息,三个消费者都可以读取到这十条消息。
有时,我们需要多个消费者合作消费同一个消息队列,即消息队列中有十条消息,三个消费者分别消费其中的一部分。
例如,消费者 A 消费信息 1,2,5,8,消费者 B 消费信息 4,9,10,消费者 C 消费信息 3,6,7,三个消费者合作完成消息消费,当消费能力不足时,可以采用这种消费者组模式。
准备一些测试数据
127.0.0.1:6379> MULTI
127.0.0.1:6379> XADD mystream * msg 1
127.0.0.1:6379> XADD mystream * msg 2
127.0.0.1:6379> XADD mystream * msg 3
127.0.0.1:6379> XADD mystream * msg 4
127.0.0.1:6379> XADD mystream * msg 5
127.0.0.1:6379> EXEC
1) "1695865694880-0"
2) "1695865694880-1"
3) "1695865694880-2"
4) "1695865694880-3"
5) "1695865694880-4"
注:使用普通 XREAD 读取 Stream 中的消息不会使其进入 Pending 状态,稍后提到的 Pending 状态和 ACK 只适用于消费者组中的消息,消息被 XREAD 读取后还在 Stream 中,所以多个消费者都可以读取。
创建消费者组
创建消费者组
127.0.0.1:6379> XGROUP CREATE mystream mqGroup 0
OK
- 这里的 0 根 XREAD 中的 0 语义一致,均表示从队列起始处获取消息
创建消费者组时自动创建消息队列
127.0.0.1:6379> XGROUP CREATE mystream2 mqGroup2 0 MKSTREAM
OK
查询当前消息队列下所有的消费者组
127.0.0.1:6379> XINFO GROUPS mystream
1) 1) "name"
2) "mqGroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
查询组内消费者
127.0.0.1:6379> XINFO CONSUMERS mystream mqGroup
(empty array)
暂时还未消费,所以本例中返回空数组。
消费者组获取消息
获取消息
127.0.0.1:6379> XREADGROUP group mqGroup consuerA count 1 streams mystream >
1) 1) "mystream"
2) 1) 1) "1695864755027-0"
2) 1) "user"
2) "david"
3) "msg"
4) "Hello"
127.0.0.1:6379> XREADGROUP group mqGroup consuerA count 1 streams mystream >
1) 1) "mystream"
2) 1) 1) "1695865694880-0"
2) 1) "msg"
2) "1"
127.0.0.1:6379> XREADGROUP group mqGroup consuerB count 1 streams mystream >
1) 1) "mystream"
2) 1) 1) "1695865694880-1"
2) 1) "msg"
2) "2"
本例借助 XREADGROUP 命令从队列获取消息,组内的消费者无需提前创建,可直接使用,参数 > 表示组中尚未消费的起始消息,参数 count 1 表示获取一条,语法与 XREAD 基本相同,但增加了组的概念。
组内消耗的基本原理
STREAM 类型将为每个消费者组记录最后处理(交付)的报文 ID(last_delivered_id),以便在组内消耗时,可以从该值的后面开始读取,确保不会重复消耗,这就是消费者组的基本运作方式。
此外,当消费者组消费时,还必须考虑另一个问题,即如果一个消费者消费了一条信息,但没有成功处理(例如消费者进程宕机),那么这条信息就可能丢失,因为群组中的其他消费者无法再次消费这条信息。
下面讨论消息被消费者获取,但消费者处理失败的解决方案。
待处理消息列表
为解决在组内读取消息过程中因消费者崩溃而导致报文丢失的问题,STREAM 设计了一个待处理列表来记录已读取但未处理的消息。
查询消费者组信息
127.0.0.1:6379> XPENDING mystream mqGroup
1) (integer) 3
2) "1695864755027-0" # 起始消息ID
3) "1695865694880-1" # 节数消息ID
4) 1) 1) "consuerA"
2) "2"
2) 1) "consuerB"
2) "1"
可以看到 PENDING 队列中 consuerA 有两条正在处理的消息,consuerB 有一条正在处理消息。
查看 PENDING 队列消息详情
127.0.0.1:6379> XPENDING mystream mqGroup - + 10
1) 1) "1695864755027-0"
2) "consuerA"
3) (integer) 588491
4) (integer) 1
2) 1) "1695865694880-0"
2) "consuerA"
3) (integer) 579214
4) (integer) 1
3) 1) "1695865694880-1"
2) "consuerB"
3) (integer) 237014
4) (integer) 1
输出三条正在处理消息的详情,字段含义分别是:消息ID、消费者名称、消息被读取所经过的时间(毫秒)、消息被读取的次数
从上面的结果可以看出,之前读取的消息都记录在待处理列表中,这表明被读取的消息没有被处理,只是被读取了。 如何表明消费者已完成消息处理呢?使用 XACK 命令告知报文处理已完成,示例如下
确认消息处理完成
使用 XACK 确认消息
127.0.0.1:6379> XACK mystream mqGroup 1695864755027-0
(integer) 1
再次运行查询 PENDING 消息详情的命令,输出如下,消息 ID 为 1695864755027-0 的消息已被成功消费
127.0.0.1:6379> XPENDING mystream mqGroup - + 10
1) 1) "1695865694880-0"
2) "consuerA"
3) (integer) 728248
4) (integer) 1
2) 1) "1695865694880-1"
2) "consuerB"
3) (integer) 386048
4) (integer) 1
这种机制确保消费者读取了但不处理消息时,消息也不会丢失
等待消费者再次上线后,可以从待处理列表获取并继续处理消息,以确保消息是有序的且不会丢失
此时又出现一个问题,如果消费者宕机后没有办法继续工作,可以把消费者的待处理信息转给其他消费者处理,这就是消息传输。
消息传输
在消息传送操作过程中,消息会被传送到自己的 Pending 列表中。
要使用 XCLAIM 命令,需要设置组、目标消费者和传输的信息 ID,还需要提供 IDLE(已读取的时间长度)。只有过了这段时间,才能进行传输。
使用 XCLAIM 进行消息传输
127.0.0.1:6379> XCLAIM mystream mqGroup consumerB 360000 1695865694880-0
1) 1) "1695865694880-0"
2) 1) "msg"
2) "1"
以下命令传输消息 ID 为 1695865694880-0,且时间超过 360s 的消息到 consumerB 的 PENDING 列表,指定 IDLE 参数可以确保传输不会花费太长时间(注:过滤出等待时间较长的消息,同时也能避免所有的 Pending 都被传输)。
执行后,再次查看可见 consumerA 的 PENDING 队列已经没有消息,1695865694880-0 出现在 consumerB 的 PENDING 队列
127.0.0.1:6379> XPENDING mystream mqGroup - + 10
1) 1) "1695865694880-0"
2) "consumerB"
3) (integer) 11479
4) (integer) 2
2) 1) "1695865694880-1"
2) "consuerB"
3) (integer) 680393
4) (integer) 1
可以观察到消息被读取的次数变为了 2,同时 IDLE 时间进行了重置,重置 IDLE 的意义是确保它不会被重复传输
重复传输无影响
例如,在接下来的连续两次传输中,第二次传输不会成功
127.0.0.1:6379> XCLAIM mystream mqGroup consumerB 360000 1695865694880-0
127.0.0.1:6379> XCLAIM mystream mqGroup consumerB 360000 1695865694880-0
这就是消息传输,到目前为止,我们已经使用了 Pending 消息的 ID、消费者属性和其 IDLE 属性,还有一个属性是消息被读取的次数,即传输计数器,该属性的功能是计算消息被读取的次数,包括传输。
补充:XCLAIM 命令不仅可以传输消息,当离线的原消费者重新上线后,可以通过 XCLAIM 命令来获取 Pending 消息。
即 XCLAIM 有两个场景:1. 重新获取 Pending 消息,2. 消息所有权转移
死信消息删除
如上所述,当同一个消息被读取的次数过多,我们就可以假定它是消费者无法处理消息,那么即使消息被反复传送给不同的消费者,也不会被消费掉,它也会在待处理列表中停留很长时间。
此时,不断转移消息会让投递计数器会被累积(从上一节的例子中可以看出),当累积到我们预设的某个阈值时,我们就认为它是坏消息(也叫死信、DeadLetter、无法投递的消息),可以直接处理坏消息并将其删除。
使用 XRANGE 命令可以查询所有队列消息
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1695864755027-0"
2) 1) "user"
2) "david"
3) "msg"
4) "Hello"
2) 1) "1695865694880-0"
2) 1) "msg"
2) "1"
3) 1) "1695865694880-1"
2) 1) "msg"
2) "2"
4) 1) "1695865694880-2"
2) 1) "msg"
2) "3"
5) 1) "1695865694880-3"
2) 1) "msg"
2) "4"
6) 1) "1695865694880-4"
2) 1) "msg"
2) "5"
删除消息,可以使用 XDEL 命令,以 1695864755027-0 和 1695865694880-0 消息为例,首先从 mystream 消息队列中删除
127.0.0.1:6379> XDEL mystream 1695864755027-0
(integer) 1
127.0.0.1:6379> XDEL mystream 1695865694880-0
(integer) 1
再次查看 mystream 队列可以看到它被删除,但是 mqGroup 消费者组的 PENDING 列表中消息依然存在,可以执行 XACK 来标记其完成
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1695865694880-1"
2) 1) "msg"
2) "2"
2) 1) "1695865694880-2"
2) 1) "msg"
2) "3"
3) 1) "1695865694880-3"
2) 1) "msg"
2) "4"
4) 1) "1695865694880-4"
2) 1) "msg"
2) "5"
127.0.0.1:6379> XPENDING mystream mqGroup - + 10
1) 1) "1695865694880-0" # <--- 依然存在
2) "consumerB"
3) (integer) 947553
4) (integer) 2
2) 1) "1695865694880-1"
2) "consuerB"
3) (integer) 1616467
4) (integer) 1
通过 ACK 确认的方式 “删除” 待处理消息列表中的消息
127.0.0.1:6379> XPENDING mystream mqGroup - + 10
1) 1) "1695865694880-0"
2) "consumerB"
3) (integer) 1048972
4) (integer) 2
2) 1) "1695865694880-1"
2) "consuerB"
3) (integer) 1717886
4) (integer) 1
127.0.0.1:6379> XACK mystream mqGroup 1695865694880-0
(integer) 1
127.0.0.1:6379> XPENDING mystream mqGroup - + 10
1) 1) "1695865694880-1"
2) "consuerB"
3) (integer) 1840638
4) (integer) 1
消息队列的创建与查询
当前没有专门的命令来创建「空」消息队列,除通过 XADD 命令写消息时自动创建消息队列,还可以在创建消费者组的时创建
通过消费者组创建消息队列
127.0.0.1:6379> xgroup create s g $ mkstream
OK
127.0.0.1:6379> xgroup destroy s g
(integer) 1
创建后,删除掉消费者组
查询队列基本信息
127.0.0.1:6379> XINFO stream mystream
1) "length"
2) (integer) 4
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1695865694880-4"
9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1695865694880-1"
2) 1) "msg"
2) "2"
13) "last-entry"
14) 1) "1695865694880-4"
2) 1) "msg"
2) "5"
限制消息队列长度
Stream 没有参数来自动限制队列长度,有一些折中的方式如下
定长删除策略
127.0.0.1:6379> xadd s * msg 1
"1695871306010-0"
127.0.0.1:6379> xadd s * msg 2
"1695871307960-0"
127.0.0.1:6379> xadd s * msg 3
"1695871310232-0"
127.0.0.1:6379> xadd s * msg 4
"1695871311338-0"
127.0.0.1:6379> xadd s * msg 5
"1695871312698-0"
127.0.0.1:6379> xadd s * msg 6
"1695871314097-0"
127.0.0.1:6379> XTRIM s MAXLEN 5
(integer) 1
127.0.0.1:6379> XRANGE s - +
1) 1) "1695871307960-0"
2) 1) "msg"
2) "2"
2) 1) "1695871310232-0"
2) 1) "msg"
2) "3"
3) 1) "1695871311338-0"
2) 1) "msg"
2) "4"
4) 1) "1695871312698-0"
2) 1) "msg"
2) "5"
5) 1) "1695871314097-0"
2) 1) "msg"
2) "6"
执行后会保留最新的 MAXLEN 长度的消息,这个命令每次执行生效,非持久化的限制,如果继续添加消息,队列长度依然会增长。
最小消息ID删除策略
可以逐出消息ID小于指定ID的条目(>=6.2版本),所有ID低于 1695871312698-0 的条目都将被删除
127.0.0.1:6379> XTRIM s MINID 1695871312698
(integer) 3
127.0.0.1:6379> XRANGE s - +
1) 1) "1695871312698-0"
2) 1) "msg"
2) "5"
2) 1) "1695871314097-0"
2) 1) "msg"
2) "6"