查询最新offset
段落。The high watermark indicated the offset of messages that are fully replicated, while the end-of-log offset might be larger if there are newly appended records to the leader partition which are not replicated yet.html
主要涉及的类有:AbstractFetcherThread
,ReplicaFetcherThread
,PartitionFetchState
,ReplicaFetcherManager
,ShutdownableThread
,ReplicaManager
, ReplicaFetcherManager
java
ReplicaManager
的becomeLeaderOrFollower
,makeFollowers
ReplicaFetcherManager
的addFetcherForPartitions
ReplicaFetcherManager
的createFetcherThread
方法建立fetcher线程。new ReplicaFetcherThread....一些字段的赋值说明:api
ReplicaFetcherThread类的结构继承关系是:
ShutdownableThread
|-- AbstractFetcherThread
|---|-- ReplicaFetcherThreadapp
由于ShutdownableThread
是个spin线程,子类实现doWork方法便可对接业务逻辑。
AbstractFetcherThread
中的doWork逻辑组织:fetch
FetchRequest
请求拉数据。FetchRequest
请求的响应,processFetchRequest
。注意:该请求不必定能请求到消息数据(Record),并且该请求也不是仅仅请求消息,还请求leader的高水位等值。maybeTruncate逻辑:ui
PartitionFetchState
实例isTruncatingLog
为true的分区。PartitionFetchState的truncatingLog字段何时为true,目前分析是ReplicaFetcherThread线程刚建立时,默认赋值了true,也就是一开始可能会截断。 ReplicaFetcherThread.buildLeaderEpochRequest。OffsetsForLeaderEpochRequest
。ReplicaFetcherThread.fetchEpochsFromLeader。处理FetchRequest
请求的响应的逻辑:spa
processPartitionData
,并更新分区状态 partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset)) ReplicaFetcherThread.processPartitionData
processPartitionData
由ReplicaFetcherThread
实现,主要逻辑有:
处理fetch请求的调用栈,消费者客户端与follower同步的fetch请求走的是一个逻辑,靠是不是来自follower作了些逻辑上的差别处理线程
ReplicaManager.kafka$server$ReplicaManager$$read$1(TopicPartition, FetchRequest$PartitionData, int, boolean, int, boolean, boolean, boolean, ReplicaQuota, IsolationLevel) line: 856 ReplicaManager$$anonfun$readFromLocalLog$1.apply(Tuple2<TopicPartition,PartitionData>) line: 962 ReplicaManager$$anonfun$readFromLocalLog$1.apply(Object) line: 961 ResizableArray$class.foreach(ResizableArray, Function1) line: 59 ArrayBuffer<A>.foreach(Function1<A,U>) line: 48 ReplicaManager.readFromLocalLog(int, boolean, boolean, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, IsolationLevel) line: 961 ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 790 ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803 KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597 KafkaApis.handle(RequestChannel$Request) line: 101
涉及到的位点信息及更新逻辑:
fetchOffset fetch开始位点,来自于fetch请求体
highWatermark 更新逻辑日志
kafka.server.ReplicaManager.readFromLocalLog(...).read(...) 操做逻辑:
标志度量数据
获取副本对象localReplica 获取逻辑是根据replica id(即broker id)获取,发fetch请求的时候 请求体中会带上replica id
val initialHighWatermark = localReplica.highWatermark.messageOffsetcode
更新leader节点上维护远端副本的LEO信息
有点绕 就是在leader节点上也 维护了 远端LEO的信息 ,由于leader节点高水位要靠远端的LEO来更新,leader节点高水位的更新逻辑就是 全部远端副本的LEO的最小值
调用栈以下:
Replica.logEndOffset_$eq(LogOffsetMetadata) line: 98 Replica.updateLogReadResult(LogReadResult) line: 83 Partition.updateReplicaLogReadResult(Replica, LogReadResult) line: 276 ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Tuple2<TopicPartition,LogReadResult>) line: 1314 ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Object) line: 1308 TraversableLike$$anonfun$map$1.apply(A) line: 234 TraversableLike$$anonfun$map$1.apply(Object) line: 234 ResizableArray$class.foreach(ResizableArray, Function1) line: 59 ArrayBuffer<A>.foreach(Function1<A,U>) line: 48 TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234 ArrayBuffer<A>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104 ReplicaManager.updateFollowerLogReadResults(int, Seq<Tuple2<TopicPartition,LogReadResult>>) line: 1308 ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 799 ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803 KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597
能够看出来是在leader处理fetch请求的时候作的逻辑,fetch请求带上来fetch offset就当成了远端副本的LEO
TODO
建立同步fetch线程
ReplicaFetcherManager.createFetcherThread(int, BrokerEndPoint) line: 30
ReplicaFetcherManager(AbstractFetcherManager).kafka$server$AbstractFetcherManager$$addAndStartFetcherThread$1(BrokerAndFetcherId, BrokerIdAndFetcherId) line: 80
AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(Tuple2<BrokerAndFetcherId,Map<TopicPartition,BrokerAndInitialOffset>>) line: 94
AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(Object) line: 85
TraversableLike$WithFilter$$anonfun$foreach$1.apply(A) line: 733
Map$Map1<A,B>.foreach(Function1<Tuple2<A,B>,U>) line: 116
TraversableLike$WithFilter.foreach(Function1<A,U>) line: 732
ReplicaFetcherManager(AbstractFetcherManager).addFetcherForPartitions(Map<TopicPartition,BrokerAndInitialOffset>) line: 85
ReplicaManager.makeFollowers(int, int, Map<Partition,PartitionState>, int, Map<TopicPartition,Errors>) line: 1272
ReplicaManager.becomeLeaderOrFollower(int, LeaderAndIsrRequest, Function2<Iterable
KafkaApis.handleLeaderAndIsrRequest(RequestChannel$Request) line: 173
KafkaApis.handle(RequestChannel$Request) line: 103
KafkaRequestHandler.run() line: 65
KafkaThread(Thread).run() line: 748
构建请求
ReplicaFetcherThread.buildFetchRequest(Seq<Tuple2<TopicPartition,PartitionFetchState>>) line: 234
AbstractFetcherThread$$anonfun$2.apply() line: 104
AbstractFetcherThread$$anonfun$2.apply() line: 103
CoreUtils$.inLock(Lock, Function0
ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 103
ReplicaFetcherThread(ShutdownableThread).run() line: 64
看的过程当中的临时记录:
[2019-09-25 18:07:13,787] INFO Handling request:RequestHeader(apiKey=OFFSET_FOR_LEADER_EPOCH, apiVersion=0, clientId=broker-0-fetcher-0, correlationId=0) -- {topics=[{topic=test.vv19,partitions=[{partition=0,leader_epoch=25}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$) [2019-09-25 22:13:02,501] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [37]lso.messageOffset: [37] (com.code260.ss.KafkaTestUtils$) [2019-09-25 22:13:02,502] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [37]lso.messageOffset: [37] (com.code260.ss.KafkaTestUtils$) [2019-09-25 22:13:03,006] INFO testEnter0005-Received response:apikey:FETCH correlationId 24; (com.code260.ss.KafkaTestUtils$) LeaderEpochFileCache$$anonfun$assign$1.apply$mcV$sp() line: 62 LeaderEpochFileCache$$anonfun$assign$1.apply() line: 61 LeaderEpochFileCache$$anonfun$assign$1.apply() line: 61 CoreUtils$.inLock(Lock, Function0<T>) line: 217 CoreUtils$.inWriteLock(ReadWriteLock, Function0<T>) line: 225 LeaderEpochFileCache.assign(int, long) line: 60 Log$$anonfun$append$2$$anonfun$apply$9.apply(MutableRecordBatch) line: 689 Log$$anonfun$append$2$$anonfun$apply$9.apply(Object) line: 687 Iterator$class.foreach(Iterator, Function1) line: 891 Wrappers$JIteratorWrapper<A>(AbstractIterator<A>).foreach(Function1<A,U>) line: 1334 IterableLike$class.foreach(IterableLike, Function1) line: 72 Wrappers$JIterableWrapper<A>(AbstractIterable<A>).foreach(Function1<A,U>) line: 54 Log$$anonfun$append$2.apply() line: 687 Log$$anonfun$append$2.apply() line: 624 Log.maybeHandleIOException(Function0<String>, Function0<T>) line: 1669 Log.append(MemoryRecords, boolean, boolean, int) line: 624 Log.appendAsLeader(MemoryRecords, int, boolean) line: 597 Partition$$anonfun$13.apply() line: 500 Partition$$anonfun$13.apply() line: 488 CoreUtils$.inLock(Lock, Function0<T>) line: 217 CoreUtils$.inReadLock(ReadWriteLock, Function0<T>) line: 223 Partition.appendRecordsToLeader(MemoryRecords, boolean, int) line: 487 ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2<TopicPartition,MemoryRecords>) line: 724 ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) line: 708 TraversableLike$$anonfun$map$1.apply(A) line: 234 TraversableLike$$anonfun$map$1.apply(Object) line: 234 HashMap$$anonfun$foreach$1.apply(DefaultEntry<A,B>) line: 130 HashMap$$anonfun$foreach$1.apply(Object) line: 130 HashTable$class.foreachEntry(HashTable, Function1) line: 236 HashMap<A,B>.foreachEntry(Function1<DefaultEntry<A,B>,U>) line: 40 HashMap<A,B>.foreach(Function1<Tuple2<A,B>,U>) line: 130 TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234 HashMap<A,B>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104 ReplicaManager.appendToLocalLog(boolean, boolean, Map<TopicPartition,MemoryRecords>, short) line: 708 ReplicaManager.appendRecords(long, short, boolean, boolean, Map<TopicPartition,MemoryRecords>, Function1<Map<TopicPartition,PartitionResponse>,BoxedUnit>, Option<Lock>, Function1<Map<TopicPartition,RecordsProcessingStats>,BoxedUnit>) line: 458 KafkaApis.handleProduceRequest(RequestChannel$Request) line: 460 KafkaApis.handle(RequestChannel$Request) line: 100 KafkaRequestHandler.run() line: 65 KafkaThread(Thread).run() line: 748 LeaderEpochFileCache$$anonfun$assign$1.apply$mcV$sp() line: 62 LeaderEpochFileCache$$anonfun$assign$1.apply() line: 61 LeaderEpochFileCache$$anonfun$assign$1.apply() line: 61 CoreUtils$.inLock(Lock, Function0<T>) line: 217 CoreUtils$.inWriteLock(ReadWriteLock, Function0<T>) line: 225 LeaderEpochFileCache.assign(int, long) line: 60 Log$$anonfun$append$2$$anonfun$apply$9.apply(MutableRecordBatch) line: 689 Log$$anonfun$append$2$$anonfun$apply$9.apply(Object) line: 687 Iterator$class.foreach(Iterator, Function1) line: 891 Wrappers$JIteratorWrapper<A>(AbstractIterator<A>).foreach(Function1<A,U>) line: 1334 IterableLike$class.foreach(IterableLike, Function1) line: 72 Wrappers$JIterableWrapper<A>(AbstractIterable<A>).foreach(Function1<A,U>) line: 54 Log$$anonfun$append$2.apply() line: 687 Log$$anonfun$append$2.apply() line: 624 Log.maybeHandleIOException(Function0<String>, Function0<T>) line: 1669 Log.append(MemoryRecords, boolean, boolean, int) line: 624 Log.appendAsFollower(MemoryRecords) line: 607 ReplicaFetcherThread.processPartitionData(TopicPartition, long, ReplicaFetcherThread$PartitionData) line: 123 ReplicaFetcherThread.processPartitionData(TopicPartition, long, AbstractFetcherThread$PartitionData) line: 62 AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(PartitionFetchState) line: 184 AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(Object) line: 172 Some<A>(Option<A>).foreach(Function1<A,U>) line: 257 AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Tuple2<TopicPartition,PartitionData>) line: 172 AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Object) line: 169 ResizableArray$class.foreach(ResizableArray, Function1) line: 59 ArrayBuffer<A>.foreach(Function1<A,U>) line: 48 AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp() line: 169 AbstractFetcherThread$$anonfun$processFetchRequest$2.apply() line: 169 AbstractFetcherThread$$anonfun$processFetchRequest$2.apply() line: 169 CoreUtils$.inLock(Lock, Function0<T>) line: 217 ReplicaFetcherThread(AbstractFetcherThread).processFetchRequest(AbstractFetcherThread$FetchRequest) line: 167 ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 113 ReplicaFetcherThread(ShutdownableThread).run() line: 64 ReplicaFetcherThread.fetchEpochsFromLeader(Map<TopicPartition,Object>) line: 332 ReplicaFetcherThread(AbstractFetcherThread).maybeTruncate() line: 130 ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 102 ReplicaFetcherThread(ShutdownableThread).run() line: 64 kafka.server.ReplicaManager.lastOffsetForLeaderEpoch kafka.server.ReplicaFetcherThread.maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]) kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(topicPartition: TopicPartition) kafka.server.ReplicaFetcherThread.maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset])