41. RabbitMQ 绑定器

41.1 用法

要使用 RabbitMQ 绑定器,可以使用以下 maven 坐标将其添加到 Spring Cloud Stream 应用程序中:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

或者,你可以使用 Spring Cloud Stream RabbitMQ Starter,如下所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

41.2 RabbitMQ 绑定器概述

下面的简化图展示了 RabbitMQ 绑定器的操作方式:

图 41.1. RabbitMQ 绑定器

rabbit binder

默认情况下,RabbitMQ 绑定器实现将每个目标映射到 TopicExchange。对于每个消费者组,Queue 都绑定到该 TopicExchange。每个消费者实例组的Queue 都有一个对应的 RabbitMQ Consumer 实例。对于分区的生产者和消费者,队列的后缀是分区索引,并使用分区索引作为路由键。对于匿名使用者(没有 group 属性的使用者),使用自动删除队列(具有随机唯一名称)。

通过使用可选的 autoBindDlq 选项,可以配置绑定器以创建和配置死信队列(DLQ)(以及死信交换 DLX 以及路由基础结构)。默认情况下,死信队列具有目标的名称,并附加 .dlq。如果启用了重试(maxAttempts > 1),则在重试次数用完后,失败的消息将传递到 DLQ。如果禁用了重试(maxAttempts = 1),则应将 requeueRejected 设置为 false(默认值),以便将失败的消息路由到 DLQ,而不是重新排队。此外,republishToDlq 会导致绑定器将失败消息发布到 DLQ(而不是拒绝它)。此功能允许将其他信息(如 x-exception-stacktrace 头中的堆栈跟踪)添加到头中的消息中。此选项不需要启用重试。你可以在一次尝试后重新发布失败的消息。从 1.2 版开始,可以配置重新发布的消息的传递模式。请参阅 republishDeliveryMode 属性。

[Important] 重点

将 requeueRejected 设置为 true(其中 republishToDlq=false)会导致消息重新排队并持续重新传递,除非失败的原因是暂时的,否则这可能不是你想要的。通常,应该通过将 maxAttempts 设置为大于 1 或将 republishToDlq 设置为 true 来在绑定器中启用 retry。

有关这些属性的更多详细信息请参阅 小节 41.3.1, “RabbitMQ 绑定器属性”。

框架不提供任何标准机制来使用死信消息(或将它们重新路由回主队列)。小节 41.6, “死信队列处理” 中描述了一些选项。

[Note] Note

当在 Spring Cloud 流应用程序中使用多个 RabbitMQ 绑定器时,禁用 'RabbitAutoConfiguration' 非常重要,以避免将 RabbitAutoConfiguration 的相同配置应用于两个绑定器。可以使用 @SpringBootApplication 注解排除类。

从 2.0 版开始,RabbitMessageChannelBinder 将 RabbitTemplate.userPublisherConnection 属性设置为 true,以便非事务性生产者避免消费者出现死锁,如果缓存连接由于代理上的内存警报而被阻止,则可能会发生这种情况。

[Note] Note

目前,只有消息驱动的消费者才支持 multiplex 消费者(监听多个队列的单个消费者);被轮询的消费者只能从单个队列中检索消息。

41.3 配置选项

本节包含特定于 RabbitMQ 绑定器和绑定通道的设置。

有关常规绑定配置选项和属性,请参阅 Spring Cloud Stream 核心文档。

41.3.1 RabbitMQ 绑定器属性

默认情况下,RabbitMQ 绑定器使用 Spring Boot 的 ConnectionFactory。因此,它支持 RabbitMQ 的所有 Spring Boot 配置选项。(有关参考,请参阅 Spring Boot 文档)。RabbitMQ 配置选项使用 spring.rabbitmq 前缀。

除了 Spring Boot 选项外,RabbitMQ 绑定器还支持以下属性:

spring.cloud.stream.rabbit.binder.adminAddresses

以逗号分隔的 RabbitMQ 管理插件 URL 列表。仅在节点包含多个条目时使用。此列表中的每个条目必须在 spring.rabbitmq.addresses 中具有相应的条目。仅当使用 RabbitMQ 集群并希望从承载队列的节点使用时才需要。有关详细信息,请参阅队列关联和 LocalizedQueueConnectionFactory。

默认值: 空。

spring.cloud.stream.rabbit.binder.nodes

以逗号分隔的 RabbitMQ 节点名列表。当有多个条目时,用于定位队列所在的服务器地址。此列表中的每个条目必须在 spring.rabbitmq.addresses 中具有相应的条目。仅当使用 RabbitMQ 集群并希望从承载队列的节点使用时才需要。有关详细信息,请参阅 队列关联和 LocalizedQueueConnectionFactory。

默认值: 空。

spring.cloud.stream.rabbit.binder.compressionLevel

压缩绑定的压缩级别。请参阅 java.util.zip.Deflater。

默认值: 1 (BEST_LEVEL)。

spring.cloud.stream.binder.connection-name-prefix

用于命名此绑定器创建的连接的连接名前缀。名称是这个前缀,后跟 #n,其中 n 在每次打开新连接时递增。

默认值: none (Spring AMQP 默认值)。

41.3.2 RabbitMQ 消费者属性

以下属性仅适用于 Rabbit 消费者,必须以 spring.cloud.stream.rabbit.bindings.<channelName>.consumer. 作为前缀。

acknowledgeMode

acknowledge 模式。

默认值: AUTO。

autoBindDlq

是否自动声明 DLQ 并将其绑定到绑定器 DLX。

默认值: false。

bindingRoutingKey

将队列绑定到交换的路由键(如果 bindQueue 为 true)。对于已分区的目的地,将添加 -<instanceIndex>。

默认值: #。

bindQueue

是否将队列绑定到目标交换。如果你已经设置了自己的基础结构,并且以前已经创建并绑定了队列,那么将其设置为 false。

默认值: true。

consumerTagPrefix

用于创建消费者标签;将附加 #n,其中 n 为每个创建的消费者递增。示例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}。

默认值: none - 代理将生成随机的消费者标签。

deadLetterQueueName

DLQ 的名称

默认值: prefix+destination.dlq

deadLetterExchange

分配给队列的 DLX。仅当 autoBindDlq 为 true 时才相关。

默认值: 'prefix+DLX'

deadLetterExchangeType

要分配给队列的 DLX 的类型。仅当 autoBindDlq 为 true 时才相关。

默认值: 'direct'

deadLetterRoutingKey

要分配给队列的死信路由键。仅当 autoBindDlq 为 true 时才相关。

默认值: destination

declareDlx

是否申报目的地死信交换。仅当 autoBindDlq 为 true 时才相关。如果你有预配置的 DLX,则设置为 false。

默认值: true。

declareExchange

是否为目的地申报交换。

默认值: true。

delayedExchange

是否将交换声明为延迟的消息交换。需要代理上延迟的消息交换插件。x-delayed-type 参数设置为 exchangeType。

默认值: false。

dlqDeadLetterExchange

如果声明了 DLQ,则分配给该队列的 DLX。

默认值: none

dlqDeadLetterRoutingKey

如果声明了 DLQ,则为该队列分配一个死信路由键。

默认值: none

dlqExpires

删除未使用的死信队列之前的时间(毫秒)。

默认值: no expiration

dlqLazy

用 x-queue-mode=lazy 参数声明死信队列。请参阅“懒加载队列”。考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。

默认值: false。

dlqMaxLength

死信队列中的最大消息数。

默认值: no limit

dlqMaxLengthBytes

所有消息的死信队列中的最大总字节数。

默认值: no limit

dlqMaxPriority

死信队列中消息的最大优先级(0-255)。

默认值: none

dlqOverflowBehavior

超过 dlqMaxLength 或 dlqMaxLengthBytes 时要采取的操作;当前 drop-head 或 reject-publish,但请参阅 RabbitMQ 文档。

默认值: none

dlqTtl

声明死信队列时应用于死信队列的默认生存时间(毫秒)。

默认值: no limit

durableSubscription

订阅是否应持久。仅当 group 也设置时有效。

默认值: true。

exchangeAutoDelete

如果 declareExchange 为 true,则是否应自动删除交换(即,在删除最后一个队列后删除)。

默认值: true。

exchangeDurable

如果 declareExchange 为 true,则说明交换是否应该是持久的(即,它在代理重新启动之后仍然存在)。

默认值: true。

exchangeType

交换类型:direct、fanout 或 topic 用于非分区目的地,direct 或 topic 用于分区目的地。

默认值: topic。

exclusive

是否创建专属消费者。如果这是 true,并发性应该是 1。通常在需要严格排序时使用,但允许热备用实例在失败后接管。请参阅 recoveryInterval,它控制备用实例尝试使用的频率。

默认值: false.

expires

删除未使用的队列之前的时间(毫秒)。

默认值: no expiration

failedDeclarationRetryInterval

如果队列丢失,则尝试从队列中消耗的间隔(以毫秒为单位)。

默认值: 5000

headerPatterns

要从入站消息映射的头的模式。

默认值: ['*'] (所有头)。

lazy

用 x-queue-mode=lazy 参数声明队列。请参阅 “懒加载队列”。考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。

默认值: false。

maxConcurrency

最大消费者数。

默认值: 1。

maxLength

队列中的最大消息数。

默认值: no limit

maxLengthBytes

队列中所有消息的最大总字节数。

默认值: no limit

maxPriority

队列中消息的最大优先级(0-255)。

默认值: none

missingQueuesFatal

当找不到队列时,是否将该条件视为致命状态并停止监听器容器。默认值为 false,以便容器继续尝试从队列中使用 — 例如,当使用集群并且承载非 HA 队列的节点关闭时。

默认值: false

overflowBehavior

超过 maxLength 或 maxLengthBytes 时要采取的操作;当前 drop-head 或 reject-publish,但请参阅 RabbitMQ 文档。

默认值: none

prefetch

预取计数。

默认值: 1.

prefix

要添加到目标和队列名称的前缀。

默认值: "".

queueDeclarationRetries

如果队列丢失,则从该队列重试消费的次数。只有当 missingQueuesFatal 为 true 时才相关。否则,容器会无限期地重试。

默认值: 3

queueNameGroupOnly

如果为 true,则从名称等于 group 的队列中使用。否则,队列名称为 destination.group。例如,当使用 Spring Cloud Stream 从现有的 RabbitMQ 队列消费时,这很有用。

默认值: false。

recoveryInterval

连接恢复尝试之间的间隔(毫秒)。

默认值: 5000。

requeueRejected

当重试被禁用或 republishToDlq 为 false 时,传递失败是否应重新排队。

默认值: false。

republishDeliveryMode

当 republishToDlq 为 true 时,指定重新发布的消息的传递模式。

默认值: DeliveryMode.PERSISTENT

republishToDlq

默认情况下,重试失败的消息将被拒绝。如果配置了死信队列(DLQ),则 RabbitMQ 将失败消息(未更改)路由到 DLQ。如果设置为 true,绑定器会将失败的消息重新发布到具有其他头的 DLQ,包括最终失败原因的异常消息和堆栈跟踪。

默认值: false

transacted

是否使用事务渠道。

默认值: false。

ttl

声明时应用于队列的默认生存时间(毫秒)。

默认值: no limit

txSize

acks 之间的交货数量。

默认值: 1。

41.3.3 高级监听器容器配置

要设置未作为绑定或绑定属性公开的监听器容器属性,请向应用程序上下文添加类型为 ListenerContainerCustomizer 的单个 bean。将设置绑定器和绑定属性,然后调用自定义程序。自定义器(configure() 方法)提供了队列名称以及作为参数的使用者组。

41.3.4 Rabbit 生产者属性

以下属性仅适用于 Rabbit 生产者,并且必须以 spring.cloud.stream.rabbit.bindings.<channelName>.producer. 作为前缀。

autoBindDlq

是否自动声明 DLQ 并将其绑定到绑定器 DLX。

默认值: false。

batchingEnabled

是否启用生产者的消息批处理。消息根据以下属性(此列表中接下来的三个条目中描述)成批处理为一条消息:'batchSize'、batchBufferLimit 和 batchTimeout。有关的详细信息请参阅 Batching。

默认值: false。

batchSize

启用批处理时要缓冲的消息数。

默认值: 100。

batchBufferLimit

启用批处理时的最大缓冲区大小。

默认值: 10000。

batchTimeout

启用批处理时的批处理超时时间。

默认值: 5000。

bindingRoutingKey

将队列绑定到交换的路由键(如果 bindQueue 为 true)。仅适用于非分区目标。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: #。

bindQueue

是否将队列绑定到目标交换。如果你已经设置了自己的基础结构,并且以前已经创建并绑定了队列,那么将其设置为 false。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: true。

compress

发送时是否应压缩数据。

默认值: false。

deadLetterQueueName

只有在提供了 requiredGroups 之后,DLQ 的名称才适用于这些组。

默认值: prefix+destination.dlq

deadLetterExchange

分配给队列的 DLX。仅当 autoBindDlq 为 true 时才相关。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: 'prefix+DLX'

deadLetterExchangeType

要分配给队列的 DLX 的类型。仅当 autoBindDlq 为 true 时才相关。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: 'direct'

deadLetterRoutingKey

要分配给队列的死信路由键。仅当 autoBindDlq 为 true 时才相关。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: destination

declareDlx

是否申报目的地死信交换。仅当 autoBindDlq 为 true 时才相关。如果你有预配置的 DLX,则设置为 false。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: true。

declareExchange

是否为目的地申报交换。

默认值: true。

delayExpression

用于计算要应用于消息的延迟的 SpEL 表达式(x-delay 头)。如果交换不是延迟的消息交换,则不会产生任何影响。

默认值: 未设置 x-delay 头。

delayedExchange

是否将交换声明为延迟的消息交换。需要代理上延迟的消息交换插件。x-delayed-type 参数设置为 exchangeType。

默认值: false.

deliveryMode

投递方式。

默认值: PERSISTENT。

dlqDeadLetterExchange

当一个 DLQ 被声明时,分配给该队列的一个 DLX。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: none

dlqDeadLetterRoutingKey

当声明 DLQ 时,要分配给该队列的死信路由键。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: none

dlqExpires

删除未使用的死信队列之前的时间(毫秒)。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: no expiration

dlqLazy

用 x-queue-mode=lazy 参数声明死信队列。请参阅“懒加载队列”。考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

dlqMaxLength

死信队列中的最大消息数。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: no limit

dlqMaxLengthBytes

所有消息的死信队列中的最大总字节数。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: no limit

dlqMaxPriority

死信队列中消息的最大优先级(0-255)仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: none

dlqTtl

声明时应用于死信队列的默认生存时间(毫秒)。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: no limit

exchangeAutoDelete

如果 declareExchange 为 true,则交换是否应自动删除(在删除最后一个队列后删除)。

默认值: true。

exchangeDurable

如果 declareExchange 为 true,则说明交换是否应持久(在代理重新启动之后仍然有效)。

默认值: true。

exchangeType

交换类型:direct、fanout 或 topic 用于非分区目的地,direct 或 topic 用于分区目的地。

默认值: topic。

expires

删除未使用的队列之前的时间(毫秒)。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: no expiration

headerPatterns

要映射到出站消息的头的模式。

默认值: ['*'] (所有头).

lazy

用 x-queue-mode=lazy 参数声明队列。请参阅“懒加载队列”。考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: false。

maxLength

队列中的最大消息数。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: no limit

maxLengthBytes

队列中所有消息的最大总字节数。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: no limit

maxPriority

队列中消息的最大优先级(0-255)。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: none

prefix

要添加到目标交换名称的前缀。

默认值: ""。

queueNameGroupOnly

如果为 true,则从名称等于 group 的队列中使用。否则,队列名称为 destination.group。例如,当使用 Spring Cloud Stream 从现有的 RabbitMQ 队列消费时,这很有用。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: false。

routingKeyExpression

用于确定发布消息时要使用的路由密钥的 SpEL 表达式。对于固定路由密钥,请使用文本表达式,例如 properties 文件中的 routingKeyExpression='my.routingKey' 或 YAML 文件中的 routingKeyExpression: '''my.routingKey'''。

默认值: destination 或 destination-<partition> 用于分区目的地。

transacted

是否使用事务渠道。

默认值: false。

ttl

声明时应用于队列的默认生存时间(毫秒)。仅在提供了 requiredGroups 时适用,然后仅适用于这些组。

默认值: no limit

[Note] Note

对于 RabbitMQ,可以通过外部应用程序设置内容类型头。Spring Cloud Stream 支持它们作为扩展内部协议的一部分,用于任何类型的传输 — 包括不支持头文件的传输,例如 Kafka(0.11 之前)。

41.4 使用 RabbitMQ 绑定器重试

如果在绑定器中启用了重试,则监听器容器线程将在配置的任何回退期间挂起。当需要对单个消费者进行严格的订购时,这可能很重要。但是,对于其他用例,它会阻止在该线程上处理其他消息。使用绑定器重试的一个替代方法是设置死信,并在死信队列(DLQ)上设置生存时间以及 DLQ 本身的死信配置。有关此处讨论的属性的更多信息,请参阅 Section 41.3.1, “RabbitMQ 绑定器属性”。可以使用以下示例配置启用此功能:

  • 将 autoBindDlq 设置为 true。绑定器创建一个 DLQ。或者,可以在 deadLetterQueueName 中指定名称。
  • 将 dlqTtl 设置为在重新交付之间等待的后退时间。
  • 将 dlqDeadLetterExchange 设置为默认交换。来自 DLQ 的过期消息被路由到原始队列,因为默认的 deadLetterRoutingKey 是队列名称(destination.group)。设置为默认交换是通过设置不带值的属性来实现的,如下一个示例所示。

若要强制将消息设置为死信,请抛出 AmqpRejectAndDontRequeueException,或者将 requeueRejected 设置为 true(默认值)并引发任何异常。

循环继续进行,没有结束,这对于暂时性的问题是很好的,但是你可能希望在一些尝试之后放弃。幸运的是,RabbitMQ 提供了 x-death 头,它允许你确定已经发生了多少个周期。

要在放弃后确认消息,请立即抛出一个 ImmediateAcknowledgeAmqpException。

41.4.1 把它们放在一起

以下配置创建一个交互 myDestination,其中队列 myDestination.consumerGroup 绑定到一个主题交换,并使用通配符路由键 #:

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

此配置创建绑定到直接交换(DLX)的 DLQ,路由键为 myDestination.consumerGroup。当消息被拒绝时,它们被路由到 DLQ。5 秒后,消息将过期,并使用队列名称作为路由键路由到原始队列,如下面的示例所示:

Spring Boot application. 

@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
        if (death != null && death.get("count").equals(3L)) {
            // giving up - don't send to DLX
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
        throw new AmqpRejectAndDontRequeueException("failed");
    }

}

注意,x-death 头中的 count 属性是 Long 的。

41.5 错误通道

从 1.3 版开始,绑定器无条件地将异常发送到每个消费者目标的错误通道,还可以配置为将异步生产者发送失败发送到错误通道。

RabbitMQ 有两种类型的发送失败:

  • 返回的消息
  • 否定确认的推送者确认。

后者是罕见的。根据 RabbitMQ 文档,“只有在负责队列的 Erlang 进程中发生内部错误时,才会传递 [A nack]。”

以及启用生产者错误通道,如果正确配置了连接工厂,RabbitMQ 绑定器只向通道发送消息,如下所示:

  • ccf.setPublisherConfirms(true);
  • ccf.setPublisherReturns(true);

为连接工厂使用 Spring Boot 配置时,请设置以下属性:

  • spring.rabbitmq.publisher-confirms
  • spring.rabbitmq.publisher-returns

返回消息的 ErrorMessage 的有效负载是具有以下属性的 ReturnedAmqpMessageException:

  • failedMessage: 失败时会发送 spring-messaging Message<?>。
  • amqpMessage: 原始 spring-amqp Message。
  • replyCode: 指示失败原因的整数值(例如,312 - 无路由)。
  • replyText: 指示失败原因的文本值(例如,NO_ROUTE)。
  • exchange: 消息发布到的交换。
  • routingKey: 发布消息时使用的路由键。

对于否定确认,有效负载是一个 NackedAmqpMessageException,具有以下属性:

  • failedMessage: 失败时会发送 spring-messaging Message<?>。
  • nackReason: 原因(如果可用 — 你可能需要检查代理日志以了解更多信息)。

这些异常没有自动处理(例如发送到死信队列)。你可以使用自己的 Spring 集成流来使用这些异常。

41.6 死信队列处理

由于你无法预测用户希望如何处理死信消息,因此框架不提供任何标准机制来处理这些死信消息。如果死信的原因是暂时的,你可能希望将消息路由回原始队列。但是,如果问题是一个永久性的问题,那么可能会导致无限循环。下面的 Spring Boot 应用程序展示了如何将这些消息路由回原始队列,但在三次尝试之后将它们移到第三个 “停车场” 队列。第二个示例使用 RabbitMQ 延迟消息交换为重新排队的消息引入延迟。在这个例子中,每次尝试的延迟都会增加。这些示例使用 @RabbitListener 从 DLQ 接收消息。你还可以在批处理过程中使用 RabbitTemplate.receive()。

示例假设原始目的地是 so8400in,而消费者组是 so8400。

41.6.1 非分区目标

前两个示例用于未分区目标的情况:

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

41.6.2 分区的目标

对于分区目标,所有分区都有一个 DLQ。我们从头确定原始队列。

republishToDlq=false

当 republishToDlq 为 false 时,RabbitMQ 使用包含原始目标信息的 x-death 头将消息发布到 DLX/DLQ,如下例所示:

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@SuppressWarnings("unchecked")
	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

republishToDlq=true

当 republishToDlq 为 true 时,重新发布恢复程序会将原始交换和路由键添加到头中,如下例所示:

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

41.7 使用 RabbitMQ 绑定器进行分区

RabbitMQ 不支持原生分区。

有时,将数据发送到特定分区是有利的 — 例如,当您想要严格地订购消息处理时,特定客户的所有消息都应该转到同一分区。

RabbitMessageChannelBinder 通过将每个分区的队列绑定到目标交换来提供分区。

下面的 Java 和 YAML 示例展示如何配置生产者:

Producer. 

@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}

application.yml. 

    spring:
      cloud:
        stream:
          bindings:
            output:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

[Note] Note

执行示例中的配置使用默认分区(key.hashCode() % partitionCount)。这可能提供或不提供适当的均衡算法,具体取决于键值。你可以使用 partitionSelectorExpression 或 partitionSelectorClass 属性覆盖此默认值。

只有在部署生产者时需要设置消费者队列时,才需要 required-groups 属性。否则,在部署相应的消费者之前,发送到分区的任何消息都将丢失。

以下配置提供主题交换:

part exchange

下列队列绑定到该交换:

part queues

下列绑定将队列与交换关联:

part bindings

下面的 Java 和 YAML 示例继续前面的示例,并展示如何配置消费者:

Consumer. 

@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in + " received from queue " + queue);
    }

}

application.yml. 

    spring:
      cloud:
        stream:
          bindings:
            input:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0

[Important] 重点

RabbitMessageChannelBinder 不支持动态缩放。每个分区必须至少有一个消费者。消费者的 instanceIndex 用于指示使用了哪个分区。像 CloudFoundry 这样的平台只能有一个实例具有 instanceIndex。