>舒适提示:本文基于 Kafka 2.2.1 版本。java
上文 《源码分析 Kafka 消息发送流程》 已经详细介绍了 KafkaProducer send 方法的流程,该方法只是将消息追加到 KafKaProducer 的缓存中,并未真正的向 broker 发送消息,本文未来探讨 Kafka 的 Sender 线程。node
在 KafkaProducer 中会启动一个单独的线程,其名称为 “kafka-producer-network-thread | clientID”,其中 clientID 为生产者的 id 。api
咱们先来看一下其各个属性的含义:缓存
Sender#run服务器
public void run() { log.debug("Starting Kafka producer I/O thread."); while (running) { try { runOnce(); // [@1](https://my.oschina.net/u/1198) } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) { // @2 try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } if (forceClose) { // [@3](https://my.oschina.net/u/2648711) log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { this.client.close(); // @4 } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); }
代码@1:Sender 线程在运行状态下主要的业务处理方法,将消息缓存区中的消息向 broker 发送。 代码@2:若是主动关闭 Sender 线程,若是不是强制关闭,则若是缓存区还有消息待发送,再次调用 runOnce 方法将剩余的消息发送完毕后再退出。 代码@3:若是强制关闭 Sender 线程,则拒绝未完成提交的消息。 代码@4:关闭 Kafka Client 即网络通讯对象。网络
接下来将分别探讨其上述方法的实现细节。数据结构
Sender#runOnce架构
void runOnce() { // 此处省略与事务消息相关的逻辑 long currentTimeMs = time.milliseconds(); long pollTimeout = sendProducerData(currentTimeMs); // @1 client.poll(pollTimeout, currentTimeMs); // @2 }
本文不关注事务消息的实现原理,故省略了该部分的代码。 代码@1:调用 sendProducerData 方法发送消息。 代码@2:调用这个方法的做用?并发
接下来分别对上述两个方法进行深刻探究。app
接下来将详细分析其实现步骤。 Sender#sendProducerData
Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
Step1:首先根据当前时间,根据缓存队列中的数据判断哪些 topic 的 哪些分区已经达到发送条件。达到可发送的条件将在 2.1.1.1 节详细分析。
Sender#sendProducerData
if (!result.unknownLeaderTopics.isEmpty()) { for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics); this.metadata.requestUpdate(); }
Step2:若是在待发送的消息未找到其路由信息,则须要首先去 broker 服务器拉取对应的路由信息(分区的 leader 节点信息)。
Sender#sendProducerData
long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } }
Step3:移除在网络层面没有准备好的分区,而且计算在接下来多久的时间间隔内,该分区都将处于未准备状态。 一、在网络环节没有准备好的标准以下:
二、client pollDelayMs 预估分区在接下来多久的时间间隔内都将处于未转变好状态(not ready),其标准以下:
Sender#sendProducerData
// create produce requests Map<integer, list<producerbatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
Step4:根据已准备的分区,从缓存区中抽取待发送的消息批次(ProducerBatch),而且按照 nodeId:List<producerbatch> 组织,注意,抽取后的 ProducerBatch 将不能再追加消息了,就算还有剩余空间可用,具体抽取将在下文在详细介绍。
Sender#sendProducerData
addToInflightBatches(batches); public void addToInflightBatches(Map<integer, list<producerbatch>> batches) { for (List<producerbatch> batchList : batches.values()) { addToInflightBatches(batchList); } } private void addToInflightBatches(List<producerbatch> batches) { for (ProducerBatch batch : batches) { List<producerbatch> inflightBatchList = inFlightBatches.get(batch.topicPartition); if (inflightBatchList == null) { inflightBatchList = new ArrayList<>(); inFlightBatches.put(batch.topicPartition, inflightBatchList); } inflightBatchList.add(batch); } }
Step5:将抽取的 ProducerBatch 加入到 inFlightBatches 数据结构,该属性的声明以下:Map<topicpartition, list< producerbatch>> inFlightBatches,即按照 topic-分区 为键,存放已抽取的 ProducerBatch,这个属性的含义就是存储待发送的消息批次。能够根据该数据结构得知在消息发送时以分区为维度反馈 Sender 线程的“积压状况”,max.in.flight.requests.per.connection 就是来控制积压的最大数量,若是积压达到这个数值,针对该队列的消息发送会限流。
Sender#sendProducerData
accumulator.resetNextBatchExpiryTime(); List<producerbatch> expiredInflightBatches = getExpiredInflightBatches(now); List<producerbatch> expiredBatches = this.accumulator.expiredBatches(now); expiredBatches.addAll(expiredInflightBatches);
Step6:从 inflightBatches 与 batches 中查找已过时的消息批次(ProducerBatch),判断是否过时的标准是系统当前时间与 ProducerBatch 建立时间之差是否超过120s,过时时间能够经过参数 delivery.timeout.ms 设置。
Sender#sendProducerData
if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation"; failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false); if (transactionManager != null && expiredBatch.inRetry()) { // This ensures that no new batches are drained until the current in flight batches are fully resolved. transactionManager.markSequenceUnresolved(expiredBatch.topicPartition); } }
Step7:处理已超时的消息批次,通知该批消息发送失败,即经过设置 KafkaProducer#send 方法返回的凭证中的 FutureRecordMetadata 中的 ProduceRequestResult result,使之调用其 get 方法不会阻塞。
Sender#sendProducerData
sensors.updateProduceRequestMetrics(batches);
Step8:收集统计指标,本文不打算详细分析,但后续会专门对 Kafka 的 Metrics 设计进行一个深刻的探讨与学习。
Sender#sendProducerData
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now); pollTimeout = Math.max(pollTimeout, 0); if (!result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}", result.readyNodes); pollTimeout = 0; }
Step9:设置下一次的发送延时,待补充详细分析。
Sender#sendProducerData
sendProduceRequests(batches, now); private void sendProduceRequests(Map<integer, list<producerbatch>> collated, long now) { for (Map.Entry<integer, list<producerbatch>> entry : collated.entrySet()) sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue()); }
Step10:该步骤按照 brokerId 分别构建发送请求,即每个 broker 会将多个 ProducerBatch 一块儿封装成一个请求进行发送,同一时间,每个 与 broker 链接只会只能发送一个请求,注意,这里只是构建请求,并最终会经过 NetworkClient#send 方法,将该批数据设置到 NetworkClient 的待发送数据中,此时并无触发真正的网络调用。
sendProducerData 方法就介绍到这里了,既然这里尚未进行真正的网络请求,那在何时触发呢?
咱们继续回到 runOnce 方法。
public List<clientresponse> poll(long timeout, long now) { ensureActive(); if (!abortedSends.isEmpty()) { // If there are aborted sends because of unsupported version exceptions or disconnects, // handle them immediately without waiting for Selector#poll. List<clientresponse> responses = new ArrayList<>(); handleAbortedSends(responses); completeResponses(responses); return responses; } long metadataTimeout = metadataUpdater.maybeUpdate(now); // @1 try { this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // @2 } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List<clientresponse> responses = new ArrayList<>(); // @3 handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleInitiateApiVersionRequests(updatedNow); handleTimedOutRequests(responses, updatedNow); completeResponses(responses); // @4 return responses; }
本文并不会详细深刻探讨其网络实现部分,Kafka 的 网络通信后续我会专门详细的介绍,在这里先点出其关键点。 代码@1:尝试更新云数据。 代码@2:触发真正的网络通信,该方法中会经过收到调用 NIO 中的 Selector#select() 方法,对通道的读写就绪事件进行处理,当写事件就绪后,就会将通道中的消息发送到远端的 broker。 代码@3:而后会消息发送,消息接收、断开链接、API版本,超时等结果进行收集。 代码@4:并依次对结果进行唤醒,此时会将响应结果设置到 KafkaProducer#send 方法返回的凭证中,从而唤醒发送客户端,完成一次完整的消息发送流程。
Sender 发送线程的流程就介绍到这里了,接下来首先给出一张流程图,而后对上述流程中一些关键的方法再补充深刻探讨一下。
根据上面的源码分析得出上述流程图,图中对重点步骤也详细标注了其关键点。下面咱们对上述流程图中 Sender 线程依赖的相关类的核心方法进行解读,以便加深 Sender 线程的理解。
因为在讲解 Sender 发送流程中,大部分都是调用 RecordAccumulator 方法来实现其特定逻辑,故接下来重点对上述涉及到RecordAccumulator 的方法进行一个详细剖析,增强对 Sender 流程的理解。
该方法主要就是根据缓存区中的消息,判断哪些分区已经达到发送条件。
RecordAccumulator#ready
public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set<string> unknownLeaderTopics = new HashSet<>(); boolean exhausted = this.free.queued() > 0; for (Map.Entry<topicpartition, deque<producerbatch>> entry : this.batches.entrySet()) { // @1 TopicPartition part = entry.getKey(); Deque<producerbatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); // @2 synchronized (deque) { if (leader == null && !deque.isEmpty()) { // @3 // This is a partition for which leader is not known, but messages are available to send. // Note that entries are currently not removed from batches when deque is empty. unknownLeaderTopics.add(part.topic()); } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) { // @4 ProducerBatch batch = deque.peekFirst(); if (batch != null) { long waitedTimeMs = batch.waitedTimeMs(nowMs); boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; boolean full = deque.size() > 1 || batch.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { // @5 readyNodes.add(leader); } else { long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); // Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }
代码@1:对生产者缓存区 ConcurrentHashMap<topicpartition, deque< producerbatch>> batches 遍历,从中挑选已准备好的消息批次。 代码@2:从生产者元数据缓存中尝试查找分区(TopicPartition) 的 leader 信息,若是不存在,当将该 topic 添加到 unknownLeaderTopics (代码@3),稍后会发送元数据更新请求去 broker 端查找分区的路由信息。 代码@4:若是不在 readyNodes 中就须要判断是否知足条件,isMuted 与顺序消息有关,本文暂时不关注,在后面的顺序消息部分会重点探讨。 代码@5:这里就是判断是否准备好的条件,先一个一个来解读局部变量的含义。
RecordAccumulator#drain
public Map<integer, list<producerbatch>> drain(Cluster cluster, Set<node> nodes, int maxSize, long now) { // @1 if (nodes.isEmpty()) return Collections.emptyMap(); Map<integer, list<producerbatch>> batches = new HashMap<>(); for (Node node : nodes) { List<producerbatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now); // @2 batches.put(node.id(), ready); } return batches; }
代码@1:咱们首先来介绍该方法的参数:
代码@2:遍历全部节点,调用 drainBatchesForOneNode 方法抽取数据,组装成 Map<integer ** brokerid * , list< producerbatch>> batches。
接下来重点来看一下 drainBatchesForOneNode。 RecordAccumulator#drainBatchesForOneNode
private List<producerbatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) { int size = 0; List<partitioninfo> parts = cluster.partitionsForNode(node.id()); // @1 List<producerbatch> ready = new ArrayList<>(); int start = drainIndex = drainIndex % parts.size(); // @2 do { // @3 PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); this.drainIndex = (this.drainIndex + 1) % parts.size(); if (isMuted(tp, now)) continue; Deque<producerbatch> deque = getDeque(tp); // @4 if (deque == null) continue; synchronized (deque) { // invariant: !isMuted(tp,now) && deque != null ProducerBatch first = deque.peekFirst(); // @5 if (first == null) continue; // first != null boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; // @6 // Only drain the batch if it is not during backoff period. if (backoff) continue; if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // @7 break; } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; // 这里省略与事务消息相关的代码,后续会重点学习。 batch.close(); // @8 size += batch.records().sizeInBytes(); ready.add(batch); batch.drained(now); } } } while (start != drainIndex); return ready; }
代码@1:根据 brokerId 获取该 broker 上的全部主分区。 代码@2:初始化 start。这里首先来阐述一下 start 与 drainIndex 。
代码@3:循环从缓存区抽取对应分区中累积的数据。 代码@4:根据 topic + 分区号从生产者发送缓存区中获取已累积的双端Queue。 代码@5:从双端队列的头部获取一个元素。(消息追加时是追加到队列尾部)。 代码@6:若是当前批次是重试,而且还未到阻塞时间,则跳过该分区。 代码@7:若是当前已抽取的消息总大小 加上新的消息已超过 maxRequestSize,则结束抽取。 代码@8:将当前批次加入到已准备集合中,并关闭该批次,即不在容许向该批次中追加消息。
关于消息发送就介绍到这里,NetworkClient 的 poll 方法内部会调用 Selector 执行就绪事件的选择,并将抽取的消息经过网络发送到 Broker 服务器,关于网络后面的具体实现,将在后续文章中单独介绍。
做者介绍:丁威,《RocketMQ技术内幕》做者,RocketMQ 社区布道师,公众号:中间件兴趣圈 维护者,目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。能够点击连接:中间件知识星球,一块儿探讨高并发、分布式服务架构,交流源码。
</producerbatch></producerbatch></partitioninfo></producerbatch></integer></producerbatch></integer,></node></integer,></topicpartition,></producerbatch></topicpartition,></string></node></clientresponse></clientresponse></clientresponse></integer,></integer,></producerbatch></producerbatch></topicpartition,></producerbatch></producerbatch></producerbatch></integer,></producerbatch></integer,>