Spring Cloud Stream 支持常规配置选项以及绑定和绑定的配置。一些绑定器允许附加的绑定属性支持中间件特定的特性。
配置选项可以通过 Spring Boot 支持的任何机制提供给 Spring Cloud Stream 应用程序。这包括应用程序参数、环境变量和 YAML 或 .properties 文件。
绑定属性是通过使用 spring.cloud.stream.bindings.<channelName>.<property>=<value>
的格式提供的。<channelName>
表示正在配置的通道的名称(例如,Source
的 output
)。
为了避免重复,Spring Cloud Stream 支持以 spring.cloud.stream.default.<property>=<value>
格式为所有通道设置值。
在避免扩展绑定属性重复时,应使用此格式 - spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>
。
在下面的内容中,我们指出忽略 spring.cloud.stream.bindings.<channelName>.
前缀并只关注属性名,理解前缀 ise 包含在运行时。
这些属性通过 org.springframework.cloud.stream.config.BindingProperties
公开。
以下绑定属性可用于输入和输出绑定,并且必须以 spring.cloud.stream.bindings.<channelName>.
作为前缀。(例如,spring.cloud.stream.bindings.input.destination=ticktock
)。
可以使用 spring.cloud.stream.default
前缀设置默认值(例如 `spring.cloud.stream.default.contentType=application/json`)。
- destination
-
绑定中间件上通道的目标目的地(例如,RabbitMQ exchange 或 Kafka 主题)。如果通道作为消费者绑定,则可以将其绑定到多个目标,并且可以将目标名称指定为逗号分隔的 String
值。如果未设置,则使用通道名称。不能重写此属性的默认值。
- group
-
通道的消费者组。仅适用于入站绑定。请参阅 消费者组。
默认值: null
(表示匿名用户)。
- contentType
-
通道的内容类型。请参阅 小节 32, 内容类型协商。
默认值: application/json
。
- binder
-
此绑定使用的绑定器。有关的详细信息请参阅 小节 30.4, “类路径上的多个绑定器”。
默认值: null
(如果存在,则使用默认绑定器)。
这些属性通过 org.springframework.cloud.stream.binder.ConsumerProperties
公开。
以下绑定属性仅可用于输入绑定,并且必须以 spring.cloud.stream.bindings.<channelName>.consumer.
作为前缀。(例如,spring.cloud.stream.bindings.input.consumer.concurrency=3
)。
可以使用 spring.cloud.stream.default.consumer
前缀设置默认值(例如 spring.cloud.stream.default.consumer.headerMode=none
)。
- concurrency
-
入站消费者的并发性。
默认值: 1
。
- partitioned
-
消费者是否从分区生产者接收数据。
默认值: false
。
- headerMode
-
当设置为 none
时,禁止对输入进行头解析。仅对本地不支持消息头且需要嵌入消息头的消息传递中间件有效。在不支持本地头的情况下,使用非 Spring Cloud Stream 应用程序中的数据时,此选项非常有用。当设置为 headers
时,它使用中间件的本地 header 机制。当设置为 embeddedHeaders
时,它将 Headers 嵌入到消息有效负载中。
默认值:取决于绑定器实现。
- maxAttempts
-
如果处理失败,则表示尝试处理消息的次数(包括第一次)。设置为 1
以禁用重试。
默认值:3
。
- backOffInitialInterval
-
重试时的回退初始间隔。
默认值:1000
。
- backOffMaxInterval
-
最大回退间隔。
默认值:10000
。
- backOffMultiplier
-
回退乘数。
默认值:2.0
。
- defaultRetryable
-
监听器引发的未在 retryableExceptions
中列出的异常是否可重试。
默认值:true
。
- instanceIndex
-
当设置为大于零的值时,它允许自定义此消费者的实例索引(如果与 spring.cloud.stream.instanceIndex
不同)。当设置为负值时,它默认为 spring.cloud.stream.instanceIndex
。有关的详细信息请参阅 小节 34.2, “实例索引和实例计数”。
默认值:-1
。
- instanceCount
-
当设置为大于零的值时,它允许自定义此消费者的实例计数(如果与 spring.cloud.stream.instanceCount
不同)。当设置为负值时,它默认为 spring.cloud.stream.instanceCount
。有关的详细信息请参阅 小节 34.2, “实例索引和实例计数”。
默认值:-1
。
- retryableExceptions
-
键中的可丢弃类名和值中的布尔值的映射。指定将重试或不重试的异常(和子类)。另请参阅 defaultRetriable
。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false
。
默认值:空。
- useNativeDecoding
-
当设置为 true
时,入站消息将直接由客户端库反序列化,客户端库必须进行相应配置(例如,设置适当的 Kafka 生产者值反序列化程序)。使用此配置时,入站消息解析组不基于绑定的 contentType
。当使用本地解码时,生产者有责任使用适当的编码器(例如,Kafka 生产者值序列化器)来序列化出站消息。此外,当使用本地编码和解码时,headerMode=embeddedHeaders
属性将被忽略,消息中不会嵌入头。请参阅生产者属性 useNativeEncoding
。
默认值:false
。
这些属性通过 org.springframework.cloud.stream.binder.ProducerProperties
公开。
以下绑定属性仅可用于输出绑定,并且必须以 spring.cloud.stream.bindings.<channelName>.producer.
作为前缀。(例如,spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id
)。
可以使用前缀 spring.cloud.stream.default.producer
设置默认值(例如,spring.cloud.stream.default.producer.partitionKeyExpression=payload.id
)。
- partitionKeyExpression
-
确定如何划分出站数据的 SpEL 表达式。如果设置了,或者设置了 partitionKeyExtractorClass
类,则此通道上的出站数据将被分区。必须将 partitionCount
设置为大于 1 的值才能生效。与 partitionKeyExtractorClass
互斥。请参阅 小节 28.6, “分区支持”。
默认值:null
。
- partitionKeyExtractorClass
-
PartitionKeyExtractorStrategy
实现。如果设置了,或者设置了 partitionKeyExpression
,则分区此通道上的出站数据。必须将 partitionCount
设置为大于 1 的值才能生效。与 partitionKeyExpression
互斥。请参阅 小节 28.6, “分区支持”。
默认值:null
。
- partitionSelectorClass
-
PartitionSelectorStrategy
实现。与 partitionSelectorExpression
互斥。如果两者都不设置,则选择分区作为 hashCode(key) % partitionCount
,其中 key
通过 partitionKeyExpression
或 partitionKeyExtractorClass
类计算。
默认值:null
。
- partitionSelectorExpression
-
自定义分区选择的 SpEL 表达式。与 partitionSelectorClass
互斥。如果两者都不设置,则选择分区作为 hashCode(key) % partitionCount
,其中 key
通过 partitionKeyExpression
或 partitionKeyExtractorClass
类计算。
默认值:null
。
- partitionCount
-
如果启用分区,则为数据的目标分区数。如果生产者已分区,则必须设置为大于 1 的值。在 Kafka,它被解释为一个提示。其中较大的一个和目标主题的分区计数将被使用。
默认值:1
。
- requiredGroups
-
由逗号分隔的组列表,生产者必须确保消息传递到这些组,即使它们是在创建之后开始的(例如,通过在 RabbitMQ 中预先创建持久队列)。
- headerMode
-
当设置为 none
时,将禁用在输出上嵌入头。它只对本地不支持消息头并且需要嵌入消息头的消息传递中间件有效。当不支持本地头时,此选项在为非 Spring Cloud Stream 应用程序生成数据时非常有用。当设置为 headers
时,它使用中间件的本地 header 机制。当设置为 embeddedHeaders
时,它将 headers 嵌入到消息有效负载中。
默认值:取决于绑定器实现。
- useNativeEncoding
-
当设置为 true
时,出站消息直接由客户端库序列化,必须相应地配置出站消息(例如,设置适当的 Kafka 生产者值序列化程序)。使用此配置时,出站消息编组不基于绑定的 contentType
。使用本地编码时,使用者有责任使用适当的解码器(例如,Kafka 消费者值反序列化器)对入站消息进行反序列化。此外,当使用本地编码和解码时,headerMode=embeddedHeaders
属性将被忽略,消息中不会嵌入头。请参见消费者属性 useNativeDecoding
。
默认值:false
。
- errorChannelEnabled
-
当设置为 true
时,如果绑定器支持异步发送结果,则发送失败将发送到目标的错误通道。
默认值:false
。
除了使用 @EnableBinding
定义的通道外,Spring Cloud Stream 还允许应用程序向动态绑定的目的地发送消息。例如,当需要在运行时确定目标目标时,这很有用。应用程序可以通过使用 BinderAwareChannelResolver
bean 来实现这一点,其由 @EnableBinding
注解自动注册。
'spring.cloud.stream.dynamicDestinations' 属性可用于将动态目标名称限制为已知集(白名单)。如果未设置此属性,则可以动态绑定任何目标。
BinderAwareChannelResolver
可以直接使用,如下所示,REST 控制器使用路径变量来确定目标通道:
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @PathVariable("target") target,
@RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, target, contentType);
}
private void sendMessage(String body, String target, Object contentType) {
resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
}
现在考虑当我们在默认端口(8080)上启动应用程序并使用 CURL 发出以下请求时会发生什么情况:
curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers
curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders
目的地 'customers' 和 'orders' 是在经纪人(在 Rabbit exchange 或 Kafka 主题中)中创建的,名称为 'customers' 和 'orders',并将数据发布到相应的目的地。
BinderAwareChannelResolver
是一个通用的 Spring 集成 DestinationResolver
,可以注入其他组件中,例如,在使用基于传入 JSON 消息的 target
字段的 SpEL 表达式的路由器中。以下示例包括一个读取 SpEL 表达式的路由器:
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/", method = POST, consumes = "application/json")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, contentType);
}
private void sendMessage(Object body, Object contentType) {
routerChannel().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
@Bean(name = "routerChannel")
public MessageChannel routerChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "routerChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router =
new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
}
Router Sink Application 使用此技术按需创建目标。
如果事先知道通道名称,则可以将生产者属性配置为与任何其他目标相同。或者,如果你注册了一个 NewBindingCallback<>
bean,它将在创建绑定之前被调用。回调采用绑定器使用的扩展生产者属性的泛型类型。它有一种方法:
void configure(String channelName, MessageChannel channel, ProducerProperties producerProperties,
T extendedProducerProperties);
以下示例展示如何使用 RabbitMQ 绑定器:
@Bean
public NewBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
return (name, channel, props, extended) -> {
props.setRequiredGroups("bindThisQueue");
extended.setQueueNameGroupOnly(true);
extended.setAutoBindDlq(true);
extended.setDeadLetterQueueName("myDLQ");
};
}
![[Note]](images/note.png) |
Note |
如果需要支持具有多个绑定器类型的动态目标,请使用 Object 作为泛型类型,并根据需要强制转换 extended 参数。
|