33. 元数据演化支持

Spring Cloud Stream 提供了对元数据演化的支持,以便数据可以随着时间的推移而演化,并且仍然与较旧或较新的生产者和消费者一起工作,反之亦然。大多数序列化模型,尤其是那些旨在跨不同平台和语言进行可移植的模型,都依赖于描述如何在二进制负载中序列化数据的元数据。为了序列化数据,然后对其进行解释,发送端和接收端都必须能够访问描述二进制格式的元数据。在某些情况下,可以从序列化时的有效负载类型或反序列化时的目标类型推断架构。然而,许多应用程序受益于访问描述二进制数据格式的显式元数据。元数据注册允许你以文本格式(通常是 JSON)存储元数据信息,并使需要它以二进制格式接收和发送数据的各种应用程序可以访问这些信息。架构可以作为一个元组进行引用,该元组由以下部分组成:

  • 元数据的逻辑名称的主题
  • 元数据版本
  • 元数据格式,描述数据的二进制格式

以下各节将详细介绍元数据演化过程中涉及的各种组件。

33.1 元数据注册客户端

用于与元数据注册服务器交互的客户端抽象是 SchemaRegistryClient 接口,它具有以下结构:

public interface SchemaRegistryClient {

    SchemaRegistrationResponse register(String subject, String format, String schema);

    String fetch(SchemaReference schemaReference);

    String fetch(Integer id);

}

Spring Cloud Stream 提供了开箱即用的实现,用于与自己的元数据服务器进行交互,以及与融合元数据注册进行交互。

可以使用 @EnableSchemaRegistryClient 配置 Spring Cloud Stream 元数据注册的客户端,如下所示:

  @EnableBinding(Sink.class)
  @SpringBootApplication
  @EnableSchemaRegistryClient
  public static class AvroSinkApplication {
    ...
  }
[Note] Note

默认的转换器被优化为不仅缓存来自远程服务器的模式,而且缓存 parse() 和 toString() 方法,这是非常昂贵的。因此,它使用了一个不缓存响应的 DefaultSchemaRegistryClient。如果要更改默认行为,可以直接在代码上使用客户端,并将其重写为所需的结果。为此,必须将 spring.cloud.stream.schemaRegistryClient.cached=true 属性添加到应用程序属性中。

33.1.1 元数据注册客户端属性

元数据注册客户端支持以下属性:

spring.cloud.stream.schemaRegistryClient.endpoint
元数据服务器的位置。设置时,请使用完整的 URL,包括协议(http 或 https)、端口和上下文路径。
默认值
http://localhost:8990/
spring.cloud.stream.schemaRegistryClient.cached
客户端是否应缓存元数据服务器响应。通常设置为 false,因为缓存发生在消息转换器中。使用元数据注册客户端的客户端应将此设置为 true。
默认值
true

33.2 Avro 元数据注册客户端消息转换器

对于在应用程序上下文中注册了 SchemaRegistryClient bean 的应用程序,Spring Cloud Stream 自动配置用于元数据管理的 Apache Avro 消息转换器。这简化了元数据的演进,因为接收消息的应用程序可以很容易地访问可与自己的读取器元数据协调的编写器模式。

对于出站消息,如果通道的内容类型设置为 application/*+avro,则会激活 MessageConverter,如下例所示:

spring.cloud.stream.bindings.output.contentType=application/*+avro

在出站转换过程中,消息转换器尝试推断每个出站消息的元数据(基于其类型),并使用 SchemaRegistryClient 将其注册到主题(基于有效负载类型)。如果已找到相同的元数据,则检索对该元数据的引用。如果没有,则注册元数据,并提供新的版本号。使用以下方案发送带有 contentType 头的消息:application/[prefix].[subject].v[version]+avro,其中 prefix 是可配置的,subject 是从有效负载类型推导出来的。

例如,User 类型的消息可以作为二进制有效负载发送,其内容类型为 application/vnd.user.v2+avro,其中 user 是主题,2 是版本号。

当接收消息时,转换器从传入消息的头推断元数据引用并尝试检索它。该元数据在反序列化过程中用作编写器架构。

33.2.1 Avro 元数据注册客户端消息转换器属性

如果通过设置 spring.cloud.stream.bindings.output.contentType=application/*+avro 启用了基于 Avro 的元数据注册客户端,则可以通过设置以下属性自定义注册行为。

spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled

如果希望转换器使用反射从 POJO 推断元数据,请启用。

默认值:false

spring.cloud.stream.schema.avro.readerSchema

Avro 通过查看一个编写器模式(原始负载)和一个读取器模式(应用程序负载)来比较元数据版本。有关的更多信息请参阅 Avro 文档。如果设置了,这将覆盖元数据服务器上的任何查找,并使用本地元数据作为读取器元数据。

默认值:null

spring.cloud.stream.schema.avro.schemaLocations

向元数据服务器注册此属性中列出的任何 .avsc 文件。

默认值:empty

spring.cloud.stream.schema.avro.prefix

要在内容类型标题上使用的前缀。

默认值:vnd

33.3 Apache Avro 消息转换器

Spring Cloud Stream 通过其 spring-cloud-stream-schema 模块为基于元数据的消息转换器提供支持。目前,对于基于元数据的消息转换器,唯一现成支持的序列化格式是 Apache Avro,在未来的版本中会添加更多的格式。

spring-cloud-stream-schema 模块包含两种类型的消息转换器,可用于 Apache Avro 序列化:

  • 使用序列化或反序列化对象的类信息或在启动时具有已知位置的元数据的转换器。
  • 使用元数据注册的转换器。它们在运行时定位元数据,并随着域对象的发展动态注册新元数据。

33.4 支持元数据的转换器

AvroSchemaMessageConverter 支持通过使用预定义元数据或使用类中可用的元数据信息(反射或包含在 SpecificRecord 中)对消息进行序列化和反序列化。如果提供自定义转换器,则不会创建默认的 AvroSchemaMessageConverter bean。以下示例展示自定义转换器:

要使用自定义转换器,只需将其添加到应用程序上下文中,可以选择指定一个或多个与之关联的 MimeTypes。默认 MimeTypes 是 application/avro。

如果转换的目标类型是 GenericRecord,则必须设置元数据。

下面的示例展示如何通过在没有预定义元数据的情况下注册 Apache Avro MessageConverter 来配置接收器应用程序中的转换器。在本例中,请注意 mime type 值是 avro/bytes,而不是默认的 application/avro。

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

  ...

  @Bean
  public MessageConverter userMessageConverter() {
      return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

相反,以下应用程序使用预定义的元数据(在类路径上找到)注册转换器:

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

  ...

  @Bean
  public MessageConverter userMessageConverter() {
      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
      converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
      return converter;
  }
}

33.5 元数据注册服务器

Spring Cloud Stream 提供了一个元数据注册服务器实现。要使用它,你可以将 spring-cloud-stream-schema-server 项目添加到项目中,并使用 @EnableSchemaRegistryServer 注解,该注解将元数据注册服务器 REST 控制器添加到应用程序中。此注解用于 Spring Boot web 应用程序,服务器的监听端口由 server.port 属性控制。spring.cloud.stream.schema.server.path 属性可用于控制元数据服务器的根路径(尤其是当它嵌入到其他应用程序中时)。spring.cloud.stream.schema.server.allowSchemaDeletion 布尔属性允许删除元数据。默认情况下,这是禁用的。

元数据注册服务器使用关系数据库存储模式。默认情况下,它使用嵌入式数据库。你可以使用Spring Boot SQL 数据库和 JDBC 配置选项自定义元数据存储。

以下示例展示启用元数据注册的 Spring Boot 应用程序:

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(SchemaRegistryServerApplication.class, args);
    }
}

33.5.1 元数据注册服务器 API

元数据注册服务器 API 由以下操作组成:

  • POST / — 请参阅 “名为 “注册一个新元数据” 的部分”
  • 'GET /{subject}/{format}/{version}' — 请参阅 “名为 “按主题、格式和版本检索现有元数据” 的部分”
  • GET /{subject}/{format} — 请参阅 “名为 “按主题和格式检索现有元数据” 的部分”
  • GET /schemas/{id} — 请参阅 “名为 “按 ID 检索现有元数据” 的部分”
  • DELETE /{subject}/{format}/{version} — 请参阅 “名为 “按主题、格式和版本删除元数据” 的部分”
  • DELETE /schemas/{id} — 请参阅 “名为 “按 ID 删除元数据” 的部分”
  • DELETE /{subject} — 请参阅 “名为 “按主题删除元数据” 的部分”

注册一个新元数据

要注册新元数据,请向 / 端点发送 POST 请求。

/ 接受具有以下字段的 JSON 负载:

  • subject: 元数据主题
  • format: 元数据格式
  • definition: 元数据定义

它的响应是 JSON 的元数据对象,包含以下字段:

  • id: 元数据 ID
  • subject: 元数据主题
  • format: 元数据格式
  • version: 元数据版本
  • definition: 元数据定义

按主题、格式和版本检索现有元数据

要按主题、格式和版本检索现有元数据,请将 GET 请求发送到 /{subject}/{format}/{version} 端点。

它的响应是 JSON 的元数据对象,包含以下字段:

  • id: 元数据 ID
  • subject: 元数据主题
  • format: 元数据格式
  • version: 元数据版本
  • definition: 元数据定义

按主题和格式检索现有元数据

要按主题和格式检索现有元数据,请将 GET 请求发送到 /subject/format 端点。

它的响应是 JSON 的元数据列表,包含以下字段:

  • id: 元数据 ID
  • subject: 元数据主题
  • format: 元数据格式
  • version: 元数据版本
  • definition: 元数据定义

按 ID 检索现有元数据

要按 ID 检索现有元数据,请将 GET 请求发送到 /schemas/{id} 端点。

它的响应是 JSON 的元数据对象,包含以下字段:

  • id: 元数据 ID
  • subject: 元数据主题
  • format: 元数据格式
  • version: 元数据版本
  • definition: 元数据定义

按主题、格式和版本删除元数据

要按主题、格式和版本删除元数据,请将 DELETE 请求发送到 /{subject}/{format}/{version} 端点。

按 ID 删除元数据

要按 ID 删除元数据,请将 DELETE 请求发送到 /schemas/{id} 端点。

按主题删除元数据

DELETE /{subject}

按主题删除已存在的元数据

[Note] Note

本说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。Spring Cloud Stream 1.1.0.RELEASE 使用表名 schema 存储 Schema 对象。Schema 是许多数据库实现中的关键字。为了避免将来的任何冲突,从 1.1.1.RELEASE 开始,我们选择了存储表的名称 SCHEMA_REPOSITORY。升级的任何 Stream 1.1.0.RELEASE 用户都应在升级前将其现有元数据迁移到新表中。

33.5.2 使用 Confluent 的元数据注册

默认配置创建一个 DefaultSchemaRegistryClient bean。如果要使用 Confluent 元数据注册,则需要创建 ConfluentSchemaRegistryClient 类型的 bean,它将取代元数据默认配置的 bean。下面的示例展示如何创建这样的 bean:

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
  ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
  client.setEndpoint(endpoint);
  return client;
}
[Note] Note

ConfluentSchemaRegistryClient 在 Confluent 平台 4.0.0 版上进行测试。

33.6 元数据注册和解析

为了更好地理解 Spring Cloud Stream 如何注册和解析新元数据及其对 Avro 元数据比较功能的使用,我们提供了两个单独的小节:

  • “小节 33.6.1, “元数据注册过程(序列化)””
  • “小节 33.6.2, “元数据解析过程(反序列化)””

33.6.1 元数据注册过程(序列化)

注册过程的第一部分是从通过通道发送的有效负载中提取元数据。诸如 SpecificRecord 或 GenericRecord 之类的 Avro 类型已经包含一个元数据,可以立即从实例中检索该元数据。在 POJOs 的情况下,如果 spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled 属性设置为 true(默认值),则会推断出元数据。

图 33.1. 架构写入解析过程

schema resolution

一旦获得了元数据,转换器就从远程服务器加载其元数据(版本)。首先,它查询本地缓存。如果找不到结果,它会将数据提交给服务器,服务器将使用版本控制信息进行响应。转换器总是缓存结果,以避免为每个需要序列化的新消息查询元数据服务器的开销。

图 33.2. 元数据注册过程

registration

使用元数据版本信息,转换器将消息的 contentType 头设置为携带版本信息  —  例如:application/vnd.user.v1+avro。

33.6.2 元数据解析过程(反序列化)

当读取包含版本信息的消息(即,具有 “小节 33.6.1, “元数据注册过程(序列化)”” 中描述的方案的 contentType 头)时,转换器查询元数据服务器以获取消息的编写器元数据。一旦找到传入消息的正确元数据,它就会检索读卡器元数据,并通过使用 Avro 的元数据解析支持,将其读取到读卡器定义中(设置默认值和任何缺少的属性)。

图 33.3. 元数据读取解析过程

schema reading

[Note] Note

你应该理解编写器元数据(编写消息的应用程序)和读卡器元数据(接收应用程序)之间的区别。我们建议你花点时间阅读 Avro 术语并了解该过程。Spring Cloud Stream 总是获取编写器元数据以确定如何读取消息。如果你想让 Avro 的元数据演化支持正常工作,需要确保为应用程序正确设置了 readerSchema。