27. 介绍 Spring Cloud Stream

Spring Cloud Stream 是构建消息驱动微服务应用程序的框架。Spring Cloud Stream 建立在 Spring Boot 的基础上,创建独立的、生产级的 Spring 应用程序,并使用 Spring 集成为消息代理提供连接。它提供了来自多个供应商的中间件的独立配置,介绍了持久发布订阅语义、使用者组和分区的概念。

你可以将 @EnableBinding 注解添加到应用程序中,以立即连接到消息代理,还可以将 @StreamListener 添加到方法中,使其接收用于流处理的事件。以下示例展示接收外部消息的接收器应用程序:

@SpringBootApplication
@EnableBinding(Sink.class)
public class VoteRecordingSinkApplication {

  public static void main(String[] args) {
    SpringApplication.run(VoteRecordingSinkApplication.class, args);
  }

  @StreamListener(Sink.INPUT)
  public void processVote(Vote vote) {
      votingService.recordVote(vote);
  }
}

@EnableBinding 注解将一个或多个接口作为参数(在本例中,参数是单个接收器接口)。接口声明输入和输出通道。Spring Cloud 流提供 Source、Sink 和 Processor 接口。还可以定义自己的接口。

下面的列展示了 Sink 接口的定义:

public interface Sink {
  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}

@Input 注解标识一个输入通道,通过该通道接收到的消息进入应用程序。@Output 注解标识一个输出通道,通过该通道发布的消息将离开应用程序。@Input 和 @Output 注解可以采用通道名称作为参数。如果未提供名称,则使用带注解的方法的名称。

Spring Cloud Stream 为你创建接口的实现。你可以在应用程序中使用它,方法是自动注入它,如下面的示例(来自测试用例)所示:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class)
@WebAppConfiguration
@DirtiesContext
public class StreamApplicationTests {

  @Autowired
  private Sink sink;

  @Test
  public void contextLoads() {
    assertNotNull(this.sink.input());
  }
}