RecordBatch是包含许多record的集合。RecordBatch继承了Iterable<Record>接口,提供了遍历Record的方法.java
AbstractRecordBatch继承了RecordBatch,而且实现了如下的方法数据结构
abstract class AbstractRecordBatch implements RecordBatch { @Override public boolean hasProducerId() { return RecordBatch.NO_PRODUCER_ID < producerId(); } // 计算下一个offset @Override public long nextOffset() { return lastOffset() + 1; } // 是否RecordBatch数据被压缩过 @Override public boolean isCompressed() { return compressionType() != CompressionType.NONE; } }
DefaultRecordBatch实现了RecordBatch接口。ide
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch { @Override public Iterator<Record> iterator() { // count()方法返回record的数量 if (count() == 0) return Collections.emptyIterator(); // 返回数据是否被压缩 if (!isCompressed()) // 没有压缩,返回未压缩的Iterator return uncompressedIterator(); // 若是被压缩,根据压缩类型,返回对应的Iterator try (CloseableIterator<Record> iterator = compressedIterator(BufferSupplier.NO_CACHING)) { List<Record> records = new ArrayList<>(count()); while (iterator.hasNext()) records.add(iterator.next()); return records.iterator(); } }
RecordIterator定义在DefaultRecordBatch里面,它负责Record遍历this
private abstract class RecordIterator implements CloseableIterator<Record> { private final Long logAppendTime; private final long baseOffset; private final long baseTimestamp; private final int baseSequence; private final int numRecords; // 已有的Record的数量 private int readRecords = 0; // 已经读取Record的个数 public RecordIterator() { this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null; // 获取RecordBatch的baseOffset this.baseOffset = baseOffset(); // 获取RecordBatch的baseTimestamp this.baseTimestamp = baseTimestamp(); // 获取RecordBatch的baseSequence this.baseSequence = baseSequence(); // 获取RecordBatch的records数量 int numRecords = count(); if (numRecords < 0) throw new InvalidRecordException("Found invalid record count " + numRecords + " in magic v" + magic() + " batch"); this.numRecords = numRecords; } @Override public boolean hasNext() { // 若是已经读取的数量,少于总数,则表示还有未读完的 return readRecords < numRecords; } @Override public Record next() { if (readRecords >= numRecords) throw new NoSuchElementException(); // 更新readRecords readRecords++; // 读取下一个Record Record rec = readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime); if (readRecords == numRecords) { // 当已经读完最后一个Record时,检查是否buffer数据已经读完 if (!ensureNoneRemaining()) throw new InvalidRecordException("Incorrect declared batch size, records still remaining in file"); } return rec; } // 读取下一个Record protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime); // 是否buffer已经读取完 protected abstract boolean ensureNoneRemaining(); @Override public void remove() { throw new UnsupportedOperationException(); } }
private CloseableIterator<Record> uncompressedIterator() { // 复制buffer final ByteBuffer buffer = this.buffer.duplicate(); // 调到Records位置 buffer.position(RECORDS_OFFSET); return new RecordIterator() { @Override protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { try { // 从buffer中读取数据,实例化DefaultRecord return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime); } catch (BufferUnderflowException e) { throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); } } @Override protected boolean ensureNoneRemaining() { // buffer没有未读取的数据 return !buffer.hasRemaining(); } @Override public void close() {} }; }
CompressionType表示Records数据的压缩类型,有内置的NONE,GZIP,SNAPPY,LZ4四种code
public enum CompressionType { NONE(0, "none", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { return buffer; } @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { return new ByteBufferInputStream(buffer); } }, GZIP(1, "gzip", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { try { // 返回支持gzip的OutputStream // GZIPOutputStream has a default buffer size of 512 bytes, which is too small return new GZIPOutputStream(buffer, 8 * 1024); } catch (Exception e) { throw new KafkaException(e); } } @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { try { // 返回支持gzip的InputStream return new GZIPInputStream(new ByteBufferInputStream(buffer)); } catch (Exception e) { throw new KafkaException(e); } } } ...... // 根据id,返回指定类型的CompressionType public static CompressionType forId(int id) { switch (id) { case 0: return NONE; case 1: return GZIP; case 2: return SNAPPY; case 3: return LZ4; default: throw new IllegalArgumentException("Unknown compression type id: " + id); } } }
只有Records部分被压缩,前面的字段是没有被压缩继承
private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier) { final ByteBuffer buffer = this.buffer.duplicate(); buffer.position(RECORDS_OFFSET); // compressionType()返回压缩类型,wrapForInput返回装饰过的DataInputStream final DataInputStream inputStream = new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier)); return new RecordIterator() { @Override protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { try { // 从inputStream中读取数据,实例化DefaultRecord return DefaultRecord.readFrom(inputStream, baseOffset, baseTimestamp, baseSequence, logAppendTime); } catch (EOFException e) { throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); } catch (IOException e) { throw new KafkaException("Failed to decompress record stream", e); } } @Override protected boolean ensureNoneRemaining() { try { // 当read()返回 -1,表示inputStream已经读取完 return inputStream.read() == -1; } catch (IOException e) { throw new KafkaException("Error checking for remaining bytes after reading batch", e); } } @Override public void close() { try { inputStream.close(); } catch (IOException e) { throw new KafkaException("Failed to close record stream", e); } } }; }
DefaultRecordBatch的数据结构接口
RecordBatch => FirstOffset => int64 # 也做为BaseOffset,Record的OffsetDelta是相对于这个字段 Length => int32 # 数据长度,从PartitionLeaderEpoch开始计算 PartitionLeaderEpoch => int32 Magic => int8 # 版本 CRC => int32 # CRC值,用于校检数据完整性(从Attributes开始计算) Attributes => int16 # 属性,压缩类型 LastOffsetDelta => int32 # FirstTimestamp => int64 # 也做为BaseTimestamp,Record的TimestampDelta是相对于这个字段 MaxTimestamp => int64 # records中最大的Timestamp ProducerId => int64 ProducerEpoch => int16 FirstSequence => int32 RecordsCount => int32 # records列表的数量 Records => [Record]
DefaultRecordBatch类ip
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch { static final int BASE_OFFSET_OFFSET = 0; static final int BASE_OFFSET_LENGTH = 8; static final int LENGTH_OFFSET = BASE_OFFSET_OFFSET + BASE_OFFSET_LENGTH; static final int LENGTH_LENGTH = 4; static final int PARTITION_LEADER_EPOCH_OFFSET = LENGTH_OFFSET + LENGTH_LENGTH; static final int PARTITION_LEADER_EPOCH_LENGTH = 4; static final int MAGIC_OFFSET = PARTITION_LEADER_EPOCH_OFFSET + PARTITION_LEADER_EPOCH_LENGTH; static final int MAGIC_LENGTH = 1; static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH; static final int CRC_LENGTH = 4; static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH; static final int ATTRIBUTE_LENGTH = 2; static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; static final int LAST_OFFSET_DELTA_LENGTH = 4; static final int BASE_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH; static final int BASE_TIMESTAMP_LENGTH = 8; static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + BASE_TIMESTAMP_LENGTH; static final int MAX_TIMESTAMP_LENGTH = 8; static final int PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH; static final int PRODUCER_ID_LENGTH = 8; static final int PRODUCER_EPOCH_OFFSET = PRODUCER_ID_OFFSET + PRODUCER_ID_LENGTH; static final int PRODUCER_EPOCH_LENGTH = 2; static final int BASE_SEQUENCE_OFFSET = PRODUCER_EPOCH_OFFSET + PRODUCER_EPOCH_LENGTH; static final int BASE_SEQUENCE_LENGTH = 4; static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH; static final int RECORDS_COUNT_LENGTH = 4; static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; public static final int RECORD_BATCH_OVERHEAD = RECORDS_OFFSET; // 读取record的数量 private int count() { return buffer.getInt(RECORDS_COUNT_OFFSET); } // 返回DefaultRecordBatch的总数据长度 @Override public int sizeInBytes() { // LOG_OVERHEAD在Records有定义,为FirstOffset和Length的长度 return LOG_OVERHEAD + buffer.getInt(LENGTH_OFFSET); } @Override public long checksum() { return ByteUtils.readUnsignedInt(buffer, CRC_OFFSET); } public boolean isValid() { // 校检CRC值 return sizeInBytes() >= RECORD_BATCH_OVERHEAD && checksum() == computeChecksum(); } private long computeChecksum() { // compute的三个参数,分别是ByteBuffer,offset,size return Crc32C.compute(buffer, ATTRIBUTES_OFFSET, buffer.limit() - ATTRIBUTES_OFFSET); } @Override public CompressionType compressionType() { return CompressionType.forId(attributes() & COMPRESSION_CODEC_MASK); } @Override public long baseOffset() { return buffer.getLong(BASE_OFFSET_OFFSET); } @Override public long lastOffset() { return baseOffset() + lastOffsetDelta(); } ...... }
RecordBatch是多个Record的集合。它继承了了Iterable<Record>接口,提供遍历Record的方法。rem
DefaultRecordBatch实现了RecordBatch接口,有本身的数据结构。它会根据压缩类型返回对应的Iterator<Record>,提供Record的遍历。get