30. 绑定器

Spring Cloud Stream 提供了一个绑定抽象,用于连接到外部中间件的物理目的地。本节提供了关于绑定器 SPI 背后的主要概念、其主要组件和实现特定细节的信息。

30.1 生产者和消费者

下图展示了生产者和消费者的一般关系:

图 30.1. 生产者和消费者

producers consumers

生产者是将消息发送到通道的任何组件。可以使用该代理的 Binder 实现将通道绑定到外部消息代理。调用 bindProducer() 方法时,第一个参数是代理中目标的名称,第二个参数是生产者向其发送消息的本地通道实例,第三个参数包含要在为该通道创建的适配器中使用的属性(如分区键表达式)。

消费者是从通道接收消息的任何组件。与生产者一样,消费者的通道可以绑定到外部消息代理。调用 bindConsumer() 方法时,第一个参数是目标名称,第二个参数提供逻辑消费者组的名称。由给定目标的消费者绑定表示的每个组都会收到生产者发送到该目标的每个消息的副本(即,它遵循正常的发布订阅语义)。如果存在多个绑定到相同组名的消费者实例,那么消息将在这些消费者实例之间进行负载平衡,以便生产者发送的每个消息仅由每个组中的一个消费者实例使用(即,它遵循正常的排队语义)。

30.2 绑定器 SPI

绑定器 SPI 由许多接口、现成的实用程序类和发现策略组成,这些发现策略为连接到外部中间件提供了可插拔的机制。

SPI 的关键点是 Binder 接口,它是一种将输入和输出连接到外部中间件的策略。下面的列展示 Binder 接口的定义:

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

    Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}

接口是参数化的,提供了许多扩展点:

  • 输入和输出绑定目标。从 1.0 版开始,只支持 MessageChannel,但将来它将用作扩展点。
  • 扩展的消费者和生产者属性,允许特定的绑定器实现添加以类型安全方式支持的补充属性。

典型的绑定器实现包括以下内容:

  • 实现 Binder 接口的类;
  • 一个 Spring @Configuration 类,它与中间件连接基础设施一起创建一个 Binder 类型的 bean。
  • 在类路径上找到的 META-INF/spring.binders 文件包含一个或多个绑定器定义,如下面的示例所示:

    kafka:\
    org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration

30.3 绑定器检测

Spring Cloud Stream 依赖绑定器 SPI 的实现来执行将通道连接到消息代理的任务。每个绑定器实现通常连接到一种类型的消息传递系统。

30.3.1 类路径检测

默认情况下,Spring Cloud Stream 依赖于 Spring Boot 的自动配置来配置绑定过程。如果在类路径上找到单个绑定器实现,那么 Spring Cloud Stream 将自动使用它。例如,旨在仅绑定到 RabbitMQ 的 Spring Cloud Stream 项目可以添加以下依赖项:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

有关其他绑定器依赖项的特定 Maven 坐标,请参阅该绑定器实现的文档。

30.4 类路径上的多个绑定器

当类路径上存在多个绑定器时,应用程序必须指明每个通道绑定将使用哪个绑定器。每个绑定器配置都包含一个 META-INF/spring.binders 文件,它是一个简单的属性文件,如下例所示:

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

对于其他提供的绑定器实现(如 Kafka),也存在类似的文件,并且期望自定义绑定器实现也提供这些文件。键表示绑定器实现的标识名,而该值是配置类的逗号分隔列表,每个配置类都包含一个且只有一个类型为 org.springframework.cloud.stream.binder.Binder 的 bean 定义。

可以使用 spring.cloud.stream.defaultBinder 属性(例如 spring.cloud.stream.defaultBinder=rabbit)全局执行绑定器选择,也可以通过在每个通道绑定上配置绑定器来单独执行。例如,从 Kafka 读取并写入 rabbitmq 的处理器应用程序(分别具有名为 input 和 output 的通道)可以指定以下配置:

spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit

30.5 连接到多个系统

默认情况下,绑定器共享应用程序的 Spring Boot 自动配置,以便在类路径上创建每个绑定器的一个实例。如果应用程序应连接到同一类型的多个代理,则可以指定多个绑定器配置,每个配置具有不同的环境设置。

[Note] Note

打开显式绑定器配置将完全禁用默认绑定器配置过程。如果这样做,所有使用中的绑定器都必须包含在配置中。打算透明地使用 Spring Cloud Stream 的框架可以创建可以通过名称引用的绑定器配置,但它们不会影响默认的绑定器配置。为此,绑定器配置可能将其 defaultCandidate 标志设置为 false(例如,spring.cloud.stream.binders.<configurationName>.defaultCandidate=false)。这表示独立于默认绑定器配置过程存在的配置。

以下示例展示了连接到两个 RabbitMQ 代理实例的处理器应用程序的典型配置:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: thing1
          binder: rabbit1
        output:
          destination: thing2
          binder: rabbit2
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host1>
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host2>

30.6 绑定可视化和控制

自 2.0 版以来,Spring Cloud Stream 支持通过执行器端点可视化和控制绑定。

从 2.0 版开始,执行器和 web 是可选的,你必须首先添加一个 web 依赖项,并手动添加执行器依赖项。以下示例展示如何添加 web 框架的依赖项:

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

以下示例显示如何添加 WebFlux 框架的依赖项:

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

可以按如下方式添加执行器依赖项:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
[Note] Note

要在 Cloud Foundry 中运行 Spring Cloud Stream 2.0 应用程序,必须将 spring-boot-starter-web 和 spring-boot-starter-actuator 添加到类路径。否则,由于健康检查失败,应用程序将无法启动。

还必须通过设置以下属性来启用 bindings 执行器终结点:--management.endpoints.web.exposure.include=bindings。

一旦这些先决条件得到满足。当应用程序启动时,应该在日志中看到以下内容:

: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . .
: Mapped "{[/actuator/bindings],methods=[GET]. . .
: Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .

要可视化当前绑定,请访问以下 URL: http://<host>:<port>/actuator/bindings

或者,要查看单个绑定,请访问以下类似的 URL 之一:http://<host>:<port>/actuator/bindings/myBindingName

你还可以停止、启动、暂停和恢复各个绑定,方法是在提供 state 参数(如 JSON)的同时将其发布到同一个 URL,如下例所示:

  • curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
  • curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
  • curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
  • curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName

[Note] Note

只有当相应的绑定器及其底层技术支持时,才会 PAUSED 和 RESUMED 工作。否则,你将在日志中看到警告消息。目前,只有 Kafka 绑定器支持 PAUSED 和 RESUMED 状态。

30.7 绑定器配置属性

自定义绑定器配置时,以下属性可用。这些属性通过 org.springframework.cloud.stream.config.BinderProperties 公开

它们必须以 spring.cloud.stream.binders.<configurationName> 作为前缀。

类型

绑定器类型。它通常引用类路径上的一个绑定器 — 特别是 META-INF/spring.binders 文件中的一个键。

默认情况下,它与配置名称具有相同的值。

继承环境

配置是否继承应用程序本身的环境。

默认值: true.

环境

可用于自定义绑定器环境的一组属性的根。设置此属性时,创建绑定器的上下文不是应用程序上下文的子上下文。此设置允许绑定器组件和应用程序组件之间完全分离。

默认值: empty.

默认候选

绑定器配置是作为默认绑定器的候选配置,还是仅在显式引用时使用。此设置允许在不干扰默认处理的情况下添加绑定器配置。

默认值: true.