Spring Cloud Stream 提供了对元数据演化的支持,以便数据可以随着时间的推移而演化,并且仍然与较旧或较新的生产者和消费者一起工作,反之亦然。大多数序列化模型,尤其是那些旨在跨不同平台和语言进行可移植的模型,都依赖于描述如何在二进制负载中序列化数据的元数据。为了序列化数据,然后对其进行解释,发送端和接收端都必须能够访问描述二进制格式的元数据。在某些情况下,可以从序列化时的有效负载类型或反序列化时的目标类型推断架构。然而,许多应用程序受益于访问描述二进制数据格式的显式元数据。元数据注册允许你以文本格式(通常是 JSON)存储元数据信息,并使需要它以二进制格式接收和发送数据的各种应用程序可以访问这些信息。架构可以作为一个元组进行引用,该元组由以下部分组成:
以下各节将详细介绍元数据演化过程中涉及的各种组件。
用于与元数据注册服务器交互的客户端抽象是 public interface SchemaRegistryClient { SchemaRegistrationResponse register(String subject, String format, String schema); String fetch(SchemaReference schemaReference); String fetch(Integer id); } Spring Cloud Stream 提供了开箱即用的实现,用于与自己的元数据服务器进行交互,以及与融合元数据注册进行交互。
可以使用 @EnableBinding(Sink.class) @SpringBootApplication @EnableSchemaRegistryClient public static class AvroSinkApplication { ... }
元数据注册客户端支持以下属性:
对于在应用程序上下文中注册了 SchemaRegistryClient bean 的应用程序,Spring Cloud Stream 自动配置用于元数据管理的 Apache Avro 消息转换器。这简化了元数据的演进,因为接收消息的应用程序可以很容易地访问可与自己的读取器元数据协调的编写器模式。
对于出站消息,如果通道的内容类型设置为 spring.cloud.stream.bindings.output.contentType=application/*+avro
在出站转换过程中,消息转换器尝试推断每个出站消息的元数据(基于其类型),并使用
例如, 当接收消息时,转换器从传入消息的头推断元数据引用并尝试检索它。该元数据在反序列化过程中用作编写器架构。
如果通过设置
Spring Cloud Stream 通过其
要使用自定义转换器,只需将其添加到应用程序上下文中,可以选择指定一个或多个与之关联的
如果转换的目标类型是
下面的示例展示如何通过在没有预定义元数据的情况下注册 Apache Avro @EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean public MessageConverter userMessageConverter() { return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); } } 相反,以下应用程序使用预定义的元数据(在类路径上找到)注册转换器: @EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean public MessageConverter userMessageConverter() { AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); converter.setSchemaLocation(new ClassPathResource("schemas/User.avro")); return converter; } }
Spring Cloud Stream 提供了一个元数据注册服务器实现。要使用它,你可以将 元数据注册服务器使用关系数据库存储模式。默认情况下,它使用嵌入式数据库。你可以使用Spring Boot SQL 数据库和 JDBC 配置选项自定义元数据存储。 以下示例展示启用元数据注册的 Spring Boot 应用程序: @SpringBootApplication @EnableSchemaRegistryServer public class SchemaRegistryServerApplication { public static void main(String[] args) { SpringApplication.run(SchemaRegistryServerApplication.class, args); } } 元数据注册服务器 API 由以下操作组成:
要注册新元数据,请向
它的响应是 JSON 的元数据对象,包含以下字段:
要按主题、格式和版本检索现有元数据,请将 它的响应是 JSON 的元数据对象,包含以下字段:
要按主题和格式检索现有元数据,请将 它的响应是 JSON 的元数据列表,包含以下字段:
要按 ID 检索现有元数据,请将 它的响应是 JSON 的元数据对象,包含以下字段:
默认配置创建一个 @Bean public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){ ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient(); client.setEndpoint(endpoint); return client; }
为了更好地理解 Spring Cloud Stream 如何注册和解析新元数据及其对 Avro 元数据比较功能的使用,我们提供了两个单独的小节:
注册过程的第一部分是从通过通道发送的有效负载中提取元数据。诸如 一旦获得了元数据,转换器就从远程服务器加载其元数据(版本)。首先,它查询本地缓存。如果找不到结果,它会将数据提交给服务器,服务器将使用版本控制信息进行响应。转换器总是缓存结果,以避免为每个需要序列化的新消息查询元数据服务器的开销。
使用元数据版本信息,转换器将消息的
当读取包含版本信息的消息(即,具有 “小节 33.6.1, “元数据注册过程(序列化)”” 中描述的方案的
|