要使用 Apache Kafka 绑定器,需要将 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> 或者,您也可以使用Spring Cloud Stream Kafka Starter,如下面的 Maven 示例所示: <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> 下图展示了 Apache Kafka 绑定器如何操作的简化图: Apache Kafka 绑定器实现将每个目标映射到 Apache Kafka 主题。消费者组直接映射到相同的 Apache Kafka 概念。分区也直接映射到 Apache Kafka 分区。
绑定器当前使用的是 Apache Kafka 本节包含 Apache Kafka 绑定器使用的配置选项。 有关绑定器的常见配置选项和属性,请参阅核心文档。
以下属性仅适用于 Kafka 消费者,并且必须以
以下属性仅适用于 Kafka 生产者,并且必须以
在本节中,我们将展示前面的属性在特定场景中的使用。 此示例说明如何在消费者应用程序中手动确认偏移量。
此示例要求将 @SpringBootApplication @EnableBinding(Sink.class) public class ManuallyAcknowdledgingConsumer { public static void main(String[] args) { SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args); } @StreamListener(Sink.INPUT) public void process(Message<?> message) { Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); if (acknowledgment != null) { System.out.println("Acknowledgment provided"); acknowledgment.acknowledge(); } } }
Apache Kafka 0.9 支持客户端和代理之间的安全连接。要利用此功能,请遵循 Apache Kafka 文档中的指导原则以及 Confluent 文档中的 Kafka 0.9 安全指导原则。使用
例如,要将 spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL 所有其他安全属性都可以用类似的方式设置。 使用 Kerberos 时,请按照参考文档中的说明创建和引用 JAAS 配置。 Spring Cloud Stream 支持通过使用 JAAS 配置文件和 Spring Boot 属性将 JAAS 配置信息传递给应用程序。 可以使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。下面的示例展示如何通过使用 JAAS 配置文件使用 SASL 和 Kerberos 启动 Spring Cloud Stream 应用程序: java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
作为拥有 JAAS 配置文件的替代方法,Spring Cloud Stream 提供了一种机制,通过使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置。 以下属性可用于配置 Kafka 客户端的登录上下文:
下面的示例展示如何使用 SASL 和 Kerberos 通过使用 Spring Boot 配置属性启动 Spring Cloud Stream 应用程序: java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \ --spring.cloud.stream.bindings.input.destination=stream.ticktock \ --spring.cloud.stream.kafka.binder.autoCreateTopics=false \ --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \ --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \ --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \ --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \ --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM 前面的示例表示以下 JAAS 文件的等价物: KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_client.keytab" principal="[email protected]"; }; 如果所需主题已经存在于代理上,或者将由管理员创建,则可以关闭自动创建,只需要发送客户端 JAAS 属性。
如果希望暂停使用,但不导致分区重新平衡,可以暂停并恢复消费者。这可以通过将 下面的简单应用程序展示了如何暂停和恢复: @SpringBootApplication @EnableBinding(Sink.class) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @StreamListener(Sink.INPUT) public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) { System.out.println(in); consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0))); } @Bean public ApplicationListener<ListenerContainerIdleEvent> idleListener() { return event -> { System.out.println(event); if (event.getConsumer().paused().size() > 0) { event.getConsumer().resume(event.getConsumer().paused()); } }; } } 从 1.3 版开始,绑定器无条件地将异常发送到每个消费者目标的错误通道,还可以配置为将异步生产者发送失败发送到错误通道。有关的更多详细信息请参阅 小节 29.4, “错误处理”。
发送失败的
没有自动处理生产者异常(例如发送到死信队列)。你可以使用自己的 Spring 集成流来使用这些异常。 Kafka 绑定器模块公开了以下指标:
由于你无法预测用户希望如何处理死信消息,因此框架不提供任何标准机制来处理这些死信消息。如果死信的原因是暂时的,你可能希望将消息路由回原始主题。但是,如果问题是一个永久性的问题,那么可能会导致无限循环。本主题中的示例 Spring Boot 应用程序是如何将这些消息路由回原始主题的示例,但它在三次尝试后将它们移动到 “parking lot” 主题。该应用程序是另一个从死信主题读取的 spring-cloud-stream 应用程序。当 5 秒钟内没有收到任何消息时,它将终止。
示例假设原始目的地是 有几个策略需要考虑:
以下代码列展示示例应用程序: application.properties. spring.cloud.stream.bindings.input.group=so8400replay spring.cloud.stream.bindings.input.destination=error.so8400out.so8400 spring.cloud.stream.bindings.output.destination=so8400out spring.cloud.stream.bindings.output.producer.partitioned=true spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot spring.cloud.stream.bindings.parkingLot.producer.partitioned=true spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest spring.cloud.stream.kafka.binder.headers=x-retries Application. @SpringBootApplication @EnableBinding(TwoOutputProcessor.class) public class ReRouteDlqKApplication implements CommandLineRunner { private static final String X_RETRIES_HEADER = "x-retries"; public static void main(String[] args) { SpringApplication.run(ReRouteDlqKApplication.class, args).close(); } private final AtomicInteger processed = new AtomicInteger(); @Autowired private MessageChannel parkingLot; @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public Message<?> reRoute(Message<?> failed) { processed.incrementAndGet(); Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class); if (retries == null) { System.out.println("First retry for " + failed); return MessageBuilder.fromMessage(failed) .setHeader(X_RETRIES_HEADER, new Integer(1)) .setHeader(BinderHeaders.PARTITION_OVERRIDE, failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)) .build(); } else if (retries.intValue() < 3) { System.out.println("Another retry for " + failed); return MessageBuilder.fromMessage(failed) .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1)) .setHeader(BinderHeaders.PARTITION_OVERRIDE, failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)) .build(); } else { System.out.println("Retries exhausted for " + failed); parkingLot.send(MessageBuilder.fromMessage(failed) .setHeader(BinderHeaders.PARTITION_OVERRIDE, failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)) .build()); } return null; } @Override public void run(String... args) throws Exception { while (true) { int count = this.processed.get(); Thread.sleep(5000); if (count == this.processed.get()) { System.out.println("Idle, terminating"); return; } } } public interface TwoOutputProcessor extends Processor { @Output("parkingLot") MessageChannel parkingLot(); } } Apache Kafka 原生支持主题分区。 有时,将数据发送到特定的分区是有利的 — 例如,当你想要严格地订阅消息处理时(特定客户的所有消息都应该转到同一个分区)。 以下示例展示如何配置生产者和消费者端: @SpringBootApplication @EnableBinding(Source.class) public class KafkaPartitionProducerApplication { private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final String[] data = new String[] { "foo1", "bar1", "qux1", "foo2", "bar2", "qux2", "foo3", "bar3", "qux3", "foo4", "bar4", "qux4", }; public static void main(String[] args) { new SpringApplicationBuilder(KafkaPartitionProducerApplication.class) .web(false) .run(args); } @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000")) public Message<?> generate() { String value = data[RANDOM.nextInt(data.length)]; System.out.println("Sending: " + value); return MessageBuilder.withPayload(value) .setHeader("partitionKey", value) .build(); } } application.yml. spring: cloud: stream: bindings: output: destination: partitioned.topic producer: partitioned: true partition-key-expression: headers['partitionKey'] partition-count: 12
由于分区是由 Kafka 原生处理的,因此在消费者端不需要特殊配置。Kafka 在实例之间分配分区。 下面的 Spring Boot 应用程序监听 Kafka 流并打印(到控制台)每个消息要发送到的分区 ID: @SpringBootApplication @EnableBinding(Sink.class) public class KafkaPartitionConsumerApplication { public static void main(String[] args) { new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class) .web(false) .run(args); } @StreamListener(Sink.INPUT) public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println(in + " received from partition " + partition); } } application.yml. spring: cloud: stream: bindings: input: destination: partitioned.topic group: myGroup
你可以根据需要添加实例。Kafka 重新均衡分区分配。如果实例计数(或 |