@EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean public MessageConverter customMessageConverter() { return new MyCustomMessageConverter(); }
类型转换Spring Cloud Stream提供的开箱即用以下表所示:“源有效载荷”是指转换前的有效载荷,“目标有效载荷”是指转换后的“有效载荷”。类型转换能够在“生产者”一侧(输出)或“消费者”一侧(输入)上进行。html
来源有效载荷 | 目标有效载荷 | content-type 标题(来源讯息) |
content-type 标题(转换后) |
注释 |
---|---|---|---|---|
POJOjava |
JSON Stringspring |
ignoredsql |
application/json数据库 |
|
Tupleapache |
JSON Stringjson |
ignoredapi |
application/json服务器 |
JSON是为Tuple量身定制的架构 |
POJO |
String (toString()) |
ignored |
text/plain, java.lang.String |
|
POJO |
byte[] (java.io serialized) |
ignored |
application/x-java-serialized-object |
|
JSON byte[] or String |
POJO |
application/json (or none) |
application/x-java-object |
|
byte[] or String |
Serializable |
application/x-java-serialized-object |
application/x-java-object |
|
JSON byte[] or String |
Tuple |
application/json (or none) |
application/x-spring-tuple |
|
byte[] |
String |
any |
text/plain, java.lang.String |
将应用在content-type头中指定的任何Charset |
String |
byte[] |
any |
application/octet-stream |
将应用在content-type头中指定的任何Charset |
注意
|
转换适用于须要类型转换的有效内容。例如,若是应用程序生成带有outputType = application / json的XML字符串,则该有效载荷将不会从XML转换为JSON。这是由于发送到出站通道的有效载荷已是一个String,因此在运行时不会应用转换。一样重要的是要注意,当使用默认的序列化机制时,必须在发送和接收应用程序之间共享有效负载类,而且与二进制内容兼容。当应用程序代码在两个应用程序中独立更改时,这可能会产生问题,由于二进制格式和代码可能会变得不兼容。 |
提示
|
虽然入站和出站渠道都支持转换,但特别推荐将其用于转发出站邮件。对于入站邮件的转换,特别是当目标是POJO时, |
除了支持开箱即用的转换,Spring Cloud Stream还支持注册您本身的邮件转换实现。这容许您以各类自定义格式(包括二进制)发送和接收数据,并将其与特定的contentTypes
关联。Spring Cloud Stream将全部类型为org.springframework.messaging.converter.MessageConverter
的bean注册为自定义消息转换器以及开箱即用消息转换器。
若是您的消息转换器须要使用特定的content-type
和目标类(用于输入和输出),则消息转换器须要扩展org.springframework.messaging.converter.AbstractMessageConverter
。对于使用@StreamListener
的转换,实现org.springframework.messaging.converter.MessageConverter
的消息转换器就足够了。
如下是在Spring Cloud Stream应用程序中建立消息转换器bean(内容类型为application/bar
)的示例:
@EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean public MessageConverter customMessageConverter() { return new MyCustomMessageConverter(); }
public class MyCustomMessageConverter extends AbstractMessageConverter { public MyCustomMessageConverter() { super(new MimeType("application", "bar")); } @Override protected boolean supports(Class<?> clazz) { return (Bar.class == clazz); } @Override protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) { Object payload = message.getPayload(); return (payload instanceof Bar ? payload : new Bar((byte[]) payload)); } }
Spring Cloud Stream还为基于Avro的转换器和模式演进提供支持。详情请参阅具体章节。
@StreamListener
和讯息转换@StreamListener
注释提供了一种方便的方式来转换传入的消息,而不须要指定输入通道的内容类型。在使用@StreamListener
注释的方法的调度过程当中,若是参数须要转换,将自动应用转换。
例如,让咱们考虑一个带有{"greeting":"Hello, world"}
的String内容的消息,而且在输入通道上收到application/json
的application/json
标题。让咱们考虑接收它的如下应用程序:
public class GreetingMessage { String greeting; public String getGreeting() { return greeting; } public void setGreeting(String greeting) { this.greeting = greeting; } } @EnableBinding(Sink.class) @EnableAutoConfiguration public static class GreetingSink { @StreamListener(Sink.INPUT) public void receive(Greeting greeting) { // handle Greeting } }
该方法的参数将自动填充包含JSON字符串的未编组形式的POJO。
Spring Cloud Stream经过其spring-cloud-stream-schema
模块为基于模式的消息转换器提供支持。目前,基于模式的消息转换器开箱即用的惟一序列化格式是Apache Avro,在未来的版本中能够添加更多的格式。
spring-cloud-stream-schema
模块包含可用于Apache Avro序列化的两种类型的消息转换器:
使用序列化/反序列化对象的类信息的转换器,或者启动时已知位置的模式;
转换器使用模式注册表 - 他们在运行时定位模式,以及随着域对象的发展动态注册新模式。
AvroSchemaMessageConverter
支持使用预约义模式或使用类中可用的模式信息(反射或包含在SpecificRecord
)中的序列化和反序列化消息。若是转换的目标类型是GenericRecord
,则必须设置模式。
对于使用它,您能够简单地将其添加到应用程序上下文中,可选地指定一个或多个MimeTypes
将其关联。默认MimeType
为application/avro
。
如下是在注册Apache Avro MessageConverter
的宿应用程序中进行配置的示例,而不须要预约义的模式:
@EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean public MessageConverter userMessageConverter() { return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); } }
相反,这里是一个应用程序,注册一个具备预约义模式的转换器,能够在类路径中找到:
@EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean public MessageConverter userMessageConverter() { AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); converter.setSchemaLocation(new ClassPathResource("schemas/User.avro")); return converter; } }
为了了解模式注册表客户端转换器,咱们将首先描述模式注册表支持。
大多数序列化模型,特别是旨在跨不一样平台和语言进行可移植性的序列化模型,依赖于描述数据如何在二进制有效载荷中被序列化的模式。为了序列化数据而后解释它,发送方和接收方都必须访问描述二进制格式的模式。在某些状况下,能够从序列化的有效载荷类型或从反序列化时的目标类型中推断出模式,可是在许多状况下,应用程序能够从访问描述二进制数据格式的显式模式中受益。模式注册表容许您以文本格式(一般为JSON)存储模式信息,并使该信息可访问须要它的各类应用程序以二进制格式接收和发送数据。一个模式能够做为一个元组引用,它由
做为模式的逻辑名称的主题 ;
模式版本 ;
描述数据 的二进制格式的模式格式。
Spring Cloud Stream提供了模式注册表服务器实现。为了使用它,您能够简单地将spring-cloud-stream-schema-server
工件添加到项目中,并使用@EnableSchemaRegistryServer
注释,将模式注册表服务器REST控制器添加到应用程序中。此注释旨在与Spring Boot Web应用程序一块儿使用,服务器的监听端口由server.port
设置控制。spring.cloud.stream.schema.server.path
设置可用于控制模式服务器的根路径(特别是嵌入其余应用程序时)。spring.cloud.stream.schema.server.allowSchemaDeletion
布尔设置能够删除模式。默认状况下,这是禁用的。
模式注册表服务器使用关系数据库来存储模式。默认状况下,它使用一个嵌入式数据库。您能够使用Spring Boot SQL数据库和JDBC配置选项自定义模式存储。
启用模式注册表的Spring Boot应用程序以下所示:
@SpringBootApplication @EnableSchemaRegistryServer public class SchemaRegistryServerApplication { public static void main(String[] args) { SpringApplication.run(SchemaRegistryServerApplication.class, args); } }
Schema注册服务器API由如下操做组成:
POST /
注册一个新的架构
接受具备如下字段的JSON有效载荷:
subject
模式主题;
format
模式格式;
definition
模式定义。
响应是JSON格式的模式对象,包含如下字段:
id
模式标识;
subject
模式主题;
format
模式格式;
version
模式版本;
definition
模式定义。
GET /{subject}/{format}/{version}
根据其主题,格式和版本检索现有模式。
响应是JSON格式的模式对象,包含如下字段:
id
模式标识;
subject
模式主题;
format
模式格式;
version
模式版本;
definition
模式定义。
GET /{subject}/{format}
根据其主题和格式检索现有模式的列表。
响应是JSON格式的每一个模式对象的模式列表,包含如下字段:
id
模式标识;
subject
模式主题;
format
模式格式;
version
模式版本;
definition
模式定义。
GET /schemas/{id}
经过其id来检索现有的模式。
响应是JSON格式的模式对象,包含如下字段:
id
模式标识;
subject
模式主题;
format
模式格式;
version
模式版本;
definition
模式定义。
DELETE /{subject}/{format}/{version}
按其主题,格式和版本删除现有模式。
DELETE /schemas/{id}
按其ID删除现有模式。
DELETE /{subject}
按其主题删除现有模式。
注意
|
本说明仅适用于Spring Cloud Stream 1.1.0.RELEASE的用户。Spring Cloud Stream 1.1.0.RELEASE使用表名 |