在consumer章节,咱们知道,在consumer的pollOnce()中调用sendFetches()方法,html
本节主要介绍服务端处理fetchRequest请求的过程,FetchRequest由服务端函数KafkaApis.handleFetchRequest处理, FetchReuqest中重要的字段是requestInfo: Map[TopicAndPartition, PartitionFetchInfo])函数
即对于Fetch请求来讲,关注点是TopicAndParititon执行Fetch的offset以及FetchSize。 post
其实Kafka的主从同步也是经过FetchRequest来完成,与consumer拉取消息的过程类似,都在handleFetchRequest()中进行处理,不过broker对他们的处理在身份验证上作了区分,下面咱们看具体的FetchRequest处理过程:fetch
该函数的源码以下:ui
在函数的开头部分spa
先是执行readMessageSets读取了log上当前可读的数据,这个数据量若是已经知足了Fetch的条件的话,后面会直接返回。以后会判断Fetch是不是来自于Follower的同步请求,若是是,则会调用recordFollowerLogEndOffsets记录Follower的offset。.net
该函数会调用ReplicaManager.updateReplicaLEOAndPartitionHW:3d
虽然执行的代码量不多,但recordFollowerLogEndOffsets带来的影响很大:日志
这里和ProducerRequest同样,将FetchRequest组装为DelayedFetch并加入到Purgatory中。code
若是不是来自replica的请求,调用ReplicaManager.fetchMessages()方法,从本地副本中获取数据,并等待足够多的数据进行返回,其中传入的responseCallback方法在超时或者是知足fetch条件时将会被调用,将结果返回给客户端。
总体来讲,分为如下几步:
readFromLocalLog() 方法的处理过程:
存储层对 Fetch 请求的处理
每一个 Replica 会对应一个 log 对象,而每一个 log 对象会管理相应的 LogSegment 实例。
Log 对象的 read()
方法的实现以下所示:
从实现能够看出,该方法会先查找对应的 Segment 对象(日志分段),而后循环直到读取到数据结束,若是当前的日志分段没有读取到相应的数据,那么会更新日志分段及对应的最大位置。读取日志分段时,会先读取 offset 索引文件再读取数据文件,具体步骤以下:
LogSegment
关乎 数据文件、offset 索引文件和时间索引文件真正的操做都是在 LogSegment 对象中的,日志读取也与这个方法息息相关。
read()
方法的实现以下:
从上面的实现来看,上述过程分为如下三部分:
translateOffset()
translateOffset()
方法的实现过程主要分为两部分:
lookup()
查找方法,获取离 startOffset 最接近的物理位置;searchFor()
方法,从指定的物理位置开始读取每条数据,知道找到对应 offset 的物理位置。
参考资料: