public interface SchemaRegistryClient { SchemaRegistrationResponse register(String subject, String format, String schema); String fetch(SchemaReference schemaReference); String fetch(Integer id); }
与模式注册表服务器交互的客户端抽象是SchemaRegistryClient
接口,具备如下结构:html
public interface SchemaRegistryClient { SchemaRegistrationResponse register(String subject, String format, String schema); String fetch(SchemaReference schemaReference); String fetch(Integer id); }
Spring Cloud Stream提供了开箱即用的实现,用于与其本身的模式服务器交互,以及与Confluent Schema注册表进行交互。java
可使用@EnableSchemaRegistryClient
配置Spring Cloud Stream模式注册表的客户端,以下所示:spring
@EnableBinding(Sink.class) @SpringBootApplication @EnableSchemaRegistryClient public static class AvroSinkApplication { ... }
注意
|
优化了默认转换器,以缓存来自远程服务器的模式,并且还会很是昂贵的 |
Schema注册表客户端支持如下属性:缓存
模式服务器的位置。在设置时使用完整的URL,包括协议(http
或https
),端口和上下文路径。服务器
客户端是否应缓存模式服务器响应。一般设置为false
,由于缓存发生在消息转换器中。使用模式注册表客户端的客户端应将其设置为true
。app
true
fetch
对于在应用程序上下文中注册了SchemaRegistryClient
bean的Spring Boot应用程序,Spring Cloud Stream将自动配置使用模式注册表客户端进行模式管理的Apache Avro消息转换器。这简化了模式演进,由于接收消息的应用程序能够轻松访问可与本身的读取器模式进行协调的写入器模式。优化
对于出站邮件,若是频道的内容类型设置为application/*+avro
,MessageConverter
将被激活,例如:
spring.cloud.stream.bindings.output.contentType=application/*+avro
在出站转换期间,消息转换器将尝试基于其类型推断出站消息的模式,并使用SchemaRegistryClient
根据有效载荷类型将其注册到主题。若是已经找到相同的模式,那么将会检索对它的引用。若是没有,则将注册模式并提供新的版本号。该消息将使用application/[prefix].[subject].v[version]+avro
的方案contentType
头发送,其中prefix
是可配置的,而且从有效载荷类型推导出subject
。
例如,类型为User
的消息能够做为内容类型为application/vnd.user.v2+avro
的二进制有效载荷发送,其中user
是主题,2
是版本号。
当接收到消息时,转换器将从传入消息的头部推断出模式引用,并尝试检索它。该模式将在反序列化过程当中用做写入器模式。
若是您已经过设置spring.cloud.stream.bindings.output.contentType=application/*+avro
启用基于Avro的模式注册表客户端,则可使用如下属性自定义注册的行为。
若是您但愿转换器使用反射从POJO推断Schema,则启用。
false
Avro经过查看编写器模式(源有效载荷)和读取器模式(应用程序有效负载)来比较模式版本,查看Avro文档以获取更多信息。若是设置,这将覆盖模式服务器上的任何查找,并将本地模式用做读取器模式。
null
使用Schema服务器注册此属性中列出的任何.avsc
文件。
empty
要在Content-Type头上使用的前缀。
vnd
为了更好地了解Spring Cloud Stream注册和解决新模式以及其使用Avro模式比较功能,咱们将提供两个单独的子部分:一个用于注册,一个用于解析模式。
注册过程的第一部分是从经过信道发送的有效载荷中提取模式。Avro类型,如SpecificRecord
或GenericRecord
已经包含一个模式,能够从实例中当即检索。在POJO的状况下,若是属性spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
设置为true
(默认),则会推断出一个模式。
一旦得到了架构,转换器就会从远程服务器加载其元数据(版本)。首先,它查询本地缓存,若是没有找到它,则将数据提交到将使用版本控制信息回复的服务器。转换器将始终缓存结果,以免为每一个须要序列化的新消息查询Schema服务器的开销。
使用模式版本信息,转换器设置消息的contentType
头,以携带版本信息,如application/vnd.user.v1+avro
当读取包含版本信息的消息(即,具备上述方案的contentType
标头)时,转换器将查询Schema服务器以获取消息的写入器架构。一旦找到传入消息的正确架构,它就会检索读取器架构,并使用Avro的架构解析支持将其读入读取器定义(设置默认值和缺乏的属性)。
注意
|
了解编写器架构(写入消息的应用程序)和读取器架构(接收应用程序)之间的区别很重要。请花点时间阅读Avro术语并了解此过程。Spring Cloud Stream将始终提取writer模式以肯定如何读取消息。若是您想要Avro的架构演进支持工做,您须要确保为您的应用程序正确设置了readerSchema。 |