29. 编程模型

要理解编程模型,你应该熟悉以下核心概念:

  • Destination Binders: 负责提供与外部消息传递系统集成的组件。
  • Destination Bindings: 外部消息传递系统和应用程序提供的消息 Producers 和 Consumers(由目的地绑定器创建)之间的桥梁。
  • Message: 生产者和消费者用来与目的地绑定器(以及通过外部消息传递系统的其他应用程序)通信的规范数据结构。
SCSt overview

29.1 目的地绑定器

目的地绑定器是 Spring Cloud Stream 的扩展组件,负责提供必要的配置和实现,以促进与外部消息传递系统的集成。这种集成负责连接、委派和路由来自生产者和消费者的消息、数据类型转换、用户代码调用等。

绑定器处理很多样板文件的责任,否则将落在你的肩上。然而,要实现这一点,绑定器仍然需要一些以极简的形式提供的帮助,需要用户提供一组指令,这些指令通常以某种配置的形式提供。

虽然讨论所有可用的绑定器和绑定配置选项(手册的其余部分对它们进行了广泛的介绍)超出了本节的范围,但是 Destination Binding 确实需要特别注意。下一节将详细讨论它。

29.2 目标绑定

如前所述,Destination Bindings 在外部消息传递系统和应用程序提供的 Producers 和 Consumers 之间提供了一个桥梁。

将 @EnableBinding 注解应用到应用程序的一个配置类中,可以定义目标绑定。@EnableBinding 注解本身使用 @Configuration 进行元注解,并触发 Spring Cloud Stream 基础设施的配置。

下面的示例展示了一个完全配置且运行正常的 Spring Cloud Stream 应用程序,该应用程序以 String 类型从 INPUT 目的地接收消息的有效负载(请参阅 小节 32, 内容类型协商部分 部分),将其记录到控制台,并在将其转换为大写后将其发送到 OUTPUT 目的地。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyApplication {

	public static void main(String[] args) {
		SpringApplication.run(MyApplication.class, args);
	}

	@StreamListener(Processor.INPUT)
	@SendTo(Processor.OUTPUT)
	public String handle(String value) {
		System.out.println("Received: " + value);
		return value.toUpperCase();
	}
}

如你所见,@EnableBinding 注解可以采用一个或多个接口类作为参数。这些参数被称为 bindings,它们包含表示 bindable components 的方法。这些组件通常是基于通道的绑定器(如 RRabbitabbit、Kafka 和其他)的消息通道(参见 Spring Messaging)。但是,其他类型的绑定可以为相应技术的本地功能提供支持。例如,Kafka Streams binder(以前称为 KStream)允许直接将本机绑定到 Kafka Streams(有关详细信息请参阅 Kafka Streams)。

Spring Cloud Stream 已经为典型的消息交换契约提供了 binding 接口,其中包括:

  • Sink: 通过提供消息的使用目的地来标识消息消费者的协定。
  • Source: 通过提供将生成的消息发送到的目标,标识消息生产者的约定。
  • Processor: 通过公开允许消息消费和生成的两个目的地,封装接收器和源的约定。
public interface Sink {

  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}
public interface Source {

  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();
}
public interface Processor extends Source, Sink {}

虽然前面的示例满足大多数情况,但是你也可以通过定义自己的绑定接口来定义自己的约定,并使用 @Input 和 @Output 注解来标识实际的 bindable components。

例如:

public interface Barista {

    @Input
    SubscribableChannel orders();

    @Output
    MessageChannel hotDrinks();

    @Output
    MessageChannel coldDrinks();
}

使用前面示例中所示的接口作为 @EnableBinding 的参数,将分别触发名为 orders、hotDrinks 和 coldDrinks 的三个绑定通道的创建。

你可以根据需要提供任意多的绑定接口,作为 @EnableBinding 注解的参数,如下例所示:

@EnableBinding(value = { Orders.class, Payment.class })

在 Spring Cloud Stream 中,可绑定的 MessageChannel 组件是 Spring Messaging MessageChannel(用于出站)及其扩展 SubscribableChannel(用于入站)。

可轮询目标绑定

虽然前面描述的绑定支持基于事件的消息消耗,但有时你需要更多的控制,例如消耗率。

从 2.0 版开始,现在可以绑定可轮询的消费者:

下面的示例展示如何绑定可轮询的消费者:

public interface PolledBarista {

    @Input
    PollableMessageSource orders();
	. . .
}

在这种情况下,PollableMessageSource 的实现绑定到 orders “channel”。更多详细信息请参阅 小节 29.3.5, “使用轮询的消费者”。

自定义通道名称

通过使用 @Input 和 @Output 注解,可以为通道指定自定义的通道名称,如下例所示:

public interface Barista {
    @Input("inboundOrders")
    SubscribableChannel orders();
}

在前面的示例中,创建的绑定通道命名为 inboundOrders。

通常,你不需要直接访问单个通道或绑定(然后通过 @EnableBinding 注解配置它们)。不过,有时也会出现这种情况,比如测试或其他特殊案例。

除了为每个绑定生成通道并将其注册为 Spring Bean 之外,对于每个绑定的接口,Spring Cloud Stream 还生成一个实现接口的 bean。这意味着你可以通过在应用程序中注入来访问表示绑定或单个通道的接口,如下两个示例所示:

注入绑定接口

@Autowire
private Source source

public void sayHello(String name) {
    source.output().send(MessageBuilder.withPayload(name).build());
}

注入单个通道

@Autowire
private MessageChannel output;

public void sayHello(String name) {
    output.send(MessageBuilder.withPayload(name).build());
}

您还可以使用标准 Spring 的 @Qualifier 注解,用于定制通道名称或需要特定名称通道的多个通道方案中的情况。

下面的示例展示如何以这种方式使用 @Qualifier 注解:

@Autowire
@Qualifier("myChannel")
private MessageChannel output;

29.3 生成和消费消息

你可以使用 Spring 集成注解或 Spring Cloud Stream 本地注解来编写 Spring Cloud Stream 应用程序。

29.3.1 Spring 集成支持

Spring Cloud Stream 建立在由企业集成模式定义的概念和模式的基础上,其内部实现依赖于 Spring 项目组合(Spring 集成框架)中已经建立并流行的企业集成模式的实现。

因此,支持 Spring 集成已经建立的基础、语义和配置选项是很自然的。

例如,可以将 Source 的输出通道附加到 MessageSource,并使用熟悉的 @InboundChannelAdapter 注解,如下所示:

@EnableBinding(Source.class)
public class TimerSource {

  @Bean
  @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
  public MessageSource<String> timerMessageSource() {
    return () -> new GenericMessage<>("Hello Spring Cloud Stream");
  }
}

同样,在为 Processor 绑定约定提供消息处理程序方法的实现时,可以使用 @Transformer 或 @ServiceActivator,如下例所示:

@EnableBinding(Processor.class)
public class TransformProcessor {
  @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
  public Object transform(String message) {
    return message.toUpperCase();
  }
}
[Note] Note

虽然这可能有点超前,但必须理解的是,当你使用 @StreamListener 注解从同一绑定使用时,会使用 pub-sub 模型。用 @StreamListener 注解的每个方法都会收到自己的消息副本,并且每个方法都有自己的消费者组。但是,如果你使用一个 Spring 集成注解(如 @Aggregator、@Transformer 或 @ServiceActivator)使用同一绑定,那么这些注解将在竞争模型中使用。不会为每个订阅创建单独的使用者组。

29.3.2 使用 @StreamListener 注解

作为对 Spring 集成支持的补充,Spring Cloud Stream 提供了自己的 @streamListener 注解,该注解是根据其他 Spring 消息注解(@MessageMapping、@JmsListener、@RabbitListener 和其他注解)建模的,并提供了基于内容的路由和其他方便方法。

@EnableBinding(Sink.class)
public class VoteHandler {

  @Autowired
  VotingService votingService;

  @StreamListener(Sink.INPUT)
  public void handle(Vote vote) {
    votingService.record(vote);
  }
}

与其他 Spring 消息传递方法一样,方法参数可以用 @Payload、@Headers 和 @Header 注解。

对于返回数据的方法,必须使用 @SendTo 注解为该方法返回的数据指定输出绑定目标,如下例所示:

@EnableBinding(Processor.class)
public class TransformProcessor {

  @Autowired
  VotingService votingService;

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}

29.3.3 使用 @StreamListener 进行基于内容的路由

Spring Cloud Stream 支持将消息分派到多个处理程序方法,这些方法根据条件用 @StreamListener 注解。

为了有资格支持条件分派,方法必须满足以下条件:

  • 它不能返回值。
  • 它必须是单独的消息处理方法(不支持响应式 API 方法)。

条件由注解的 condition 参数中的 SpEL 表达式指定,并对每条消息进行计算。所有与条件匹配的处理程序都在同一个线程中调用,并且不必对调用发生的顺序进行假设。

在具有调度条件的 @StreamListener 的以下示例中,所有具有值 bogey 的头 type 的消息都被调度到 receiveBogey 方法,并且所有具有值 bacall 的头 type 的消息都被调度到 receiveBacall 方法。

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")
    public void receiveBacall(@Payload BacallPojo bacallPojo) {
       // handle the message
    }
}

condition 上下文中的内容类型协商

使用 @StreamListener 的 condition 参数了解基于内容的路由背后的一些机制是很重要的,尤其是在整个消息类型的上下文中。 如果你在继续之前熟悉小节 32, 内容类型协商,也会有所帮助。

考虑以下情况:

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class CatsAndDogs {

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Dog'")
    public void bark(Dog dog) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Cat'")
    public void purr(Cat cat) {
       // handle the message
    }
}

上述代码完全有效。它编译和部署没有任何问题,但是它从来不会产生你期望的结果。

那是因为你正在测试一些还没有在你期望的状态中存在的东西。这是因为消息的有效负载尚未从有线格式(byte[])转换为所需类型。换句话说,它还没有完成 小节 32, 内容类型协商 中描述的类型转换过程。

因此,除非使用计算原始数据的 SPeL 表达式(例如,字节数组中第一个字节的值),否则请使用基于消息头的表达式(例如,condition = "headers['type']=='dog'")。

[Note] Note

目前,通过 @StreamListener 条件进行调度仅支持基于通道的绑定(不支持响应式编程)。

29.3.4 Spring Cloud 函数支持

从 Spring Cloud Stream V2.1 开始,定义 stream handlers 和 sources 的另一种选择是使用 Spring Cloud Function 的内置支持,它们可以被表示为 java.util.function.[Supplier/Function/Consumer] 类型的 bean。

要指定要绑定到绑定公开的外部目标的功能 bean,必须提供 spring.cloud.stream.function.definition 属性。

下面是将消息处理程序公开为 java.util.function.Function 的处理器应用程序示例。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class, "--spring.cloud.stream.function.definition=toUpperCase");
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

在上面的内容中,我们简单地定义了一个名为 toUpperCase 的 java.util.function.Function 类型的 bean,并将其标识为要用作消息处理程序的 bean,其 'input' 和 'output' 必须绑定到由处理器绑定公开的外部目标。

下面是支持源、处理器和接收器的简单功能应用程序的示例。

下面是定义为 java.util.function.Supplier 的源应用程序示例

@SpringBootApplication
@EnableBinding(Source.class)
public static class SourceFromSupplier {
	public static void main(String[] args) {
		SpringApplication.run(SourceFromSupplier.class, "--spring.cloud.stream.function.definition=date");
	}
	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

下面是定义为 java.util.function.Function 的处理器应用程序的示例

@SpringBootApplication
@EnableBinding(Processor.class)
public static class ProcessorFromFunction {
	public static void main(String[] args) {
		SpringApplication.run(ProcessorFromFunction.class, "--spring.cloud.stream.function.definition=toUpperCase");
	}
	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

下面是定义为 java.util.function.Consumer 的接收器应用程序的示例

@EnableAutoConfiguration
@EnableBinding(Sink.class)
public static class SinkFromConsumer {
	public static void main(String[] args) {
		SpringApplication.run(SinkFromConsumer.class, "--spring.cloud.stream.function.definition=sink");
	}
	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}

函数组合

使用这个编程模型,你还可以从函数组合中获益,在函数组合中,你可以从一组简单函数中动态地组合复杂的处理程序。作为一个例子,我们将下面的函数 bean 添加到上面定义的应用程序中。

@Bean
public Function<String, String> wrapInQuotes() {
	return s -> "\"" + s + "\"";
}

并修改 spring.cloud.stream.function.definition 属性,以反映你从 ‘toUpperCase’ 和 ‘wrapInQuotes’ 组合新函数的意图。要执行 Spring Cloud 函数,你可以使用 |(管道)符号。为了完成我们的示例,我们的属性现在将如下所示:

—spring.cloud.stream.function.definition=toUpperCase|wrapInQuotes

29.3.5 使用轮询的消费者

概述

使用轮询的消费者时,将按需轮询 PollableMessageSource。考虑以下轮询的消费者的例子:

public interface PolledConsumer {

    @Input
    PollableMessageSource destIn();

    @Output
    MessageChannel destOut();

}

考虑到前面示例中的轮询的消费者,你可以按如下方式使用它:

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure
            }
        }
    };
}

PollableMessageSource.poll() 方法接受 MessageHandler 参数(通常是 lambda 表达式,如图所示)。如果接收到消息并成功处理,则返回 true。

与消息驱动的消费者一样,如果消息处理程序抛出异常,则消息将发布到错误通道,如 “???” 中所述。

通常,poll() 方法在 MessageHandler 退出时确认消息。如果方法异常退出,则消息将被拒绝(而不是重新排队),但请参阅 “错误处理” 部分。你可以通过对确认承担责任来覆盖该行为,如下例所示:

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}
[Important] 重点

你必须在某个时刻 ack(或 nack)消息,以避免资源泄漏。

[Important] 重点

一些消息传递系统(如 Apache Kafka)在日志中维护一个简单的偏移量。如果传递失败,并使用 StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE); 重新排队,则会重新传递任何稍后成功确认的消息。

还有一个重载的 poll 方法,其定义如下:

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

type 是一个转换提示,允许转换传入消息有效负载,如下面的示例所示:

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});

错误处理

默认情况下,为可轮询源配置一个错误通道;如果回调引发异常,则会向错误通道发送一条 ErrorMessage(<destination>.<group>.errors);此错误通道也桥接到全局 Spring 集成 errorChannel。

你可以使用 @ServiceActivator 订阅任何一个错误通道来处理错误;如果不订阅,则只会记录错误并确认消息成功。如果错误通道服务激活器引发异常,则消息将被拒绝(默认情况下),并且不会被重新传递。如果服务激活器抛出 RequeueCurrentMessageException,则消息将在代理处重新排队,并在随后的轮询中再次检索。

如果监听器直接抛出 RequeueCurrentMessageException,则消息将被重新排队,如上所述,并且不会发送到错误通道。

29.4 错误处理

错误会发生,Spring Cloud Stream 提供了几种灵活的机制来处理它们。错误处理有两种方式:

  • application: 错误处理在应用程序中完成(自定义错误处理程序)。
  • system: 错误处理委托给绑定器(重新排队、DL 和其他)。注意,这些技术依赖于绑定器实现和底层消息传递中间件的功能。

Spring Cloud Stream 使用 Spring Retry 库来促进成功的消息处理。有关详细信息,请参阅 小节 29.4.3, “重试模板”。但是,当所有操作都失败时,消息处理程序抛出的异常将传播回绑定器。此时,绑定器调用自定义错误处理程序或将错误传递回消息传递系统(重新排队、DLQ 和其他)。

29.4.1 应用程序错误处理

有两种类型的应用程序级错误处理。可以在每个绑定订阅处处理错误,或者全局处理程序可以处理所有绑定订阅错误。让我们回顾一下细节。

图 29.1. 带有自定义和全局错误处理程序的 Spring Cloud Stream 接收器应用程序

custom vs global error channels

对于每个输入绑定,Spring Cloud Stream 使用以下语义创建一个专用的错误通道 <destinationName>.errors。

[Note] Note

<destinationName> 由绑定的名称(如 input)和组的名称(如 myGroup)组成。

考虑以下事项:

spring.cloud.stream.bindings.input.group=myGroup
@StreamListener(Sink.INPUT) // destination name 'input.myGroup'
public void handle(Person value) {
	throw new RuntimeException("BOOM!");
}

@ServiceActivator(inputChannel = Processor.INPUT + ".myGroup.errors") //channel name 'input.myGroup.errors'
public void error(Message<?> message) {
	System.out.println("Handling ERROR: " + message);
}

在前面的示例中,目标名称是 input.myGroup,而专用的错误通道名称是 input.myGroup.errors。

[Note] Note

@StreamListener 注解的使用专门用于定义连接内部通道和外部目标的绑定。考虑到特定于目的地的错误通道没有关联的外部目的地,这种通道是 Spring 集成(SI)的特权。这意味着,必须使用一个 SI 处理程序注解(即 @ServiceActivator、@Transformer 等)定义此类目标的处理程序。

[Note] Note

如果未指定 group,则使用匿名组(如 input.anonymous.2K37rb06Q6m2r51-SPIDDQ),这不适用于错误处理场景,因为在创建目标之前你不知道它将是什么。

此外,如果你绑定到现有目标,例如:

spring.cloud.stream.bindings.input.destination=myFooDestination
spring.cloud.stream.bindings.input.group=myGroup

完整的目标名称是 myFooDestination.myGroup,然后专用的错误通道名称是 myFooDestination.myGroup.errors。

回到示例…​

订阅名为 input 的通道的 handle(..) 方法引发异常。如果还存在错误通道 input.myGroup.errors 的订阅服务器,则所有错误消息都由该订阅服务器处理。

如果有多个绑定,则可能需要一个错误处理程序。Spring Cloud Stream 通过将每个错误通道桥接到名为 errorChannel 的通道自动提供对全局错误通道的支持,允许单个订阅处理所有错误,如下例所示:

@StreamListener("errorChannel")
public void error(Message<?> message) {
	System.out.println("Handling ERROR: " + message);
}

如果错误处理逻辑相同,不管哪个处理程序产生了错误,这可能是一个方便的选项。

29.4.2 系统错误处理

系统级的错误处理意味着错误被传回消息传递系统,并且,考虑到并非每个消息传递系统都是相同的,不同的绑定器之间的功能可能不同。

也就是说,在本节中,我们将解释系统级错误处理背后的一般思想,并以 Rabbit 绑定器为例。注意:Kafka 绑定器提供了类似的支持,尽管一些配置属性确实不同。此外,有关详细信息和配置选项,请参阅各个绑定器的文档。

如果没有配置内部错误处理程序,则错误将传播到绑定器,绑定器随后将这些错误传播回消息传递系统。根据消息传递系统的功能,此类系统可能会丢弃消息、重新排队以重新处理消息或将失败消息发送给 DLQ。Rabbit 和 Kafka 都支持这些概念。但是,其他绑定器可能不会,因此有关支持的系统级错误处理选项的详细信息,请参阅单个绑定器的文档。

丢弃失败消息

默认情况下,如果没有提供额外的系统级配置,消息传递系统将删除失败的消息。虽然在某些情况下是可以接受的,但在大多数情况下是不能接受的,我们需要一些恢复机制来避免消息丢失。

DLQ - 死信队列

DLQ 允许失败的消息被发送到一个特殊的目的地:死信队列。

配置后,失败的消息将发送到此目标,以进行后续的重新处理或审核和调节。

例如,继续上一个示例并使用 Rabbit 绑定器设置 DLQ,需要设置以下属性:

spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true

请记住,在上述属性中,input 对应于输入目标绑定的名称。consumer 表示它是消费者属性,而 auto-bind-dlq 指示绑定器为 input 目的地配置 DLQ,这将导致名为 input.myGroup.dlq 的额外 Rabbit 队列。

配置后,所有失败的消息都将路由到此队列,错误消息类似于:

delivery_mode:	1
headers:
x-death:
count:	1
reason:	rejected
queue:	input.hello
time:	1522328151
exchange:
routing-keys:	input.myGroup
Payload {"name”:"Bob"}

正如你从上面看到的,原始消息将被保留,以供进一步操作。

然而,你可能注意到的一件事是,对于消息处理的原始问题,信息是有限的。例如,看不到与原始错误对应的堆栈跟踪。要获取有关原始错误的更多相关信息,必须设置其他属性:

spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=true

这样做会强制内部错误处理程序截获错误消息,并在将其发布到 DLQ 之前向其添加其他信息。配置后,可以看到错误消息包含与原始错误相关的更多信息,如下所示:

delivery_mode:	2
headers:
x-original-exchange:
x-exception-message:	has an error
x-original-routingKey:	input.myGroup
x-exception-stacktrace:	org.springframework.messaging.MessageHandlingException: nested exception is
      org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
      headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
      deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
      amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
      at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
      at. . . . .
Payload {"name”:"Bob"}

这有效地结合了应用程序级和系统级的错误处理,以进一步帮助下游的故障排除机制。

重新排队失败的消息

如前所述,当前支持的绑定器(Rabbit 和 Kafka)依赖于 RetryTemplate 来促进成功的消息处理。有关详细信息,请参阅 小节 29.4.3, “重试模板”。但是,对于 max-attempts 属性设置为 1 的情况,将禁用消息的内部重新处理。此时,你可以通过指示消息传递系统对失败的消息重新排队来促进消息重新处理(重试)。一旦重新排队,失败的消息将被发送回原始处理程序,实质上是创建了一个重试循环。

如果错误的性质与某些零星但短期的资源不可用有关,则此选项可能是可行的。

要完成此操作,必须设置以下属性:

spring.cloud.stream.bindings.input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

在前面的示例中,max-attempts 设置为 1,基本上禁用内部重试,requeue-rejected(requeue rejected messages 的缩写)设置为 true。一旦设置,失败的消息将被重新提交到同一个处理程序并连续循环,或者直到处理程序抛出 AmqpRejectAndDontRequeueException,基本上允许你在处理程序内部构建自己的重试逻辑。

29.4.3 重试模版

RetryTemplate 是 Spring Retry 库的一部分。虽然覆盖 RetryTemplate 的所有功能超出了本文档的范围,但我们将提到与 RetryTemplate 特定相关的以下消费者属性:

maxAttempts

尝试处理消息的次数。

默认值: 3。

backOffInitialInterval

重试时的回退初始间隔。

默认 1000 毫秒。

backOffMaxInterval

最大回退间隔。

默认 10000 毫秒。

backOffMultiplier

回退乘数。

默认 2.0。

defaultRetryable

监听器引发的未在 retryableExceptions 中列出的异常是否可重试。

默认值: true。

retryableExceptions

键中的可丢弃类名和值中的布尔值的映射。指定将重试或不重试的异常(和子类)。另请参见 defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false。

默认值: empty。

虽然前面的设置足以满足大多数自定义需求,但它们可能无法满足某些复杂的需求,此时你可能需要提供自己的 RetryTemplate 实例。为此,在应用程序配置中将其配置为 bean。应用程序提供的实例将覆盖框架提供的实例。此外,为了避免冲突,必须将绑定器要使用的 RetryTemplate 实例限定为 @StreamRetryTemplate。例如,

@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
    return new RetryTemplate();
}

从上面的示例中可以看到,由于 @StreamRetryTemplate 是一个合格的 @Bean,所以不需要用 @Bean 对其进行注解。

29.5 响应式编程支持

Spring Cloud Stream 还支持使用响应式 API,其中传入和传出数据作为连续数据流处理。对响应式 API 的支持可以通过 spring-cloud-stream-reactive 获得,它需要显式地添加到项目中。

带有响应式 API 的编程模型是声明性的。可以使用描述从入站到出站数据流的功能转换的运算符,而不是指定如何处理每个单独的消息。

目前,Spring Cloud Stream 只支持 Reactor API。在未来,我们打算支持一个更通用的基于响应流的模型。

响应式编程模型还使用 @StreamListener 注解来设置响应式处理程序。区别在于:

  • @StreamListener 注解不能指定输入或输出,因为它们作为参数提供,并从方法返回值。
  • 方法的参数必须用 @Input 和 @Output 注解,分别指示传入和传出数据流连接到哪个输入或输出。
  • 方法的返回值(如果有)用 @Output 注解,指示应将数据发送到哪里的输入。
[Note] Note

响应式编程支持需要 Java 1.8。

[Note] Note

从 Spring Cloud Stream 1.1.1 及更高版本(从 release train Brooklyn.SR2 开始),响应式编程支持需要使用 Reactor 3.0.4.RELEASE 及更高版本。不支持早期的 Reactor 版本(包括 3.0.1.RELEASE、3.0.2.RELEASE 和 3.0.3.RELEASE)。spring-cloud-stream-reactive 可传递地检索正确的版本,但项目结构可以管理 io.projectreactor:reactor-core 到早期版本,特别是在使用 maven 时。这是使用Spring Initializr 和 Spring Boot 1.x 生成的项目的情况,Spring Boot 1.x 将覆盖 Reactor 版本到 2.0.8.RELEASE。在这种情况下,你必须确保发布工件的正确版本。可以通过添加对 io.projectreactor:reactor-core 的直接依赖来实现,该版本为 3.0.4.RELEASE 或更高版本。

[Note] Note

术语 “reactive” 的使用目前是指正在使用的响应式 API,而不是指正在响应的执行模型(即,绑定端点仍然使用 'push' 而不是 'pull' 模型)。虽然一些响应式支持是由 Reactor 的使用提供的,但在未来的版本中,我们确实打算通过为连接的中间件使用本地响应式客户端来支持完全响应式管道。

29.5.1 基于 Reactor 的处理程序

基于 Reactor 的处理程序可以具有以下参数类型:

  • 对于用 @Input 注解的参数,它支持 Reactor Flux 类型。入站 Flux 的参数化遵循与单个消息处理相同的规则:它可以是整个 Message、可以是 Message 有效负载的 POJO,或者是基于 Message 内容类型头的转换的 POJO。提供多个输入。
  • 对于带 Output 注解的参数,它支持 FluxSender 类型,该类型将方法生成的 Flux 与输出连接起来。一般来说,只有当方法可以有多个输出时,才建议将输出指定为参数。

基于 Reactor 的处理程序支持 Flux 的返回类型。在这种情况下,它必须用 @Output 注解。我们建议在单输出 Flux 可用时使用该方法的返回值。

以下示例显示了基于 Reactor 的 Processor:

@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {

  @StreamListener
  @Output(Processor.OUTPUT)
  public Flux<String> receive(@Input(Processor.INPUT) Flux<String> input) {
    return input.map(s -> s.toUpperCase());
  }
}

使用输出参数的同一个处理程序如下所示:

@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {

  @StreamListener
  public void receive(@Input(Processor.INPUT) Flux<String> input,
     @Output(Processor.OUTPUT) FluxSender output) {
     output.send(input.map(s -> s.toUpperCase()));
  }
}

29.5.2 响应式源

Spring Cloud Stream 响应式主持还提供了通过 @StreamEmitter 注解创建响应式源的能力。通过使用 @StreamEmitter 注解,可以将常规源转换为响应式源。@StreamEmitter 是一个方法级注解,它将方法标记为使用 @EnableBinding 声明的输出的发射器。不能将 @Input 注解与 @StreamEmitter 一起使用,因为用此注解标记的方法没有监听任何输入。相反,标记为 @StreamEmitter 的方法会生成输出。按照 @StreamListener 中使用的相同编程模型,@StreamEmitter 还允许灵活地使用 @Output 注解,这取决于该方法是否具有任何参数、返回类型和其他注意事项。

本节中的剩余部分包括使用 @StreamEmitter 注解的不同风格。

以下示例每毫秒发出 Hello, World 消息,并发布到一个 Reactor Flux:

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  @Output(Source.OUTPUT)
  public Flux<String> emit() {
    return Flux.intervalMillis(1)
            .map(l -> "Hello World");
  }
}

在前面的例子中,Flux 中产生的消息被发送到 Source 的输出通道。

下一个例子是发送 Reactor Flux 的 @StreamEmmitter 的另一种风格。以下方法使用 FluxSender 以编程方式从源发送 Flux,而不是返回 Flux:

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  @Output(Source.OUTPUT)
  public void emit(FluxSender output) {
    output.send(Flux.intervalMillis(1)
            .map(l -> "Hello World"));
  }
}

下一个例子在功能和风格上与上面的片段完全相同。但是,它不在方法上使用显式的 @Output 注解,而是在方法参数上使用注解。

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  public void emit(@Output(Source.OUTPUT) FluxSender output) {
    output.send(Flux.intervalMillis(1)
            .map(l -> "Hello World"));
  }
}

本节中的最后一个示例是使用 Reactive Streams Publisher API 和利用 Spring Integration Java DSL 对它的支持的另一种编写响应式源的特色。下面的示例中的 Publisher 仍然使用上面的 Reactor Flux,但是,从应用的角度来看,这对于用户是透明的,并且只需要 Reactive Stream 和 Java DSL 用于 Spring集成:

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  @Output(Source.OUTPUT)
  @Bean
  public Publisher<Message<String>> emit() {
    return IntegrationFlows.from(() ->
                new GenericMessage<>("Hello World"),
        e -> e.poller(p -> p.fixedDelay(1)))
        .toReactivePublisher();
  }
}