Kafka消息序列化

Kafka消息序列化

阅读文章,但愿能解决如下问题:java

  • 序列化主要解决的问题
  • 不一样的序列化对消息大小的影响
  • 能够用序列化来解决消息太大的问题吗

归纳

序列化主要是用来解决数据在网络中传输的问题. 在网络中传输的数据必须全是字节,也称为字节流. 而文本数据到字节数据的这一步就是序列化(将非字节数据 -> 字节数组).数组

Kafka中序列化

Kafka中的序列化主要是将发送的消息序列化成字节数组. 在Java中,有八大基本数据类型和引用类型. Kafka预先内置了一些相应的序列化和反序列化网络

Java类型 序列化 反序列化
int IntegerSerializer IntegerDeserializer
long LongSerializer LongDeserializer
double DoubleSerializer DoubleDeserializer
byte BytesSerializer BytesDeserializer
byte ByteArraySerializer ByteArrayDeserializer
byte ByteBufferSerializer ByteBufferDeserializer
String StringSerializer StringDeserializer

经过上面表格能够看出,Kafka并非为全部的基本类型内置了对应的序列化器和反序列化器. 并且Kafka为对byte提供方便,内置了三个不一样的序列化器和反序列化器. 同时,Kafka为一个引用类型-String,提供了序列化器和反序列化器,由于String太经常使用了.ide

// StringSerializer序列化代码
public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }
}

从代码能够看出,默认状况下会把字符串编码成UTF-8格式,而后在网络中传输.工具

自定义序列化器

Kafka自带的序列化器并不能知足全部的需求,假如我有一个用户对象,里面包含用户姓名,用户年龄... 可是Kafka中没有提供相对应的序列化器,须要本身实现一个. 实现一个序列化器很简单,只须要实现一个接口.性能

public interface Serializer<T> extends Closeable {

    // 配置该类
    void configure(Map<String, ?> configs, boolean isKey);

    // 将数据转变为字节数组
    byte[] serialize(String topic, T data);

    // 默认方法
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }

    // 关闭序列化器
    @Override
    void close();
}

接下来,本身实现一个序列化器. 下面序列化器是商店顾客序列化器. 这里采用硬编码的方式,将该对象序列化成字节数组.编码

public class CustomerSerializer implements Serializer<Customer> {
    @Override
    public byte[] serialize(String topic, Customer data) {
        try {
            byte[] serializedName;
            int stringSize;
            if (data == null) {
                return null;
            } else {
                if (data.getName() != null) {
                    serializedName = data.getName().getBytes(StandardCharsets.UTF_8);
                    stringSize = serializedName.length;
                } else {
                    serializedName = new byte[0];
                    stringSize = 0;
                }
            }

            final ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getCustomerId());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
        }
    }
}

自定义序列化器的劣势:code

  • 须要考虑向前兼容和向后兼容的问题,假如更新的反序列化可否对之前的消息进行支持.
  • 须要将序列化和反序列化成匹配的出现

用第三方jar包实现自定义序列化

用JSON,ProtoBuf,Protostuff,Thrift...实现经过的序列化工具.对象

public class JsonSerializer implements Serializer<Customer> {

    private final Logger log = LoggerFactory.getLogger(JsonSerializer.class);

    @Override
    public byte[] serialize(String topic, Customer data) {
        byte[] result = null;
        try {
            // 关键代码,把对象序列化为字节数组.
            result = JSON.toJSONBytes(data);
            log.info("{} is serialize after the size is {}", data, result.length);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }
}

总结

序列化只是用来将非字节数据变为字节数组,最终实现数据在网络传输的目的. 然而想要经过序列化提高传输的性能(例如把序列化后的字节变少)是比较难实现的. 由于最终的字节数组要在消费端反序列化,所以消费者须要和生产者约定好(例如 1 表明 K, 2 表明 A ...).接口

相关文章
相关标签/搜索