32. 内容类型协商

数据转换是任何消息驱动微服务体系结构的核心特性之一。假设在 Spring Cloud Stream 中,这样的数据被表示为一条 Spring Message,那么在到达目的地之前,一条消息可能必须被转换成所需的形状或大小。这有两个原因:

  1. 转换传入消息的内容以匹配应用程序提供的处理程序的签名。
  2. 将传出消息的内容转换为线格式。

线格式通常是 byte[](对于 Kafka 和 Rabbit 绑定器是这样),但它受绑定器实现的控制。

在 Spring Cloud Stream 中,消息转换是通过 org.springframework.messaging.converter.MessageConverter 完成的。

[Note] Note

作为后续细节的补充,你可能还需要阅读以下博客文章。

32.1 机制

为了更好地理解内容类型协商背后的机制和必要性,我们以下面的消息处理程序为例来查看一个非常简单的用例:

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String handle(Person person) {..}
[Note] Note

为了简单起见,我们假设这是应用程序中唯一的处理程序(我们假设没有内部管道)。

前面示例中所示的处理程序需要一个 Person 对象作为参数,并生成一个 String 类型作为输出。为了框架成功地将传入 Message 作为参数传递给这个处理程序,它必须以某种方式将 Message 类型的有效负载从线格式转换为 Person 类型。换句话说,框架必须找到并应用适当的 MessageConverter。要实现这一点,框架需要用户提供一些指令。其中一条指令已经由处理程序方法本身(Person 类型)的签名提供。因此,在理论上,这应该是(并且在某些情况下,是)足够的。但是,对于大多数用例,为了选择适当的 MessageConverter,框架需要额外的信息。缺失的部分是 contentType。

Spring Cloud Stream 提供了三种定义 contentType 的机制(按优先级排列):

  1. HEADER:contentType 可以通过消息本身进行通信。通过提供 contentType 头,可以声明用于查找和应用适当的 MessageConverter 的内容类型。
  2. BINDING:通过设置 spring.cloud.stream.bindings.input.content-type 属性,可以为每个目标绑定设置 contentType。

    [Note] Note

    属性名中的 input 段对应于目的地的实际名称(在我们的例子中是 “input”)。这种方法允许你基于每个绑定声明用于定位和应用适当的 MessageConverter 的内容类型。

  3. DEFAULT:如果 Message 头或绑定中不存在 contentType,则使用默认的 application/json 内容类型来定位和应用适当的 MessageConverter。

如前所述,前面的列还展示了绑定时的优先级顺序。例如,头部提供的内容类型优先于任何其他内容类型。这同样适用于基于每个绑定设置的内容类型,这基本上允许你重写默认的内容类型。然而,它也提供了一个合理的违约(这是由社区反馈决定的)。

使 application/json 成为默认应用程序的另一个原因是分布式微服务体系结构驱动的互操作性需求,其中生产者和消费者不仅在不同的 JVM 中运行,而且还可以在不同的非 JVM 平台上运行。

当非空处理程序方法返回时,如果返回值已经是一条 Message,则该 Message 将成为有效负载。但是,如果返回值不是 Message,则在从输入消息继承头减去由 SpringIntegrationProperties.messageHandlerNotPropagatedHeaders 定义或筛选的头时,将以返回值作为有效负载构造新 Message。默认情况下,那里只有一个头集:contentType。这意味着新 Message 没有 contentType 头集,从而确保 contentType 可以发展。你总是可以选择不从处理器方法返回 Message,在该方法中可以插入你希望的任何头。

如果存在内部管道,则通过相同的转换过程将 Message 发送到下一个处理程序。但是,如果没有内部管道或者你已经到达管道的末尾,则 Message 将被发送回输出目标。

32.1.1 内容类型与参数类型

如前所述,为了让框架选择适当的 MessageConverter,它需要参数类型和(可选)内容类型信息。选择适当 MessageConverter 的逻辑与参数冲突解决程序(HandlerMethodArgumentResolvers)驻留在一起,该参数冲突解决程序在调用用户定义的处理程序方法(即框架已知实际参数类型时)之前触发。如果参数类型与当前有效负载的类型不匹配,则框架将委托给预先配置的 MessageConverters 堆栈,以查看它们中是否有任何一个可以转换有效负载。如你所见,MessageConverter 的Object fromMessage(Message<?> message, Class<?> targetClass); 操作将 targetClass 作为其参数之一。框架还确保提供的 Message 始终包含 contentType 头。当不存在 contentType 头时,它将插入每个绑定 contentType 头或默认 contentType 头。contentType 参数类型的组合是框架确定消息是否可以转换为目标类型的机制。如果找不到合适的 MessageConverter,则会引发异常,你可以通过添加自定义 MessageConverter 来处理该异常(请参阅 小节 32.3, “用户定义的消息转换器”)。

但是,如果有效负载类型与处理器方法声明的目标类型匹配呢?在这种情况下,没有要转换的内容,有效负载是未修改地传递的。虽然这听起来非常简单和合乎逻辑,但请记住接受消息的处理程序方法将 Message<?> 或 Object 作为参数。通过声明目标类型为对象(它是 Java 中的所有东西的 instanceof),基本上放弃了转换过程。

[Note] Note

不要期望仅基于 contentType 将 Message 转换为其他类型。记住 contentType 是目标类型的补充。如果愿意,你可以提供一个提示,MessageConverter 可以考虑,也可以不考虑。

32.1.2 消息转换器

MessageConverters 定义了两个方法

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);

理解这些方法的协议和它们的用法是很重要的,特别是在 Spring Cloud Stream 的上下文中。

fromMessage 方法将传入 Message 转换为参数类型。Message 的有效负载可以是任何类型,并且它取决于支持多种类型的 MessageConverter 的实际实现。例如,一些 JSON 转换器可能支持有效负载类型为 byte[]、String 和其他类型。当应用程序包含一个内部管道(即 input → handler1 → handler2 →. . . → output)时,这一点很重要。output 和上游处理程序的输出导致 Message 可能不是初始线性格式。

但是,toMessage 方法具有更严格的约定,必须始终将 Message 转换为 wire 格式:byte[]。

因此,对于所有意图和目的(尤其是在实现自己的转换器时),你都将这两种方法视为具有以下签名:

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);

32.2 提供的消息转换器

如前所述,框架已经提供了一个 MessageConverters 来处理最常见的用例。以下列表按优先级顺序描述提供的 MessageConverters(使用工作的第一个 MessageConverter):

  1. ApplicationJsonMessageMarshallingConverter:org.springframework.messaging.converter.MappingJackson2MessageConverter 的变体。对于 contentType 为 application/json(默认)的情况,支持将 Message 的有效负载转换为 pojo 或从 pojo 转换。
  2. TupleJsonMessageConverter:不推荐使用 支持将 Message 的有效负载转换 为/从 org.springframework.tuple.Tuple。
  3. ByteArrayMessageConverter:当 contentType 为 application/octet-stream 时,支持将消息的有效负载从 byte[] 转换为 byte[]。它本质上是一个传递,主要是为了向后兼容而存在的。
  4. ObjectStringMessageConverter: 当 contentType 为 text/plain 时,支持将任何类型转换为 String。它调用对象的 toString() 方法,或者,如果有效负载是 byte[],则调用 new String(byte[])。
  5. JavaSerializationMessageConverter:不推荐使用 当 contentType 是 application/x-java-serialized-object 时,支持基于 Java 序列化的转换。
  6. KryoMessageConverter:不推荐使用 当 contentType 为 application/x-java-objectt 时,支持基于 Kryo 序列化的转换。
  7. JsonUnmarshallingConverter:类似于 ApplicationJsonMessageMarshallingConverter。当 contentType 为 application/x-java-object 时,它支持任何类型的转换。它期望实际类型信息作为一个属性嵌入到 contentType 中(例如,application/x-java-object;type=foo.bar.Cat)。

当找不到合适的转换器时,框架抛出异常。当发生这种情况时,应该检查代码和配置,并确保没有遗漏任何内容(即,通过使用绑定或头确保提供了 contentType)。但是,最有可能的情况是,你发现了一些不常见的情况(例如自定义 contentType),并且提供的 MessageConverters 的当前堆栈不知道如何转换。如果是这种情况,你可以添加自定义 MessageConverter。请参阅 小节 32.3, “用户自定义的消息转换器”。

32.3 用户自定义的消息转换器

Spring Cloud Stream 公开了一种机制来定义和注册额外的 MessageConverters。要使用它,请实现 org.springframework.messaging.converter.MessageConverter,将其配置为 @Bean,并用 @StreamMessageConverter 对其进行注解。然后它被扩展到 `MessageConverter` 的现有堆栈。

[Note] Note

了解自定义 MessageConverter 实现被添加到现有堆栈的头部是很重要的。因此,自定义 MessageConverter 实现优先于现有的实现,后者允许你重写并添加到现有的转换器。

下面的示例展示如何创建消息转换器 bean 以支持名为 application/bar 的新内容类型:

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

    ...

    @Bean
    @StreamMessageConverter
    public MessageConverter customMessageConverter() {
        return new MyCustomMessageConverter();
    }
}

public class MyCustomMessageConverter extends AbstractMessageConverter {

    public MyCustomMessageConverter() {
        super(new MimeType("application", "bar"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (Bar.class.equals(clazz));
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
    }
}

Spring Cloud Stream 还支持基于 Avro 的转换器和模式演化。有关的详细信息请参阅 小节 33, 元数据演化支持。