Kafka的消息格式

Commit Log

Kafka储存消息的文件被它叫作log,按照Kafka文档的说法是:java

Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit logapache

这反应出来的Kafka的行为是:消息被不断地append到文件末尾,并且消息是不可变的。数组

这种行为源于Kafka想要实现的功能:高吞吐量,多副本,消息持久化。这种简单的log形式的文件结构可以更好地实现这些功能,不过也会在其它方面有所欠缺,好比检索消息的能力。缓存

而Kafka的行为也决定了它的消息格式。对于Kafka来讲,消息的主体部分的格式在网络传输中和磁盘上是一致的,也就是说消息的主体部分能够直接从网络读取的字节buffer中写入到文件(部分状况下),也能够直接从文件中copy到网络,而不须要在程序中再加工,这有利于下降服务器端的开销,以及提升IO速度(好比使用zero-copy的传输)。服务器

这也就决定了Kafka的消息格式必须是适于被直接append到文件中的。固然啥均可以append到文件后面,问题在于怎么从文件中拆分出来一条条记录。网络

记录的划分以及消息的格式

对于日志来讲,一条记录以"\n"结尾,或者经过其它特定的分隔符分隔,这样就能够从文件中拆分出一条一条的记录,不过这种格式更适用于文本,对于Kafka来讲,须要的是二进制的格式。因此,Kafka使用了另外一种经典的格式:在消息前面固定长度的几个字节记录下这条消息的大小(以byte记),因此Kafka的记录格式变成了:app

Offset MessageSize Messageide

消息被以这样格式append到文件里,在读的时候经过MessageSize能够肯定一条消息的边界。性能

须要注意的是,在Kafka的文档以及源码中,消息(Message)并不包括它的offset。Kafka的log是由一条一条的记录构成的,Kafka并无给这种记录起个专门的名字,可是须要记住的是这个“记录”并不等于"Message"。Offset MessageSize Message加在一块儿,构成一条记录。而在Kafka Protocol中,Message具体的格式为fetch

Message => Crc MagicByte Attributes Key Value
   Crc => int32
   MagicByte => int8
   Attributes => int8
   Key => bytes
   Value => bytes

各个部分的含义是

Field

Description

Attributes

This byte holds metadata attributes about the message. The lowest 2 bits contain the compression codec used for the message. The other bits should be set to 0.

Crc

The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer.

Key

The key is an optional message key that was used for partition assignment. The key can be null.

MagicByte

This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 0.

Offset

This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.

Value

The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null.

 

MessageSet

之因此要强调记录与Message的区别,是为了更好地理解MessageSet的概念。Kafka protocol里对于MessageSet的定义是这样的

MessageSet => [Offset MessageSize Message]
   Offset => int64
   MessageSize => int32

也就是说MessageSet是由多条记录组成的,而不是消息,这就决定了一个MessageSet实际上不须要借助其它信息就能够从它对应的字节流中切分出消息,而这决定了更重要的性质:Kafka的压缩是以MessageSet为单位的。而以MessageSet为单位压缩,决定了对于压缩后的MessageSet,不须要在它的外部记录这个MessageSet的结构,也就决定了Kafka的消息是能够递归包含的,也就是前边"value"字段的说明“Kafka supports recursive messages in which case this may itself contain a message set"。

具体地说,对于Kafka来讲,能够对一个MessageSet作为总体压缩,把压缩后获得的字节数组做为一条Message的value。因而,Message既能够表示未压缩的单条消息,也能够表示压缩后的MessageSet。

压缩后的消息的读取

就看Message头部的Attributes里的压缩格式标识。说到这个,得说下递归包含的事情,理论上,一个压缩的的MessageSet里的一个Message可能会是另外一个压缩后的MessageSet,或者包含更深层的MessageSet。可是实际上,Kafka中的一个Message最多只含有一个MessageSet。从Message中读取MessageSet的逻辑,能够在ByteBufferMessageSet的internalIterator方法中找到:

        if(isShallow) { //是否要进行深层迭代
          new MessageAndOffset(newMessage, offset)
        } else { //若是要深层迭代的话
          newMessage.compressionCodec match {
            case NoCompressionCodec =>
              innerIter = null
              new MessageAndOffset(newMessage, offset) //若是这个Message没有压缩,就直接把它做为一个Message返回
            case _ =>
              innerIter = ByteBufferMessageSet.deepIterator(newMessage) //若是这个Message采用了压缩,就对它进行深层迭代
              if(!innerIter.hasNext)
                innerIter = null
              makeNext()
          }
        }

而ByteBufferMessageSet的deepIterator方法就是对这个Message的value进行解压,而后从中按照Offset MessageSize Message的格式读取一条条记录,对于此次读取的Message,就再也不进行深层迭代了。下面是deepIterator的makeNext方法,它被不断调用以生成迭代器的元素

      override def makeNext(): MessageAndOffset = {
        try {
          // read the offset
          val offset = compressed.readLong()
          // read record size
          val size = compressed.readInt()

          if (size < Message.MinHeaderSize)
            throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")

          // read the record into an intermediate record buffer
          // and hence has to do extra copy
          val bufferArray = new Array[Byte](size)
          compressed.readFully(bufferArray, 0, size)
          val buffer = ByteBuffer.wrap(bufferArray)

          val newMessage = new Message(buffer)

          // the decompressed message should not be a wrapper message since we do not allow nested compression
          new MessageAndOffset(newMessage, offset)
        } catch {
          case eofe: EOFException =>
            compressed.close()
            allDone()
          case ioe: IOException =>
            throw new KafkaException(ioe)
        }
      }

KAFKA-1718

至于一个MessageSet中不能包含多个压缩后的Message(压缩后的Message也就是以压缩后的MessageSet做为value的Message),Kafka Protocol中是这么说的

The outer MessageSet should contain only one compressed "Message" (see KAFKA-1718 for details).

KAFKA-1718就是在Protocol里添加这么一个特殊说明的缘由。事情是这样的:

报各这个问题的人是Go语言client的做者,他发现本身发的Message明显没有过大,可是发生了MessageSizeTooLargeException。后来跟其它人讨论,发现是由于broker端在调用Log.append时,会把传送给这个方法的MessageSet解压开,而后再组合成一个压缩后的MessageSet(ByteBufferMessageSet)。而Go语言的客户端发送的MessageSet中包含了多个压缩后的Message,这样即便发送时的Message不会超过message.max.bytes的限制,可是broker端再次生成的Message就超过了这个限制。因此,Kafka Protocol对这种状况作了特殊说明:The outer MessageSet should contain only one compressed "Message"。

Compressed Message的offset

即然能够把压缩后的MessageSet做为Message的value,那么这个Message的offset该如何设置呢?

这个offset的值只有两种可能:1, 被压缩的MessageSet里Message的最大offset; 2, 被压缩的MessageSet里Message的最小offset.

这两种取值没有功能的不一样,只有效率的不一样。

因为FetchRequest协议中的offset是要求broker提供大于等于这个offset的消息,所以broker会检查log,找到符合条件的,而后传输出去。那么因为FetchRequest中的offset位置的消息可位于一个compressed message中,因此broker须要肯定一个compressed Message是否须要被包含在respone中。

  • 若是compressed Message的offset是它包含的MessageSet的最小offset。那么,咱们对于这个Message是否应包含在response中,没法给出"是”或"否“的回答。好比FetchRequest中指明的开始读取的offset是14,而一个compressed Message的offset是13,那么这个Message中可能包含offset为14的消息,也可能不包含。
  • 若是compressed Message的offset是它包含的MessageSet的最大offset,那么,能够根据这个offset肯定这个Message“不该该”包含在response中。好比FetchRequest中指明的开始读取的offset是14,那么若是一个compressed Message的offset是13,那它就不应被包含在response中。而当咱们顺序排除这种不符合条件的Message,就能够找到第一个应该被包含在response中的Message(压缩或者未压缩), 从它开始读取。

在第一种状况下(最小offset),咱们尽管能够经过连续的两个Message肯定第一个Message的offset范围,可是这样在读取时须要在读取第二个Message的offset以后跳回到第一个Message,  这一般会使得最近一次读(也就读第二个offset)的文件系统的缓存失效。并且逻辑比第二种状况更复杂。在第二种状况下,broker只须要找到第一个其offset大于或等于目标offset的Message,从它能够读取便可,并且也一般能利用到文件系统缓存,由于offset和消息内容有可能在同一个缓存块中。

在处理FetchRequest时,broker的逻辑也正是如此。对FetchRequest的处理会调用到Log#read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None)方法,而后调用到LogSegment的read方法,它的以后的调用有不少,全部不贴代码了,它的注释说明了读取的逻辑

* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified

即,返回的MessageSet的第一条Message的offset >= startOffset。

而在broker给compressed Message赋予offset时,其逻辑也是赋予其包含的messages中的最大offset。这段逻辑在ByteBufferMessageSet的create方法中:

      messageWriter.write(codec = compressionCodec) { outputStream =>
        val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) //建立压缩流
        try {
          for (message <- messages) {
            offset = offsetCounter.getAndIncrement //offsetCounter是一个AtomicLong,使用它的当前值做为这条Message的offset,而后+1做为下一条消息的offset
            output.writeLong(offset)//写入这条日志记录的offset
            output.writeInt(message.size)//写入这条日志记录的大小
            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) //写入这条记录的Message
          }
        } finally {
          output.close()
        }
      }
      val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
      writeMessage(buffer, messageWriter, offset)//以最后一个Message的offset做为这个compressed Message的offset

Validate Message

什么须要验证?

先看一下消息的哪些特征须要被验证。

首先,网络传输过程当中,数据可能会产生错误,即便是写在磁盘上的消息,也可能会因为磁盘的问题产生错误。所以,broker对接收到的消息须要验证其完整性。这里的消息就是前边协议里定义的Message。对于消息完整性的检测,是使用CRC32校验,可是并非对消息的全部部分计算CRC,而是对Message的Crc部分之后的部分,不包括记录的offset和MessageSize部分。把offset和MessageSize加到CRC计算中,能够对完整性有更强的估证,可是坏处在于这两个部分在消息由producer到达broker之后,会被broker重写,所以若是把它们计算在crc里边,就须要在broker端从新计算crc32,这样会带来额外的开销。

CRC32没有检测出错误的几率在0.0047%如下,加上TCP自己也有校验机制,不能检测出错误的几率就很小了(这个还须要再仔细算一下)。

除了消息的完整性,还须要对消息的合规性进行检验,主要是检验offset是不是单调增加的,以及MessageSize是超过了最大值。

这里检验时使用的MessageSize就不是Message自己的大小了,而是一个记录的大小,包括offset和MessageSize,这个也挺奇怪的,有必要非拉上这俩吗?

并且在broker端检验producer发来的MessageSet时,也不必检验它的offset是不是单调增加的呀,毕竟leader还要对Message的offset从新赋值。而follower是从leader处拉取的,若是网络或者磁盘出错,经过对offset的单调性检查也可能会漏掉出错了的记录,对于consumer来讲也是同理。因此这里有点奇怪。

什么时候须要验证?

在broker把收到的producer request里的MessageSet append到Log以前,以及consumer和follower获取消息以后,都须要进行校验。

这种状况分红两种:

1. broker和consumer把收到的消息append到log以前

2. consumser收到消息后

第一种状况都是在调用Log#append时进行检验的。

如何验证?

先看下Log#append的方法声明

def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo

在replica的fetcher线程调用append方法时,会把assignOffsets设成false,而leader处理produce request时,会把assignOffsets设成true。

下面append方法的一部分代码

    val appendInfo = analyzeAndValidateMessageSet(messages) //验证消息
    
    // if we have any valid messages, append them to the log
    if(appendInfo.shallowCount == 0)
      return appendInfo
      
    // trim any invalid bytes or partial messages before appending it to the on-disk log
    var validMessages = trimInvalidBytes(messages, appendInfo)//trim掉不可用的部分或者残缺的消息

    try {
      // they are valid, insert them in the log
      lock synchronized {
        appendInfo.firstOffset = nextOffsetMetadata.messageOffset 

       if(assignOffsets) { //若是须要从新赋予offset
          // assign offsets to the message set
          val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
          try {
            validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact) //验证消息而且赋予offset
          } catch {
            case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
          }
          appendInfo.lastOffset = offset.get - 1
        } else {
          // we are taking the offsets we are given
          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
            throw new IllegalArgumentException("Out of order offsets found in " + messages)
        }

        // re-validate message sizes since after re-compression some may exceed the limit 对压缩后消息从新验证MessageSize是否超过了容许的最大值
        for(messageAndOffset <- validMessages.shallowIterator) {
          if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
            // we record the original message set size instead of trimmed size
            // to be consistent with pre-compression bytesRejectedRate recording
            BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
            BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
            throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
              .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
          }
        }

注意到对MessageSize验证了两次,第二次是对从新压缩后的消息。KAFKA-1718里提到MessageSizeToLargeException,就是在这时候检测出来的。

初步检验:analyzeAndValidateMessageSet

具体的检验消息完整性和offset单调增加的逻辑在analyzeAndValidateMessageSet方法里。这个方法的实现里,须要注意几点:

  1. 它是使用ByteBufferMessageSize的shallowIterator来对这个MessageSet的消息进行迭代,这也意味着并不会对compressed message里边的MessageSet解压后再进行检验,而是把comprssed message做为单个Message进行检验。
  2. 它计算checksum时,是计算的MagicByte及其之后的内容。
     def computeChecksum(): Long = 
        CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset,  buffer.limit - MagicOffset)

     

  3. 它比较的是entrySize与MaxMessageSize的大小,来肯定这个消息是否太大
      def entrySize(message: Message): Int = LogOverhead + message.size
    
    ---------------------------------
    
      val MessageSizeLength = 4
      val OffsetLength = 8
      val LogOverhead = MessageSizeLength + OffsetLength

     

  4. 它返回的LogAppendInfo中会包括一个targetCodec,指明这个MessageSet将要使用的压缩方式。leader处理produce request时,将使用这个压缩方式从新压缩整个MessageSet。
        val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)

    config.compressionType就是broker配置里的compression.type的值,若是它是“producer", 就会使用producer request使用压缩方式,不然就使用config.compressionType指明的压缩方式。注意若是一个MessageSet里的Message采用了不一样的压缩方式,最后被当成sourceCodec的是最后一个压缩了的消息的压缩方式。

再次检验而且赋予offset :validateMessagesAndAssignOffsets

只有leader处理produce request时,会调用ByteBufferMessageSet的这个方法。 它不会检测analyzeAndValidateMessageSet已经检测的内容,可是会把这个MessageSet进行深度遍历(即若是它里边的消息是压缩后,就把这个消息解压开再遍历),这样它就能作analyzeAndValidateMessageSet不能进行的检测:对于compacted topic检测其key是否为空,若是为空就抛出InvalidMessageException。

另外,它会把深度遍历后得到的Message放在一块儿从新压缩。

若是MessageSet的尾部不是完整的Message呢?

这是在获取ByteBufferMessageSet的iternalIterator时候处理的。

      def makeNextOuter: MessageAndOffset = {
        // if there isn't at least an offset and size, we are done
        if (topIter.remaining < 12)
          return allDone()
        val offset = topIter.getLong()
        val size = topIter.getInt()
        if(size < Message.MinHeaderSize)
          throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator")

        // we have an incomplete message
        if(topIter.remaining < size)
          return allDone()
    .    ...
    }    

注意返回allDone()和抛出InvalidMessageException的时机。

  • 若是这个MessageSet剩下部分不到12bytes,那剩下的部分就是下一个MessageSet头部的一部分,是无法处理的,也是没办法检验的,所以就返回allDone。
  • 若是够12bytes,就能够读出offset和MessageSize。MessageSize至少会大于Message头里边的那些crc、Attributes, MagicBytes等加起来的大小,所以若是MessageSize比这个还小,就确定是个entry有问题,因此就抛出异常。这里的问题在于,即便MessageSet最后的那个Message是不完整的,只要MessageSize有问题,也会抛异常,而不是忽略这个不完整的Message。(这个多是没考虑到,也多是有别的考虑,不过不管怎么处理最后的这个不完整的Message,都有必定的道理)。

 consumer端的验证

consumer(0.9)会检查checksum,不过是能够配置的,缘由正如config里说的同样。

    public static final String CHECK_CRCS_CONFIG = "check.crcs";
    private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";

config的文档说,检查checksum是为了"ensures no on-the-wire or on-disk corruption to the message occurred."即,为了保证没有在网络传输出或者磁盘存储时出现了消息的损坏。可是checksum计算时会带来开销,因此追求最佳性能,能够关掉checksum的检查。


 

下面来看一下几个与消息格式相关的KIP。为何须要这些改变呢?为何以前没有实现这些改变呢?都是由于各类折衷吧,需求与性能折衷,需求与实现所需的工做量的折衷……

下面的几个KIP可能会一块儿加上去,毕竟都是对消息格式的修改,不能搞冲突了。

KIP-31 - Move to relative offsets in compressed message sets

前边提到了,在leader收到ProduceRequet以后,它会解压开compressed message(也就是是这个KIP里的compressed messageset,这两说说法的确有些乱),而后给里边包含的message set的每条消息从新赋予offset。这个作法也是应该的,乍一看也没什么很差。可是问题在于,不只是直接改个offset这么简单,在改完以后,须要从新压缩这些消息,还要计算。这么一搞,开销就大了。KIP-31就是想把这部分的性能损失降下来。(这个KIP已是accepted状态)

作法是把在一个compressed message set里边的每一个message的offset里记下当前message相对于外层的wrapper message的偏移。用汉语说这个意思比较费劲,KIP里这么说

When the producer compresses a message, write the relative offset value in the raw message's offset field. Leave the wrapped message's offset blank.

When broker receives a compressed message, it only needs to 

    1. Decompress the message to verify the CRC and relative offset.
    2. Set outer message's base offset. The outer message's base offset will be the offset of the last inner message.  (Since the broker only needs to update the message-set header, there is no need to re-compress message sets.)

注意,这个wrapper message里记的base offset, 是它所含的message set里的最后一个message的offset。这个和当前的compressed message的offset是一致的。

而后当broker收到一个压缩后的消息时,它只须要

  • 验证CRC与realtive offset的正确性
  • 从新设定外层消息的offset,也就是base offset。

KIP-32 - Add timestamps to Kafka message

在消息里加时间戳。须要注意的是,这个KIP还在讨论中(如下的内容是基于2016年1月7日的版本)。不像上一个已经肯定了。

(俺是以为这个事情早该作了……)

首先,来看一下动机,这个提有意思

Motivation

This KIP tries to address the following issues in Kafka.

  1. Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
  2. Log rolling might break for a newly created replica as well because of the same reason as (1).
  3. Some use cases such as streaming processing needs a timestamp in messages.

说的是这几个缘由

1. Log retention会不靠谱。当前log retention是在log segment层面作的,是按照log segment的最后修改时间肯定是否要删除一个log segment. 可是,当replica重分配发生时,新被分配的这个replica的log segment的修改时间会被设成当前时间。这么一来,它就不能被按照log retention想要作的那样(其实是想把一段时间以前的消息删除)被删除。

2. 因为和1一样的缘由,对于一个新建立的replica(意思应该是移动位置的replica, 并非增长分区后新加的replica)log rolling有时候也会不靠谱。

3. 有些场景中须要消息含有时间戳,好比流处理。

感受,貌似第三个缘由才是决定性的,拥抱流处理。

接口的变化

准备在Message里加入timestamp字段

准备增长两个配置

  • message.timestamp.type 能够选CreateTime或者LogAppendTime,CreateTime就是这条消息生成的时间,是在producer端指定的。LogAppendTime就是append到log的时间(实现细节没有说明)。
  • max.message.time.difference.ms 若是选择了CreateTime, 那么只有当createTime和broker的本地时间相差在这个配置指定的差距以内,broker才会接受这条消息。

纠结之处

以前关于这个KIP的讨论主要是关于使用哪一个时间, 是使用LogAppendTime(broker time),仍是CreateTime(application time)。

两种都有利有弊:

The good things about LogAppendTime are: 使用LogAppendTime的好处在于

  1. Broker is more robust. Broker比起用户程序更健壮(更不容易出错,好比用户程序可能有bug,致使CreateTime设置的不正确,想想KIP-33,若是错得离谱,索引怎么建?)
  2. Monotonically increasing. LogAppendTime是单调增加的。(可是,follower收到的消息的timestamp该怎么设呢?若是不用leader带来的,就不能肯定是否monotonically increasing)
  3. Deterministic behavior for log rolling and retention.log rolling和retention的行为是肯定性的。(若是按消息里的这个timestamp来决定这两个操做的行为,那么让用户指定timestamp的确挺危险的)
  4. If CreateTime is required, it can always be put into the message payload.若是须要CreateTime,能够加到消息的内容里。(这个的确是……)

The good things about CreateTime are: 使用CreateTime的好处是

  1. More intuitive to users. 更符合用户的思惟(用户固然是想使用本身填进去的时间)。
  2. User may want to have log retention based on when the message is created instead of when the message enters the pipeline.用户可能更但愿用消息被建立的时间来决定log retention的行为,而不是消息进行处理管道的时间。
  3. Immutable after entering the pipeline.这样,消息的timestamp在进入管道后就不会再改变了。

在俺看来,这两个选择的确挺纠结的。用户确定是想用本身产生消息的时间,否则很难准确地找到一条消息。可是,若是使用用户指定的时间,broker端的行为就变得复杂了,好比,若是用户指定的时间不是单调递增的,该怎么建时间索引。可是用户产生畸形的时间,倒能够经过配置里max.message.time.difference.ms来控制。或许能够加另外一个配置,容许broker在必定范围内修改CreateTime,好比最多能够更改1000ms。这样就能即便消息的timestamp单调增加,也能使用户对消息的时间的估计比较准确。不过,这样可能就须要让broker time的含义变成broker收到消息时间,而不是append到log的时间。不然就难以肯定什么时候该拒绝没法在指定范围内修改timestamp的消息。

 

KIP-33 - Add a time based log index

动机:

当前按照时间戳查找offset获得的结果是很是粗粒度的,只能在log segment的级别。(对于reassigned replica就差得没谱了。)因此这个KIP提议建一个基于时间的对日志的索引,来容许按timestamp搜索消息的结果更准确。

这个KIP和KIP-32是紧密相关的。这俩KIP都在讨论过程当中。

相关文章
相关标签/搜索