经过KIP32,Kafka的每条消息都加进了时间戳,这个KIP在0.10.0.0被加入。算法
说到“时间”,先贴张图,娱乐一下(若是对星球大战系列电影不熟的话,请自动略过……)apache

这个KIP的文档在服务器
下面贴一下这个KIP的关键部分,俺的注解部分用灰色的字标识。app
Motivation 动机
This KIP tries to address the following issues in Kafka.ide
- Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
- Log rolling might break for a newly created replica as well because of the same reason as (1).
- Some use cases such as streaming processing needs a timestamp in messages.
To solve the above issues, we propose to add a timestamp to Kafka messages.性能
前两个缘由都和replica的从新分配有关,replica从新分配就是把某个分区的副本迁移到另外一台机器上,一般是为了调节机器的负载,增长副本数,或者移除机器。在进行replica迁移的时候,Kafka会在迁移的目的地新建一个replica,而且从当前的leader处抓取消息,直到新的副本和leader同步,而后停掉再也不保留的replica。相关的文档在这里。 那么,这里就存在一个问题,就是新的replic在得到数据时,从leader的哪一个offset开始拉取数据呢?若是直接从最新的数据开始拉,那么这个replica的数据就不足以承担它做为“副本”的任务,所以,确定是从最旧的offset开始拉。这个操做,在源码里的调用过程挺复杂的……大概会通过LeaderAndIsrRequest 处理 -> becomeFollower -> createReplica -> ReplicaFetcherThreader.handleOffsetOutOfRange,而后新的replica就会从leader的最先的offset开始拉取消息,而且写成文件。这里就迁扯到了Log的rolling和retention的问题。retention的本意是为了消除再也不须要的消息或者限制Kafka在本地存储的大小。当为了第一个目的时,应该删除已经保存了好久的消息,这个“好久”是指消息进入到Kafka的时间(或者消息产生的时间)距离当前时间过了好久。roll的本意是为了保持单个文件不要太大,过大的文件不利于retention。可是,因为Kafka的消息中没有时间戳,因此新的replica是不知道消息真正进入Kafka的时间(或产生的时间)的,因此roll和rotention机制就没法正确地工做。“没法正确的工做”表如今log retention是依据于log segment(对应于一个log文件)的最后修改时间,而log rolling是依据于依据于log segment的建立时间。当replica reassign发生时,新的replica里最初的这些log segment的建立时间和修改时间都不能反应这个log segment里边消息的产生或处理时间。flex
第三个增长timestamp的缘由,是因为流处理系统的须要,好比上边那幅图,是StephanEwan在讲Apache Flink的时间举的例子。ui
Public Interfaces 与外部协议相关的改变
This KIP has the following public interface changes:this
- Add a new timestamp field to the message format.
- Use the forth least significant bit to indicate the timestamp type. (0 for CreateTime, 1 for LogAppendTime)
- Add the following two configurations to the broker
- message.timestamp.type - This topic level configuration defines the type of timestamp in the messages of a topic. The valid values are CreateTime or LogAppendTime.
- max.message.time.difference.ms - This configuration only works when message.timestamp.type=CreateTime. The broker will only accept messages whose timestamp differs no more than max.message.time.difference.ms from the broker local time.
- Add a timestamp field to ProducerRecord and ConsumerRecord. A producer will be able to set a timestamp for a ProducerRecord. A consumer will see the message timestamp when it sees the messages.
- Add ProduceRequest/ProduceResponse V2 which uses the new message format.
- Add a timestamp in ProduceResponse V2 for each partition. The timestamp will either be LogAppendTime if the topic is configured to use it or it will be NoTimestamp if create time is used.
- Add FetchRequest/FetchResponse V2 which uses the new message format.
- Add a timestamp variable to RecordMetadata. The timestamp is the timestamp of messages appended to partition log.
最重要的有三点:spa
1. 由用户来指定这个时间戳的确切含义,能够指定两种含义里一个:1. 建立时间,2. 消息append到log的时间。当用户指定时间戳的含义是create time时,broker会拒绝消息的create time与它进入到broker的时间相差过大的消息。实际上,对于用户而言,这是个艰难的选择。例如,选择create time的话,这个timestamp是由用户指定的,因此就可能在错误或者误差,从而影响到rolling和retention的正常工做(好比可能根本不retention,从而写满磁盘)。对这种状况,能够经过max.message.time.difference.ms来避免。可是,还有些其它的状况比这种要复杂,在这个KIP的文档里讨论了各类选择的优缺点。
2. 用户能够经过producer指定这个时间以人为戳,consumer能够得到这个时间戳。可是时间戳的含义仍是由第一点来肯定。
3. 须要更改Kafka协议,因此会致使与旧版本的兼容性问题。在KIP的文档里描述了升级的方案,因此也没必要过去担忧。
CreateTime和LogAppendTime的优劣(简要)
There are three options proposed before this proposal. The details of option 1, option 2 and Option 3 are in the Rejected Alternatives section.
The key decision we made in this KIP is whether use LogAppendTime(Broker Time) or CreateTime(Application Time)
The good things about LogAppendTime are:
- Broker is more robust.
- Monotonically increasing.
- Deterministic behavior for log rolling and retention.
- If CreateTime is required, it can always be put into the message payload.
LogAppendTime的好处是:
- Broker更加健壮(与create time相比,timestamp彻底在broker的掌握之中,因此broker的行为更肯定)
- 单调增加
- log rolling和retention的行为是肯定的
- 若是须要CreateTime,老是能够把它放在消息的负载中
The good things about CreateTime are:
- More intuitive to users.
- User may want to have log retention based on when the message is created instead of when the message enters the pipeline.
- Immutable after entering the pipeline.
CreateTime的好处在于:
- 对于用户更直观
- 用户可能但愿log retention基于消息的建立时间而不是消息进入流水线(指消息的处理流程)的时间。
- 进入流水线之后就再也不改变。
Because both LogAppendTime and CreateTime have their own use cases, the proposed change provides users with the flexibility to choose which one they want to use.
For more detail discussion please refer to the mail thread as well as the Rejected Alternatives section.
This KIP is closely related to KIP-33. Some of the contents listed in this KIP are actually part of KIP-33. They are put here because they are important for the design decision. More specifically, KIP-33 will implement the following changes:
- Build a time index for each log segment using the timestamps in messages.
- Enforce log retention and log rolling use time based index.
因为LogAppendTime和CreateTime各自有它们的使用场景,这个KIP的提议是由用户本身选择。
关于更详尽地讨论请参照邮件组的讨论,以及下面的Rejected Alternatives一节。
这个KIP与KIP-33紧密相关,KIP-33将会作下面的变化:
1. 基于消息中的时间戳,为每一个log segment建立基于时间的索引。
2. 使用基于时间的索引来增强log retention和log rolling。
- 容许用户在生产消息时候加入时间戳
- 当一个做为leader的broker收到消息时
- 若是message.timestamp.type=LogAppendTime, broker会用本身的本地时间覆盖消息的时间戳,而且把消息追加到log
- 若是收到的是压缩的消息,那么包装后的消息的TS(timestamp)将会用当前的服务器时间覆盖。Broker将会把包装后消息的timestamp type位 置为1。Broker会忽略内部消息的时间戳。在使用LogAppendTime时,之因此不修改每一个内部消息的TS,是为了不从新压缩带来的性能损失。
- 若是消息没有压缩,那么消息的TS将会被覆盖为服务器的本地时间
- 若是message.timestamp.type=CreateTime
- 若是时间差在max.message.time.difference.ms以内,那么broker将会接收这个消息而且把它追加到log。对于压缩后的消息,broker将会把压缩后消息的TS更新为内部消息的最大的TS。
- 若是时间差超过了max.message.time.difference.ms, broker将会以TimestampExceededThresholdException的形式拒绝整批消息。
- 当一个follwer broker收到一个消息时
- 若是这个消息是压缩后的消息,follower broker会使用压缩后消息的TS来构建索引。也就是说,一个压缩后消息的TS老是它的全部内部消息的TS里最大的一个(译注:之因此这么作,是为了构建索引,这与按TS索引的算法有关)。
- 若是这个消息是一个没有压缩的消息,那么这个消息的TS将会被用来构建索引。
- 当一个consumer收到消息时
- 若是这个消息是一个压缩后的消息
- 若是包装后消息的timestamp属性位是0(CreateTime),那么将会使用内部消息的时间戳
- 若是包装后消息的timestamp属性位是1,那么包装消息的TS将会被用做内部消息的TS
- 若是消息是一个没有压缩的消息,那么这个消息的TS将会被使用。
- message.timestamp.type和max.message.time.difference.ms将会是能够按topic配置的
- 在ProduceResponseV2中,每一个partition都会返回一个TS
- 若是topic使用LogAppendTime,那么返回的TS将会是这个message set的LogAppendTime.
- 若是topic使用CreateTime,那么返回的TS将会是NoTimestamp
- 若是producer为每一个消息启用了callback,那么若是produce response不是NoTimestamp,它就会使用produce response中的TS,不然就使用producer记录的的TS。
- 在这种状况下,producer将没法分辨TS是LogAppendTime仍是CreateTime。
- 基于使用的索引有如下的保证(请注意基于时间的索引将会在KIP-33中被实现,而不是这个KIP。之因此在这里讨论索引的问题,是由于它和这个KIP的设计紧密相关)
- 若是用户索引一个时间戳:
- 因此在这个时间戳以后的消息都将会被消息
- 用户可能会看到更早的消息
- log retention将会按照时间索引文件的最后一个条目。由于最后一个条目将会是整个log segment里将新的timestamp。若是这个entry过时了,那么整个log segment将会被删除。
- log rolling将会依据全部见到过的消息的最大的timestamp。If no message is appended in log.roll.ms after largest appended timestamp, a new log segment will be rolled out.(????这个不合逻辑呀,明显要用最先的timestamp)
- 这个提议的很差的方面包括:
- 若是message.timestamp.typ=CreateTime的话,timestamp可能不会是递增的
- log retention可能不是肯定的。也就是说,一个应该被删除的消息如今依赖于同一个log segment的其它消息。而且,这些由用户提供的时间戳依赖于被配置的时间差的阀值(即,max.message.time.differnece.ms)。
- 尽管这个提议有这些缺点,可是它给了用户使有时间戳的灵活性。
- 若是message.timestamp.type=CreateTime
- 若是时间差阀值被设为Long.MaxValue,那么消息里的时间戳就等于CreateTime
- 若是时间差阀值在0和Long.MaxValue之间,就能保证消息的时间戳总能在一个肯定的范围以内
- 若是message.timestamp.type=LogAppendTime,那么时间戳就会是log append time.
总结:
关于时间戳的类型的选择:CreateTime仍是LogAppendTime,仍是得依据于具体的使用场景。好比,若是强烈须要使用event time来进行后续的处理,那就只能选create time。重要的是在选择好一种类型之后,了解它对于Kafka的各类行为的影响。