都在用 Kafka ! 消息队列序列化怎么处理?

生产者须要用序列化器(Serializer)把对象转换成字节数组才能经过网络发送给Kafka。而在对侧,消费者须要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。apache

 

 

先参考下面代码实现一个简单的客户端。数组

image.png

为了方便,消息的 key 和 value 都使用了字符串,对应程序中的序列化器也使用了客户端自带的 org.apache.kafka.common.serialization.StringSerializer,除了用于 String 类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这几种类型,它们都实现了 org.apache.kafka.common.serialization.Serializer 接口,此接口有3个方法:网络

 

configure() 方法用来配置当前类,serialize() 方法用来执行序列化操做。而 close() 方法用来关闭当前的序列化器,通常状况下 close() 是一个空方法,若是实现了此方法,则必须确保此方法的幂等性,由于这个方法极可能会被 KafkaProducer 调用屡次。工具

 

生产者使用的序列化器和消费者使用的反序列化器是须要一一对应的,若是生产者使用了某种序列化器,好比 StringSerializer,而消费者使用了另外一种序列化器,好比 IntegerSerializer,那么是没法解析出想要的数据的编码

 

下面就以 StringSerializer 为例来看看 Serializer 接口中的3个方法的使用方法,StringSerializer 类的具体实现如代码spa

首先是 configure() 方法,这个方法是在建立 KafkaProducer 实例的时候调用的,主要用来肯定编码类型,不过通常客户端对于 key.serializer.encoding、value.serializer. encoding 和 serializer.encoding 这几个参数都不会配置,在 KafkaProducer 的参数集合(ProducerConfig)里也没有这几个参数(它们能够看做用户自定义的参数),因此通常状况下 encoding 的值就为默认的“UTF-8”。serialize() 方法很是直观,就是将 String 类型转为 byte[] 类型。3d

 

若是 Kafka 客户端提供的几种序列化器都没法知足应用需求,则能够选择使用如 Avro、JSON、Thrift、ProtoBuf 和 Protostuff 等通用的序列化工具来实现,或者使用自定义类型的序列化器来实现。下面就以一个简单的例子来介绍自定义类型的使用方法对象

 

假设咱们要发送的消息都是 Company 对象,这个 Company 的定义很简单,只有名称 name 和地址 address,示例代码参考以下blog

 

下面咱们再来看一下 Company 对应的序列化器 CompanySerializer,示例代码如代码接口

 

如何使用自定义的序列化器 CompanySerializer 呢?只需将 KafkaProducer 的 value.serializer 参数设置为 CompanySerializer 类的全限定名便可。假如咱们要发送一个 Company 对象到 Kafka,关键代码如代码

 

注意,示例中消息的 key 对应的序列化器仍是 StringSerializer,这个并无改动。其实 key.serializer 和 value.serializer 并无太大的区别

原文地址

相关文章
相关标签/搜索