数据转换是任何消息驱动微服务体系结构的核心特性之一。假设在 Spring Cloud Stream 中,这样的数据被表示为一条 Spring Message
,那么在到达目的地之前,一条消息可能必须被转换成所需的形状或大小。这有两个原因:
为了更好地理解内容类型协商背后的机制和必要性,我们以下面的消息处理程序为例来查看一个非常简单的用例:
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String handle(Person person) {..}
![[Note]](images/note.png) |
Note |
为了简单起见,我们假设这是应用程序中唯一的处理程序(我们假设没有内部管道)。
|
前面示例中所示的处理程序需要一个 Person
对象作为参数,并生成一个 String
类型作为输出。为了框架成功地将传入 Message
作为参数传递给这个处理程序,它必须以某种方式将 Message
类型的有效负载从线格式转换为 Person
类型。换句话说,框架必须找到并应用适当的 MessageConverter
。要实现这一点,框架需要用户提供一些指令。其中一条指令已经由处理程序方法本身(Person
类型)的签名提供。因此,在理论上,这应该是(并且在某些情况下,是)足够的。但是,对于大多数用例,为了选择适当的 MessageConverter
,框架需要额外的信息。缺失的部分是 contentType
。
Spring Cloud Stream 提供了三种定义 contentType
的机制(按优先级排列):
-
HEADER:
contentType
可以通过消息本身进行通信。通过提供 contentType
头,可以声明用于查找和应用适当的 MessageConverter
的内容类型。
-
BINDING:通过设置 spring.cloud.stream.bindings.input.content-type
属性,可以为每个目标绑定设置 contentType
。
![[Note]](images/note.png) |
Note |
属性名中的 input 段对应于目的地的实际名称(在我们的例子中是 “input”)。这种方法允许你基于每个绑定声明用于定位和应用适当的 MessageConverter 的内容类型。
|
-
DEFAULT:如果
Message
头或绑定中不存在 contentType
,则使用默认的 application/json
内容类型来定位和应用适当的 MessageConverter
。
如前所述,前面的列还展示了绑定时的优先级顺序。例如,头部提供的内容类型优先于任何其他内容类型。这同样适用于基于每个绑定设置的内容类型,这基本上允许你重写默认的内容类型。然而,它也提供了一个合理的违约(这是由社区反馈决定的)。
使 application/json
成为默认应用程序的另一个原因是分布式微服务体系结构驱动的互操作性需求,其中生产者和消费者不仅在不同的 JVM 中运行,而且还可以在不同的非 JVM 平台上运行。
当非空处理程序方法返回时,如果返回值已经是一条 Message
,则该 Message
将成为有效负载。但是,如果返回值不是 Message
,则在从输入消息继承头减去由 SpringIntegrationProperties.messageHandlerNotPropagatedHeaders
定义或筛选的头时,将以返回值作为有效负载构造新 Message
。默认情况下,那里只有一个头集:contentType
。这意味着新 Message
没有 contentType
头集,从而确保 contentType
可以发展。你总是可以选择不从处理器方法返回 Message
,在该方法中可以插入你希望的任何头。
如果存在内部管道,则通过相同的转换过程将 Message
发送到下一个处理程序。但是,如果没有内部管道或者你已经到达管道的末尾,则 Message
将被发送回输出目标。
如前所述,为了让框架选择适当的 MessageConverter
,它需要参数类型和(可选)内容类型信息。选择适当 MessageConverter
的逻辑与参数冲突解决程序(HandlerMethodArgumentResolvers
)驻留在一起,该参数冲突解决程序在调用用户定义的处理程序方法(即框架已知实际参数类型时)之前触发。如果参数类型与当前有效负载的类型不匹配,则框架将委托给预先配置的 MessageConverters
堆栈,以查看它们中是否有任何一个可以转换有效负载。如你所见,MessageConverter 的Object fromMessage(Message<?> message, Class<?> targetClass);
操作将 targetClass
作为其参数之一。框架还确保提供的 Message
始终包含 contentType
头。当不存在 contentType 头时,它将插入每个绑定 contentType
头或默认 contentType
头。contentType
参数类型的组合是框架确定消息是否可以转换为目标类型的机制。如果找不到合适的 MessageConverter
,则会引发异常,你可以通过添加自定义 MessageConverter
来处理该异常(请参阅 小节 32.3, “用户定义的消息转换器”)。
但是,如果有效负载类型与处理器方法声明的目标类型匹配呢?在这种情况下,没有要转换的内容,有效负载是未修改地传递的。虽然这听起来非常简单和合乎逻辑,但请记住接受消息的处理程序方法将 Message<?>
或 Object
作为参数。通过声明目标类型为对象(它是 Java 中的所有东西的 instanceof
),基本上放弃了转换过程。
![[Note]](images/note.png) |
Note |
不要期望仅基于 contentType 将 Message 转换为其他类型。记住 contentType 是目标类型的补充。如果愿意,你可以提供一个提示,MessageConverter 可以考虑,也可以不考虑。
|
MessageConverters
定义了两个方法
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
理解这些方法的协议和它们的用法是很重要的,特别是在 Spring Cloud Stream 的上下文中。
fromMessage
方法将传入 Message
转换为参数类型。Message
的有效负载可以是任何类型,并且它取决于支持多种类型的 MessageConverter
的实际实现。例如,一些 JSON 转换器可能支持有效负载类型为 byte[]
、String
和其他类型。当应用程序包含一个内部管道(即 input → handler1 → handler2 →. . . → output)时,这一点很重要。output 和上游处理程序的输出导致 Message
可能不是初始线性格式。
但是,toMessage
方法具有更严格的约定,必须始终将 Message
转换为 wire 格式:byte[]
。
因此,对于所有意图和目的(尤其是在实现自己的转换器时),你都将这两种方法视为具有以下签名:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);
如前所述,框架已经提供了一个 MessageConverters
来处理最常见的用例。以下列表按优先级顺序描述提供的 MessageConverters
(使用工作的第一个 MessageConverter
):
-
ApplicationJsonMessageMarshallingConverter
:org.springframework.messaging.converter.MappingJackson2MessageConverter
的变体。对于 contentType
为 application/json
(默认)的情况,支持将 Message
的有效负载转换为 pojo 或从 pojo 转换。
-
TupleJsonMessageConverter
:不推荐使用 支持将 Message
的有效负载转换 为/从 org.springframework.tuple.Tuple
。
-
ByteArrayMessageConverter
:当 contentType
为 application/octet-stream
时,支持将消息的有效负载从 byte[]
转换为 byte[]
。它本质上是一个传递,主要是为了向后兼容而存在的。
-
ObjectStringMessageConverter
: 当 contentType
为 text/plain
时,支持将任何类型转换为 String
。它调用对象的 toString()
方法,或者,如果有效负载是 byte[]
,则调用 new String(byte[])
。
-
JavaSerializationMessageConverter
:不推荐使用 当 contentType
是 application/x-java-serialized-object
时,支持基于 Java 序列化的转换。
-
KryoMessageConverter
:不推荐使用 当 contentType
为 application/x-java-object
t 时,支持基于 Kryo 序列化的转换。
-
JsonUnmarshallingConverter
:类似于 ApplicationJsonMessageMarshallingConverter
。当 contentType
为 application/x-java-object
时,它支持任何类型的转换。它期望实际类型信息作为一个属性嵌入到 contentType
中(例如,application/x-java-object;type=foo.bar.Cat
)。
当找不到合适的转换器时,框架抛出异常。当发生这种情况时,应该检查代码和配置,并确保没有遗漏任何内容(即,通过使用绑定或头确保提供了 contentType
)。但是,最有可能的情况是,你发现了一些不常见的情况(例如自定义 contentType
),并且提供的 MessageConverters
的当前堆栈不知道如何转换。如果是这种情况,你可以添加自定义 MessageConverter
。请参阅 小节 32.3, “用户自定义的消息转换器”。
Spring Cloud Stream 公开了一种机制来定义和注册额外的 MessageConverters
。要使用它,请实现 org.springframework.messaging.converter.MessageConverter
,将其配置为 @Bean
,并用 @StreamMessageConverter
对其进行注解。然后它被扩展到 `MessageConverter` 的现有堆栈。
![[Note]](images/note.png) |
Note |
了解自定义 MessageConverter 实现被添加到现有堆栈的头部是很重要的。因此,自定义 MessageConverter 实现优先于现有的实现,后者允许你重写并添加到现有的转换器。
|
下面的示例展示如何创建消息转换器 bean 以支持名为 application/bar
的新内容类型:
@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {
...
@Bean
@StreamMessageConverter
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
Spring Cloud Stream 还支持基于 Avro 的转换器和模式演化。有关的详细信息请参阅 小节 33, 元数据演化支持。