Partition是由多个Segment组成,Segment又是由数据文件,索引文件组成。app
数据文件是以.log结尾,索引文件是由.index结尾。dom
OffsetIndex表示的就是一个索引文件ide
OffsetIndex继承AbstractIndex,它使用了内存映射MappedByteBuffer读取索引文件。函数
abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean) extends Logging { // 每条记录的大小 protected def entrySize: Int protected val lock = new ReentrantLock @volatile protected var mmap: MappedByteBuffer = { val newlyCreated = file.createNewFile() val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r") try { //若是文件是新建的,则分配空间。空间大小为,最接近maxIndexSize的entrySize的倍数 if(newlyCreated) { if(maxIndexSize < entrySize) throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize)) } // 获取file的大小。注意文件大小每次初始化为maxIndexSize,可是当文件关闭时, // 会截断掉多余的数据,因此文件的大小不是同样的 val len = raf.length() val idx = { // 实例MappedByteBuffer if (writable) raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) else raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, len) } // 设置MappedByteBuffer的position值 if(newlyCreated) idx.position(0) else // 指向文件的最后位置(必须为entrySize的倍数) idx.position(roundDownToExactMultiple(idx.limit, entrySize)) idx } finally { // 关闭RandomAccessFile文件。只要MappedByteBuffer没被垃圾回收,文件实际上就不会关闭 CoreUtils.swallow(raf.close()) } } private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
swallow接受传递的函数action,执行action。若是有异常,仅仅记录下来,不抛出。post
def swallow(log: (Object, Throwable) => Unit, action: => Unit) { try { action } catch { case e: Throwable => log(e.getMessage(), e) } }
// 根据mmap.limit和entrySize,计算出entry的最大值 @volatile private[this] var _maxEntries = mmap.limit / entrySize // 计算如今entry的数量 @volatile protected var _entries = mmap.position / entrySize // 是否数据存储已满 def isFull: Boolean = _entries >= _maxEntries def maxEntries: Int = _maxEntries def entries: Int = _entries
OffsetPosition有两个属性ui
sealed trait IndexEntry { // We always use Long for both key and value to avoid boxing. def indexKey: Long def indexValue: Long } case class OffsetPosition(offset: Long, position: Int) extends IndexEntry { override def indexKey = offset override def indexValue = position.toLong }
OffsetIndex的每条记录的大小为8byte。this
class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true) extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize, writable) { override def entrySize = 8 // 添加纪录 def append(offset: Long, position: Int) { inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + _entries + ").") if (_entries == 0 || offset > _lastOffset) { debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) // 计算对应baseOffset的偏移量,写进内存映射中 mmap.putInt((offset - baseOffset).toInt) // 将position写进内存映射中 mmap.putInt(position) // 更新_entries _entries += 1 // 更新_lastOffset _lastOffset = offset require(_entries * entrySize == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." .format(offset, entries, _lastOffset, file.getAbsolutePath)) } } }
OffsetIndex是做为数据文件的索引存在的。固然它只是存储了数据文件的一部分。当两条数据在数据文件的物理位置,相差大于必定的数值(由indexInterval配置),就会添加一条索引记录。固然既然做为索引,下面详细讲解索引的查找过程。scala
// targetOffset为要查找的offset def lookup(targetOffset: Long): OffsetPosition = { maybeLock(lock) { val idx = mmap.duplicate // 查找offset小于targetOffset的最大项位置 val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY) if(slot == -1) OffsetPosition(baseOffset, 0) else parseEntry(idx, slot).asInstanceOf[OffsetPosition] } } // 计算第n个entry的开始位置,而后读取int值,即offsetDelta private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize) // 计算第n个entry的开始位置,而后略过offsetDelta,而后读取int值,即position private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4) // 返回第n个entry的数据,OffsetPosition的实例 override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = { // 这里注意到,offset计算是baseOffset +offsetDelta OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n)) // 返回小于或等于target的记录中,值最大的一个 protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = indexSlotRangeFor(idx, target, searchEntity)._1 // 二分法查找。返回结果的两个数值,都是最接近target的。第一个值小于或等于target,第二个值大于或等于target private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = { // check if the index is empty if(_entries == 0) return (-1, -1) // 若是target比第一个entry的offfset还要小,说明不存在 if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) return (-1, 0) var lo = 0 var hi = _entries - 1 while(lo < hi) { val mid = ceil(hi/2.0 + lo/2.0).toInt val found = parseEntry(idx, mid) val compareResult = compareIndexEntry(found, target, searchEntity) if(compareResult > 0) hi = mid - 1 else if(compareResult < 0) // lo位置始终是小于target lo = mid else return (mid, mid) } (lo, if (lo == _entries - 1) -1 else lo + 1) }
def resize(newSize: Int) { inLock(lock) { val raf = new RandomAccessFile(file, "rw") // 计算newSize最多恰好容纳entrySize的大小 val roundedNewSize = roundDownToExactMultiple(newSize, entrySize) // 记录当前的position val position = mmap.position if (OperatingSystem.IS_WINDOWS) forceUnmap(mmap); try { // 若是roundedNewSize小于当前文件的大小,等同于文件截断。 // 反之,等同于添加文件容量 raf.setLength(roundedNewSize) // 更新mmap mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) _maxEntries = mmap.limit / entrySize // 回复当前的postion mmap.position(position) } finally { // 关闭文件 CoreUtils.swallow(raf.close()) } } } // 清空文件 override def truncate() = truncateToEntries(0) // 只保留entries个记录 private def truncateToEntries(entries: Int) { inLock(lock) { // 更新属性 _entries = entries mmap.position(_entries * entrySize) _lastOffset = lastEntry.offset } }
OffsetIndex是数据文件的索引,目的是为了提升查找的效率。OffsetIndex为了节省空间,只是间隔性的记录一些数据的索引。debug
OffsetIndex为了提升读取索引文件的速度,底层改用了内存映射的机制。code
OffsetIndex是根据数据的offset来查找数据文件的物理位置。它会根据offset,查找出小于或等于offset,而且最接近offset的值。