kafka的消息是以record方式的存储下来。java
Record是接口,DefaultRecord实现了Record接口。数据结构
Record => Length => varint # record总长度(不包括Length自己) Attributes => int8 # 属性 TimestampDelta => varint # timestamp的偏移量(相对于RecordBatch的baseTimestamp) OffsetDelta => varint # offset的偏移量(相对于RecordBatch的baseOffset) KeyLen => varint # key的长度 Key => data # key的数据 ValueLen => varint # value的长度 Value => data # value的数据 NumHeaders => varint # header的数量 Headers => [Header] # Header列表 Header => HeaderKeyLen => varint # key的长度 HeaderKey => string # key的数据 HeaderValueLen => varint # value的长度 HeaderValue => data # value的数据
上面数据类型有varint,这个类型是Protocol Buffers的类型。在存储数值比较小的时候,会节省空间。具体参考连接https://developers.google.com/protocol-buffers/docs/encodingide
DefaultRecord是上述数据结构的封装post
public class DefaultRecord implements Record { // excluding key, value and headers: 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes public static final int MAX_RECORD_OVERHEAD = 21; private static final int NULL_VARINT_SIZE_BYTES = ByteUtils.sizeOfVarint(-1); private final int sizeInBytes; private final byte attributes; private final long offset; private final long timestamp; private final int sequence; private final ByteBuffer key; private final ByteBuffer value; private final Header[] headers; private DefaultRecord(int sizeInBytes, byte attributes, long offset, long timestamp, int sequence, ByteBuffer key, ByteBuffer value, Header[] headers) { this.sizeInBytes = sizeInBytes; this.attributes = attributes; this.offset = offset; this.timestamp = timestamp; this.sequence = sequence; this.key = key; this.value = value; this.headers = headers; } @Override public long offset() { return offset; } @Override public int sequence() { return sequence; } ........ @Override public Header[] headers() { return headers; } /**
DefaultRecord提供了从buffer读取数据,实例化的方法this
// 从DataInput读取数据 public static DefaultRecord readFrom(DataInput input, long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException { // 读取Length int sizeOfBodyInBytes = ByteUtils.readVarint(input); // 分配buffer ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes); // 读取body数据 input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes); // 计算整个record的长度,包括Length int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes; return readFrom(recordBuffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime); } // 从ByteBuffer读取数据,baseOffset, baseTimestamp,baseSequence都是RecordBatch的属性 public static DefaultRecord readFrom(ByteBuffer buffer, long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { // 读取Length int sizeOfBodyInBytes = ByteUtils.readVarint(buffer); // 检查长度 if (buffer.remaining() < sizeOfBodyInBytes) return null; // 整个record的总长度(包括Length) int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes; return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime); } private static DefaultRecord readFrom(ByteBuffer buffer, int sizeInBytes, int sizeOfBodyInBytes, long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { try { // 记录record的开始位置 int recordStart = buffer.position(); // 读取Attributes byte attributes = buffer.get(); // 读取TimestampDelta long timestampDelta = ByteUtils.readVarlong(buffer); // 计算timestamp,baseTimestamp是RecordBatch的属性 long timestamp = baseTimestamp + timestampDelta; if (logAppendTime != null) timestamp = logAppendTime; // 读取OffsetDelta int offsetDelta = ByteUtils.readVarint(buffer); // 计算offset,baseOffset是RecordBatch的属性 long offset = baseOffset + offsetDelta; int sequence = baseSequence >= 0 ? DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) : RecordBatch.NO_SEQUENCE; ByteBuffer key = null; // 读取KeySize int keySize = ByteUtils.readVarint(buffer); if (keySize >= 0) { // 取出Key这段值 key = buffer.slice(); // 设置切片的limit值 key.limit(keySize); // 设置buffer的position,跳过Key这段 buffer.position(buffer.position() + keySize); } ByteBuffer value = null; // 读取ValueSize int valueSize = ByteUtils.readVarint(buffer); if (valueSize >= 0) { // 取出Value这段值 value = buffer.slice(); // 设置切片的limit值 value.limit(valueSize); // 设置buffer的position,跳过Value这段 buffer.position(buffer.position() + valueSize); } // 读取header的数量 int numHeaders = ByteUtils.readVarint(buffer); if (numHeaders < 0) throw new InvalidRecordException("Found invalid number of record headers " + numHeaders); final Header[] headers; if (numHeaders == 0) headers = Record.EMPTY_HEADERS; else // 读取headers headers = readHeaders(buffer, numHeaders); // 当record数据已经读取完,检查长度 if (buffer.position() - recordStart != sizeOfBodyInBytes) throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes + " bytes in record payload, but instead read " + (buffer.position() - recordStart)); return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers); } catch (BufferUnderflowException | IllegalArgumentException e) { throw new InvalidRecordException("Found invalid record structure", e); } }
读取headersgoogle
private static Header[] readHeaders(ByteBuffer buffer, int numHeaders) { Header[] headers = new Header[numHeaders]; for (int i = 0; i < numHeaders; i++) { // 读取header的KeySize int headerKeySize = ByteUtils.readVarint(buffer); if (headerKeySize < 0) throw new InvalidRecordException("Invalid negative header key size " + headerKeySize); // 读取header的key,而且转为utf-8 String headerKey = Utils.utf8(buffer, headerKeySize); // 设置buffer的postion,跳过key buffer.position(buffer.position() + headerKeySize); ByteBuffer headerValue = null; // 读取header的value int headerValueSize = ByteUtils.readVarint(buffer); if (headerValueSize >= 0) { // 切片,取出headerValue这一段 headerValue = buffer.slice(); // 设置headerValue的limit值 headerValue.limit(headerValueSize); // 设置buffer的postion,跳过value buffer.position(buffer.position() + headerValueSize); } // 实例化 RecordHeader, RecordHeader只是对key和value的封装 headers[i] = new RecordHeader(headerKey, headerValue); } return headers; }
public static int sizeInBytes(int offsetDelta, long timestampDelta, int keySize, int valueSize, Header[] headers) { // 计算body的长度 int bodySize = sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers); // 加上Length的长度 return bodySize + ByteUtils.sizeOfVarint(bodySize); } private static int sizeOfBodyInBytes(int offsetDelta, long timestampDelta, ByteBuffer key, ByteBuffer value, Header[] headers) { // 计算keySize的值 int keySize = key == null ? -1 : key.remaining(); // 计算valueSize的值 int valueSize = value == null ? -1 : value.remaining(); return sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers); } private static int sizeOfBodyInBytes(int offsetDelta, long timestampDelta, int keySize, int valueSize, Header[] headers) { // attribute占1byte int size = 1; // 计算offsetDelta的长度 size += ByteUtils.sizeOfVarint(offsetDelta); // 计算timestampDelta的长度 size += ByteUtils.sizeOfVarlong(timestampDelta); // 计算剩下key,value和headers的长度 size += sizeOf(keySize, valueSize, headers); return size; } private static int sizeOf(int keySize, int valueSize, Header[] headers) { int size = 0; if (keySize < 0) size += NULL_VARINT_SIZE_BYTES; else // keySize自己的长度,和key的长度 size += ByteUtils.sizeOfVarint(keySize) + keySize; if (valueSize < 0) size += NULL_VARINT_SIZE_BYTES; else // valueSize自己的长度,和value的长度 size += ByteUtils.sizeOfVarint(valueSize) + valueSize; if (headers == null) throw new IllegalArgumentException("Headers cannot be null"); // NumHeaders自己的长度 size += ByteUtils.sizeOfVarint(headers.length); for (Header header : headers) { String headerKey = header.key(); if (headerKey == null) throw new IllegalArgumentException("Invalid null header key found in headers"); // header的keySize int headerKeySize = Utils.utf8Length(headerKey); // keySize自己的长度,和key的长度 size += ByteUtils.sizeOfVarint(headerKeySize) + headerKeySize; byte[] headerValue = header.value(); if (headerValue == null) { size += NULL_VARINT_SIZE_BYTES; } else { // valueSize自己的长度,和value的长度 size += ByteUtils.sizeOfVarint(headerValue.length) + headerValue.length; } } return size; }
kafka的一条消息,对应着一条Record。DefaultRecord实现了Record接口,数据结构采用了新的varint类型,减小了空间存储。Record依赖着RecordBatch的存储,里面的offset,timestamp等都和RecordBatch有关。RecordBatch在下节会有介绍。code