Spring Cloud Stream 支持应用程序之间的通信。应用程序间通信是一个涉及多个问题的复杂问题,如以下主题所述: 尽管 Spring Cloud Stream 使单个 Spring Boot 应用程序很容易连接到消息传递系统,但 Spring Cloud Stream 的典型场景是创建多应用程序管道,其中微服务应用程序互相发送数据。你可以通过关联 “adjacent” 应用程序的输入和输出目标来实现这个场景。
假设设计要求时间源应用程序向日志接收器应用程序发送数据。你可以为两个应用程序中的绑定使用一个名为
时间源(具有通道名称 spring.cloud.stream.bindings.output.destination=ticktock
日志接收器(具有通道名称 spring.cloud.stream.bindings.input.destination=ticktock
当扩展 Spring Cloud Stream 应用程序时,每个实例都可以接收有关同一应用程序存在多少其他实例以及其自身实例索引是什么的信息。Spring Cloud Stream 通过
当通过 Spring Cloud 数据流部署 Spring Cloud Stream 应用程序时,这些属性是自动配置的;当独立启动 Spring Cloud Stream 应用程序时,必须正确设置这些属性。默认情况下, 在按比例放大的场景中,这两个属性的正确配置通常对于解决分区行为(见下文)很重要,并且某些绑定器(例如,Kafka 绑定器)始终需要这两个属性,以确保数据在多个消费者实例中正确地分割。 Spring Cloud Stream 中的分区包括两个任务:
你可以配置输出绑定以通过设置其 例如,以下是一个有效且典型的配置: spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id spring.cloud.stream.bindings.output.producer.partitionCount=5 基于该示例配置,数据通过以下逻辑发送到目标分区。
根据
如果一个 SpEL 表达式不足以满足你的需要,可以通过提供 --spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName=customPartitionKeyExtractor --spring.cloud.stream.bindings.output.producer.partitionCount=5 . . . @Bean public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() { return new CustomPartitionKeyExtractorClass(); }
计算完消息键后,分区选择过程将目标分区确定为介于 --spring.cloud.stream.bindings.output.producer.partitionSelectorName=customPartitionSelector . . . @Bean public CustomPartitionSelectorClass customPartitionSelector() { return new CustomPartitionSelectorClass(); }
通过设置分区属性以及应用程序本身的 spring.cloud.stream.bindings.input.consumer.partitioned=true spring.cloud.stream.instanceIndex=3 spring.cloud.stream.instanceCount=5
虽然在独立情况下使用多个实例进行分区数据处理的场景可能很复杂,但 Spring Cloud 数据流可以通过正确填充输入和输出值以及让你依赖运行时基础结构提供有关实例索引和实例计数的信息来显著简化该过程。 |