关于Kafka __consumer_offests的讨论

 众所周知,__consumer__offsets是一个内部topic,对用户而言是透明的,除了它的数据文件以及偶尔在日志中出现这两点以外,用户通常是感受不到这个topic的。不过咱们的确知道它保存的是Kafka新版本consumer的位移信息。本文咱们简单梳理一下这个内部topic(以1.0.0代码为分析对象)java

1、什么时候被建立?json

首先,咱们先来看下 它是什么时候被建立的?__consumer_offsets建立的时机有不少种,主要包括:数组

  • broker响应FindCoordinatorRequest请求时
  • broker响应MetadataRequest显式请求__consumer_offsets元数据时

其中以第一种最为常见,而第一种时机的表现形式可能有不少,好比用户启动了一个消费者组(下称consumer group)进行消费或调用kafka-consumer-groups --describe等spa

2、消息种类线程

__consumer_offsets中保存的记录是普通的Kafka消息,只是它的格式彻底由Kafka来维护,用户不能干预。严格来讲,__consumer_offsets中保存三类消息,分别是:3d

  • Consumer group组元数据消息
  • Consumer group位移消息
  • Tombstone消息

2.1 Consumer group组元数据消息日志

咱们都知道__consumer_offsets是保存位移的,但实际上每一个消费者组的元数据信息也保存在这个topic。这些元数据包括:orm

这里不详细展开组元数据各个字段的含义。咱们只须要知道组元数据消息也是保存在__consumer_offsets中便可。值得一提的是, 若是用户使用standalone consumer(即consumer.assign(****)方法),那么就不会写入这类消息,毕竟咱们使用的是独立的消费者,而没有使用消费者组。server

这类消息的key是一个二元组,格式是【版本+groupId】,这里的版本表征这类消息的版本号,无实际用途;而value就是上图全部这些信息打包而成的字节数组。对象

2.2 Consumer group组位移提交消息

若是只容许说出__consumer_offsets的一个功能,那么咱们就记住这个好了:__consumer_offsets保存consumer提交到Kafka的位移数据。这句话有两个要点:1. 只有当consumer group向Kafka提交位移时才会向__consumer_offsets写入这类消息。若是你的consumer压根就不提交位移,或者你将位移保存到了外部存储中(好比Apache Flink的检查点机制或老版本的Storm Kafka Spout),那么__consumer_offsets中就是无位移数据;2. 这句话中的consumer既包含consumer group也包含standalone consumer。也就是说,只要你向Kafka提交位移,不论使用哪一种java consumer,它都是向__consumer_offsets写消息。

这类消息的key是一个三元组,格式是【groupId + topic + 分区号】,value则是要提交的位移信息,以下图所示:

位移就是待提交的位移,提交时间是提交位移时的时间戳,而过时时间则是用户指定的过时时间。因为目前consumer代码在提交位移时并无明确指定过时间隔,故broker端默认设置过时时间为提交时间+offsets.retention.minutes参数值,即提交1天以后自动过时。Kafka会按期扫描__consumer_offsets中的位移消息并删除掉那些过时的位移数据。

上图中还有个“自定义元数据”,实际上consumer容许用户在提交位移时指定一些特殊的自定义信息。咱们不对此进行详细展开,由于java consumer根本就没有使用到它。相反地,Kafka Streams利用该字段来完成某些定制任务。

2.3 tombstone消息或Delete Mark消息

 第三类消息成为tombstone消息或delete mark消息。这类消息只出如今源码中而不暴露给用户。它和第一类消息很像,key都是二元组【版本+groupId】,惟一的区别在于这类消息的消息体是null,即空消息体。什么时候写入这类消息?前面说过了,Kafka会按期扫描过时位移消息并删除之。一旦某个consumer group下已没有任何active成员且全部的位移数据都已被删除时,Kafka会将该group的状态置为Dead并向__consumer__offsets对应分区写入tombstone消息,代表要完全删除这个group的信息。简单来讲,这类消息就是用于完全删除group信息的。

3、什么时候写入?

第一类消息是在组rebalance时写入的;第二类消息是在提交位移时写入的;第三类消息是在Kafka后台线程扫描并删除过时位移或者__consumer_offsets分区副本重分配时写入的。

4、消息留存策略

__consumer_offsets目前的留存策略是compact,__consumer_offsets会按期对消息内容进行compact操做——用户也能够同时启用两种留存策略来减小该topic所占的磁盘空间,不过要承担可能丢失位移数据的风险。

5、副本因子

__consumer_offest不受server.properties中num.partitions和default.replication.factor参数的制约。相反地,它的分区数和备份因子分别由offsets.topic.num.partitions和offsets.topic.replication.factor参数决定。这两个参数的默认值分别是50和1,表示该topic有50个分区,副本因子是1。鉴于位移和group元数据等信息都保存在该topic中,实际使用过程当中不少用户都会将offsets.topic.replication.factor设置成大于1的数以增长可靠性,这是推荐的作法。不过在0.11.0.0以前,这个设置是有缺陷的:假设你设置了offsets.topic.replication.factor = 3,只要Kafka建立该topic时可用broker数<3,那么建立出来的__consumer_offsets的备份因子就是2。也就是说Kafka并无尊重咱们设置的offsets.topic.replication.factor参数。好在这个问题在0.11.0.0版本获得了解决,如今用户在使用时,一旦须要建立__consumer_offsets了Kafka必定会保证凑齐足量的broker才会开始建立,不然就抛出异常给用户。

平常使用中,另外一个常见的问题是如何扩展该topic的副本因子。因为它依然是一个Kafka topic,所以咱们能够调用bin/kafka-reassign-partitions.sh(bat)脚原本扩展replication factor。作法以下:

1. 构造一个json文件,以下所示,其中1,2,3表示3台broker的ID

{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":1,"replicas":[2,3,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[3,1,2]},
{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,3]},
...
{"topic":"__consumer_offsets","partition":49,"replicas":[2,3,1]}
]}

2. 运行bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file reassign.json --execute

若是一切正常,你会发现__consumer_offsets的replication factor已然被扩展为3。

6、如何删除group信息

首先明确一点,Kafka是会删除consumer group信息的,既包括位移信息,也包括组元数据信息。对于位移信息而言,前面提到过每条位移消息都设置了过时时间。每一个Kafka broker在后台会启动一个线程,按期(由offsets.retention.check.interval.ms肯定,默认10分钟)扫描过时位移,并删除之。而对组元数据而言,删除它们的条件有两个:1. 这个group下不能存在active成员,即全部成员都已经退出了group;2. 这个group的全部位移信息都已经被删除了。当知足了这两个条件后,Kafka后台线程会删除group运输局信息。

好了, 咱们总说删除,那么Kafka究竟是怎么删除的呢——正是经过写入具备相同key的tombstone消息。咱们举个例子,假设__consumer_offsets当前保存有一条位移消息,key是【testGroupid,test, 0】(三元组),value是待提交的位移信息。不管什么时候,只要咱们向__consumer_offsets相同分区写入一条key=【testGroupid,test, 0】,value=null的消息,那么Kafka就会认为以前的那条位移信息是能够删除的了——即至关于咱们向__consumer_offsets中插入了一个delete mark。

再次强调一下,向__consumer_offsets写入tombstone消息仅仅是标记它以前的具备相同key的消息是能够被删除的,但删除操做一般不会当即开始。真正的删除操做是由log cleaner的Cleaner线程来执行的。

 

鉴于目前水平有限,能想到的就这么多。有相关问题的读者能够将问题发动评论区,若是具备较大的共性,我会添加到本文的末尾~~

相关文章
相关标签/搜索