翻译:基于 Redis Stream 类型的轻量消息队列解决方案

Published: 2023-08-17

Tags: MQ Redis 翻译

本文总阅读量

原文标题为 “基于 Redis 流类型的完美消息队列解决方案”,在众多专业的 MQ 当中,Redis Stream 称得上优秀、轻量,但称呼完美有些夸张,遂修改「完美」为「轻量」,使得表达更加准确合理。

原文地址:The Perfect Message Queue Solution Based on the Redis Stream Type


Redis 在 5.0 版本带来了 Stream 类型,从字面上看它是一种流类型,从功能角度看,它实际上应该是 Redis 对消息队列(MQ,Message Queue)的完美实现。

使用过 Redis 作为消息队列的人都知道,有许多基于 Reids 的消息队列实现

  • 通过 PUB/SUB, subscribe/publish 模型实现
  • 基于 List-based LPUSH+BRPOP 的实现
  • 基于 Sorted-Set 的实现

每种方式都有典型的特点和问题。

Redis 5.0 中发布的 Stream 类型也用于实现典型的消息队列,流类型的出现几乎满足了信息队列的所有内容,包括但不限于

  • Serialization generation of the message ID. 生成序列化消息 ID
  • Message traversal. 消息遍历
  • Blocking and non-blocking reads of messages. 阻塞及非阻塞获取消息
  • Packet consumption of messages. 消息的数据包消耗
  • Processing of outstanding messages. 处理未回复的信息
  • Message queue monitoring. 消息队列监控

消息队列有生产者和消费者,让我们来体验流类型的奇妙之处。

推送消息

通过 XADD 命令可以向 Stream 发送消息,如下所示

127.0.0.1:6379> XADD memberMessage * user reggie msg Hello
"1553439850328-0"
127.0.0.1:6379> XADD memberMessage * user dwen msg World
"1553439858868-0"

语法格式为

XADD key ID field string [field string ...]

你需要提供 key, 消息 id 的 schema,消息内容,消息的内容是 key-value 格式的

  • ID 通常指定为 *,这表明消息的 ID 是通过 Redis 自动生成分配的,这也是非常推荐的 schema 使用方式
  • field string [field string] 要发送的消息,键值对类型,支持可以发送多个 key-value 消息。

在以上的示例,user 值为 reggie,msg 值为 Hello 的消息被发送到 memberMessage 队列。

Redis 使用毫秒时间戳和序列号生成消息 ID,此时,消息队列存在了一条消息。

消费消息

通过 XREAD 命令可以从 Stream 读取消息,示例如下:

127.0.0.1:6379> XREAD streams memberMessage 0
1) 1) "memberMessage"
   2) 1) 1) "1553439850328-0"
         2) 1) "user"
            2) "reggie"
            3) "msg"
            4) "Hello"
      2) 1) "1553439858868-0"
         2) 1) "user"
            2) "dwen"
            3) "msg"
            4) "World"

上面的命令从消息队列 memberMessage 中读取所有信息。

XREAD 支持很多参数,它的语法如下

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
  • COUNT count 用于限制获取消息的数量
  • BLOCK milliseconds 设置 XREAD 为阻塞模式,默认是非阻塞模式
  • ID 用于设置从哪个消息 ID 开始读取,指定 0 则从第一条信息开始。本例中使用的是 0,这里需要注意的是,消息队列 ID 是单调递增的,因此通过设置起点,可以向后读取,在阻塞模式下,$ 可以用来表示最新的消息 ID。在非阻塞模式下,$ 没有任何意义。
  • XREAD 读取消息时分为阻塞和非阻塞模式,BLOCK 选项可用于指示阻塞模式,并需要设置阻塞持续时间,在非阻塞模式下,读取后会立即返回(即使没有消息),而在阻塞模式下,如果无法读取内容,就会阻塞等待。

典型的阻塞模式用法如下

127.0.0.1:6379> XREAD block 1000 streams memberMessage $
(nil)
(1.07s)

我们使用阻塞模式(以 $ 作为 ID)来读取最新的信息,如果没有信息,命令就会阻塞。在等待过程中,其他客户端会向队列添加信息,这些信息将立即被读取。

因此,一个典型的队列是由 XADD 和 XREAD 组成的,XADD 负责生成信息,而 XREAD 负责消费信息。

消息ID描述

由 Redis XADD 命令生成的 1553439850328-0 是消息 ID,它由两部分组成:时间戳-序列号。

时间戳以毫秒为单位,是生成信息时 Redis 服务器的时间,它是一个 64 位整数 (int64)。

序列号是该毫秒时间点的报文序列号,也是一个 64 位整数。

认真地说,序列号可能会溢出,但这真的可能吗?序列号的增量可以通过多批次处理来验证:

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> XADD memberMessage * msg one
QUEUED
127.0.0.1:6379> XADD memberMessage * msg two
QUEUED
127.0.0.1:6379> XADD memberMessage * msg three
QUEUED
127.0.0.1:6379> XADD memberMessage * msg four
QUEUED
127.0.0.1:6379> XADD memberMessage * msg five
QUEUED
127.0.0.1:6379> EXEC
1) "1553441006884-0"
2) "1553441006884-1"
3) "1553441006884-2"
4) "1553441006884-3"
5) "1553441006884-4"

由于 Redis 命令执行的速度非常快,因此可以看出,在同一时间戳内,信息是以递增的序列号表示的。

为了确保信息有序,Redis 生成的 ID 按顺序单调递增,由于 ID 包含时间戳部分,为了避免服务器时间错误(如服务器时间延迟)造成的问题,Redis 的每个 Stream 类型数据都维护了一个 latest_generated_id 属性,用于记录最后一条消息的 ID。

如果发现当前时间戳向后(小于 latest_generated_id 所记录的时间戳),则使用时间戳不变、序列号递增的方案作为新的消息 ID(这也是序列号使用 int64 以确保有足够序列号的原因),从而确保 ID 的单调递增性质。

强烈建议使用 Redis 方案生成消息 ID,因为这种由时间戳 + 序列号构成的单调递增 ID 方案几乎可以满足所有需求。

但同时要记住,别忘了 ID 是可以定制的!

消费者组模式

当多个消费者同时消费一个消息队列时,他们可以重复消费同一条消息,也就是说,消息队列中有十条消息,三个消费者都可以消费这十条消息。

注:使用普通 XREAD 读取 Stream 中的消息不会使其进入 Pending 状态,稍后提到的 Pending 状态和 ACK 只适用于消费者组中的消息,消息被 XREAD 读取后还在 Stream 中,所以多个消费者都使用。

另外 XREAD 读取会受到消费者组读取影响,消费者组成功消费消息后,消息会从 Stream 中移除,XREAD 就获取不到了。 (勘误,这里的解释不正确,Stream 中消息不会被移除,XREAD 依然可以读取)

但有时,我们需要多个消费者合作消费同一个消息队列,即消息队列中有十条消息,三个消费者分别消费其中的一部分。

例如,消费者 A 消费信息 1,2,5,8,消费者 B 消费信息 4,9,10,消费者 C 消费信息 3,6,7

即三个消费者合作完成消息消费,当消费能力不足时,也就是消息处理程序效率不高时,可以采用这种模式。

这种模式就是消费者组模式。如下图所示:

消费者组模式的支持主要由两条命令实现:

  • XGROUP 用于管理消费者群组,提供创建群组、销毁群组和更新群组启动信息 ID 等操作
  • XREADGROUP 群组消费消息操作

演示中使用了五条消息,我们的想法是创建一个 Stream 消息队列,由生产者生成五条消息。

在消息队列上创建一个消费者组,该组中的三个消费者消费消息

# Producer generates 5 messages
127.0.0.1:6379> MULTI
127.0.0.1:6379> XADD mq * msg 1
127.0.0.1:6379> XADD mq * msg 2
127.0.0.1:6379> XADD mq * msg 3
127.0.0.1:6379> XADD mq * msg 4
127.0.0.1:6379> XADD mq * msg 5
127.0.0.1:6379> EXEC
 1) "1553585533795-0"
 2) "1553585533795-1"
 3) "1553585533795-2"
 4) "1553585533795-3"
 5) "1553585533795-4"

# Create a consumer group mqGroup
127.0.0.1:6379> XGROUP CREATE mq mqGroup 0 # Create a consumer group mgGroup for message queue mq
OK

# Consumer A, Consumption Article 1
127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 streams mq > #Consumer A in the consumer group reads a message from the message queue mq
1) 1) "mq"
   2) 1) 1) "1553585533795-0"
         2) 1) "msg"
            2) "1"
# Consumer A, Consumption Article 2
127.0.0.1:6379> XREADGROUP group mqGroup consumerA COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-1"
         2) 1) "msg"
            2) "2"
# Consumer B, Consumption Article 3
127.0.0.1:6379> XREADGROUP group mqGroup consumerB COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-2"
         2) 1) "msg"
            2) "3"
# Consumer A, Consumption Article 4
127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-3"
         2) 1) "msg"
            2) "4"
# Consumer C, Consumption Article 5
127.0.0.1:6379> XREADGROUP group mqGroup consumerC COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-4"
         2) 1) "msg"
            2) "5"

在上例中,当三个消费者 A、B 和 C 在同一个 mqGroup 组中消费消息时(消费者可以在消费时指定,无需提前创建),他们具有互斥原则,消费计划为:A->1 , A->2,B->3,A->4,C->5

XGROUP create mq mqGroup 0 用于在消息队列 mq 上创建消费分组 mqGroup。最后一个参数是 0,表示该组从第一条消息开始消费。其含义与 XREAD 的 0 一致)

除支持 CREATE 外,它还支持 SETID 设置起始 ID、DESTROY 销毁分组、DELCONSUMER 删除分组中的消费者等操作。

XREADGROUP group mqGroup consumerA count 1 streams mq >,用于消费者组 mqGroup 中的消费者 A 在队列 mq 中消费,参数 > 表示组中尚未消费的起始消息,参数 count 1 表示获取一条。

语法与 XREAD 基本相同,但增加了组的概念。

组内消耗的基本原理是,STREAM 类型将为每个消费者组记录最后处理(交付)的报文 ID(last_delivered_id),以便在组内消耗时,可以从该值的后面开始读取,确保不会重复消耗。

以上就是消费者组的基本运作。

此外,当消费者组消费时,还必须考虑另一个问题,即如果一个消费者消费了一条信息,但没有成功处理(例如消费者进程宕机),那么这条信息就可能丢失,因为群组中的其他消费者无法再次消费这条信息。

下面将继续讨论解决方案。

待处理消息列表

为了解决在组内读取消息过程中因消费者崩溃而导致报文丢失的问题,STREAM 设计了一个待处理列表来记录已读取但未处理的报文。

XPENDING 命令用于获取消费者组或消费者中消费者的未处理报文,示例如下

127.0.0.1:6379> XPENDING mq mqGroup
1) (integer) 5 # 5 messages read but not processed
2) "1553585533795-0" # begin ID
3) "1553585533795-4" # end ID
4) 1) 1) "consumerA" # consumer A have 3 messages
      2) "3"
   2) 1) "consumerB" # consumer B have 1message
      2) "1"
   3) 1) "consumerC" # consumer C have 1message
      2) "1"

127.0.0.1:6379> XPENDING mq mqGroup - + 10 # Use the start end count option for details
1) 1) "1553585533795-0" # Message ID
   2) "consumerA" # consumer
   3) (integer) 1654355 # It has been 1654355ms from reading to now, IDLE
   4) (integer) 5 # The message was read 5 times,delivery counter
2) 1) "1553585533795-1"
   2) "consumerA"
   3) (integer) 1654355
   4) (integer) 4
# A total of 5, the remaining 3 omitted ...

127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # Add the consumer parameter to get the Pending list of a specific consumer
1) 1) "1553585533795-0"
   2) "consumerA"
   3) (integer) 1641083
   4) (integer) 5
# A total of 3, the remaining 2 omitted ...

每个待处理信息都有四个属性:

  1. Message-ID 消息 ID
  2. The consumer 消费者
  3. IDLE, the time elapsed 已过时间
  4. Delivery counter 计数器,即消息被阅读的次数

从上面的结果可以看出,我们之前读取的报文都记录在待处理列表中,这表明所有读取的报文都没有被处理,而只是被读取了。

那么,如何表明消费者已完成信息处理呢?

使用 XACK 命令告知报文处理已完成,示例如下

127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # Notify message processing end, identified by message ID
(integer) 1

127.0.0.1:6379> XPENDING mq mqGroup # Check the Pending list again
1) (integer) 4 # The messages read but not processed have become 4
2) "1553585533795-1"
3) "1553585533795-4"
4) 1) 1) "consumerA" # Consumer A, there are 2 message processing
      2) "2"
   2) 1) "consumerB"
      2) "1"
   3) 1) "consumerC"
      2) "1"
127.0.0.1:6379>

有了这种待处理机制,就意味着消费者读取信息但不处理信息后,信息也不会丢失。

在等待消费者再次上线后,您可以阅读 "待处理列表 "并继续处理信息,以确保信息是有序的,不会丢失。

此时还有一个问题,就是如果消费者宕机后没有办法上网,就需要把消费者的待处理信息转给其他消费者处理,这就是信息传递。

消息传递

在信息传送操作过程中,信息会被传送到自己的 Pending 列表中。

要使用 XCLAIM 命令,需要设置组、目标消费者和传输的信息 ID,还需要提供 IDLE(已读取的时间长度)。只有过了这段时间,才能进行传输。

# The message currently belonging to consumer A is 1553585533795-1, which has been unprocessed for 15907,787ms
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
   2) "consumerA"
   3) (integer) 15907787
   4) (integer) 4

# Transfer message 1553585533795-1 over 3600s to consumer B's Pending list
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
1) 1) "1553585533795-1"
   2) 1) "msg"
      2) "2"

# Message 1553585533795-1 has been transferred to Consumer B's Pending.
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
   2) "consumerB"
   3) (integer) 84404 # IDLE, it's reset
   4) (integer) 5 # The number of reads is also accumulated by 1

上述代码完成了一次消息传送,除了指定 ID 外,传输还需要指定本次传输的 IDLE,以确保传输不会被长时间处理(注:过滤出等待时间较长的消息,同时也能避免所有的 Pending 都被传输)。

已传输信息的 IDLE 将被重置,以确保它不会被重复传输,我们认为,可能会有同时向多个消费者传输过期消息的并发操作,如果设置了 IDLE,就可以避免后续传输。

例如,在接下来的连续两次传输中,第二次传输不会成功。

127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1

这就是消息传输,到目前为止,我们已经使用了 Pending 消息的 ID、消费者属性和其 IDLE 属性,还有一个属性是信息被读取的次数,即传输计数器,该属性的功能是计算消息被读取的次数,包括传输。

该属性主要用于判断是否为错误数据。

注:XCLAIM 命令不仅可以传输消息,当离线的原消费者重新上线后,可以通过 XCLAIM 命令来获取 Pending 消息。

即 XCLAIM 有两个场景:1. 重新获取 Pending 消息,2. 消息所有权转移

死信问题

如上所述,如果消费者无法处理报文,也就是无法 XACK,那么即使报文被反复传送给不同的消费者,它也会在待处理列表中停留很长时间。

此时,消息的投递计数器会被累积(从上一节的例子中可以看出),当累积到我们预设的某个阈值时,我们就认为它是坏消息(也叫死信、DeadLetter、无法投递的消息),因为有了判断条件,我们就可以直接处理坏消息并将其删除。

要删除信息,请使用 XDEL 命令,如下所示

# delete message from queue
127.0.0.1:6379> XDEL mq 1553585533795-1
(integer) 1
# Check that there is no more message in the queue
127.0.0.1:6379> XRANGE mq - +
1) 1) "1553585533795-0"
   2) 1) "msg"
      2) "1"
2) 1) "1553585533795-2"
   2) 1) "msg"
      2) "3"

请注意,在此示例中,Pending 中的消息未被删除,因此如果您查看 Pending ,消息仍然存在。可以执行 XACK 来标记其完成!

信息监控

Stream 提供 XINFO 来监控服务器信息,并可对其进行查询:

# View queue information
127.0.0.1:6379> xinfo stream mq
...# Consumer group information
127.0.0.1:6379> xinfo groups mq
...# Consumer Group Member Information
127.0.0.1:6379> xinfo consumers mq mqGroup
...

至此,对消息队列的操作描述基本结束,让我们用 Golang 来实现 Redis Stream 消息队列。

Golang 代码示例

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/go-redis/redis/v8"
)

func main() {
    ctx := context.Background()
    rdb := redis.NewClient(&redis.Options{
      Addr: "localhost:6381",
    })

    // Add a message to the stream, the result returns the message ID
    xAddArgs := &redis.XAddArgs{
        Stream: "example:stream",
        MaxLen: 10,
        Values: map[string]interface{}{"foodId": "10001", "foodName": "steak"},
    }
    strCmd := rdb.XAdd(ctx, xAddArgs)
    fmt.Println(strCmd.Result())
    // 1609083771429-0 <nil>

    // insert second message
    strCmd = rdb.XAdd(ctx, &redis.XAddArgs{Stream: "example:stream", Values: []string{"foodId", "10002", "foodName", "Hamburger"}})
    fmt.Println(strCmd.Result())
    // 1609083771430-0 <nil>

    // Limit the length of streams to about 10
    intCmd := rdb.XTrimApprox(ctx, "example:stream", 10)
    fmt.Println(intCmd.Result())
    // 0 <nil>

    // Get the length of the stream
    intCmd = rdb.XLen(ctx, "example:stream")
    fmt.Println(intCmd.Result())
    // 2 <nil>

    // Get message list
    xMessageSliceCmd := rdb.XRange(ctx, "example:stream", "-", "+")
    fmt.Println(xMessageSliceCmd.Result())
    // [{1609083771429-0 map[foodId:10001 foodName:steak]} {1609083771430-0 map[foodId:10002 foodName:Hamburger]}] <nil>

    // Get the message list in reverse, the first one is the latest message
    xMessageSliceCmd = rdb.XRevRange(ctx, "example:stream", "+", "-")
    fmt.Println(xMessageSliceCmd.Result())
    // [{1609083771430-0 map[foodId:10002 foodName:Hamburger]} {1609083771429-0 map[foodId:10001 foodName:steak]}] <nil>

    // Read the next message with the given id
    xReadArgs := &redis.XReadArgs{
        Streams: []string{"example:stream", "1609083771429-0"},
        Count:   1,
        Block:   5 * time.Second,
    }
    xStreamSliceCmd := rdb.XRead(ctx, xReadArgs)
    fmt.Println(xStreamSliceCmd.Result())
    // [{example:stream [{1609083771430-0 map[foodId:10002 foodName:Hamburger]}]}] <nil>

    // delete message, delete both messages
    intCmd = rdb.XDel(ctx, "example:stream", "1609083771430-0", "1609083771429-0")
    fmt.Println(intCmd.Result())
    // 2 <nil>

    // Create a consumer group eater on the stream, and start consuming from the latest message (represented by $)
    statusCmd := rdb.XGroupCreate(ctx, "example:stream", "eater", "$")
    fmt.Println(statusCmd.Result())
    // OK <nil>

    // Read messages in the consumer group eater that have not been read by other consumers>
    // Will block after running, name the input on the redis client  XADD "example:stream" * foodId 1003 foodName Coca-Cola will get the result
    xReadGroupArgs := &redis.XReadGroupArgs{
        Group:    "eater",                         // consumer group
        Consumer: "eater01",                       // Consumer, created on-the-fly
        Streams:  []string{"example:stream", ">"}, // stream
        Block:    0,                               // infinite waiting
        NoAck:    false,                           // Confirmation required
    }
    xStreamSliceCmd = rdb.XReadGroup(ctx, xReadGroupArgs)
    xStreamSlice, err := xStreamSliceCmd.Result()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(xStreamSlice)
    // [{example:stream [{1609086089189-0 map[foodId:1003 foodName:Coca-Cola]}]}]

    // Confirm that the message is processed
    intCmd = rdb.XAck(ctx, "example:stream", "eater", "1609086089189-0")
    fmt.Println(intCmd.Result())
    // 1 <nil>

    // Set the last delivered id to 1609086089189-0
    statusCmd = rdb.XGroupSetID(ctx, "example:stream", "eater", "1609086089189-0")
    fmt.Println(statusCmd.Result())
    // OK <nil>

    // View pending messages
    // type XPending struct {
        //  Count     int64
        //  Lower     string
        //  Higher    string
        //  Consumers map[string]int64
    // }
    xPendingCmd := rdb.XPending(ctx, "example:stream", "eater")
    fmt.Println(xPendingCmd.Result())
    // &{1 1609086342551-0 1609086342551-0 map[eater01:1]} <nil>

    // Transfer the ownership of the message, and transfer the message that has not been processed for more than two minutes to the consumer eater02
    xClaimArgs := &redis.XClaimArgs{
      Stream:   "example:stream",
      Group:    "eater",
      Consumer: "eater02",
      MinIdle:  2 * time.Minute,
      Messages: []string{"1609086342551-0"},
    }
    xMessageSliceCmd = rdb.XClaim(ctx, xClaimArgs)
    fmt.Println(xMessageSliceCmd.Result())
    // [] <nil> // There are no messages that meet the requirements

    xInfoStreamCmd := rdb.XInfoStream(ctx, "example:stream")
    fmt.Println(xInfoStreamCmd.Result())
    // &{3 1 2 1 1609086342551-0 {1609082364313-0 map[foodId:10001 foodName:steak]} {1609086342551-0 map[foodId:1003 foodName:Coca-Cola]}} <nil>

    // View consumer group messages
    xInfoGroupCmd := rdb.XInfoGroups(ctx, "example:stream")
    fmt.Println(xInfoGroupCmd.Result())
    // [{eater 0 0 1609086089189-0}] <nil>

    //delete consumer
    intCmd = rdb.XGroupDelConsumer(ctx, "example:stream", "eater", "eater01")
    fmt.Println(intCmd.Result())
    // 1 <nil>

    // delete consumer group
    intCmd = rdb.XGroupDestroy(ctx, "example:stream", "eater")
    fmt.Println(intCmd.Result())
    // 1 <nil>
}

感谢您阅读本文,更多精彩,敬请期待。