要使用 RabbitMQ 绑定器,可以使用以下 maven 坐标将其添加到 Spring Cloud Stream 应用程序中: <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> 或者,你可以使用 Spring Cloud Stream RabbitMQ Starter,如下所示: <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> 下面的简化图展示了 RabbitMQ 绑定器的操作方式:
默认情况下,RabbitMQ 绑定器实现将每个目标映射到
通过使用可选的
有关这些属性的更多详细信息请参阅 小节 41.3.1, “RabbitMQ 绑定器属性”。 框架不提供任何标准机制来使用死信消息(或将它们重新路由回主队列)。小节 41.6, “死信队列处理” 中描述了一些选项。
从 2.0 版开始,
本节包含特定于 RabbitMQ 绑定器和绑定通道的设置。 有关常规绑定配置选项和属性,请参阅 Spring Cloud Stream 核心文档。
默认情况下,RabbitMQ 绑定器使用 Spring Boot 的 除了 Spring Boot 选项外,RabbitMQ 绑定器还支持以下属性:
以下属性仅适用于 Rabbit 消费者,必须以
要设置未作为绑定或绑定属性公开的监听器容器属性,请向应用程序上下文添加类型为
以下属性仅适用于 Rabbit 生产者,并且必须以
如果在绑定器中启用了重试,则监听器容器线程将在配置的任何回退期间挂起。当需要对单个消费者进行严格的订购时,这可能很重要。但是,对于其他用例,它会阻止在该线程上处理其他消息。使用绑定器重试的一个替代方法是设置死信,并在死信队列(DLQ)上设置生存时间以及 DLQ 本身的死信配置。有关此处讨论的属性的更多信息,请参阅 Section 41.3.1, “RabbitMQ 绑定器属性”。可以使用以下示例配置启用此功能:
若要强制将消息设置为死信,请抛出
循环继续进行,没有结束,这对于暂时性的问题是很好的,但是你可能希望在一些尝试之后放弃。幸运的是,RabbitMQ 提供了
要在放弃后确认消息,请立即抛出一个
以下配置创建一个交互 --- spring.cloud.stream.bindings.input.destination=myDestination spring.cloud.stream.bindings.input.group=consumerGroup #disable binder retries spring.cloud.stream.bindings.input.consumer.max-attempts=1 #dlx/dlq setup spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000 spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange= ---
此配置创建绑定到直接交换( Spring Boot application. @SpringBootApplication @EnableBinding(Sink.class) public class XDeathApplication { public static void main(String[] args) { SpringApplication.run(XDeathApplication.class, args); } @StreamListener(Sink.INPUT) public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) { if (death != null && death.get("count").equals(3L)) { // giving up - don't send to DLX throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts"); } throw new AmqpRejectAndDontRequeueException("failed"); } }
注意, 从 1.3 版开始,绑定器无条件地将异常发送到每个消费者目标的错误通道,还可以配置为将异步生产者发送失败发送到错误通道。 RabbitMQ 有两种类型的发送失败:
后者是罕见的。根据 RabbitMQ 文档,“只有在负责队列的 Erlang 进程中发生内部错误时,才会传递 [A nack]。” 以及启用生产者错误通道,如果正确配置了连接工厂,RabbitMQ 绑定器只向通道发送消息,如下所示:
为连接工厂使用 Spring Boot 配置时,请设置以下属性:
返回消息的
对于否定确认,有效负载是一个
这些异常没有自动处理(例如发送到死信队列)。你可以使用自己的 Spring 集成流来使用这些异常。
由于你无法预测用户希望如何处理死信消息,因此框架不提供任何标准机制来处理这些死信消息。如果死信的原因是暂时的,你可能希望将消息路由回原始队列。但是,如果问题是一个永久性的问题,那么可能会导致无限循环。下面的 Spring Boot 应用程序展示了如何将这些消息路由回原始队列,但在三次尝试之后将它们移到第三个 “停车场” 队列。第二个示例使用 RabbitMQ 延迟消息交换为重新排队的消息引入延迟。在这个例子中,每次尝试的延迟都会增加。这些示例使用
示例假设原始目的地是 前两个示例用于未分区目标的情况: @SpringBootApplication public class ReRouteDlqApplication { private static final String ORIGINAL_QUEUE = "so8400in.so8400"; private static final String DLQ = ORIGINAL_QUEUE + ".dlq"; private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; private static final String X_RETRIES_HEADER = "x-retries"; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Hit enter to terminate"); System.in.read(); context.close(); } @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER); if (retriesHeader == null) { retriesHeader = Integer.valueOf(0); } if (retriesHeader < 3) { failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1); this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage); } else { this.rabbitTemplate.send(PARKING_LOT, failedMessage); } } @Bean public Queue parkingLot() { return new Queue(PARKING_LOT); } } @SpringBootApplication public class ReRouteDlqApplication { private static final String ORIGINAL_QUEUE = "so8400in.so8400"; private static final String DLQ = ORIGINAL_QUEUE + ".dlq"; private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; private static final String X_RETRIES_HEADER = "x-retries"; private static final String DELAY_EXCHANGE = "dlqReRouter"; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Hit enter to terminate"); System.in.read(); context.close(); } @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders(); Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER); if (retriesHeader == null) { retriesHeader = Integer.valueOf(0); } if (retriesHeader < 3) { headers.put(X_RETRIES_HEADER, retriesHeader + 1); headers.put("x-delay", 5000 * retriesHeader); this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage); } else { this.rabbitTemplate.send(PARKING_LOT, failedMessage); } } @Bean public DirectExchange delayExchange() { DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE); exchange.setDelayed(true); return exchange; } @Bean public Binding bindOriginalToDelay() { return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE); } @Bean public Queue parkingLot() { return new Queue(PARKING_LOT); } } 对于分区目标,所有分区都有一个 DLQ。我们从头确定原始队列。
当 @SpringBootApplication public class ReRouteDlqApplication { private static final String ORIGINAL_QUEUE = "so8400in.so8400"; private static final String DLQ = ORIGINAL_QUEUE + ".dlq"; private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; private static final String X_DEATH_HEADER = "x-death"; private static final String X_RETRIES_HEADER = "x-retries"; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Hit enter to terminate"); System.in.read(); context.close(); } @Autowired private RabbitTemplate rabbitTemplate; @SuppressWarnings("unchecked") @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders(); Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER); if (retriesHeader == null) { retriesHeader = Integer.valueOf(0); } if (retriesHeader < 3) { headers.put(X_RETRIES_HEADER, retriesHeader + 1); List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER); String exchange = (String) xDeath.get(0).get("exchange"); List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys"); this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage); } else { this.rabbitTemplate.send(PARKING_LOT, failedMessage); } } @Bean public Queue parkingLot() { return new Queue(PARKING_LOT); } }
当 @SpringBootApplication public class ReRouteDlqApplication { private static final String ORIGINAL_QUEUE = "so8400in.so8400"; private static final String DLQ = ORIGINAL_QUEUE + ".dlq"; private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; private static final String X_RETRIES_HEADER = "x-retries"; private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE; private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Hit enter to terminate"); System.in.read(); context.close(); } @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders(); Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER); if (retriesHeader == null) { retriesHeader = Integer.valueOf(0); } if (retriesHeader < 3) { headers.put(X_RETRIES_HEADER, retriesHeader + 1); String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER); String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER); this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage); } else { this.rabbitTemplate.send(PARKING_LOT, failedMessage); } } @Bean public Queue parkingLot() { return new Queue(PARKING_LOT); } } RabbitMQ 不支持原生分区。 有时,将数据发送到特定分区是有利的 — 例如,当您想要严格地订购消息处理时,特定客户的所有消息都应该转到同一分区。
下面的 Java 和 YAML 示例展示如何配置生产者: Producer. @SpringBootApplication @EnableBinding(Source.class) public class RabbitPartitionProducerApplication { private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final String[] data = new String[] { "abc1", "def1", "qux1", "abc2", "def2", "qux2", "abc3", "def3", "qux3", "abc4", "def4", "qux4", }; public static void main(String[] args) { new SpringApplicationBuilder(RabbitPartitionProducerApplication.class) .web(false) .run(args); } @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000")) public Message<?> generate() { String value = data[RANDOM.nextInt(data.length)]; System.out.println("Sending: " + value); return MessageBuilder.withPayload(value) .setHeader("partitionKey", value) .build(); } } application.yml. spring: cloud: stream: bindings: output: destination: partitioned.destination producer: partitioned: true partition-key-expression: headers['partitionKey'] partition-count: 2 required-groups: - myGroup
以下配置提供主题交换: ![]() 下列队列绑定到该交换: ![]() 下列绑定将队列与交换关联: ![]() 下面的 Java 和 YAML 示例继续前面的示例,并展示如何配置消费者: Consumer. @SpringBootApplication @EnableBinding(Sink.class) public class RabbitPartitionConsumerApplication { public static void main(String[] args) { new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class) .web(false) .run(args); } @StreamListener(Sink.INPUT) public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) { System.out.println(in + " received from queue " + queue); } } application.yml. spring: cloud: stream: bindings: input: destination: partitioned.destination group: myGroup consumer: partitioned: true instance-index: 0
|