kafka协议-record解析

介绍

kafka的消息是以record方式的存储下来。java

Record

Record是接口,DefaultRecord实现了Record接口。数据结构

DefaultRecord的存储结构

Record =>
  Length => varint    #  record总长度(不包括Length自己)
  Attributes => int8    # 属性
  TimestampDelta => varint    # timestamp的偏移量(相对于RecordBatch的baseTimestamp)
  OffsetDelta => varint    # offset的偏移量(相对于RecordBatch的baseOffset)
  KeyLen => varint    # key的长度
  Key => data    # key的数据
  ValueLen => varint    # value的长度
  Value => data    # value的数据
  NumHeaders => varint    # header的数量
  Headers => [Header]    # Header列表
  
Header =>
  HeaderKeyLen => varint    # key的长度
  HeaderKey => string    # key的数据
  HeaderValueLen => varint    # value的长度
  HeaderValue => data    # value的数据

上面数据类型有varint,这个类型是Protocol Buffers的类型。在存储数值比较小的时候,会节省空间。具体参考连接https://developers.google.com/protocol-buffers/docs/encodingide

DefaultRecord类

DefaultRecord是上述数据结构的封装post

public class DefaultRecord implements Record {

    // excluding key, value and headers: 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes
    public static final int MAX_RECORD_OVERHEAD = 21;

    private static final int NULL_VARINT_SIZE_BYTES = ByteUtils.sizeOfVarint(-1);

    private final int sizeInBytes;
    private final byte attributes;
    private final long offset;
    private final long timestamp;
    private final int sequence;
    private final ByteBuffer key;
    private final ByteBuffer value;
    private final Header[] headers;

    private DefaultRecord(int sizeInBytes,
                          byte attributes,
                          long offset,
                          long timestamp,
                          int sequence,
                          ByteBuffer key,
                          ByteBuffer value,
                          Header[] headers) {
        this.sizeInBytes = sizeInBytes;
        this.attributes = attributes;
        this.offset = offset;
        this.timestamp = timestamp;
        this.sequence = sequence;
        this.key = key;
        this.value = value;
        this.headers = headers;
    }

    @Override
    public long offset() {
        return offset;
    }

    @Override
    public int sequence() {
        return sequence;
    }

    ........

    @Override
    public Header[] headers() {
        return headers;
    }

    /**

DefaultRecord读取

DefaultRecord提供了从buffer读取数据,实例化的方法this

// 从DataInput读取数据
    public static DefaultRecord readFrom(DataInput input,
                                         long baseOffset,  long baseTimestamp,
                                         int baseSequence, Long logAppendTime) throws IOException {
        // 读取Length
        int sizeOfBodyInBytes = ByteUtils.readVarint(input);
        // 分配buffer
        ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes);
        // 读取body数据
        input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes);
        // 计算整个record的长度,包括Length
        int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
        return readFrom(recordBuffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
                baseSequence, logAppendTime);
    }


    // 从ByteBuffer读取数据,baseOffset, baseTimestamp,baseSequence都是RecordBatch的属性
    public static DefaultRecord readFrom(ByteBuffer buffer, long baseOffset,
                                         long baseTimestamp, int baseSequence,  Long logAppendTime) {
        // 读取Length
        int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
        // 检查长度
        if (buffer.remaining() < sizeOfBodyInBytes)
            return null;
        // 整个record的总长度(包括Length)
        int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
        return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
                baseSequence, logAppendTime);
    }

    private static DefaultRecord readFrom(ByteBuffer buffer, int sizeInBytes,
                                          int sizeOfBodyInBytes, long baseOffset, long baseTimestamp,
                                          int baseSequence, Long logAppendTime) {
        try {
            // 记录record的开始位置
            int recordStart = buffer.position();
            // 读取Attributes
            byte attributes = buffer.get();
            // 读取TimestampDelta
            long timestampDelta = ByteUtils.readVarlong(buffer);
            // 计算timestamp,baseTimestamp是RecordBatch的属性
            long timestamp = baseTimestamp + timestampDelta;
            if (logAppendTime != null)
                timestamp = logAppendTime;
            // 读取OffsetDelta
            int offsetDelta = ByteUtils.readVarint(buffer);
            // 计算offset,baseOffset是RecordBatch的属性
            long offset = baseOffset + offsetDelta;
    
            int sequence = baseSequence >= 0 ?
                    DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
                    RecordBatch.NO_SEQUENCE;

            ByteBuffer key = null;
            // 读取KeySize
            int keySize = ByteUtils.readVarint(buffer);
            if (keySize >= 0) {
                // 取出Key这段值
                key = buffer.slice();
                // 设置切片的limit值
                key.limit(keySize);
                // 设置buffer的position,跳过Key这段
                buffer.position(buffer.position() + keySize);
            }

            ByteBuffer value = null;
            // 读取ValueSize
            int valueSize = ByteUtils.readVarint(buffer);
            if (valueSize >= 0) {
                // 取出Value这段值
                value = buffer.slice();
                // 设置切片的limit值
                value.limit(valueSize);
                // 设置buffer的position,跳过Value这段
                buffer.position(buffer.position() + valueSize);
            }

            // 读取header的数量
            int numHeaders = ByteUtils.readVarint(buffer);
            if (numHeaders < 0)
                throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
            final Header[] headers;
            if (numHeaders == 0)
                headers = Record.EMPTY_HEADERS;
            else
                // 读取headers
                headers = readHeaders(buffer, numHeaders);

            // 当record数据已经读取完,检查长度
            if (buffer.position() - recordStart != sizeOfBodyInBytes)
                throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
                        " bytes in record payload, but instead read " + (buffer.position() - recordStart));

            return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
        } catch (BufferUnderflowException | IllegalArgumentException e) {
            throw new InvalidRecordException("Found invalid record structure", e);
        }
    }

读取headersgoogle

private static Header[] readHeaders(ByteBuffer buffer, int numHeaders) {
        Header[] headers = new Header[numHeaders];
        for (int i = 0; i < numHeaders; i++) {
            // 读取header的KeySize
            int headerKeySize = ByteUtils.readVarint(buffer);
            if (headerKeySize < 0)
                throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
            // 读取header的key,而且转为utf-8
            String headerKey = Utils.utf8(buffer, headerKeySize);
            // 设置buffer的postion,跳过key
            buffer.position(buffer.position() + headerKeySize);
            
            ByteBuffer headerValue = null;
            // 读取header的value
            int headerValueSize = ByteUtils.readVarint(buffer);
            if (headerValueSize >= 0) {
                // 切片,取出headerValue这一段
                headerValue = buffer.slice();
                // 设置headerValue的limit值
                headerValue.limit(headerValueSize);
                // 设置buffer的postion,跳过value
                buffer.position(buffer.position() + headerValueSize);
            }
            // 实例化 RecordHeader, RecordHeader只是对key和value的封装
            headers[i] = new RecordHeader(headerKey, headerValue);
        }

        return headers;
    }

DefaultRecord长度计算

public static int sizeInBytes(int offsetDelta,
                                  long timestampDelta,
                                  int keySize,
                                  int valueSize,
                                  Header[] headers) {
        // 计算body的长度
        int bodySize = sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers);
        // 加上Length的长度
        return bodySize + ByteUtils.sizeOfVarint(bodySize);
    }

    private static int sizeOfBodyInBytes(int offsetDelta,
                                         long timestampDelta,
                                         ByteBuffer key,
                                         ByteBuffer value,
                                         Header[] headers) {
        // 计算keySize的值
        int keySize = key == null ? -1 : key.remaining();
        // 计算valueSize的值
        int valueSize = value == null ? -1 : value.remaining();
        return sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers);
    }

    private static int sizeOfBodyInBytes(int offsetDelta,
                                         long timestampDelta,
                                         int keySize,
                                         int valueSize,
                                         Header[] headers) {
        // attribute占1byte
        int size = 1; 
        // 计算offsetDelta的长度
        size += ByteUtils.sizeOfVarint(offsetDelta);
        // 计算timestampDelta的长度
        size += ByteUtils.sizeOfVarlong(timestampDelta);
        // 计算剩下key,value和headers的长度
        size += sizeOf(keySize, valueSize, headers);
        return size;
    }

    private static int sizeOf(int keySize, int valueSize, Header[] headers) {
        int size = 0;
        if (keySize < 0)
            size += NULL_VARINT_SIZE_BYTES;
        else
            // keySize自己的长度,和key的长度
            size += ByteUtils.sizeOfVarint(keySize) + keySize;

        if (valueSize < 0)
            size += NULL_VARINT_SIZE_BYTES;
        else
             // valueSize自己的长度,和value的长度
            size += ByteUtils.sizeOfVarint(valueSize) + valueSize;

        if (headers == null)
            throw new IllegalArgumentException("Headers cannot be null");
        // NumHeaders自己的长度
        size += ByteUtils.sizeOfVarint(headers.length);
        for (Header header : headers) {
            String headerKey = header.key();
            if (headerKey == null)
                throw new IllegalArgumentException("Invalid null header key found in headers");
            // header的keySize
            int headerKeySize = Utils.utf8Length(headerKey);
            // keySize自己的长度,和key的长度
            size += ByteUtils.sizeOfVarint(headerKeySize) + headerKeySize;

            byte[] headerValue = header.value();
            if (headerValue == null) {
                size += NULL_VARINT_SIZE_BYTES;
            } else {
                // valueSize自己的长度,和value的长度
                size += ByteUtils.sizeOfVarint(headerValue.length) + headerValue.length;
            }
        }
        return size;
    }

归纳

kafka的一条消息,对应着一条Record。DefaultRecord实现了Record接口,数据结构采用了新的varint类型,减小了空间存储。Record依赖着RecordBatch的存储,里面的offset,timestamp等都和RecordBatch有关。RecordBatch在下节会有介绍。code

相关文章
相关标签/搜索