31. 配置选项

Spring Cloud Stream 支持常规配置选项以及绑定和绑定的配置。一些绑定器允许附加的绑定属性支持中间件特定的特性。

配置选项可以通过 Spring Boot 支持的任何机制提供给 Spring Cloud Stream 应用程序。这包括应用程序参数、环境变量和 YAML 或 .properties 文件。

31.1 绑定服务属性

这些属性通过 org.springframework.cloud.stream.config.BindingServiceProperties 公开。

spring.cloud.stream.instanceCount

应用程序的已部署实例数。必须为生产方端的分区设置。使用 RabbitMQ 时必须在消费者端设置,如果 autoRebalanceEnabled=false,则必须在 Kafka 中设置。

默认值: 1.

spring.cloud.stream.instanceIndex

应用程序的实例索引:从 0 到 instanceCount - 1 的数字。如果 autoRebalanceEnabled=false,则用于使用 RabbitMQ 和 Kafka 进行分区。在 Cloud Foundry 中自动设置以匹配应用程序的实例索引。

spring.cloud.stream.dynamicDestinations

可以动态绑定的目的地列表(例如,在动态路由方案中)。如果设置,则只能绑定列出的目标。

默认值: 空(允许绑定任何目的地)。

spring.cloud.stream.defaultBinder

如果配置了多个绑定器,则使用默认绑定器。请参阅类路径上的多个绑定器。

默认值: 空。

spring.cloud.stream.overrideCloudConnectors

仅当 cloud 配置组处于激活状态并且应用程序附带了 Spring Cloud Connectors 时,此属性才适用。如果属性为 false(默认值),绑定器将检测合适的绑定服务(例如,在 Cloud Foundry 中为 RabbitMQ 绑定器绑定的 RabbitMQ 服务),并将其用于创建连接(通常通过 Spring Cloud Connectors)。当设置为 true 时,此属性指示绑定器完全忽略绑定服务并依赖于 Spring Boot 属性(例如,依赖于在环境中为 RabbitMQ 绑定器提供的 spring.rabbitmq.* 属性)。当连接到多个系统时,此属性的典型用法是嵌套在自定义环境中。

默认值: false。

spring.cloud.stream.bindingRetryInterval

当绑定器不支持后期绑定和代理(例如,Apache Kafka)关闭时,重试绑定创建之间的间隔(秒)。将其设置为零以将此类情况视为致命,从而阻止应用程序启动。

默认值: 30

31.2 绑定属性

绑定属性是通过使用 spring.cloud.stream.bindings.<channelName>.<property>=<value> 的格式提供的。<channelName> 表示正在配置的通道的名称(例如,Source 的 output)。

为了避免重复,Spring Cloud Stream 支持以 spring.cloud.stream.default.<property>=<value> 格式为所有通道设置值。

在避免扩展绑定属性重复时,应使用此格式 - spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>。

在下面的内容中,我们指出忽略 spring.cloud.stream.bindings.<channelName>. 前缀并只关注属性名,理解前缀 ise 包含在运行时。

31.2.1 常用绑定属性

这些属性通过 org.springframework.cloud.stream.config.BindingProperties 公开。

以下绑定属性可用于输入和输出绑定,并且必须以 spring.cloud.stream.bindings.<channelName>. 作为前缀。(例如,spring.cloud.stream.bindings.input.destination=ticktock)。

可以使用 spring.cloud.stream.default 前缀设置默认值(例如 `spring.cloud.stream.default.contentType=application/json`)。

destination

绑定中间件上通道的目标目的地(例如,RabbitMQ exchange 或 Kafka 主题)。如果通道作为消费者绑定,则可以将其绑定到多个目标,并且可以将目标名称指定为逗号分隔的 String 值。如果未设置,则使用通道名称。不能重写此属性的默认值。

group

通道的消费者组。仅适用于入站绑定。请参阅 消费者组。

默认值: null (表示匿名用户)。

contentType

通道的内容类型。请参阅 小节 32, 内容类型协商。

默认值: application/json。

binder

此绑定使用的绑定器。有关的详细信息请参阅 小节 30.4, “类路径上的多个绑定器”。

默认值: null (如果存在,则使用默认绑定器)。

31.2.2 消费者属性

这些属性通过 org.springframework.cloud.stream.binder.ConsumerProperties 公开。

以下绑定属性仅可用于输入绑定,并且必须以 spring.cloud.stream.bindings.<channelName>.consumer. 作为前缀。(例如,spring.cloud.stream.bindings.input.consumer.concurrency=3)。

可以使用 spring.cloud.stream.default.consumer 前缀设置默认值(例如 spring.cloud.stream.default.consumer.headerMode=none)。

concurrency

入站消费者的并发性。

默认值: 1。

partitioned

消费者是否从分区生产者接收数据。

默认值: false。

headerMode

当设置为 none 时,禁止对输入进行头解析。仅对本地不支持消息头且需要嵌入消息头的消息传递中间件有效。在不支持本地头的情况下,使用非 Spring Cloud Stream 应用程序中的数据时,此选项非常有用。当设置为 headers 时,它使用中间件的本地 header 机制。当设置为 embeddedHeaders 时,它将 Headers 嵌入到消息有效负载中。

默认值:取决于绑定器实现。

maxAttempts

如果处理失败,则表示尝试处理消息的次数(包括第一次)。设置为 1 以禁用重试。

默认值:3。

backOffInitialInterval

重试时的回退初始间隔。

默认值:1000。

backOffMaxInterval

最大回退间隔。

默认值:10000。

backOffMultiplier

回退乘数。

默认值:2.0。

defaultRetryable

监听器引发的未在 retryableExceptions 中列出的异常是否可重试。

默认值:true。

instanceIndex

当设置为大于零的值时,它允许自定义此消费者的实例索引(如果与 spring.cloud.stream.instanceIndex 不同)。当设置为负值时,它默认为 spring.cloud.stream.instanceIndex。有关的详细信息请参阅 小节 34.2, “实例索引和实例计数”。

默认值:-1。

instanceCount

当设置为大于零的值时,它允许自定义此消费者的实例计数(如果与 spring.cloud.stream.instanceCount 不同)。当设置为负值时,它默认为 spring.cloud.stream.instanceCount。有关的详细信息请参阅 小节 34.2, “实例索引和实例计数”。

默认值:-1。

retryableExceptions

键中的可丢弃类名和值中的布尔值的映射。指定将重试或不重试的异常(和子类)。另请参阅 defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false。

默认值:空。

useNativeDecoding

当设置为 true 时,入站消息将直接由客户端库反序列化,客户端库必须进行相应配置(例如,设置适当的 Kafka 生产者值反序列化程序)。使用此配置时,入站消息解析组不基于绑定的 contentType。当使用本地解码时,生产者有责任使用适当的编码器(例如,Kafka 生产者值序列化器)来序列化出站消息。此外,当使用本地编码和解码时,headerMode=embeddedHeaders 属性将被忽略,消息中不会嵌入头。请参阅生产者属性 useNativeEncoding。

默认值:false。

31.2.3 生产者属性

这些属性通过 org.springframework.cloud.stream.binder.ProducerProperties 公开。

以下绑定属性仅可用于输出绑定,并且必须以 spring.cloud.stream.bindings.<channelName>.producer. 作为前缀。(例如,spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id)。

可以使用前缀 spring.cloud.stream.default.producer 设置默认值(例如,spring.cloud.stream.default.producer.partitionKeyExpression=payload.id)。

partitionKeyExpression

确定如何划分出站数据的 SpEL 表达式。如果设置了,或者设置了 partitionKeyExtractorClass 类,则此通道上的出站数据将被分区。必须将 partitionCount 设置为大于 1 的值才能生效。与 partitionKeyExtractorClass 互斥。请参阅 小节 28.6, “分区支持”。

默认值:null。

partitionKeyExtractorClass

PartitionKeyExtractorStrategy 实现。如果设置了,或者设置了 partitionKeyExpression,则分区此通道上的出站数据。必须将 partitionCount 设置为大于 1 的值才能生效。与 partitionKeyExpression 互斥。请参阅 小节 28.6, “分区支持”。

默认值:null。

partitionSelectorClass

PartitionSelectorStrategy 实现。与 partitionSelectorExpression 互斥。如果两者都不设置,则选择分区作为 hashCode(key) % partitionCount,其中 key 通过 partitionKeyExpression 或 partitionKeyExtractorClass 类计算。

默认值:null。

partitionSelectorExpression

自定义分区选择的 SpEL 表达式。与 partitionSelectorClass 互斥。如果两者都不设置,则选择分区作为 hashCode(key) % partitionCount,其中 key 通过 partitionKeyExpression 或 partitionKeyExtractorClass 类计算。

默认值:null。

partitionCount

如果启用分区,则为数据的目标分区数。如果生产者已分区,则必须设置为大于 1 的值。在 Kafka,它被解释为一个提示。其中较大的一个和目标主题的分区计数将被使用。

默认值:1。

requiredGroups

由逗号分隔的组列表,生产者必须确保消息传递到这些组,即使它们是在创建之后开始的(例如,通过在 RabbitMQ 中预先创建持久队列)。

headerMode

当设置为 none 时,将禁用在输出上嵌入头。它只对本地不支持消息头并且需要嵌入消息头的消息传递中间件有效。当不支持本地头时,此选项在为非 Spring Cloud Stream 应用程序生成数据时非常有用。当设置为 headers 时,它使用中间件的本地 header 机制。当设置为 embeddedHeaders 时,它将 headers 嵌入到消息有效负载中。

默认值:取决于绑定器实现。

useNativeEncoding

当设置为 true 时,出站消息直接由客户端库序列化,必须相应地配置出站消息(例如,设置适当的 Kafka 生产者值序列化程序)。使用此配置时,出站消息编组不基于绑定的 contentType。使用本地编码时,使用者有责任使用适当的解码器(例如,Kafka 消费者值反序列化器)对入站消息进行反序列化。此外,当使用本地编码和解码时,headerMode=embeddedHeaders 属性将被忽略,消息中不会嵌入头。请参见消费者属性 useNativeDecoding。

默认值:false。

errorChannelEnabled

当设置为 true 时,如果绑定器支持异步发送结果,则发送失败将发送到目标的错误通道。

默认值:false。

31.3 使用动态绑定的目标

除了使用 @EnableBinding 定义的通道外,Spring Cloud Stream 还允许应用程序向动态绑定的目的地发送消息。例如,当需要在运行时确定目标目标时,这很有用。应用程序可以通过使用 BinderAwareChannelResolver bean 来实现这一点,其由 @EnableBinding 注解自动注册。

'spring.cloud.stream.dynamicDestinations' 属性可用于将动态目标名称限制为已知集(白名单)。如果未设置此属性,则可以动态绑定任何目标。

BinderAwareChannelResolver 可以直接使用,如下所示,REST 控制器使用路径变量来确定目标通道:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @PathVariable("target") target,
           @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, target, contentType);
    }

    private void sendMessage(String body, String target, Object contentType) {
        resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }
}

现在考虑当我们在默认端口(8080)上启动应用程序并使用 CURL 发出以下请求时会发生什么情况:

curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers

curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders

目的地 'customers' 和 'orders' 是在经纪人(在 Rabbit exchange 或 Kafka 主题中)中创建的,名称为 'customers' 和 'orders',并将数据发布到相应的目的地。

BinderAwareChannelResolver 是一个通用的 Spring 集成 DestinationResolver,可以注入其他组件中,例如,在使用基于传入 JSON 消息的 target 字段的 SpEL 表达式的路由器中。以下示例包括一个读取 SpEL 表达式的路由器:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;


    @RequestMapping(path = "/", method = POST, consumes = "application/json")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, contentType);
    }

    private void sendMessage(Object body, Object contentType) {
        routerChannel().send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }

    @Bean(name = "routerChannel")
    public MessageChannel routerChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "routerChannel")
    public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router =
            new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
        router.setDefaultOutputChannelName("default-output");
        router.setChannelResolver(resolver);
        return router;
    }
}

Router Sink Application 使用此技术按需创建目标。

如果事先知道通道名称,则可以将生产者属性配置为与任何其他目标相同。或者,如果你注册了一个 NewBindingCallback<> bean,它将在创建绑定之前被调用。回调采用绑定器使用的扩展生产者属性的泛型类型。它有一种方法:

void configure(String channelName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例展示如何使用 RabbitMQ 绑定器:

@Bean
public NewBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}
[Note] Note

如果需要支持具有多个绑定器类型的动态目标,请使用 Object 作为泛型类型,并根据需要强制转换 extended 参数。