Kafka消息存储之MessageWriter

摘要

MessageWriter是Kafka进行消息写的工具类,这一部分代码却是和整个系统设计没有多大关系,可是从局部来看,有许多有意思的细节,因此也开一篇短博客来说一讲。数组

MessageWriter的设计意图

首先让咱们列出在消息写的过程当中可能出现的变化状况,也就是这个类的设计需求:缓存

  • 输入源不一样,有bytes[] stream 基本的数据类型(Int,Long,Byte,bytes)等,均须要支持。网络

  • 写入的数据大小不肯定,因此须要考虑自适应容量机制ide

  • 须要必定的自动保障机制,好比在写入数据后自动生成CRC并填充到头部;自动计算大小并填充到头部函数

咱们将这三个需求再切分一下层次,绝大部分的基本类型写入均可以归结为字节的写入,各类类型的写入和自适应容量的功能较为底层,能够实现的更加普适,而相对的第三个需求则相对上层,能够分开来实现。工具

实际上,Kafka也是这么作的,MessagheWriter继承了一个父类BufferingOutputStream,该类主要用于从各种输入源中写入数据到缓存,而后批量写入Buffer中。性能

BufferingOutputStream解析

写入的消息会先暂存在BufferingOutputStream内部,他的容量控制是经过字节数组来构成链表完成的,每一个字节数组均是定长的(长度由构造函数传入),同时为每一个字节数组配备一个游标来标示已被写入多少字节内容,同时采用一个引用代表当前正在写的数组。下面就给出这种基本结构的定义。this

protected final class Segment(size: Int) {
    val bytes = new Array[Byte](size)
    var written = 0
    var next: Segment = null
    def freeSpace: Int = bytes.length - written
  }

BufferingOutputStream的控制策略很是简单,那就是当前的Segment写满,当即增长新的Segment。值得注意的是,这种写入是不可逆的,就是当你回退后再写入也会建立新的segment而非重用原有的segment。scala

那么更加有效和复杂的控制策略是否值得被引入呢?Kafka采用这种简单的策略是因为消息的写入是一次性的,一个序列的消息使用一个writer来写入,而并不是复用writer。另外就是Kafka的性能更多地受限于网络带宽,因此它应该采起尽可能简单的读写策略提升读写的效率而没必要费尽心机来减小内存的建立和释放(GC并非它的主要问题)。设计

基本写入

下面咱们以byte数组的写入为例,上代码:

override def write(b: Array[Byte], off: Int, len: Int) {
    if (off >= 0 && off <= b.length && len >= 0 && off + len <= b.length) {
      var remaining = len
      var offset = off
      while (remaining > 0) {
        if (currentSegment.freeSpace <= 0) addSegment()

        val amount = math.min(currentSegment.freeSpace, remaining)
        System.arraycopy(b, offset, currentSegment.bytes, currentSegment.written, amount)
        currentSegment.written += amount
        offset += amount
        remaining -= amount
      }
    } else {
      throw new IndexOutOfBoundsException()
    }
  }

对于基本类型的写入,MessageWriter使用了位操做,一个字节一个字节的写入,确保它们构建在字节写入的基础之上,这是一种漂亮的概括,咱们也以32位Int的写入为例。

private def writeInt(value: Int): Unit = {
    write(value >>> 24)
    write(value >>> 16)
    write(value >>> 8)
    write(value)
  }

保障机制的实现

咱们仍是先贴代码再废话吧

def write(key: Array[Byte] = null,
            codec: CompressionCodec,
            timestamp: Long,
            timestampType: TimestampType,
            magicValue: Byte)(writePayload: OutputStream => Unit): Unit = {
    withCrc32Prefix {
      // write magic value
      write(magicValue)
      // write attributes
      var attributes: Byte = 0
      if (codec.codec > 0)
        attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
      if (magicValue > MagicValue_V0)
        attributes = timestampType.updateAttributes(attributes)
      write(attributes)
      // Write timestamp
      if (magicValue > MagicValue_V0)
        writeLong(timestamp)
      // write the key
      if (key == null) {
        writeInt(-1)
      } else {
        writeInt(key.length)
        write(key, 0, key.length)
      }
      // write the payload with length prefix
      withLengthPrefix {
        writePayload(this)
      }
    }
  }

private def withLengthPrefix(writeData: => Unit): Unit = {
    // get a writer for length value
    val lengthWriter = reserve(ValueSizeLength)
    // save current size
    val oldSize = size
    // write data
    writeData
    // write length value
    writeInt(lengthWriter, size - oldSize)
  }

从上面这段代码能够看出scala的with很是相似于Python的decrator,将代码块当作无返回无参的函数在with中进行调用。但我想提的是另外一个问题,那就是如何记录以前的位置呢。咱们说过写入的过程是不可逆的,写入的游标不能回退,可是咱们必须在写完数据以后再写入CRC,那么咱们就须要相似于buffer的mark和reset机制同样的东东。

可是咱们不能像buffer那样直接移动游标,由于咱们须要顺利写入下一条消息,移动游标再复位实在代价太大。那咱们能不能提早把这一小段内存截取出来赋给另外一个引用,写入的时候向新引用写就好了,独立于原有的数据写入过程。MessageWriter就是这样作的,好吧让咱们介绍那所谓的一小段内存吧。

protected class ReservedOutput(seg: Segment, offset: Int, length: Int) extends OutputStream {
    private[this] var cur = seg
    private[this] var off = offset
    private[this] var len = length //预留的内存大小

    override def write(value: Int) = {
      if (len <= 0) throw new IndexOutOfBoundsException()
      if (cur.bytes.length <= off) {
        cur = cur.next
        off = 0
      }
      cur.bytes(off) = value.toByte
      off += 1
      len -= 1
    }
  }

可是大家必定看出了上面的问题了,这样写入不是将数据覆盖了吗,因此咱们在写数据时须要先预留一部份内存,这时就须要跳过一部份内存空间了。

private def skip(len: Int): Unit = {
    if (len >= 0) {
      var remaining = len
      while (remaining > 0) {
        if (currentSegment.freeSpace <= 0) addSegment()

        val amount = math.min(currentSegment.freeSpace, remaining)
        currentSegment.written += amount
        remaining -= amount
      }
    } else {
      throw new IndexOutOfBoundsException()
    }
  }

预留操做在此处

def reserve(len: Int): ReservedOutput = {
    val out = new ReservedOutput(currentSegment, currentSegment.written, len)
    skip(len)
    out
  }

咱们再来看withLengthPrefix,先预留ValueSize大小的内存,而后写入数据,最后计算整个的内存变化也就是写入的数据的大小,写入预留内存中,是否是很完美?

相关文章
相关标签/搜索