消息队列在kafka中被称为Topic。由于kafka的分布式,Topic会有多个Partition组成,分布在不一样的机器上。kafka为了进一步的增长读取效率,会将Partition分为多个Segment。这篇文章将详细的介绍Segment的消息的添加,查找和索引的恢复。app
class LogSegment(val log: FileRecords, val index: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, time: Time) extends Logging { def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { if (records.sizeInBytes > 0) { trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d" .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp)) // 获取FileRecord文件的末尾 val physicalPosition = log.sizeInBytes() if (physicalPosition == 0) rollingBasedTimestamp = Some(largestTimestamp) // 检查offset的范围 require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.") // 调用FileRecords添加records val appendedBytes = log.append(records) trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset") // 更新最大的timestamp和对应的offset. if (largestTimestamp > maxTimestampSoFar) { maxTimestampSoFar = largestTimestamp offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp } // index记录是当对应的FileRecrods中的记录大于indexIntervalBytes时,会添加新的index纪录 if(bytesSinceLastIndexEntry > indexIntervalBytes) { // 添加index纪录 index.append(firstOffset, physicalPosition) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp) bytesSinceLastIndexEntry = 0 } bytesSinceLastIndexEntry += records.sizeInBytes } } // 文件中存储的是offset与baseOffset的差值,只能用四个字节表示 def canConvertToRelativeOffset(offset: Long): Boolean = { (offset - baseOffset) <= Integer.MAX_VALUE } }
class LogSegment { private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = { // 查找对应的索引记录 val mapping = index.lookup(offset) // 从文件中查找RecordBatch log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition)) } } class FileRecords { public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) { // 从指定的位置startingPosition,开始顺序遍历RecordBatch for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) { long offset = batch.lastOffset(); if (offset >= targetOffset) // 直到找到第一个,batch的tlastOffset大于argetOffset // 返回 lastOffset, batch的开始位置,数据大小 return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes()); } return null; } }
read方法提供了方便查找消息的做用, 它可以指定的条件中读取消息。分布式
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size, minOneMessage: Boolean = false): FetchDataInfo = { if (maxSize < 0) throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) val logSize = log.sizeInBytes // 查找startOffset的位置信息 val startOffsetAndSize = translateOffset(startOffset) // if the start position is already off the end of the log, return null if (startOffsetAndSize == null) return null val startPosition = startOffsetAndSize.position val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition) val adjustedMaxSize = // 若是须要保证读取一条完整的数据,则至少须要startOffset的对应的ReocrdBatch的大小 if (minOneMessage) math.max(maxSize, startOffsetAndSize.size) else maxSize if (adjustedMaxSize == 0) return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY) val fetchSize: Int = maxOffset match { case None => // 若是没有指定maxOffset,则只须要考虑大小。 min((maxPosition - startPosition).toInt, adjustedMaxSize) case Some(offset) => if (offset < startOffset) return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false) // 获取maxPosition对应的位置 val mapping = translateOffset(offset, startPosition) val endPosition = if (mapping == null) logSize else mapping.position // 查找知足全部条件,即之间的最小值 min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt } FetchDataInfo(offsetMetadata, log.read(startPosition, fetchSize), firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size) }
当索引文件被损坏时,kafka会自动重建索引。fetch
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = { // 截断索引文件 index.truncate() index.resize(index.maxIndexSize) // 截断时间索引文件 timeIndex.truncate() timeIndex.resize(timeIndex.maxIndexSize) // 截断事务索引文件 txnIndex.truncate() // 文件的读取位置 var validBytes = 0 var lastIndexEntry = 0 maxTimestampSoFar = RecordBatch.NO_TIMESTAMP try { // 遍历数据文件的RecordBatch for (batch <- log.batches.asScala) { batch.ensureValid() // 更新最大的timestamp和对应的offset if (batch.maxTimestamp > maxTimestampSoFar) { maxTimestampSoFar = batch.maxTimestamp offsetOfMaxTimestamp = batch.lastOffset } // 若是数据间隔大小超过指定数值indexIntervalBytes,则添加索引记录 if(validBytes - lastIndexEntry > indexIntervalBytes) { val startOffset = batch.baseOffset index.append(startOffset, validBytes) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp) lastIndexEntry = validBytes } // 更新validBytes validBytes += batch.sizeInBytes() if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { leaderEpochCache.foreach { cache => if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is to avoid unnecessary warning in cache.assign() cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } updateProducerState(producerStateManager, batch) } } } catch { case e: CorruptRecordException => logger.warn("Found invalid messages in log segment %s at byte offset %d: %s." .format(log.file.getAbsolutePath, validBytes, e.getMessage)) } // 检查数据文件,是否有多于的数据 val truncated = log.sizeInBytes - validBytes if (truncated > 0) logger.debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery") log.truncateTo(validBytes) index.trimToValidSize() timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true) timeIndex.trimToValidSize() truncated }
Segment是由数据文件FileReocrds和索引文件IndexOffset共同组成。当添加新的消息时,会更新二者。当查找消息时,也会充分的利用索引文件。ui