深刻剖析Kafka

本文来自OPPO互联网技术团队,转载请注名做者。同时欢迎关注OPPO互联网技术团队的公众号:OPPO_tech,与你分享OPPO前沿互联网技术及活动。html

Kafka是一个分布式的基于发布、订阅的消息系统,具备着高吞吐、高容错、高可靠以及高性能等特性,主要用于应用解耦、流量削峰、异步消息等场景。nginx

为了让你们更加深刻的了解Kafka内部实现原理,文中将会从主题与日志开始介绍消息的存储、删除以及检索,而后介绍其副本机制的实现原理,最后介绍生产与消费的实现原理以便更合理的应用于实际业务。git

另外,本文较长,建议点赞后再慢慢看 :)算法

1. 引言

Kafka是一个分布式的基于发布、订阅的消息系统,有着强大的消息处理能力,相比与其余消息系统,具备如下特性:bootstrap

  • 快速数据持久化,实现了O(1)时间复杂度的数据持久化能力。
  • 高吞吐,能在普通的服务器上达到10W每秒的吞吐速率。
  • 高可靠,消息持久化以及副本系统的机制保证了消息的可靠性,消息能够屡次消费。
  • 高扩展,与其余分布式系统同样,全部组件均支持分布式、自动实现负载均衡,能够快速便捷的扩容系统。
  • 离线与实时处理能力并存,提供了在线与离线的消息处理能力。

正是因其具备这些的优秀特性而普遍用于应用解耦、流量削峰、异步消息等场景,好比消息中间件、日志聚合、流处理等等。segmentfault

本文将从如下几个方面去介绍kafka:缓存

  1. 第一章简单介绍下kafka做为分布式的消息发布与订阅系统所具有的特征与优点
  2. 第二章节介绍kafka系统的主题与日志,了解消息如何存放、如何检索以及如何删除
  3. 第三章节介绍kafka副本机制以了解kafka内部如何实现消息的高可靠
  4. 第四章节将会从消息的生产端去介绍消息的分区算法以及幂等特性的具体实现
  5. 第五章节将从消息的消费端去了解消费组、消费位移以及重平衡机制具体实现
  6. 最后章节简单总结下本文

2. 主题与日志

2.1 主题

主题是存储消息的一个逻辑概念,能够简单理解为一类消息的集合,由使用方去建立。Kafka中的主题通常会有多个订阅者去消费对应主题的消息,也能够存在多个生产者往主题中写入消息。服务器

每一个主题又能够划分红多个分区,每一个分区存储不一样的消息。当消息添加至分区时,会为其分配一个位移offset(从0开始递增),并保证分区上惟一,消息在分区上的顺序由offset保证,即同一个分区内的消息是有序的,以下图所示网络

同一个主题的不一样分区会分配在不一样的节点上(broker),分区时保证Kafka集群具备水平扩展的基础。session

以主题nginx_access_log为例,分区数为3,如上图所示。分区在逻辑上对应一个日志(Log),物理上对应的是一个文件夹。

drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx\_access\_log-0/  
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx\_access\_log-1/  
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx\_access\_log-2/

消息写入分区时,其实是将消息写入分区所在的文件夹中。日志又分红多个分片(Segment),每一个分片由日志文件与索引文件组成,每一个分片大小是有限的(在kafka集群的配置文件log.segment.bytes配置,默认为1073741824byte,即1GB),当分片大小超过限制则会从新建立一个新的分片,外界消息的写入只会写入最新的一个分片(顺序IO)。

\-rw-r--r--  1 root root    1835920 10月 11 19:18 00000000000000000000.index  
\-rw-r--r--  1 root root 1073741684 10月 11 19:18 00000000000000000000.log  
\-rw-r--r--  1 root root    2737884 10月 11 19:18 00000000000000000000.timeindex  
\-rw-r--r--  1 root root    1828296 10月 11 19:30 00000000000003257573.index  
\-rw-r--r--  1 root root 1073741513 10月 11 19:30 00000000000003257573.log  
\-rw-r--r--  1 root root    2725512 10月 11 19:30 00000000000003257573.timeindex  
\-rw-r--r--  1 root root    1834744 10月 11 19:42 00000000000006506251.index  
\-rw-r--r--  1 root root 1073741771 10月 11 19:42 00000000000006506251.log  
\-rw-r--r--  1 root root    2736072 10月 11 19:42 00000000000006506251.timeindex  
\-rw-r--r--  1 root root    1832152 10月 11 19:54 00000000000009751854.index  
\-rw-r--r--  1 root root 1073740984 10月 11 19:54 00000000000009751854.log  
\-rw-r--r--  1 root root    2731572 10月 11 19:54 00000000000009751854.timeindex  
\-rw-r--r--  1 root root    1808792 10月 11 20:06 00000000000012999310.index  
\-rw-r--r--  1 root root 1073741584 10月 11 20:06 00000000000012999310.log  
\-rw-r--r--  1 root root         10 10月 11 19:54 00000000000012999310.snapshot  
\-rw-r--r--  1 root root    2694564 10月 11 20:06 00000000000012999310.timeindex  
\-rw-r--r--  1 root root   10485760 10月 11 20:09 00000000000016260431.index  
\-rw-r--r--  1 root root  278255892 10月 11 20:09 00000000000016260431.log  
\-rw-r--r--  1 root root         10 10月 11 20:06 00000000000016260431.snapshot  
\-rw-r--r--  1 root root   10485756 10月 11 20:09 00000000000016260431.timeindex  
\-rw-r--r--  1 root root          8 10月 11 19:03 leader-epoch-checkpoint

一个分片包含多个不一样后缀的日志文件,分片中的第一个消息的offset将做为该分片的基准偏移量,偏移量固定长度为20,不够前面补齐0,而后将其做为索引文件以及日志文件的文件名,如00000000000003257573.index00000000000003257573.log00000000000003257573.timeindex、相同文件名的文件组成一个分片(忽略后缀名),除了.index.timeindex.log后缀的日志文件外其余日志文件,对应含义以下:

文件类型 做用
.index 偏移量索引文件,记录<相对位移,起始地址>映射关系,其中相对位移表示该分片的第一个消息,从1开始计算,起始地址表示对应相对位移消息在分片.log文件的起始地址
.timeindex 时间戳索引文件,记录<时间戳,相对位移>映射关系
.log 日志文件,存储消息的详细信息
.snaphot 快照文件
.deleted 分片文件删除时会先将该分片的全部文件加上.delete后缀,而后有delete-file任务延迟删除这些文件(file.delete.delay.ms能够设置延时删除的的时间)
.cleaned 日志清理时临时文件
.swap Log Compaction 以后的临时文件
.leader-epoch-checkpoint

2.2 日志索引

首先介绍下.index文件,这里以文件00000000000003257573.index为例,首先咱们能够经过如下命令查看该索引文件的内容,咱们能够看到输出结构为<offset,position>,实际上索引文件中保存的并非offset而是相对位移,好比第一条消息的相对位移则为0,格式化输出时加上了基准偏移量,如上图所示,<114,17413>表示该分片相对位移为114的消息,其位移为3257573+114,即3257687,position表示对应offset在.log文件的物理地址,经过.index索引文件则能够获取对应offset所在的物理地址。索引采用稀疏索引的方式构建,并不保证分片中的每一个消息都在索引文件有映射关系(.timeindex索引也是相似),主要是为了节省磁盘空间、内存空间,由于索引文件最终会映射到内存中。

\# 查看该分片索引文件的前10条记录  
bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.index |head \-n 10  
Dumping /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.index  
offset: 3257687 position: 17413  
offset: 3257743 position: 33770  
offset: 3257799 position: 50127  
offset: 3257818 position: 66484  
offset: 3257819 position: 72074  
offset: 3257871 position: 87281  
offset: 3257884 position: 91444  
offset: 3257896 position: 95884  
offset: 3257917 position: 100845  
\# 查看该分片索引文件的后10条记录  
$ bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.index |tail \-n 10  
offset: 6506124 position: 1073698512  
offset: 6506137 position: 1073702918  
offset: 6506150 position: 1073707263  
offset: 6506162 position: 1073711499  
offset: 6506176 position: 1073716197  
offset: 6506188 position: 1073720433  
offset: 6506205 position: 1073725654  
offset: 6506217 position: 1073730060  
offset: 6506229 position: 1073734174  
offset: 6506243 position: 1073738288
好比查看offset为 6506155的消息:首先根据offset找到对应的分片,65061所对应的分片为 00000000000003257573,而后经过二分法在 00000000000003257573.index文件中找到不大于6506155的最大索引值,获得<offset: 6506150, position: 1073707263>,而后从 00000000000003257573.log的1073707263位置开始顺序扫描找到offset为650155的消息

Kafka从0.10.0.0版本起,为分片日志文件中新增了一个.timeindex的索引文件,能够根据时间戳定位消息。一样咱们能够经过脚本kafka-dump-log.sh查看时间索引的文件内容。

\# 查看该分片时间索引文件的前10条记录  
bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex |head \-n 10  
Dumping /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex  
timestamp: 1570792689308 offset: 3257685  
timestamp: 1570792689324 offset: 3257742  
timestamp: 1570792689345 offset: 3257795  
timestamp: 1570792689348 offset: 3257813  
timestamp: 1570792689357 offset: 3257867  
timestamp: 1570792689361 offset: 3257881  
timestamp: 1570792689364 offset: 3257896  
timestamp: 1570792689368 offset: 3257915  
timestamp: 1570792689369 offset: 3257927  
​  
\# 查看该分片时间索引文件的前10条记录  
bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex |tail \-n 10  
Dumping /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex  
timestamp: 1570793423474 offset: 6506136  
timestamp: 1570793423477 offset: 6506150  
timestamp: 1570793423481 offset: 6506159  
timestamp: 1570793423485 offset: 6506176  
timestamp: 1570793423489 offset: 6506188  
timestamp: 1570793423493 offset: 6506204  
timestamp: 1570793423496 offset: 6506214  
timestamp: 1570793423500 offset: 6506228  
timestamp: 1570793423503 offset: 6506240  
timestamp: 1570793423505 offset: 6506248
好比我想查看时间戳 1570793423501开始的消息:1.首先定位分片,将 1570793423501与每一个分片的最大时间戳进行对比(最大时间戳取时间索引文件的最后一条记录时间,若是时间为0则取该日志分段的最近修改时间),直到找到大于或等于 1570793423501的日志分段,所以会定位到时间索引文件 00000000000003257573.timeindex,其最大时间戳为 1570793423505;2.经过二分法找到大于或等于 1570793423501的最大索引项,即<timestamp: 1570793423503 offset: 6506240>(6506240为offset,相对位移为3247667);3.根据相对位移3247667去索引文件中找到不大于该相对位移的最大索引值<3248656,1073734174>;4.从日志文件 00000000000003257573.log的1073734174位置处开始扫描,查找不小于 1570793423501的数据。

2.3 日志删除

与其余消息中间件不一样的是,Kafka集群中的消息不会由于消费与否而删除,跟日志同样消息最终会落盘,并提供对应的策略周期性(经过参数log.retention.check.interval.ms来设置,默认为5分钟)执行删除或者压缩操做(broker配置文件log.cleanup.policy参数若是为“delete”则执行删除操做,若是为“compact”则执行压缩操做,默认为“delete”)。

2.3.1 基于时间的日志删除

参数 默认值 说明
log.retention.hours 168 日志保留时间(小时)
log.retention.minutes 日志保留时间(分钟),优先级大于小时
log.retention.ms 日志保留时间(毫秒),优先级大于分钟

当消息在集群保留时间超过设定阈值(log.retention.hours,默认为168小时,即七天),则须要进行删除。这里会根据分片日志的最大时间戳来判断该分片的时间是否知足删除条件,最大时间戳首先会选取时间戳索引文件中的最后一条索引记录,若是对应的时间戳值大于0则取该值,不然为最近一次修改时间。

这里不直接选取最后修改时间的缘由是避免分片日志的文件被无心篡改而致使其时间不许。

若是刚好该分区下的全部日志分片均已过时,那么会先生成一个新的日志分片做为新消息的写入文件,而后再执行删除参数。

2.3.2 基于空间的日志删除

参数 默认值 说明
log.retention.bytes 1073741824(即1G),默认未开启,即无穷大 日志文件总大小,并非指单个分片的大小
log.segment.bytes 1073741824(即1G) 单个日志分片大小

首先会计算待删除的日志大小diff(totalSize-log.rentention.bytes),而后从最旧的一个分片开始查看能够执行删除操做的文件集合(若是diff-segment.size>=0,则知足删除条件),最后执行删除操做。

2.3.3 基于日志起始偏移量的日志删除

通常状况下,日志文件的起始偏移量(logStartOffset)会等于第一个日志分段的baseOffset,可是其值会由于删除消息请求而增加,logStartOffset的值其实是日志集合中的最小消息,而小于这个值的消息都会被清理掉。如上图所示,咱们假设logStartOffset=7421048,日志删除流程以下:

  • 从最旧的日志分片开始遍历,判断其下一个分片的baseOffset是否小于或等于logStartOffset值,若是知足,则须要删除,所以第一个分片会被删除。
  • 分片二的下一个分片baseOffset=6506251<7421048,因此分片二也须要删除。
  • 分片三的下一个分片baseOffset=9751854>7421048,因此分片三不会被删除。

2.4 日志压缩

前面提到当broker配置文件log.cleanup.policy参数值设置为“compact”时,则会执行压缩操做,这里的压缩跟普通意义的压缩不同,这里的压缩是指将相同key的消息只保留最后一个版本的value值,以下图所示,压缩以前offset是连续递增,压缩以后offset递增可能不连续,只保留5条消息记录。

Kafka日志目录下cleaner-offset-checkpoint文件,用来记录每一个主题的每一个分区中已经清理的偏移量,经过这个偏移量能够将分区中的日志文件分红两个部分:clean表示已经压缩过;dirty表示还未进行压缩,以下图所示(active segment不会参与日志的压缩操做,由于会有新的数据写入该文件)。

\-rw-r--r--  1 root root    4 10月 11 19:02 cleaner-offset-checkpoint  
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx\_access\_log-0/  
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx\_access\_log-1/  
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx\_access\_log-2/  
\-rw-r--r--  1 root root    0 9月  18 09:50 .lock  
\-rw-r--r--  1 root root    4 10月 16 11:19 log-start-offset-checkpoint  
\-rw-r--r--  1 root root   54 9月  18 09:50 meta.properties  
\-rw-r--r--  1 root root 1518 10月 16 11:19 recovery-point-offset-checkpoint  
\-rw-r--r--  1 root root 1518 10月 16 11:19 replication-offset-checkpoint  
​  
#cat cleaner-offset-checkpoint  
nginx\_access\_log 0 5033168  
nginx\_access\_log 1 5033166  
nginx\_access\_log 2 5033168

日志压缩时会根据dirty部分数据占日志文件的比例(cleanableRatio)来判断优先压缩的日志,而后为dirty部分的数据创建key与offset映射关系(保存对应key的最大offset)存入SkimpyoffsetMap中,而后复制segment分段中的数据,只保留SkimpyoffsetMap中记录的消息,压缩以后的相关日志文件大小会减小,为了不出现太小的日志文件与索引文件,压缩时会对全部的segment进行分组(一个组的分片大小不会超过设置的log.segment.bytes值大小),同一个分组的多个分片日志压缩以后变成一个分片。

如上图所示,全部消息都还没压缩前 clean checkpoint值为0,表示该分区的数据还没进行压缩,第一次压缩后,以前每一个分片的日志文件大小都有所减小,同时会移动 clean checkpoint的位置到这一次压缩结束的offset值。第二次压缩时,会将前两个分片{0.5GB,0.4GB}组成一个分组,{0.7GB,0.2GB}组成一个分组进行压缩,以此类推。

如上图所示,日志压缩的主要流程以下:

  1. 计算deleteHorizonMs值:当某个消息的value值为空时,该消息会被保留一段时间,超时以后会在下一次的得日志压缩中被删除,因此这里会计算deleteHorizonMs,根据该值肯定能够删除value值为空的日志分片。(deleteHorizonMs = clean部分的最后一个分片的lastModifiedTime - deleteRetionMs,deleteRetionMs经过配置文件log.cleaner.delete.retention.ms配置,默认为24小时)。
  2. 肯定压缩dirty部分的offset范围[firstDirtyOffset,endOffset):其中firstDirtyOffset表示dirty的起始位移,通常会等于clear checkpoint值,firstUncleanableOffset表示不能清理的最小位移,通常会等于活跃分片的baseOffset,而后从firstDirtyOffset位置开始遍历日志分片,并填充key与offset的映射关系至SkimpyoffsetMap中,当该map被填充满或到达上限firstUncleanableOffset时,就能够肯定日志压缩上限endOffset
  3. 将[logStartOffset,endOffset)中的日志分片进行分组,而后按照分组的方式进行压缩。

3. 副本

kafka支持消息的冗余备份,能够设置对应主题的副本数(--replication-factor参数设置主题的副本数可在建立主题的时候指定,offsets.topic.replication.factor设置消费主题_consumer_offsets副本数,默认为3),每一个副本包含的消息同样(但不是彻底一致,可能从副本的数据较主副本稍微有些落后)。每一个分区的副本集合中会有一个副本被选举为主副本(leader),其余为从副本,全部的读写请求由主副本对外提供,从副本负责将主副本的数据同步到本身所属分区,若是主副本所在分区宕机,则会从新选举出新的主副本对外提供服务。

3.1 ISR集合

ISR(In-Sync Replica)集合,表示目前能够用的副本集合,每一个分区中的leader副本会维护此分区的ISR集合。这里的可用是指从副本的消息量与主副本的消息量相差不大,加入至ISR集合中的副本必须知足如下几个条件:

  1. 副本所在节点须要与ZooKeeper维持心跳。
  2. 从副本的最后一条消息的offset须要与主副本的最后一条消息offset差值不超过设定阈值(replica.lag.max.messages)或者副本的LEO落后于主副本的LEO时长不大于设定阈值(replica.lag.time.max.ms),官方推荐使用后者判断,并在新版本kafka0.10.0移除了replica.lag.max.messages参数。

若是从副本不知足以上的任意条件,则会将其提出ISR集合,当其再次知足以上条件以后又会被从新加入集合中。ISR的引入主要是解决同步副本与异步复制两种方案各自的缺陷(同步副本中若是有个副本宕机或者超时就会拖慢该副本组的总体性能;若是仅仅使用异步副本,当全部的副本消息均远落后于主副本时,一旦主副本宕机从新选举,那么就会存在消息丢失状况)

3.2 HW&LEO

HW(High Watermark)是一个比较特殊的offset标记,消费端消费时只能拉取到小于HW的消息而HW及以后的消息对于消费者来讲是不可见的,该值由主副本管理,当ISR集合中的所有从副本都拉取到HW指定消息以后,主副本会将HW值+1,即指向下一个offset位移,这样能够保证HW以前消息的可靠性。

LEO(Log End Offset)表示当前副本最新消息的下一个offset,全部副本都存在这样一个标记,若是是主副本,当生产端往其追加消息时,会将其值+1。当从副本从主副本成功拉取到消息时,其值也会增长。

3.2.1 从副本更新LEO与HW

从副本的数据是来自主副本,经过向主副本发送fetch请求获取数据,从副本的LEO值会保存在两个地方,一个是自身所在的节点),一个是主副本所在节点,自身节点保存LEO主要是为了更新自身的HW值,主副本保存从副本的LEO也是为了更新其HW。当从副本每写入一条新消息就会增长其自身的LEO,主副本收到从副本的fetch请求,会先从自身的日志中读取对应数据,在数据返回给从副本以前会先去更新其保存的从副本LEO值。一旦从副本数据写入完成,就会尝试更新本身的HW值,比较LEO与fetch响应中主副本的返回HW,取最小值做为新的HW值。

3.2.2 主副本更新LEO与HW

主副本有日志写入时就会更新其自身的LEO值,与从副本相似。而主副本的HW值是分区的HW值,决定分区数据对应消费端的可见性,如下四种状况,主副本会尝试更新其HW值:

  • 副本成为主副本:当某个副本成为主副本时,kafka会尝试更新分区的HW值。
  • broker出现奔溃致使副本被踢出ISR集合:若是有broker节点奔溃则会看是否影响对应分区,而后会去检查分区的HW值是否须要更新。
  • 生成端往主副本写入消息时:消息写入会增长其LEO值,此时会查看是否须要修改HW值。
  • 主副本接受到从副本的fetch请求时:主副本在处理从副本的fetch请求时会尝试更新分区HW值。

前面是去尝试更新HW,可是不必定会更新,主副本上保存着从副本的LEO值与自身的LEO值,这里会比较全部知足条件的副本LEO值,并选择最小的LEO值最为分区的HW值,其中知足条件的副本是指知足如下两个条件之一:

  • 副本在ISR集合中
  • 副本的LEO落后于主副本的LEO时长不大于设定阈值(replica.lag.time.max.ms,默认为10s)

3.3 数据丢失场景

前面提到若是仅仅依赖HW来进行日志截断以及水位的判断会存在问题,如上图所示,假定存在两个副本A、副本B,最开始A为主副本,B为从副本,且参数min.insync.replicas=1,即ISR只有一个副本时也会返回成功:

  • 初始状况为主副本A已经写入了两条消息,对应HW=1,LEO=2,LEOB=1,从副本B写入了一条消息,对应HW=1,LEO=1。
  • 此时从副本B向主副本A发起fetchOffset=1请求,主副本收到请求以后更新LEOB=1,表示副本B已经收到了消息0,而后尝试更新HW值,min(LEO,LEOB)=1,即不须要更新,而后将消息1以及当前分区HW=1返回给从副本B,从副本B收到响应以后写入日志并更新LEO=2,而后更新其HW=1,虽然已经写入了两条消息,可是HW值须要在下一轮的请求才会更新为2。
  • 此时从副本B重启,重启以后会根据HW值进行日志截断,即消息1会被删除。
  • 从副本B向主副本A发送fetchOffset=1请求,若是此时主副本A没有什么异常,则跟第二步骤同样没有什么问题,假设此时主副本也宕机了,那么从副本B会变成主副本。
  • 当副本A恢复以后会变成从副本并根据HW值进行日志截断,即把消息1丢失,此时消息1就永久丢失了。

3.4 数据不一致场景

如图所示,假定存在两个副本A、副本B,最开始A为主副本,B为从副本,且参数min.insync.replicas=1,即ISR只有一个副本时也会返回成功:

  • 初始状态为主副本A已经写入了两条消息对应HW=1,LEO=2,LEOB=1,从副本B也同步了两条消息,对应HW=1,LEO=2。
  • 此时从副本B向主副本发送fetchOffset=2请求,主副本A在收到请求后更新分区HW=2并将该值返回给从副本B,若是此时从副本B宕机则会致使HW值写入失败。
  • 咱们假设此时主副本A也宕机了,从副本B先恢复并成为主副本,此时会发生日志截断,只保留消息0,而后对外提供服务,假设外部写入了一个消息1(这个消息与以前的消息1不同,用不一样的颜色标识不一样消息)。
  • 等副本A起来以后会变成从副本,不会发生日志截断,由于HW=2,可是对应位移1的消息实际上是不一致的

3.5 leader epoch机制

HW值被用于衡量副本备份成功与否以及出现失败状况时候的日志截断依据可能会致使数据丢失与数据不一致状况,所以在新版的Kafka(0.11.0.0)引入了leader epoch概念,leader epoch表示一个键值对<epoch, offset>,其中epoch表示leader主副本的版本号,从0开始编码,当leader每变动一次就会+1,offset表示该epoch版本的主副本写入第一条消息的位置,好比<0,0>表示第一个主副本从位移0开始写入消息,<1,100>表示第二个主副本版本号为1并从位移100开始写入消息,主副本会将该信息保存在缓存中并按期写入到checkpoint文件中,每次发生主副本切换都会去从缓存中查询该信息,下面简单介绍下leader epoch的工做原理:

  • 每条消息会都包含一个4字节的leader epoch number值
  • 每一个log目录都会建立一个leader epoch sequence文件用来存放主副本版本号以及开始位移。
  • 当一个副本成为主副本以后,会在leader epoch sequence文件末尾添加一条新的记录,而后每条新的消息就会变成新的leader epoch值。
  • 当某个副本宕机重启以后,会进行如下操做:

    • 从leader epoch sequence文件中恢复全部的leader epoch。
    • 向分区主副本发送LeaderEpoch请求,请求包含了从副本的leader epoch sequence文件中的最新leader epoch值。
    • 主副本返回从副本对应LeaderEpoch的lastOffset,返回的lastOffset分为两种状况,一种是返回比从副本请求中leader epoch版本大1的开始位移,另一种是与请求中的leader epoch相等则直接返回当前主副本的LEO值。
    • 若是从副本的leader epoch开始位移大于从leader中返回的lastOffset,那么会将从副本的leader epoch sequence值保持跟主副本一致。
    • 从副本截断本地消息到主副本返回的LastOffset所在位移处。
    • 从副本开始从主副本开始拉取数据。
    • 在获取数据时,若是从副本发现消息中的leader epoch值比自身的最新leader epoch值大,则会将该leader epoch 值写到leader epoch sequence文件,而后继续同步文件。

下面看下leader epoch机制如何避免前面提到的两种异常场景

3.5.1 数据丢失场景解决

  • 如图所示,当从副本B重启以后向主副本A发送offsetsForLeaderEpochRequest,epoch主从副本相等,则A返回当前的LEO=2,从副本B中没有任何大于2的位移,所以不须要截断。
  • 当从副本B向主副本A发送fetchoffset=2请求时,A宕机,因此从副本B成为主副本,并更新epoch值为<epoch=1, offset=2>,HW值更新为2。
  • 当A恢复以后成为从副本,并向B发送fetcheOffset=2请求,B返回HW=2,则从副本A更新HW=2。
  • 主副本B接受外界的写请求,从副本A向主副本A不断发起数据同步请求。

从上能够看出引入leader epoch值以后避免了前面提到的数据丢失状况,可是这里须要注意的是若是在上面的第一步,从副本B起来以后向主副本A发送offsetsForLeaderEpochRequest请求失败,即主副本A同时也宕机了,那么消息1就会丢失,具体可见下面数据不一致场景中有提到。

3.5.2 数据不一致场景解决

  • 从副本B恢复以后向主副本A发送offsetsForLeaderEpochRequest请求,因为主副本也宕机了,所以副本B将变成主副本并将消息1截断,此时接受到新消息1的写入。
  • 副本A恢复以后变成从副本并向主副本A发送offsetsForLeaderEpochRequest请求,请求的epoch值小于主副本B,所以主副本B会返回epoch=1时的开始位移,即lastoffset=1,所以从副本A会截断消息1。
  • 从副本A从主副本B拉取消息,并更新epoch值<epoch=1, offset=1>。

能够看出epoch的引入避免的数据不一致,可是两个副本均宕机,则仍是存在数据丢失的场景,前面的全部讨论都是创建在min.insync.replicas=1的前提下,所以须要在数据的可靠性与速度方面作权衡。

4. 生产者

4.1 消息分区选择

生产者的做用主要是生产消息,将消息存入到Kafka对应主题的分区中,具体某个消息应该存入哪一个分区,有如下三个策略决定(优先级由上到下,依次递减):

  • 若是消息发送时指定了消息所属分区,则会直接发往指定分区。
  • 若是没有指定消息分区,可是设置了消息的key,则会根据key的哈希值选择分区。
  • 若是前二者均不知足,则会采用轮询的方式选择分区。

4.2 ack参数设置及意义

生产端往kafka集群发送消息时,能够经过request.required.acks参数来设置数据的可靠性级别

  • 1:默认为1,表示在ISR中的leader副本成功接收到数据并确认后再发送下一条消息,若是主节点宕机则可能出现数据丢失场景,详细分析可参考前面提到的副本章节。
  • 0:表示生产端不须要等待节点的确认就能够继续发送下一批数据,这种状况下数据传输效率最高,可是数据的可靠性最低。
  • -1:表示生产端须要等待ISR中的全部副本节点都收到数据以后才算消息写入成功,可靠性最高,可是性能最低,若是服务端的min.insync.replicas值设置为1,那么在这种状况下容许ISR集合只有一个副本,所以也会存在数据丢失的状况。

4.3 幂等特性

所谓的幂等性,是指一次或者屡次请求某一个资源对于资源自己应该具备一样的结果(网络超时等问题除外),通俗一点的理解就是同一个操做任意执行屡次产生的影响或效果与一次执行影响相同,幂等的关键在于服务端可否识别出请求是否重复,而后过滤掉这些重复请求,一般状况下须要如下信息来实现幂等特性:

  • 惟一标识:判断某个请求是否重复,须要有一个惟一性标识,而后服务端就能根据这个惟一标识来判断是否为重复请求。
  • 记录已经处理过的请求:服务端须要记录已经处理过的请求,而后根据惟一标识来判断是不是重复请求,若是已经处理过,则直接拒绝或者不作任何操做返回成功。

kafka中Producer端的幂等性是指当发送同一条消息时,消息在集群中只会被持久化一次,其幂等是在如下条件中才成立:

  • 只能保证生产端在单个会话内的幂等,若是生产端由于某些缘由意外挂掉而后重启,此时是没办法保证幂等的,由于这时没办法获取到以前的状态信息,即没法作到垮会话级别的幂等。
  • 幂等性不能垮多个主题分区,只能保证单个分区内的幂等,涉及到多个消息分区时,中间的状态并无同步。

若是要支持垮会话或者垮多个消息分区的状况,则须要使用kafka的事务性来实现。

为了实现生成端的幂等语义,引入了Producer ID(PID)与Sequence Number的概念:

  • Producer ID(PID):每一个生产者在初始化时都会分配一个惟一的PID,PID的分配对于用户来讲是透明的。
  • Sequence Number(序列号):对于给定的PID而言,序列号从0开始单调递增,每一个主题分区均会产生一个独立序列号,生产者在发送消息时会给每条消息添加一个序列号。broker端缓存了已经提交消息的序列号,只有比缓存分区中最后提交消息的序列号大1的消息才会被接受,其余会被拒绝。

4.3.1 生产端消息发送流程

下面简单介绍下支持幂等的消息发送端工做流程

  1. 生产端经过Kafkaproducer会将数据添加到RecordAccumulator中,数据添加时会判断是否须要新建一个ProducerBatch。
  2. 生产端后台启动发送线程,会判断当前的PID是否须要重置,重置的缘由是由于某些消息分区的batch重试屡次仍然失败最后由于超时而被移除,这个时候序列号没法连续,致使后续消息没法发送,所以会重置PID,并将相关缓存信息清空,这个时候消息会丢失。
  3. 发送线程判断是否须要新申请PID,若是须要则会阻塞直到获取到PID信息。
  4. 发送线程在调用sendProducerData()方法发送数据时,会进行如下判断:

    • 判断主题分区是否能够继续发送、PID是否有效、若是是重试batch须要判断以前的batch是否发送完成,若是没有发送完成则会跳过当前主题分区的消息发送,直到前面的batch发送完成。
    • 若是对应ProducerBatch没有分配对应的PID与序列号信息,则会在这里进行设置。

4.3.2 服务端消息接受流程

服务端(broker)在收到生产端发送的数据写请求以后,会进行一些判断来决定是否能够写入数据,这里也主要介绍关于幂等相关的操做流程。

  1. 若是请求设置了幂等特性,则会检查是否对ClusterResource有IdempotentWrite权限,若是没有,则会返回错误CLUSTER_AUTHORIZATION_FAILED
  2. 检查是否有PID信息。
  3. 根据batch的序列号检查该batch是否重复,服务端会缓存每一个PID对应主题分区的最近5个batch信息,若是有重复,则直接返回写入成功,可是不会执行真正的数据写入操做。
  4. 若是有PID且非重复batch,则进行如下操做:

    • 判断该PID是否已经存在缓存中。
    • 若是不存在则判断序列号是不是从0开始,若是是则表示为新的PID,在缓存中记录PID的信息(包括PID、epoch以及序列号信息),而后执行数据写入操做;若是不存在可是序列号不是从0开始,则直接返回错误,表示PID在服务端以及过时或者PID写的数据已通过期。
    • 若是PID存在,则会检查PID的epoch版本是否与服务端一致,若是不一致且序列号不是从0开始,则返回错误。若是epoch不一致可是序列号是从0开始,则能够正常写入。
    • 若是epoch版本一致,则会查询缓存中最近一次序列号是否连续,不连续则会返回错误,不然正常写入。

5. 消费者

消费者主要是从Kafka集群拉取消息,而后进行相关的消费逻辑,消费者的消费进度由其自身控制,增长消费的灵活性,好比消费端能够控制重复消费某些消息或者跳过某些消息进行消费。

5.1 消费组

多个消费者能够组成一个消费组,每一个消费者只属于一个消费组。消费组订阅主题的每一个分区只会分配给该消费组中的某个消费者处理,不一样的消费组之间彼此隔离无依赖。同一个消息只会被消费组中的一个消费者消费,若是想要让同一个消息被多个消费者消费,那么每一个消费者须要属于不一样的消费组,且对应消费组中只有该一个消费者,消费组的引入能够实现消费的“独占”或“广播”效果。

  • 消费组下能够有多个消费者,个数支持动态变化。
  • 消费组订阅主题下的每一个分区只会分配给消费组中的一个消费者。
  • group.id标识消费组,相同则属于同一消费组。
  • 不一样消费组之间相互隔离互不影响。

如图所示,消费组1中包含两个消费者,其中消费者1分配消费分区0,消费者2分配消费分区1与分区2。此外消费组的引入还支持消费者的水平扩展及故障转移,好比从上图咱们能够看出消费者2的消费能力不足,相对消费者1来讲消费进度比较落后,咱们能够往消费组里面增长一个消费者以提升其总体的消费能力,以下图所示。

假设消费者1所在机器出现宕机,消费组会发送重平衡,假设将分区0分配给消费者2进行消费,以下图所示。同个消费组中消费者的个数不是越多越好,最大不能超过主题对应的分区数,若是超过则会出现超过的消费者分配不到分区的状况,由于分区一旦分配给消费者就不会再变更,除非组内消费者个数出现变更而发生重平衡。

5.2 消费位移

5.2.1 消费位移主题

Kafka 0.9开始将消费端的位移信息保存在集群的内部主题(__consumer_offsets)中,该主题默认为50个分区,每条日志项的格式都是:<TopicPartition, OffsetAndMetadata>,其key为主题分区主要存放主题、分区以及消费组信息,value为OffsetAndMetadata对象主要包括位移、位移提交时间、自定义元数据等信息。只有消费组往kafka中提交位移才会往这个主题中写入数据,若是消费端将消费位移信息保存在外部存储,则不会有消费位移信息,下面能够经过kafka-console-consumer.sh脚本查看主题消费位移信息。

\# bin/kafka-console-consumer.sh --topic \_\_consumer\_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning  
​  
\[consumer-group01,nginx\_access\_log,2\]::OffsetAndMetadata(offset\=17104625, leaderEpoch\=Optional.\[0\], metadata\=, commitTimestamp\=1573475863555, expireTimestamp\=None)  
\[consumer-group01,nginx\_access\_log,1\]::OffsetAndMetadata(offset\=17103024, leaderEpoch\=Optional.\[0\], metadata\=, commitTimestamp\=1573475863555, expireTimestamp\=None)  
\[consumer-group01,nginx\_access\_log,0\]::OffsetAndMetadata(offset\=17107771, leaderEpoch\=Optional.\[0\], metadata\=, commitTimestamp\=1573475863555, expireTimestamp\=None)

5.2.2 消费位移自动提交

消费端能够经过设置参数enable.auto.commit来控制是自动提交仍是手动,若是值为true则表示自动提交,在消费端的后台会定时的提交消费位移信息,时间间隔由auto.commit.interval.ms(默认为5秒)。

可是若是设置为自动提交会存在如下几个问题:

  1. 可能存在重复的位移数据提交到消费位移主题中,由于每隔5秒会往主题中写入一条消息,无论是否有新的消费记录,这样就会产生大量的同key消息,其实只须要一条,所以须要依赖前面提到日志压缩策略来清理数据。
  2. 重复消费,假设位移提交的时间间隔为5秒,那么在5秒内若是发生了rebalance,则全部的消费者会从上一次提交的位移处开始消费,那么期间消费的数据则会再次被消费。

5.2.3 消费位移手动提交

手动提交须要将enable.auto.commit值设置为false,而后由业务消费端来控制消费进度,手动提交又分为如下三种类型:

  • 同步手动提交位移:若是调用的是同步提交方法commitSync(),则会将poll拉取的最新位移提交到kafka集群,提交成功前会一直等待提交成功。
  • 异步手动提交位移:调用异步提交方法commitAsync(),在调用该方法以后会马上返回,不会阻塞,而后能够经过回调函数执行相关的异常处理逻辑。
  • 指定提交位移:指定位移提交也分为异步跟同步,传参为Map<TopicPartition, OffsetAndMetadata>,其中key为消息分区,value为位移对象。

5.3 分组协调者

分组协调者(Group Coordinator)是一个服务,kafka集群中的每一个节点在启动时都会启动这样一个服务,该服务主要是用来存储消费分组相关的元数据信息,每一个消费组均会选择一个协调者来负责组内各个分区的消费位移信息存储,选择的主要步骤以下:

  • 首选肯定消费组的位移信息存入哪一个分区:前面提到默认的__consumer_offsets主题分区数为50,经过如下算法能够计算出对应消费组的位移信息应该存入哪一个分区 partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 其中groupId为消费组的id,这个由消费端指定,groupMetadataTopicPartitionCount为主题分区数。
  • 根据partition寻找该分区的leader所对应的节点broker,该broker的Coordinator即为该消费组的Coordinator。

5.4 重平衡机制

5.4.1 重平衡发生场景

如下几种场景均会触发重平衡操做:

  1. 新的消费者加入到消费组中。
  2. 消费者被动下线。好比消费者长时间的GC、网络延迟致使消费者长时间未向Group Coordinator发送心跳请求,均会认为该消费者已经下线并踢出。
  3. 消费者主动退出消费组。
  4. 消费组订阅的任意一个主题分区数出现变化。
  5. 消费者取消某个主题的订阅。

5.4.2 重平衡操做流程

重平衡的实现能够分为如下几个阶段:

  1. 查找Group Coordinator:消费者会从kafka集群中选择一个负载最小的节点发送GroupCoorinatorRequest请求,并处理返回响应GroupCoordinatorResponse。其中请求参数中包含消费组的id,响应中包含Coordinator所在节点id、host以及端口号信息。
  2. Join group:当消费者拿到协调者的信息以后会往协调者发送加入消费组的请求JoinGroupRequest,当全部的消费者都发送该请求以后,协调者会从中选择一个消费者做为leader角色,而后将组内成员信息、订阅等信息发给消费者(响应格式JoinGroupResponse见下表),leader负责消费方案的分配。

JoinGroupRequest请求数据格式

名称 类型 说明
group_id String 消费者id
seesion_timeout int 协调者超过session_timeout指定的时间没有收到心跳消息,则认为该消费者下线
member_id String 协调者分配给消费者的id
protocol_type String 消费组实现的协议,默认为sonsumer
group_protocols List 包含此消费者支持的所有PartitionAssignor类型
protocol_name String PartitionAssignor类型
protocol_metadata byte[] 针对不一样PartitionAssignor类型序列化后的消费者订阅信息,包含用户自定义数据userData

JoinGroupResponse响应数据格式

名称 类型 说明
error_code short 错误码
generation_id int 协调者分配的年代信息
group_protocol String 协调者选择的PartitionAssignor类型
leader_id String Leader的member_id
member_id String 协调者分配给消费者的id
members Map集合 消费组中所有的消费者订阅信息
member_metadata byte[] 对应消费者的订阅信息
  1. Synchronizing Group State阶段:当leader消费者完成消费方案的分配后会发送SyncGroupRequest请求给协调者,其余非leader节点也会发送该请求,只是请求参数为空,而后协调者将分配结果做为响应SyncGroupResponse发给各个消费者,请求及相应的数据格式以下表所示:

SyncGroupRequest请求数据格式

名称 类型 说明
group_id String 消费组的id
generation_id int 消费组保存的年代信息
member_id String 协调者分配的消费者id
member_assignment byte[] 分区分配结果

SyncGroupResponse响应数据格式

名称 类型 说明
error_code short 错误码
member_assignment byte[] 分配给当前消费者的分区

5.4.3 分区分配策略

Kafka提供了三个分区分配策略:RangeAssignor、RoundRobinAssignor以及StickyAssignor,下面简单介绍下各个算法的实现。

  1. RangeAssignor:kafka默认会采用此策略进行分区分配,主要流程以下

    • 将全部订阅主题下的分区进行排序获得集合TP={TP0,Tp1,...,TPN+1}
    • 对消费组中的全部消费者根据名字进行字典排序获得集合CG={C0,C1,...,CM+1}
    • 计算D=N/MR=N%M
    • 消费者Ci获取消费分区起始位置=D*i+min(i,R),Ci获取的分区总数=D+(if (i+1>R)0 else 1)。
假设一个消费组中存在两个消费者{C0,C1},该消费组订阅了三个主题{T1,T2,T3},每一个主题分别存在三个分区,一共就有9个分区{TP1,TP2,...,TP9}。经过以上算法咱们能够获得D=4,R=1,那么消费组C0将消费的分区为{TP1,TP2,TP3,TP4,TP5},C1将消费分区{TP6,TP7,TP8,TP9}。这里存在一个问题,若是不能均分,那么前面的几个消费者将会多消费一个分区。
  1. RoundRobinAssignor:使用该策略须要知足如下两个条件:1) 消费组中的全部消费者应该订阅主题相同;2) 同一个消费组的全部消费者在实例化时给每一个主题指定相同的流数。

    • 对全部主题的全部分区根据主题+分区获得的哈希值进行排序。
    • 对全部消费者按字典排序。
    • 经过轮询的方式将分区分配给消费者。
  2. StickyAssignor:该分配方式在0.11版本开始引入,主要是保证如下特性:1) 尽量的保证分配均衡;2) 当从新分配时,保留尽量多的现有分配。其中第一条的优先级要大于第二条。

6. 总结

在本文中,咱们围绕Kafka的特性,详细介绍了其原理实现,经过主题与日志的深刻剖析,了解了Kafka内部消息的存放、检索以及删除机制。副本系统中的ISR概念的引入解决同步副本与异步复制两种方案各自的缺陷,lead epoch机制的出现解决了数据丢失以及数据不一致问题。生产端的分区选择算法实现了数据均衡,幂等特性的支持则解决了以前存在的重复消息问题。

最后介绍了消费端的相关原理,消费组机制实现了消费端的消息隔离,既有广播也有独占的场景支持,而重平衡机制则保证的消费端的健壮性与扩展性。

参考文献

[1] 徐郡明.Apach Kafka 源码剖析[M].北京.电子工业出版社,2017.
[2] Kafka深度解析.
[3] 深刻浅出理解基于 Kafka 和 ZooKeeper 的分布式消息队列.
[4] Kafka 事务性之幂等性实现.
[5] Kafka水位(high watermark)与leader epoch的讨论.
[6] kafka消费者如何分配分区.

相关文章
相关标签/搜索