Server端处理fetchRequest请求

Server端处理fetchRequest请求

1前言

在consumer章节,咱们知道,在consumer的pollOnce()中调用sendFetches()方法,html

 

本节主要介绍服务端处理fetchRequest请求的过程,FetchRequest由服务端函数KafkaApis.handleFetchRequest处理, FetchReuqest中重要的字段是requestInfo: Map[TopicAndPartition, PartitionFetchInfo])函数

即对于Fetch请求来讲,关注点是TopicAndParititon执行Fetch的offset以及FetchSize。 post

其实Kafka的主从同步也是经过FetchRequest来完成,与consumer拉取消息的过程类似,都在handleFetchRequest()中进行处理,不过broker对他们的处理在身份验证上作了区分,下面咱们看具体的FetchRequest处理过程:fetch

2 handleFetchRequest()处理过程

该函数的源码以下:ui

 

 

 

 

在函数的开头部分spa

 

 

先是执行readMessageSets读取了log上当前可读的数据,这个数据量若是已经知足了Fetch的条件的话,后面会直接返回。以后会判断Fetch是不是来自于Follower的同步请求,若是是,则会调用recordFollowerLogEndOffsets记录Follower的offset。.net

该函数会调用ReplicaManager.updateReplicaLEOAndPartitionHW:3d

 

 

虽然执行的代码量不多,但recordFollowerLogEndOffsets带来的影响很大:日志

  • l  根据Fetch读到的message的offset表明了follower的leo,因此replica中的logEndOffsetMetadata和logEndOffsetUpdateTimeMsValue变量会更新;
  • l  replicaManager.maybeShrinkIsr函数做为一个按期任务,会根据replica的logEndOffsetMetadata和logEndOffsetUpdateTimeMsValue变量清理ISR,将leo落后太多或者长时间没更新的replica从ISR中踢出;
  • l  replica的leo更新,若是知足条件leo > leaderHw,则该replica有可能会成为ISR中的一员,并更新zk中的ISR记录。
  • l  若是replica原本就是ISR,leo更新意味着leaderHw也有可能会发生变化。
  • l  在requiredAcks>1或者=-1时,DelayedProduce请求所需条件与replica是否知足leo>requiredOffset有关,因此须要调用producerRequestPurgatory.update;
  • l  若是FetchRequest不想等待,或者已经读到了足够的数据,FetchRequest会直接使用已经读到的数据进行返回。
  • l  不然,会执行以下代码:

 

 

这里和ProducerRequest同样,将FetchRequest组装为DelayedFetch并加入到Purgatory中。code

若是不是来自replica的请求,调用ReplicaManager.fetchMessages()方法,从本地副本中获取数据,并等待足够多的数据进行返回,其中传入的responseCallback方法在超时或者是知足fetch条件时将会被调用,将结果返回给客户端。

 

 

 

总体来讲,分为如下几步:

  • l  readFromLocalLog():调用该方法,从本地日志拉取相应的数据;
  • l  判断 Fetch 请求来源,若是来自副本同步,那么更新该副本的 the end offset 记录,若是该副本不在 isr 中,并判断是否须要更新 isr;
  • l  返回结果,知足条件的话立马返回,不然的话,经过延迟操做,延迟返回结果。

 

 

 

 

 

 

readFromLocalLog() 方法的处理过程:

  • l  先根据要拉取的 topic-partition 获取对应的 Partition 对象,根据 Partition 对象获取对应的 Replica 对象;
  • l  根据 Replica 对象找到对应的 Log 对象,而后调用其 read() 方法从指定的位置读取数据。

存储层对 Fetch 请求的处理

每一个 Replica 会对应一个 log 对象,而每一个 log 对象会管理相应的 LogSegment 实例。

Log 对象的 read() 方法的实现以下所示:

 

 

从实现能够看出,该方法会先查找对应的 Segment 对象(日志分段),而后循环直到读取到数据结束,若是当前的日志分段没有读取到相应的数据,那么会更新日志分段及对应的最大位置。读取日志分段时,会先读取 offset 索引文件再读取数据文件,具体步骤以下:

  • l  根据要读取的起始偏移量(startOffset)读取 offset 索引文件中对应的物理位置;
  • l  查找 offset 索引文件最后返回:起始偏移量对应的最近物理位置(startPosition);
  • l  根据 startPosition 直接定位到数据文件,而后读取数据文件内容;
  • l  最多能读到数据文件的结束位置(maxPosition)。

LogSegment

关乎 数据文件、offset 索引文件和时间索引文件真正的操做都是在 LogSegment 对象中的,日志读取也与这个方法息息相关。

read() 方法的实现以下:

 

 

从上面的实现来看,上述过程分为如下三部分:

  • l  根据 startOffset 获得实际的物理位置(translateOffset());
  • l  计算要读取的实际物理长度;
  • l  根据实际起始物理位置和要读取实际物理长度读取数据文件。

translateOffset()

translateOffset() 方法的实现过程主要分为两部分:

  • l  查找 offset 索引文件:调用 offset 索引文件的 lookup() 查找方法,获取离 startOffset 最接近的物理位置;
  • l  调用数据文件的 searchFor() 方法,从指定的物理位置开始读取每条数据,知道找到对应 offset 的物理位置。

 

 

 

参考资料:

https://blog.csdn.net/c395318621/article/details/53164123

http://www.daleizhou.tech/posts/consume-messages.html

相关文章
相关标签/搜索