34. 应用程序间通信

Spring Cloud Stream 支持应用程序之间的通信。应用程序间通信是一个涉及多个问题的复杂问题,如以下主题所述:

  • “小节 34.1, “连接多个应用程序实例””
  • “小节 34.2, “实例索引和实例计数””
  • “小节 34.3, “分区””

34.1 连接多个应用程序实例

尽管 Spring Cloud Stream 使单个 Spring Boot 应用程序很容易连接到消息传递系统,但 Spring Cloud Stream 的典型场景是创建多应用程序管道,其中微服务应用程序互相发送数据。你可以通过关联 “adjacent” 应用程序的输入和输出目标来实现这个场景。

假设设计要求时间源应用程序向日志接收器应用程序发送数据。你可以为两个应用程序中的绑定使用一个名为 ticktock 的公共目标。

时间源(具有通道名称 output)将设置以下属性:

spring.cloud.stream.bindings.output.destination=ticktock

日志接收器(具有通道名称 input)将设置以下属性:

spring.cloud.stream.bindings.input.destination=ticktock

34.2 实例索引和实例计数

当扩展 Spring Cloud Stream 应用程序时,每个实例都可以接收有关同一应用程序存在多少其他实例以及其自身实例索引是什么的信息。Spring Cloud Stream 通过 spring.cloud.stream.instanceCount 和 spring.cloud.stream.instanceIndex 属性执行此操作。例如,如果 HDFS 接收器应用程序有三个实例,则所有三个实例的 spring.cloud.stream.instanceCount 都设置为 3,而各个应用程序的 spring.cloud.stream.instanceIndex 分别设置为 0、1 和 2。

当通过 Spring Cloud 数据流部署 Spring Cloud Stream 应用程序时,这些属性是自动配置的;当独立启动 Spring Cloud Stream 应用程序时,必须正确设置这些属性。默认情况下,spring.cloud.stream.instanceCount 为 1,spring.cloud.stream.instanceIndex 为 0。

在按比例放大的场景中,这两个属性的正确配置通常对于解决分区行为(见下文)很重要,并且某些绑定器(例如,Kafka 绑定器)始终需要这两个属性,以确保数据在多个消费者实例中正确地分割。

34.3 分区

Spring Cloud Stream 中的分区包括两个任务:

  • “小节 34.3.1, “为分区配置输出绑定””
  • “小节 34.3.2, “为分区配置输入绑定””

34.3.1 为分区配置输出绑定

你可以配置输出绑定以通过设置其 partitionKeyExpression 或 partitionKeyExtractorName 属性中的一个(也是唯一一个)及其 partitionCount 属性来发送分区数据。

例如,以下是一个有效且典型的配置:

spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.output.producer.partitionCount=5

基于该示例配置,数据通过以下逻辑发送到目标分区。

根据 partitionKeyExpression 为发送到分区输出通道的每条消息计算分区键的值。partitionKeyExpression 是一个 SpEL 表达式,根据用于提取分区键的出站消息进行计算。

如果一个 SpEL 表达式不足以满足你的需要,可以通过提供 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 的实现并将其配置为 bean(通过使用 @Bean 注解)来计算分区键值。如果在应用程序上下文中有多个类型为 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 的 bean,则可以通过使用 partitionKeyExtractorName 属性指定其名称来进一步筛选它,如下例所示:

--spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.output.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
    return new CustomPartitionKeyExtractorClass();
}
[Note] Note

在 Spring Cloud Stream 的早期版本中,可以通过设置 spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass 属性来指定 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 的实现。从版本 2.0 起,此属性已被弃用,将来的版本中将删除对它的支持。

计算完消息键后,分区选择过程将目标分区确定为介于 0 和 partitionCount - 1 之间的值。适用于大多数方案的默认计算基于以下公式:key.hashCode() % partitionCount。这可以在绑定上自定义,方法是设置要根据 'key'(通过 partitionSelectorExpression 属性)对其进行计算的 SpEL 表达式,或者将 org.springframework.cloud.stream.binder.PartitionSelectorStrategy 的实现配置为 bean(通过使用 @Bean 注解)。与 PartitionKeyExtractorStrategy 类似,当应用程序上下文中有多个此类 bean 可用时,可以使用 spring.cloud.stream.bindings.output.producer.partitionSelectorName 属性进一步筛选它,如下例所示:

--spring.cloud.stream.bindings.output.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
    return new CustomPartitionSelectorClass();
}
[Note] Note

在 Spring Cloud Stream 的早期版本中,可以通过设置 spring.cloud.stream.bindings.output.producer.partitionSelectorClass 属性来指定 org.springframework.cloud.stream.binder.PartitionSelectorStrategy 的实现。从版本 2.0 起,此属性已被弃用,将来的版本中将删除对它的支持。

34.3.2 为分区配置输入绑定

通过设置分区属性以及应用程序本身的 instanceIndex 和 instanceCount 属性,将输入绑定(使用通道名称 input)配置为接收 partitioned 数据,如下例所示:

spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

instanceCount 值表示应用程序实例的总数,应在这些实例之间对数据进行分区。instanceIndex 必须是多个实例之间的唯一值,其值介于 0 和 instanceCount - 1 之间。实例索引帮助每个应用程序实例识别从中接收数据的唯一分区。使用不支持本地分区的技术的绑定器需要它。例如,对于 RabbitMQ,每个分区都有一个队列,队列名称包含实例索引。对于 Kafka,如果 autoRebalanceEnabled 为 true(默认值),则 Kafka 负责跨实例分布分区,而不需要这些属性。如果 autoRebalanceEnabled 设置为 false,绑定器将使用 instanceCount 和 instanceIndex 来确定实例订阅的分区(必须至少拥有与实例相同的分区)。绑定器分配分区而不是 Kafka。如果你希望特定分区的消息总是转到同一个实例,这可能很有用。当绑定器配置需要它们时,必须正确设置这两个值,以确保使用所有数据,并且应用程序实例接收互斥的数据集。

虽然在独立情况下使用多个实例进行分区数据处理的场景可能很复杂,但 Spring Cloud 数据流可以通过正确填充输入和输出值以及让你依赖运行时基础结构提供有关实例索引和实例计数的信息来显著简化该过程。