39. Apache Kafka 绑定器

39.1 用法

要使用 Apache Kafka 绑定器,需要将 spring-cloud-stream-binder-kafka 作为对 Spring Cloud Stream 应用程序的依赖项添加,如下面的 Maven 示例所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

或者,您也可以使用Spring Cloud Stream Kafka Starter,如下面的 Maven 示例所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

39.2 Apache Kafka 绑定器概述

下图展示了 Apache Kafka 绑定器如何操作的简化图:

图 39.1. Kafka 绑定器

kafka binder

Apache Kafka 绑定器实现将每个目标映射到 Apache Kafka 主题。消费者组直接映射到相同的 Apache Kafka 概念。分区也直接映射到 Apache Kafka 分区。

绑定器当前使用的是 Apache Kafka kafka-clients 1.0.0 jar,并且设计为至少与该版本的代理程序一起使用。此客户端可以与旧的代理进行通信(请参阅 Kafka 文档),但某些功能可能不可用。例如,如果版本早于 0.11.x.x,则不支持本地头文件。此外,0.11.x.x 不支持 autoAddPartitions 属性。

39.3 配置选项

本节包含 Apache Kafka 绑定器使用的配置选项。

有关绑定器的常见配置选项和属性,请参阅核心文档。

39.3.1 Kafka 绑定器属性

spring.cloud.stream.kafka.binder.brokers

Kafka 绑定器所连接的代理列表。

默认值: localhost.

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 允许使用或不使用端口信息(例如,host1,host2:port2)指定主机。这将在代理列表中未配置端口时设置默认端口。

默认值: 9092。

spring.cloud.stream.kafka.binder.configuration

传递给绑定器创建的所有客户端的客户端属性(生产者和消费者)的 Key/Value map。由于生产者和消费者都使用这些属性,因此应将使用限制为公用属性 — 例如安全设置。此处的属性取代启动时设置的任何属性。

默认值: 空 map。

spring.cloud.stream.kafka.binder.consumerProperties

任意 Kafka 客户端消费者属性的 Key/Value map。这里的属性取代在引导和上面的 configuration 属性中设置的任何属性。

默认值: 空 map。

spring.cloud.stream.kafka.binder.headers

绑定器传输的自定义头的列表。仅在与较旧的应用程序(⇐ 1.3.x)通信时需要,kafka-clients 版本 < 0.11.0.0。较新的版本支持本地头文件。

默认值: 空。

spring.cloud.stream.kafka.binder.healthTimeout

等待获取分区信息的时间(秒)。如果此计时器过期,则运行状况报告为已关闭。

默认值: 10。

spring.cloud.stream.kafka.binder.requiredAcks

代理上所需的 acks 数。请参阅 Kafka 文档了解生产者 acks 属性。

默认值: 1。

spring.cloud.stream.kafka.binder.minPartitionCount

仅当设置了 autoCreateTopics 或 autoAddPartitions 时有效。绑定器在生成或消费数据的主题上配置的全局最小分区数。它可以被生产者的 partitionCount 设置或生产者的 instanceCount * concurrency 设置(如果两者之一较大)的值取代。

默认值: 1。

spring.cloud.stream.kafka.binder.producerProperties

任意 Kafka 客户端生产者属性的 Key/Value map。这里的属性取代在引导和上面的 configuration 属性中设置的任何属性。

默认值: 空 map。

spring.cloud.stream.kafka.binder.replicationFactor

如果 autoCreateTopics 处于激活状态,则自动创建主题的复制因子。可以在每个绑定上重写。

默认值: 1。

spring.cloud.stream.kafka.binder.autoCreateTopics

如果设置为 true,绑定器将自动创建新主题。如果设置为 false,绑定器依赖于已经配置的主题。在后一种情况下,如果主题不存在,绑定器将无法启动。

[Note] Note

此设置独立于代理的 auto.topic.create.enable 设置,不影响它。如果服务器设置为自动创建主题,则可以使用默认代理设置将其创建为元数据检索请求的一部分。

默认值: true。

spring.cloud.stream.kafka.binder.autoAddPartitions

如果设置为 true,绑定器将根据需要创建新分区。如果设置为 false,绑定器依赖于已经配置的主题的分区大小。如果目标主题的分区计数小于预期值,绑定器将无法启动。

默认值: false。

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

启用绑定器中的事务。请参见 Kafka 文档中的 transaction.id 和 spring-kafka 文档中的 Transactions。启用事务时,将忽略单个 producer 属性,并且所有生产者都使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性。

默认值: null(无事务)

spring.cloud.stream.kafka.binder.transaction.producer.*

事务性绑定器中生产者的全局生产者属性。请参见 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix 和 小节 39.3.3, “Kafka 生产者属性”以及所有绑定器支持的一般生产者属性。

默认值: 请参阅单个生产者属性。

spring.cloud.stream.kafka.binder.headerMapperBeanName

用于将 spring-messaging 头映射到 Kafka 头和从 Kafka 头映射的 KafkaHeaderMapper 的 bean 名称。例如,如果你希望在一个 DefaultKafkaHeaderMapper 中定制受信任的包,该文件头使用 JSON 反序列化。

默认值: none。

39.3.2 Kafka 消费者属性

以下属性仅适用于 Kafka 消费者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.consumer. 作为前缀。

admin.configuration

提供主题时使用的 Kafka 主题属性的 Map — 例如 spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0

默认值: none。

admin.replicas-assignment

副本分配的 Map <Integer, List<Integer>>,键为分区,值为分配。提供新主题时使用。请参见 kafka-clients jar 中的 NewTopic Javadocs。

默认值: none。

admin.replication-factor

设置主题时要使用的复制因子。覆盖绑定器范围设置。如果存在 replicas-assignments,则忽略。

默认值: none (绑定器范围默认值为1)。

autoRebalanceEnabled

如果为 true,主题分区将在消费者组成员之间自动重新均衡。如果为 false,则将根据 spring.cloud.stream.instanceCount 和 spring.cloud.stream.instanceIndex 为每个使用者分配一组固定的分区。这要求在每个启动的实例上适当设置 spring.cloud.stream.instanceCount 和 spring.cloud.stream.instanceIndex 属性。在这种情况下,spring.cloud.stream.instanceCount 属性的值通常必须大于 1。

默认值: true。

ackEachRecord

当 autoCommitOffset 为 true 时,此设置指示是否在处理每个记录后提交偏移量。默认情况下,在处理 consumer.poll() 返回的一批记录中的所有记录之后提交偏移量。投票返回的记录数可以使用 max.poll.records Kafka 属性控制,该属性通过使用者配置属性设置。将此设置为 true 可能会导致性能下降,但这样做会降低发生故障时重新传递记录的可能性。另外,请参阅绑定器 requiredAcks 属性,它还影响提交偏移的性能。

默认值: false。

autoCommitOffset

处理消息后是否自动提交偏移量。如果设置为 false,则入站消息中会出现一个带有类型为 org.springframework.kafka.support.Acknowledgment 头的 kafka_acknowledgment 键的头。应用程序可以使用此头来确认消息。有关的详细信息请参阅示例部分。当该属性设置为 false 时,Kafka 绑定器将 ack 模式设置为 org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,应用程序负责确认记录。另请参阅 ackEachRecord。

默认值: true。

autoCommitOnError

仅当 autoCommitOffset 设置为 true 时有效。如果设置为 false,则对导致错误的消息取消自动提交,只对成功的消息进行提交。它允许流在持续失败的情况下自动重播上次成功处理的消息。如果设置为 true,则始终自动提交(如果启用了自动提交)。如果未设置(默认设置),它实际上具有与 enableDlq 相同的值,如果将错误消息发送到 DLQ,则自动提交错误消息,否则不提交错误消息。

默认值: 不设置。

resetOffsets

是否将消费者的偏移重置为 startOffset 提供的值。

默认值: false。

startOffset

新组的起始偏移量。允许值:earliest 和 latest。如果为消费者 'binding'(通过 spring.cloud.stream.bindings.<channelName>.group)显式设置了消费者组,'startOffset' 将设置为 earliest。否则,它将为 anonymous 消费者组设置为 latest。另请参阅 resetOffsets(本列表前面)。

默认值: null (等同于 earliest)。

enableDlq

当设置为 true 时,它为消费者启用 DLQ 行为。默认情况下,导致错误的消息将转发到名为 error.<destination>.<group> 的主题。可以通过设置 dlqName 属性来配置 DLQ 主题名称。这为 Kafka 回放场景提供了另一种选择,在错误数量相对较少的情况下,回放整个原始主题可能太麻烦。更多详细信息请参阅 小节 39.6, “死信主题处理”。从 2.0 版开始,发送到 DLQ 主题的消息将使用以下头进行增强:x-original-topic、x-exception-message 和 x-exception-stacktrace 为 byte[]。destinationIsPattern 为 true 时不允许。

默认值: false。

configuration

使用包含通用 Kafka 消费者属性的 key/value 对进行映射。

默认值: 空 map。

dlqName

接收错误消息的 DLQ 主题的名称。

默认值: null (如果未指定,则会将导致错误的消息转发到名为 error.<destination>.<group> 的主题)。

dlqProducerProperties

使用此选项,可以设置 DLQ 特定的生产者属性。通过 Kafka 生产者属性可用的所有属性都可以通过此属性设置。

默认值: 默认的 Kafka 生产者属性。

standardHeaders

指示由入站通道适配器填充的标准头。允许值:none、id、timestamp 或 both。如果使用本地反序列化,并且接收消息的第一个组件需要 id(例如,配置为使用 JDBC 消息存储的聚合器),则非常有用。

默认值: none

converterBeanName

实现 RecordMessageConverter 的 bean 的名称。在入站通道适配器中用于替换默认 MessagingMessageConverter。

默认值: null

idleEventInterval

事件之间的间隔(以毫秒为单位),表示最近没有收到任何消息。使用 ApplicationListener<ListenerContainerIdleEvent> 接收这些事件。有关用法示例,请参阅名为 “示例:暂停和恢复消费者” 的部分。

默认值: 30000

destinationIsPattern

如果为 true,则目标将被视为正则表达式 Pattern,用于由代理匹配主题名称。如果为 true,则不会设置主题,也不允许 enableDlq,因为绑定器在设置阶段不知道主题名称。注意,检测与模式匹配的新主题所用的时间由消费者属性 metadata.max.age.ms 控制,该属性(在写入时)默认为 300,000ms(5 分钟)。这可以使用上面的 configuration 属性进行配置。

默认值: false

39.3.3 Kafka 生产者属性

以下属性仅适用于 Kafka 生产者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer. 作为前缀。

admin.configuration

提供新主题时使用的 Kafka 主题属性的 Map — 例如 spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0

默认值: none。

admin.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,键为分区,值为分配。提供新主题时使用。请参见 kafka-clients jar 中的 NewTopic javadocs。

默认值: none。

admin.replication-factor

提供新主题时要使用的复制因子。覆盖绑定器范围设置。如果存在 replicas-assignments,则忽略。

默认值: none (绑定器范围默认值为 1)。

bufferSize

Kafka 生产者在发送前尝试批处理的数据量的上限(字节)。

默认值: 16384。

sync

生产者是否同步。

默认值: false。

batchTimeout

在发送消息之前,生产者要等待多长时间才能在同一批中累积更多的消息。(通常,生产者根本不等待,只发送前一次发送过程中累积的所有消息。)非零值可能会以延迟为代价增加吞吐量。

默认值: 0。

messageKeyExpression

针对用于填充生成的 Kafka 消息的键的传出消息计算的 SpEL 表达式 — 例如,headers['myKey']。无法使用负载,因为在计算此表达式时,负载的形式已为 byte[]。

默认值: none。

headerPatterns

以逗号分隔的简单模式列表,以匹配要映射到 ProducerRecord 中的 Kafka Headers 的 Spring 消息头。模式可以以通配符(星号)开头或结尾。模式可以通过加前缀来消除 !。匹配在第一次匹配后停止(正或负)。例如 !ask,as* 会传递 ash,但 ask 不会。id 和 timestamp 从未映射。

默认值: * (所有头 - 除了 id 和 timestamp)

configuration

使用包含通用 Kafka 生产者属性的 key/value 对进行映射。

默认值: 空 map。

[Note] Note

Kafka 绑定器使用生产者的 partitionCount 设置作为提示,以创建具有给定分区计数的主题(与 minPartitionCount 一起使用,最大两个是所使用的值)。配置绑定器的 minPartitionCount 和应用程序的 partitionCount 时要小心,因为使用的值较大。如果已经存在具有较小分区数的主题,并且禁用了 autoAddPartitions(默认设置),绑定器将无法启动。如果主题已经存在,并且分区数较小,并且启用了 autoAddPartitions,则会添加新分区。如果主题的分区数大于最大分区数(minPartitionCount 或 partitionCount),则使用现有分区数。

39.3.4 使用示例

在本节中,我们将展示前面的属性在特定场景中的使用。

示例: 设置 autoCommitOffset 为 false 并依靠手动确认

此示例说明如何在消费者应用程序中手动确认偏移量。

此示例要求将 spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset 设置为 false。使用相应的输入通道名称作为示例。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}

示例:安全配置

Apache Kafka 0.9 支持客户端和代理之间的安全连接。要利用此功能,请遵循 Apache Kafka 文档中的指导原则以及 Confluent 文档中的 Kafka 0.9 安全指导原则。使用 spring.cloud.stream.kafka.binder.configuration 选项为绑定器创建的所有客户端设置安全属性。

例如,要将 security.protocol 设置为 SASL_SSL,请设置以下属性:

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

所有其他安全属性都可以用类似的方式设置。

使用 Kerberos 时,请按照参考文档中的说明创建和引用 JAAS 配置。

Spring Cloud Stream 支持通过使用 JAAS 配置文件和 Spring Boot 属性将 JAAS 配置信息传递给应用程序。

使用 JAAS 配置文件

可以使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。下面的示例展示如何通过使用 JAAS 配置文件使用 SASL 和 Kerberos 启动 Spring Cloud Stream 应用程序:

 java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性

作为拥有 JAAS 配置文件的替代方法,Spring Cloud Stream 提供了一种机制,通过使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置。

以下属性可用于配置 Kafka 客户端的登录上下文:

spring.cloud.stream.kafka.binder.jaas.loginModule

登录模块名称。正常情况下不需要设置。

默认值: com.sun.security.auth.module.Krb5LoginModule。

spring.cloud.stream.kafka.binder.jaas.controlFlag

登录模块的控制标志。

默认值: required。

spring.cloud.stream.kafka.binder.jaas.options

使用包含登录模块选项的 key/value 对进行映射。

默认值: 空 map。

下面的示例展示如何使用 SASL 和 Kerberos 通过使用 Spring Boot 配置属性启动 Spring Cloud Stream 应用程序:

 java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

前面的示例表示以下 JAAS 文件的等价物:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[email protected]";
};

如果所需主题已经存在于代理上,或者将由管理员创建,则可以关闭自动创建,只需要发送客户端 JAAS 属性。

[Note] Note

不要在同一个应用程序中混合 JAAS 配置文件和 Spring Boot 属性。如果已经存在 -Djava.security.auth.login.config 系统属性,则 Spring Cloud Stream 将忽略 Spring Boot 属性。

[Note] Note

使用带有 Kerberos 的 autoCreateTopics 和 autoAddPartitions 时要小心。通常,应用程序可以使用在 Kafka 和 Zookeeper 中没有管理权限的主体。因此,依赖 Spring Cloud Stream 来创建/修改主题可能会失败。在安全环境中,我们强烈建议使用 Kafka 工具创建主题和管理 ACL。

示例:暂停和恢复消费者

如果希望暂停使用,但不导致分区重新平衡,可以暂停并恢复消费者。这可以通过将 Consumer 作为参数添加到 @StreamListener 来实现。要恢复,需要 ListenerContainerIdleEvent 实例的 ApplicationListener。事件发布的频率由 idleEventInterval 属性控制。由于使消费者不是线程安全的,所以必须在调用线程上调用这些方法。

下面的简单应用程序展示了如何暂停和恢复:

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
		System.out.println(in);
		consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
	}

	@Bean
	public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
		return event -> {
			System.out.println(event);
			if (event.getConsumer().paused().size() > 0) {
				event.getConsumer().resume(event.getConsumer().paused());
			}
		};
	}

}

39.4 错误通道

从 1.3 版开始,绑定器无条件地将异常发送到每个消费者目标的错误通道,还可以配置为将异步生产者发送失败发送到错误通道。有关的更多详细信息请参阅 小节 29.4, “错误处理”。

发送失败的 ErrorMessage 的有效负载是 KafkaSendFailureException,属性为:

  • failedMessage: 失败时发送 Spring Messaging Message<?>。
  • record: 从 failedMessage 创建的原始 ProducerRecord

没有自动处理生产者异常(例如发送到死信队列)。你可以使用自己的 Spring 集成流来使用这些异常。

39.5 Kafka 度量

Kafka 绑定器模块公开了以下指标:

spring.cloud.stream.binder.kafka.offset: 此指标表示给定的消费者组尚未从给定的绑定器主题中消耗多少消息。提供的度量基于 Mircometer 度量库。该度量包含消费者组信息、主题以及与主题上的最新偏移量的实际提交偏移量滞后。这个指标对于向 PaaS 平台提供自动伸缩反馈特别有用。

39.6 死信主题处理

由于你无法预测用户希望如何处理死信消息,因此框架不提供任何标准机制来处理这些死信消息。如果死信的原因是暂时的,你可能希望将消息路由回原始主题。但是,如果问题是一个永久性的问题,那么可能会导致无限循环。本主题中的示例 Spring Boot 应用程序是如何将这些消息路由回原始主题的示例,但它在三次尝试后将它们移动到 “parking lot” 主题。该应用程序是另一个从死信主题读取的 spring-cloud-stream 应用程序。当 5 秒钟内没有收到任何消息时,它将终止。

示例假设原始目的地是 so8400out,而消费者组是 so8400。

有几个策略需要考虑:

  • 考虑仅在主应用程序未运行时运行重新路由。否则,对瞬时错误的重试将很快用完。
  • 或者,使用两个阶段的方法:使用此应用程序路由到第三个主题,使用另一个路由从第三个主题返回到主主题。

以下代码列展示示例应用程序:

application.properties. 

spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries

Application. 

@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private MessageChannel parkingLot;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> reRoute(Message<?> failed) {
        processed.incrementAndGet();
        Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
        if (retries == null) {
            System.out.println("First retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else if (retries.intValue() < 3) {
            System.out.println("Another retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else {
            System.out.println("Retries exhausted for " + failed);
            parkingLot.send(MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
        }
        return null;
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, terminating");
                return;
            }
        }
    }

    public interface TwoOutputProcessor extends Processor {

        @Output("parkingLot")
        MessageChannel parkingLot();

    }

}

39.7 用 Kafka 绑定器进行分区

Apache Kafka 原生支持主题分区。

有时,将数据发送到特定的分区是有利的 — 例如,当你想要严格地订阅消息处理时(特定客户的所有消息都应该转到同一个分区)。

以下示例展示如何配置生产者和消费者端:

@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}

application.yml. 

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: partitioned.topic
          producer:
            partitioned: true
            partition-key-expression: headers['partitionKey']
            partition-count: 12

[Important] 重点

必须为主题提供足够的分区,以便为所有消费者组实现所需的并发性。上述配置最多支持 12 个消费者实例(6个 用于并发性为 2,4 个用于并发性为 3,依此类推)。通常情况下,最好对分区进行 “over-provision”,以便将来增加消费者或并发性。

[Note] Note

前面的配置使用默认分区(key.hashCode() % partitionCount)。这可能提供或不提供适当的均衡算法,具体取决于键值。你可以使用 partitionSelectorExpression 或 partitionSelectorClass 属性覆盖此默认值。

由于分区是由 Kafka 原生处理的,因此在消费者端不需要特殊配置。Kafka 在实例之间分配分区。

下面的 Spring Boot 应用程序监听 Kafka 流并打印(到控制台)每个消息要发送到的分区 ID:

@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(in + " received from partition " + partition);
    }

}

application.yml. 

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: partitioned.topic
          group: myGroup

你可以根据需要添加实例。Kafka 重新均衡分区分配。如果实例计数(或 instance count * concurrency)超过分区数,则某些消费者处于空闲状态。