阅读文章,但愿能解决如下问题:java
序列化主要是用来解决数据在网络中传输的问题. 在网络中传输的数据必须全是字节,也称为字节流. 而文本数据到字节数据的这一步就是序列化(将非字节数据 -> 字节数组).数组
Kafka中的序列化主要是将发送的消息序列化成字节数组. 在Java中,有八大基本数据类型和引用类型. Kafka预先内置了一些相应的序列化和反序列化网络
Java类型 | 序列化 | 反序列化 |
---|---|---|
int | IntegerSerializer | IntegerDeserializer |
long | LongSerializer | LongDeserializer |
double | DoubleSerializer | DoubleDeserializer |
byte | BytesSerializer | BytesDeserializer |
byte | ByteArraySerializer | ByteArrayDeserializer |
byte | ByteBufferSerializer | ByteBufferDeserializer |
String | StringSerializer | StringDeserializer |
经过上面表格能够看出,Kafka并非为全部的基本类型内置了对应的序列化器和反序列化器. 并且Kafka为对byte提供方便,内置了三个不一样的序列化器和反序列化器. 同时,Kafka为一个引用类型-String,提供了序列化器和反序列化器,由于String太经常使用了.ide
// StringSerializer序列化代码 public class StringSerializer implements Serializer<String> { private String encoding = "UTF8"; @Override public byte[] serialize(String topic, String data) { try { if (data == null) return null; else return data.getBytes(encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); } } }
从代码能够看出,默认状况下会把字符串编码成UTF-8格式,而后在网络中传输.工具
Kafka自带的序列化器并不能知足全部的需求,假如我有一个用户对象,里面包含用户姓名,用户年龄... 可是Kafka中没有提供相对应的序列化器,须要本身实现一个. 实现一个序列化器很简单,只须要实现一个接口.性能
public interface Serializer<T> extends Closeable { // 配置该类 void configure(Map<String, ?> configs, boolean isKey); // 将数据转变为字节数组 byte[] serialize(String topic, T data); // 默认方法 default byte[] serialize(String topic, Headers headers, T data) { return serialize(topic, data); } // 关闭序列化器 @Override void close(); }
接下来,本身实现一个序列化器. 下面序列化器是商店顾客序列化器. 这里采用硬编码的方式,将该对象序列化成字节数组.编码
public class CustomerSerializer implements Serializer<Customer> { @Override public byte[] serialize(String topic, Customer data) { try { byte[] serializedName; int stringSize; if (data == null) { return null; } else { if (data.getName() != null) { serializedName = data.getName().getBytes(StandardCharsets.UTF_8); stringSize = serializedName.length; } else { serializedName = new byte[0]; stringSize = 0; } } final ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize); buffer.putInt(data.getCustomerId()); buffer.putInt(stringSize); buffer.put(serializedName); return buffer.array(); } catch (Exception e) { throw new SerializationException("Error when serializing Customer to byte[] " + e); } } }
自定义序列化器的劣势:code
用JSON,ProtoBuf,Protostuff,Thrift...实现经过的序列化工具.对象
public class JsonSerializer implements Serializer<Customer> { private final Logger log = LoggerFactory.getLogger(JsonSerializer.class); @Override public byte[] serialize(String topic, Customer data) { byte[] result = null; try { // 关键代码,把对象序列化为字节数组. result = JSON.toJSONBytes(data); log.info("{} is serialize after the size is {}", data, result.length); } catch (Exception e) { e.printStackTrace(); } return result; } }
序列化只是用来将非字节数据变为字节数组,最终实现数据在网络传输的目的. 然而想要经过序列化提高传输的性能(例如把序列化后的字节变少)是比较难实现的. 由于最终的字节数组要在消费端反序列化,所以消费者须要和生产者约定好(例如 1 表明 K, 2 表明 A ...).接口