本文来自OPPO互联网技术团队,转载请注名做者。同时欢迎关注OPPO互联网技术团队的公众号:OPPO_tech,与你分享OPPO前沿互联网技术及活动。html
Kafka是一个分布式的基于发布、订阅的消息系统,具备着高吞吐、高容错、高可靠以及高性能等特性,主要用于应用解耦、流量削峰、异步消息等场景。nginx
为了让你们更加深刻的了解Kafka内部实现原理,文中将会从主题与日志开始介绍消息的存储、删除以及检索,而后介绍其副本机制的实现原理,最后介绍生产与消费的实现原理以便更合理的应用于实际业务。git
另外,本文较长,建议点赞后再慢慢看 :)算法
Kafka是一个分布式的基于发布、订阅的消息系统,有着强大的消息处理能力,相比与其余消息系统,具备如下特性:bootstrap
正是因其具备这些的优秀特性而普遍用于应用解耦、流量削峰、异步消息等场景,好比消息中间件、日志聚合、流处理等等。segmentfault
本文将从如下几个方面去介绍kafka:缓存
主题是存储消息的一个逻辑概念,能够简单理解为一类消息的集合,由使用方去建立。Kafka中的主题通常会有多个订阅者去消费对应主题的消息,也能够存在多个生产者往主题中写入消息。服务器
每一个主题又能够划分红多个分区,每一个分区存储不一样的消息。当消息添加至分区时,会为其分配一个位移offset(从0开始递增),并保证分区上惟一,消息在分区上的顺序由offset保证,即同一个分区内的消息是有序的,以下图所示网络
同一个主题的不一样分区会分配在不一样的节点上(broker),分区时保证Kafka集群具备水平扩展的基础。session
以主题nginx_access_log
为例,分区数为3,如上图所示。分区在逻辑上对应一个日志(Log),物理上对应的是一个文件夹。
drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx\_access\_log-0/ drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx\_access\_log-1/ drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx\_access\_log-2/
消息写入分区时,其实是将消息写入分区所在的文件夹中。日志又分红多个分片(Segment),每一个分片由日志文件与索引文件组成,每一个分片大小是有限的(在kafka集群的配置文件log.segment.bytes
配置,默认为1073741824byte,即1GB),当分片大小超过限制则会从新建立一个新的分片,外界消息的写入只会写入最新的一个分片(顺序IO)。
\-rw-r--r-- 1 root root 1835920 10月 11 19:18 00000000000000000000.index \-rw-r--r-- 1 root root 1073741684 10月 11 19:18 00000000000000000000.log \-rw-r--r-- 1 root root 2737884 10月 11 19:18 00000000000000000000.timeindex \-rw-r--r-- 1 root root 1828296 10月 11 19:30 00000000000003257573.index \-rw-r--r-- 1 root root 1073741513 10月 11 19:30 00000000000003257573.log \-rw-r--r-- 1 root root 2725512 10月 11 19:30 00000000000003257573.timeindex \-rw-r--r-- 1 root root 1834744 10月 11 19:42 00000000000006506251.index \-rw-r--r-- 1 root root 1073741771 10月 11 19:42 00000000000006506251.log \-rw-r--r-- 1 root root 2736072 10月 11 19:42 00000000000006506251.timeindex \-rw-r--r-- 1 root root 1832152 10月 11 19:54 00000000000009751854.index \-rw-r--r-- 1 root root 1073740984 10月 11 19:54 00000000000009751854.log \-rw-r--r-- 1 root root 2731572 10月 11 19:54 00000000000009751854.timeindex \-rw-r--r-- 1 root root 1808792 10月 11 20:06 00000000000012999310.index \-rw-r--r-- 1 root root 1073741584 10月 11 20:06 00000000000012999310.log \-rw-r--r-- 1 root root 10 10月 11 19:54 00000000000012999310.snapshot \-rw-r--r-- 1 root root 2694564 10月 11 20:06 00000000000012999310.timeindex \-rw-r--r-- 1 root root 10485760 10月 11 20:09 00000000000016260431.index \-rw-r--r-- 1 root root 278255892 10月 11 20:09 00000000000016260431.log \-rw-r--r-- 1 root root 10 10月 11 20:06 00000000000016260431.snapshot \-rw-r--r-- 1 root root 10485756 10月 11 20:09 00000000000016260431.timeindex \-rw-r--r-- 1 root root 8 10月 11 19:03 leader-epoch-checkpoint
一个分片包含多个不一样后缀的日志文件,分片中的第一个消息的offset将做为该分片的基准偏移量,偏移量固定长度为20,不够前面补齐0,而后将其做为索引文件以及日志文件的文件名,如00000000000003257573.index
、00000000000003257573.log
、00000000000003257573.timeindex
、相同文件名的文件组成一个分片(忽略后缀名),除了.index
、.timeindex
、 .log
后缀的日志文件外其余日志文件,对应含义以下:
文件类型 | 做用 |
---|---|
.index | 偏移量索引文件,记录<相对位移,起始地址>映射关系,其中相对位移表示该分片的第一个消息,从1开始计算,起始地址表示对应相对位移消息在分片.log文件的起始地址 |
.timeindex | 时间戳索引文件,记录<时间戳,相对位移>映射关系 |
.log | 日志文件,存储消息的详细信息 |
.snaphot | 快照文件 |
.deleted | 分片文件删除时会先将该分片的全部文件加上.delete后缀,而后有delete-file 任务延迟删除这些文件(file.delete.delay.ms能够设置延时删除的的时间) |
.cleaned | 日志清理时临时文件 |
.swap | Log Compaction 以后的临时文件 |
.leader-epoch-checkpoint |
首先介绍下.index
文件,这里以文件00000000000003257573.index
为例,首先咱们能够经过如下命令查看该索引文件的内容,咱们能够看到输出结构为<offset,position>,实际上索引文件中保存的并非offset而是相对位移,好比第一条消息的相对位移则为0,格式化输出时加上了基准偏移量,如上图所示,<114,17413>表示该分片相对位移为114的消息,其位移为3257573+114,即3257687,position表示对应offset在.log
文件的物理地址,经过.index
索引文件则能够获取对应offset所在的物理地址。索引采用稀疏索引的方式构建,并不保证分片中的每一个消息都在索引文件有映射关系(.timeindex
索引也是相似),主要是为了节省磁盘空间、内存空间,由于索引文件最终会映射到内存中。
\# 查看该分片索引文件的前10条记录 bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.index |head \-n 10 Dumping /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.index offset: 3257687 position: 17413 offset: 3257743 position: 33770 offset: 3257799 position: 50127 offset: 3257818 position: 66484 offset: 3257819 position: 72074 offset: 3257871 position: 87281 offset: 3257884 position: 91444 offset: 3257896 position: 95884 offset: 3257917 position: 100845 \# 查看该分片索引文件的后10条记录 $ bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.index |tail \-n 10 offset: 6506124 position: 1073698512 offset: 6506137 position: 1073702918 offset: 6506150 position: 1073707263 offset: 6506162 position: 1073711499 offset: 6506176 position: 1073716197 offset: 6506188 position: 1073720433 offset: 6506205 position: 1073725654 offset: 6506217 position: 1073730060 offset: 6506229 position: 1073734174 offset: 6506243 position: 1073738288
好比查看offset为6506155
的消息:首先根据offset找到对应的分片,65061所对应的分片为00000000000003257573
,而后经过二分法在00000000000003257573.index
文件中找到不大于6506155的最大索引值,获得<offset: 6506150, position: 1073707263>,而后从00000000000003257573.log
的1073707263位置开始顺序扫描找到offset为650155的消息
Kafka从0.10.0.0版本起,为分片日志文件中新增了一个.timeindex
的索引文件,能够根据时间戳定位消息。一样咱们能够经过脚本kafka-dump-log.sh
查看时间索引的文件内容。
\# 查看该分片时间索引文件的前10条记录 bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex |head \-n 10 Dumping /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex timestamp: 1570792689308 offset: 3257685 timestamp: 1570792689324 offset: 3257742 timestamp: 1570792689345 offset: 3257795 timestamp: 1570792689348 offset: 3257813 timestamp: 1570792689357 offset: 3257867 timestamp: 1570792689361 offset: 3257881 timestamp: 1570792689364 offset: 3257896 timestamp: 1570792689368 offset: 3257915 timestamp: 1570792689369 offset: 3257927 \# 查看该分片时间索引文件的前10条记录 bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex |tail \-n 10 Dumping /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex timestamp: 1570793423474 offset: 6506136 timestamp: 1570793423477 offset: 6506150 timestamp: 1570793423481 offset: 6506159 timestamp: 1570793423485 offset: 6506176 timestamp: 1570793423489 offset: 6506188 timestamp: 1570793423493 offset: 6506204 timestamp: 1570793423496 offset: 6506214 timestamp: 1570793423500 offset: 6506228 timestamp: 1570793423503 offset: 6506240 timestamp: 1570793423505 offset: 6506248
好比我想查看时间戳1570793423501
开始的消息:1.首先定位分片,将1570793423501
与每一个分片的最大时间戳进行对比(最大时间戳取时间索引文件的最后一条记录时间,若是时间为0则取该日志分段的最近修改时间),直到找到大于或等于1570793423501
的日志分段,所以会定位到时间索引文件00000000000003257573.timeindex
,其最大时间戳为1570793423505
;2.经过二分法找到大于或等于1570793423501
的最大索引项,即<timestamp: 1570793423503 offset: 6506240>(6506240为offset,相对位移为3247667);3.根据相对位移3247667去索引文件中找到不大于该相对位移的最大索引值<3248656,1073734174>;4.从日志文件00000000000003257573.log
的1073734174位置处开始扫描,查找不小于1570793423501
的数据。
与其余消息中间件不一样的是,Kafka集群中的消息不会由于消费与否而删除,跟日志同样消息最终会落盘,并提供对应的策略周期性(经过参数log.retention.check.interval.ms来设置,默认为5分钟)执行删除或者压缩操做(broker配置文件log.cleanup.policy
参数若是为“delete”则执行删除操做,若是为“compact”则执行压缩操做,默认为“delete”)。
参数 | 默认值 | 说明 |
---|---|---|
log.retention.hours | 168 | 日志保留时间(小时) |
log.retention.minutes | 无 | 日志保留时间(分钟),优先级大于小时 |
log.retention.ms | 无 | 日志保留时间(毫秒),优先级大于分钟 |
当消息在集群保留时间超过设定阈值(log.retention.hours,默认为168小时,即七天),则须要进行删除。这里会根据分片日志的最大时间戳来判断该分片的时间是否知足删除条件,最大时间戳首先会选取时间戳索引文件中的最后一条索引记录,若是对应的时间戳值大于0则取该值,不然为最近一次修改时间。
这里不直接选取最后修改时间的缘由是避免分片日志的文件被无心篡改而致使其时间不许。
若是刚好该分区下的全部日志分片均已过时,那么会先生成一个新的日志分片做为新消息的写入文件,而后再执行删除参数。
参数 | 默认值 | 说明 |
---|---|---|
log.retention.bytes | 1073741824(即1G),默认未开启,即无穷大 | 日志文件总大小,并非指单个分片的大小 |
log.segment.bytes | 1073741824(即1G) | 单个日志分片大小 |
首先会计算待删除的日志大小diff
(totalSize-log.rentention.bytes),而后从最旧的一个分片开始查看能够执行删除操做的文件集合(若是diff-segment.size>=0
,则知足删除条件),最后执行删除操做。
通常状况下,日志文件的起始偏移量(logStartOffset)会等于第一个日志分段的baseOffset,可是其值会由于删除消息请求而增加,logStartOffset的值其实是日志集合中的最小消息,而小于这个值的消息都会被清理掉。如上图所示,咱们假设logStartOffset=7421048,日志删除流程以下:
前面提到当broker配置文件log.cleanup.policy
参数值设置为“compact”时,则会执行压缩操做,这里的压缩跟普通意义的压缩不同,这里的压缩是指将相同key的消息只保留最后一个版本的value值,以下图所示,压缩以前offset是连续递增,压缩以后offset递增可能不连续,只保留5条消息记录。
Kafka日志目录下cleaner-offset-checkpoint
文件,用来记录每一个主题的每一个分区中已经清理的偏移量,经过这个偏移量能够将分区中的日志文件分红两个部分:clean
表示已经压缩过;dirty
表示还未进行压缩,以下图所示(active segment不会参与日志的压缩操做,由于会有新的数据写入该文件)。
\-rw-r--r-- 1 root root 4 10月 11 19:02 cleaner-offset-checkpoint drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx\_access\_log-0/ drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx\_access\_log-1/ drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx\_access\_log-2/ \-rw-r--r-- 1 root root 0 9月 18 09:50 .lock \-rw-r--r-- 1 root root 4 10月 16 11:19 log-start-offset-checkpoint \-rw-r--r-- 1 root root 54 9月 18 09:50 meta.properties \-rw-r--r-- 1 root root 1518 10月 16 11:19 recovery-point-offset-checkpoint \-rw-r--r-- 1 root root 1518 10月 16 11:19 replication-offset-checkpoint #cat cleaner-offset-checkpoint nginx\_access\_log 0 5033168 nginx\_access\_log 1 5033166 nginx\_access\_log 2 5033168
日志压缩时会根据dirty部分数据占日志文件的比例(cleanableRatio)来判断优先压缩的日志,而后为dirty部分的数据创建key与offset映射关系(保存对应key的最大offset)存入SkimpyoffsetMap中,而后复制segment分段中的数据,只保留SkimpyoffsetMap中记录的消息,压缩以后的相关日志文件大小会减小,为了不出现太小的日志文件与索引文件,压缩时会对全部的segment进行分组(一个组的分片大小不会超过设置的log.segment.bytes
值大小),同一个分组的多个分片日志压缩以后变成一个分片。
如上图所示,全部消息都还没压缩前clean checkpoint
值为0,表示该分区的数据还没进行压缩,第一次压缩后,以前每一个分片的日志文件大小都有所减小,同时会移动clean checkpoint
的位置到这一次压缩结束的offset值。第二次压缩时,会将前两个分片{0.5GB,0.4GB}组成一个分组,{0.7GB,0.2GB}组成一个分组进行压缩,以此类推。
如上图所示,日志压缩的主要流程以下:
deleteHorizonMs
值:当某个消息的value值为空时,该消息会被保留一段时间,超时以后会在下一次的得日志压缩中被删除,因此这里会计算deleteHorizonMs
,根据该值肯定能够删除value值为空的日志分片。(deleteHorizonMs = clean部分的最后一个分片的lastModifiedTime - deleteRetionMs
,deleteRetionMs经过配置文件log.cleaner.delete.retention.ms配置,默认为24小时)。firstDirtyOffset
表示dirty的起始位移,通常会等于clear checkpoint
值,firstUncleanableOffset
表示不能清理的最小位移,通常会等于活跃分片的baseOffset,而后从firstDirtyOffset位置开始遍历日志分片,并填充key与offset的映射关系至SkimpyoffsetMap中,当该map被填充满或到达上限firstUncleanableOffset
时,就能够肯定日志压缩上限endOffset
。kafka支持消息的冗余备份,能够设置对应主题的副本数(--replication-factor
参数设置主题的副本数可在建立主题的时候指定,offsets.topic.replication.factor
设置消费主题_consumer_offsets
副本数,默认为3),每一个副本包含的消息同样(但不是彻底一致,可能从副本的数据较主副本稍微有些落后)。每一个分区的副本集合中会有一个副本被选举为主副本(leader),其余为从副本,全部的读写请求由主副本对外提供,从副本负责将主副本的数据同步到本身所属分区,若是主副本所在分区宕机,则会从新选举出新的主副本对外提供服务。
ISR(In-Sync Replica)集合,表示目前能够用的副本集合,每一个分区中的leader副本会维护此分区的ISR集合。这里的可用是指从副本的消息量与主副本的消息量相差不大,加入至ISR集合中的副本必须知足如下几个条件:
replica.lag.max.messages
)或者副本的LEO落后于主副本的LEO时长不大于设定阈值(replica.lag.time.max.ms
),官方推荐使用后者判断,并在新版本kafka0.10.0移除了replica.lag.max.messages
参数。若是从副本不知足以上的任意条件,则会将其提出ISR集合,当其再次知足以上条件以后又会被从新加入集合中。ISR的引入主要是解决同步副本与异步复制两种方案各自的缺陷(同步副本中若是有个副本宕机或者超时就会拖慢该副本组的总体性能;若是仅仅使用异步副本,当全部的副本消息均远落后于主副本时,一旦主副本宕机从新选举,那么就会存在消息丢失状况)
HW(High Watermark)是一个比较特殊的offset标记,消费端消费时只能拉取到小于HW的消息而HW及以后的消息对于消费者来讲是不可见的,该值由主副本管理,当ISR集合中的所有从副本都拉取到HW指定消息以后,主副本会将HW值+1,即指向下一个offset位移,这样能够保证HW以前消息的可靠性。
LEO(Log End Offset)表示当前副本最新消息的下一个offset,全部副本都存在这样一个标记,若是是主副本,当生产端往其追加消息时,会将其值+1。当从副本从主副本成功拉取到消息时,其值也会增长。
从副本的数据是来自主副本,经过向主副本发送fetch请求获取数据,从副本的LEO值会保存在两个地方,一个是自身所在的节点),一个是主副本所在节点,自身节点保存LEO主要是为了更新自身的HW值,主副本保存从副本的LEO也是为了更新其HW。当从副本每写入一条新消息就会增长其自身的LEO,主副本收到从副本的fetch请求,会先从自身的日志中读取对应数据,在数据返回给从副本以前会先去更新其保存的从副本LEO值。一旦从副本数据写入完成,就会尝试更新本身的HW值,比较LEO与fetch响应中主副本的返回HW,取最小值做为新的HW值。
主副本有日志写入时就会更新其自身的LEO值,与从副本相似。而主副本的HW值是分区的HW值,决定分区数据对应消费端的可见性,如下四种状况,主副本会尝试更新其HW值:
前面是去尝试更新HW,可是不必定会更新,主副本上保存着从副本的LEO值与自身的LEO值,这里会比较全部知足条件的副本LEO值,并选择最小的LEO值最为分区的HW值,其中知足条件的副本是指知足如下两个条件之一:
前面提到若是仅仅依赖HW来进行日志截断以及水位的判断会存在问题,如上图所示,假定存在两个副本A、副本B,最开始A为主副本,B为从副本,且参数min.insync.replicas=1
,即ISR只有一个副本时也会返回成功:
min(LEO,LEOB)=1
,即不须要更新,而后将消息1以及当前分区HW=1返回给从副本B,从副本B收到响应以后写入日志并更新LEO=2,而后更新其HW=1,虽然已经写入了两条消息,可是HW值须要在下一轮的请求才会更新为2。如图所示,假定存在两个副本A、副本B,最开始A为主副本,B为从副本,且参数min.insync.replicas=1
,即ISR只有一个副本时也会返回成功:
HW值被用于衡量副本备份成功与否以及出现失败状况时候的日志截断依据可能会致使数据丢失与数据不一致状况,所以在新版的Kafka(0.11.0.0)引入了leader epoch概念,leader epoch表示一个键值对<epoch, offset>,其中epoch表示leader主副本的版本号,从0开始编码,当leader每变动一次就会+1,offset表示该epoch版本的主副本写入第一条消息的位置,好比<0,0>表示第一个主副本从位移0开始写入消息,<1,100>表示第二个主副本版本号为1并从位移100开始写入消息,主副本会将该信息保存在缓存中并按期写入到checkpoint文件中,每次发生主副本切换都会去从缓存中查询该信息,下面简单介绍下leader epoch的工做原理:
当某个副本宕机重启以后,会进行如下操做:
下面看下leader epoch机制如何避免前面提到的两种异常场景
offsetsForLeaderEpochRequest
,epoch主从副本相等,则A返回当前的LEO=2,从副本B中没有任何大于2的位移,所以不须要截断。从上能够看出引入leader epoch值以后避免了前面提到的数据丢失状况,可是这里须要注意的是若是在上面的第一步,从副本B起来以后向主副本A发送offsetsForLeaderEpochRequest
请求失败,即主副本A同时也宕机了,那么消息1就会丢失,具体可见下面数据不一致场景中有提到。
offsetsForLeaderEpochRequest
请求,因为主副本也宕机了,所以副本B将变成主副本并将消息1截断,此时接受到新消息1的写入。offsetsForLeaderEpochRequest
请求,请求的epoch值小于主副本B,所以主副本B会返回epoch=1时的开始位移,即lastoffset=1,所以从副本A会截断消息1。能够看出epoch的引入避免的数据不一致,可是两个副本均宕机,则仍是存在数据丢失的场景,前面的全部讨论都是创建在min.insync.replicas=1
的前提下,所以须要在数据的可靠性与速度方面作权衡。
生产者的做用主要是生产消息,将消息存入到Kafka对应主题的分区中,具体某个消息应该存入哪一个分区,有如下三个策略决定(优先级由上到下,依次递减):
key
,则会根据key
的哈希值选择分区。生产端往kafka集群发送消息时,能够经过request.required.acks
参数来设置数据的可靠性级别
min.insync.replicas
值设置为1,那么在这种状况下容许ISR集合只有一个副本,所以也会存在数据丢失的状况。所谓的幂等性,是指一次或者屡次请求某一个资源对于资源自己应该具备一样的结果(网络超时等问题除外),通俗一点的理解就是同一个操做任意执行屡次产生的影响或效果与一次执行影响相同,幂等的关键在于服务端可否识别出请求是否重复,而后过滤掉这些重复请求,一般状况下须要如下信息来实现幂等特性:
kafka中Producer端的幂等性是指当发送同一条消息时,消息在集群中只会被持久化一次,其幂等是在如下条件中才成立:
若是要支持垮会话或者垮多个消息分区的状况,则须要使用kafka的事务性来实现。
为了实现生成端的幂等语义,引入了Producer ID(PID)与Sequence Number的概念:
下面简单介绍下支持幂等的消息发送端工做流程
发送线程在调用sendProducerData()
方法发送数据时,会进行如下判断:
服务端(broker)在收到生产端发送的数据写请求以后,会进行一些判断来决定是否能够写入数据,这里也主要介绍关于幂等相关的操做流程。
CLUSTER_AUTHORIZATION_FAILED
。若是有PID且非重复batch,则进行如下操做:
消费者主要是从Kafka集群拉取消息,而后进行相关的消费逻辑,消费者的消费进度由其自身控制,增长消费的灵活性,好比消费端能够控制重复消费某些消息或者跳过某些消息进行消费。
多个消费者能够组成一个消费组,每一个消费者只属于一个消费组。消费组订阅主题的每一个分区只会分配给该消费组中的某个消费者处理,不一样的消费组之间彼此隔离无依赖。同一个消息只会被消费组中的一个消费者消费,若是想要让同一个消息被多个消费者消费,那么每一个消费者须要属于不一样的消费组,且对应消费组中只有该一个消费者,消费组的引入能够实现消费的“独占”或“广播”效果。
如图所示,消费组1中包含两个消费者,其中消费者1分配消费分区0,消费者2分配消费分区1与分区2。此外消费组的引入还支持消费者的水平扩展及故障转移,好比从上图咱们能够看出消费者2的消费能力不足,相对消费者1来讲消费进度比较落后,咱们能够往消费组里面增长一个消费者以提升其总体的消费能力,以下图所示。
假设消费者1所在机器出现宕机,消费组会发送重平衡,假设将分区0分配给消费者2进行消费,以下图所示。同个消费组中消费者的个数不是越多越好,最大不能超过主题对应的分区数,若是超过则会出现超过的消费者分配不到分区的状况,由于分区一旦分配给消费者就不会再变更,除非组内消费者个数出现变更而发生重平衡。
Kafka 0.9开始将消费端的位移信息保存在集群的内部主题(__consumer_offsets)中,该主题默认为50个分区,每条日志项的格式都是:<TopicPartition, OffsetAndMetadata>,其key为主题分区主要存放主题、分区以及消费组信息,value为OffsetAndMetadata对象主要包括位移、位移提交时间、自定义元数据等信息。只有消费组往kafka中提交位移才会往这个主题中写入数据,若是消费端将消费位移信息保存在外部存储,则不会有消费位移信息,下面能够经过kafka-console-consumer.sh
脚本查看主题消费位移信息。
\# bin/kafka-console-consumer.sh --topic \_\_consumer\_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning \[consumer-group01,nginx\_access\_log,2\]::OffsetAndMetadata(offset\=17104625, leaderEpoch\=Optional.\[0\], metadata\=, commitTimestamp\=1573475863555, expireTimestamp\=None) \[consumer-group01,nginx\_access\_log,1\]::OffsetAndMetadata(offset\=17103024, leaderEpoch\=Optional.\[0\], metadata\=, commitTimestamp\=1573475863555, expireTimestamp\=None) \[consumer-group01,nginx\_access\_log,0\]::OffsetAndMetadata(offset\=17107771, leaderEpoch\=Optional.\[0\], metadata\=, commitTimestamp\=1573475863555, expireTimestamp\=None)
消费端能够经过设置参数enable.auto.commit
来控制是自动提交仍是手动,若是值为true
则表示自动提交,在消费端的后台会定时的提交消费位移信息,时间间隔由auto.commit.interval.ms
(默认为5秒)。
可是若是设置为自动提交会存在如下几个问题:
手动提交须要将enable.auto.commit
值设置为false
,而后由业务消费端来控制消费进度,手动提交又分为如下三种类型:
commitSync()
,则会将poll拉取的最新位移提交到kafka集群,提交成功前会一直等待提交成功。commitAsync()
,在调用该方法以后会马上返回,不会阻塞,而后能够经过回调函数执行相关的异常处理逻辑。分组协调者(Group Coordinator)是一个服务,kafka集群中的每一个节点在启动时都会启动这样一个服务,该服务主要是用来存储消费分组相关的元数据信息,每一个消费组均会选择一个协调者来负责组内各个分区的消费位移信息存储,选择的主要步骤以下:
partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
其中groupId
为消费组的id,这个由消费端指定,groupMetadataTopicPartitionCount
为主题分区数。如下几种场景均会触发重平衡操做:
重平衡的实现能够分为如下几个阶段:
Group Coordinator
:消费者会从kafka集群中选择一个负载最小的节点发送GroupCoorinatorRequest
请求,并处理返回响应GroupCoordinatorResponse
。其中请求参数中包含消费组的id,响应中包含Coordinator所在节点id、host以及端口号信息。Join group
:当消费者拿到协调者的信息以后会往协调者发送加入消费组的请求JoinGroupRequest
,当全部的消费者都发送该请求以后,协调者会从中选择一个消费者做为leader角色,而后将组内成员信息、订阅等信息发给消费者(响应格式JoinGroupResponse
见下表),leader负责消费方案的分配。JoinGroupRequest
请求数据格式
名称 | 类型 | 说明 |
---|---|---|
group_id | String | 消费者id |
seesion_timeout | int | 协调者超过session_timeout指定的时间没有收到心跳消息,则认为该消费者下线 |
member_id | String | 协调者分配给消费者的id |
protocol_type | String | 消费组实现的协议,默认为sonsumer |
group_protocols | List | 包含此消费者支持的所有PartitionAssignor类型 |
protocol_name | String | PartitionAssignor类型 |
protocol_metadata | byte[] | 针对不一样PartitionAssignor类型序列化后的消费者订阅信息,包含用户自定义数据userData |
JoinGroupResponse
响应数据格式
名称 | 类型 | 说明 |
---|---|---|
error_code | short | 错误码 |
generation_id | int | 协调者分配的年代信息 |
group_protocol | String | 协调者选择的PartitionAssignor类型 |
leader_id | String | Leader的member_id |
member_id | String | 协调者分配给消费者的id |
members | Map集合 | 消费组中所有的消费者订阅信息 |
member_metadata | byte[] | 对应消费者的订阅信息 |
Synchronizing Group State
阶段:当leader消费者完成消费方案的分配后会发送SyncGroupRequest
请求给协调者,其余非leader节点也会发送该请求,只是请求参数为空,而后协调者将分配结果做为响应SyncGroupResponse
发给各个消费者,请求及相应的数据格式以下表所示:SyncGroupRequest
请求数据格式
名称 | 类型 | 说明 |
---|---|---|
group_id | String | 消费组的id |
generation_id | int | 消费组保存的年代信息 |
member_id | String | 协调者分配的消费者id |
member_assignment | byte[] | 分区分配结果 |
SyncGroupResponse
响应数据格式
名称 | 类型 | 说明 |
---|---|---|
error_code | short | 错误码 |
member_assignment | byte[] | 分配给当前消费者的分区 |
Kafka提供了三个分区分配策略:RangeAssignor、RoundRobinAssignor以及StickyAssignor,下面简单介绍下各个算法的实现。
RangeAssignor:kafka默认会采用此策略进行分区分配,主要流程以下
TP={TP0,Tp1,...,TPN+1}
。CG={C0,C1,...,CM+1}
。D=N/M
,R=N%M
。假设一个消费组中存在两个消费者{C0,C1},该消费组订阅了三个主题{T1,T2,T3},每一个主题分别存在三个分区,一共就有9个分区{TP1,TP2,...,TP9}。经过以上算法咱们能够获得D=4,R=1,那么消费组C0将消费的分区为{TP1,TP2,TP3,TP4,TP5},C1将消费分区{TP6,TP7,TP8,TP9}。这里存在一个问题,若是不能均分,那么前面的几个消费者将会多消费一个分区。
RoundRobinAssignor:使用该策略须要知足如下两个条件:1) 消费组中的全部消费者应该订阅主题相同;2) 同一个消费组的全部消费者在实例化时给每一个主题指定相同的流数。
在本文中,咱们围绕Kafka的特性,详细介绍了其原理实现,经过主题与日志的深刻剖析,了解了Kafka内部消息的存放、检索以及删除机制。副本系统中的ISR概念的引入解决同步副本与异步复制两种方案各自的缺陷,lead epoch机制的出现解决了数据丢失以及数据不一致问题。生产端的分区选择算法实现了数据均衡,幂等特性的支持则解决了以前存在的重复消息问题。
最后介绍了消费端的相关原理,消费组机制实现了消费端的消息隔离,既有广播也有独占的场景支持,而重平衡机制则保证的消费端的健壮性与扩展性。
[1] 徐郡明.Apach Kafka 源码剖析[M].北京.电子工业出版社,2017.
[2] Kafka深度解析.
[3] 深刻浅出理解基于 Kafka 和 ZooKeeper 的分布式消息队列.
[4] Kafka 事务性之幂等性实现.
[5] Kafka水位(high watermark)与leader epoch的讨论.
[6] kafka消费者如何分配分区.