目录:java
《NetworkClient类代码分析》fetch
-------------------------------------------------------------------ui
前文咱们分析了Kafka生产者端的源代码,了解了生产者产生消息的过程。消息由生产者发布到某个主题的某个分区上,其实最终是被存储在服务端的某个broker上。而消费者由订阅行为来决定它所要消费的主题和分区。消费者经过poll操做,不断的从服务端拉取该主题分区上产生的消息。.net
相信有兴趣看kafka源代码的同窗,确定对kafka的基本概念和原理有所了解。关于消费者,咱们知道在服务端会有GroupCoordinator,负责每一个consumer group的leader的选举,以及分发分区分配结果,而coumer的leader则会根据分区分配策略进行分区分配。这里须要注意,分区分配结果并非由leader分发给同组的consumer,而是leader返回给GroupCoordinator,再有GroupCoordinator进行分发。线程
每当Broker有变化,或者Consumer Group有出入组的变化时,会触发ConsumerGroup的rebalance。也就是上述的分区分配工做。
另外消费者本地保存了它所负责主题分区的消费状态,经过手动和自动的方式提交到服务端的内部主题中。rebalance事后,消费者从新从内部主题获取对应主题分区的消费位置。
关于消费者更多的内容介绍请参考我另一篇文章《Apache Kafka核心组件和流程-协调器(消费者和组协调器)》
上面咱们回顾了Consumer的设计和流程,为咱们进入源代码分析作好铺垫。接下来咱们将从KafkaConsumer入手,进行代码分析。
咱们先看下使用KafkaConsumer进行消费的部分代码:
private final KafkaConsumer<Integer, String> consumer; ......... consumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); }
以上代码来自于源代码包中的例子,咱们能够看到KafkaConsumer先订阅topic,而后经过poll方法进行消息拉取。
能够看到KafkaConsumer经过poll方法进行消费,这也是KafkaConsumer最主要的方法。
咱们先看看KafkaConsumer内部的其余组件有哪些,见下图:
上图介绍了KafkaConsumer内部的几个重要组件:
一、前文说过消费者要本身记录消费的位置(但也须要提交到服务端保存,为了rebalance后的消费能衔接上),因此咱们须要SubScriptionState来保存消费的状态。
二、ConsumerCoordinator负责和GroupCoordinator通信,例如在leader选举,入组,分区分配等过程。
三、ConsumerNetworkClient是对NetworkClient的封装,若是你是从producer看过来的,想必对NetworkClient十分了解,他对nio的组件进行封装,实现网络IO。
四、PartitionAssignor,这是分区分配策略,在进行分区分配的时候会用到。
五、Fetcher负责组织拉取消息的请求,以及处理返回。不过须要注意它并不作网络IO,网络IO仍是由ConsumerNetworkClient完成。它其实对应生产者中的Sender。
咱们抛开订阅、rebalance这些流程,先以kafka消费流程为主,进行分析。有些组件在消费流程中是涉及不到的。消费流程主要涉及到Fetcher、SubScriptionState和ConsumerNetworkClient。特别是Fetcher,承担了重要的工做。不过咱们还须要一步步来,先进入poll方法的分析。
这是消息拉取的入口方法,他会从上次消费的位置拉取消息,也能够手动指定消费位置。入参是阻塞的时长,若是有消息将会当即返回,不然会阻塞到超时,若是没有数据则返回空的数据集合。
代码以下:
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) { acquireAndEnsureOpen(); try { if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } // poll for new data until the timeout expires do { client.maybeTriggerWakeup(); if (includeMetadataInTimeout) { if (!updateAssignmentMetadataIfNeeded(timer)) { return ConsumerRecords.empty(); } } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) { log.warn("Still waiting for metadata"); } } final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { client.pollNoWakeup(); } return this.interceptors.onConsume(new ConsumerRecords<>(records)); } } while (timer.notExpired()); return ConsumerRecords.empty(); } finally { release(); } }
逻辑说明:
一、经过acquireAndEnsureOpen()确保本对象是单线程进入,这是由于KafkaConsumer非线程安全。
二、检查是否订阅了topic
三、进入主循环,条件是没有超时
四、在主循环中经过pollForFetches()拉取一次消息。这个方法中先检查是否经存在拉取过的未加工消息,这是由于上一轮次拉取作了提早拉取处理。有可能已经拉取回消息等待处理。若是没有已拉取未加工数据,则准备新的拉取请求,网络IO拉取消息,加工拉取回来的数据。
五、若是上一步拉取到消息,并不会当即返回,而是再一次触发消息拉取,而且使用的是非阻塞方式,调用client.pollNoWakeup()。这样作的目的是,提早网络IO,把消息拉取请求发出去。在网络IO的同时,消息数据返回给consumer的调用者进行业务处理。这样作到了并行处理,提升了效率。等下次调用KafkaConsumer进行poll,当进行到第4步时,有可能直接返回了上轮次提早拉取到的消息,从而省去了网络IO时间。
咱们经过下图帮助理解上面四、5步的设计:
图中带颜色的方框表明在整个拉取消息的流程中,不一样的处理过程,分布于不一样的对象中。图中下半部分展现的是Kafka处理逻辑。能够看到在第一轮次调用了两次ConusmerNetworkClient进行IO处理,第二次IO的同时,调用者已经开始拿到返回的消息进行业务处理,这里实现了并行处理。进入第二轮次,咱们发现kafkaConsumer能够直接取到上轮第二次IO回来的消息进行加工,加工后返回调用者,进行业务处理,同时下一轮次的消息拉取异步进行中。能够看到第二轮次的总时长已经没有了网络IO的时长,由于这部分工做在上一轮次已经异步进行完成。
若是不这样作,会怎么样呢?咱们看图中上半部分,咱们发现每一个轮次都是同样的,网络IO都须要同步等待,从第二轮开始,整个消息拉取处理的时长明显增长了IO部分,会更长。
以上状况比较极端,每次提早IO都会返回数据,而且消息的业务处理时长大于网络IO。这种状况下,能最大发挥出异步IO的优点。
以上这种设计的小细节真的值得咱们来学习。读源代码在了解原理的同时,咱们也要多总结优秀的设计思想,对咱们的工做颇有帮助。
从上面的分析看到,真正消息拉取的代码是:
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
下面咱们继续分析pollForFetches方法。
这个方法完成了从服务端拉取消息的动做,这个过程主要使用了Fetcher和ConsumerNetworkClient两个组件。Fetcher负责准备好拉取消息的request、处理response,而且把消息转化为对调用者友好的格式。ConsumerNetworkClient负责把请求发送出去,接收返回,也就是网络IO工做。
他的主要流程是以下四步:
一、查看是否已经存在拉取回来未加工的消息原始数据,有的话当即调用fetcher.fetchedRecords()加工,而后返回。
二、若是没有未加工的原始数据,那么调用fetcher.sendFetches()准备拉取请求。
三、经过ConsumerNetworkClient发送拉取请求。
四、加工拉取回的原始数据,返回。
其实正常来讲2,3,4步流程就足够了。为何会有第1步呢?那些已经存在的未加工的数据哪里来的?若是你理解了前面所讲的异步拉取设计,那么你应该知道答案。这些已经存在的未加工数据来自于上一轮次的异步IO。正是由于有了异步的IO拉取,才会有第一步的处理可能。
完整代码以下:
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) { long pollTimeout = Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); // if data is available already, return it immediately final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) { return records; } // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); // We do not want to be stuck blocking in poll if we are missing some positions // since the offset lookup may be backing off after a failure // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call // updateAssignmentMetadataIfNeeded before this method. if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { pollTimeout = retryBackoffMs; } Timer pollTimer = time.timer(pollTimeout); client.poll(pollTimer, () -> { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); }); timer.update(pollTimer.currentTimeMs()); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster if (coordinator.rejoinNeededOrPending()) { return Collections.emptyMap(); } return fetcher.fetchedRecords(); }
能够看到以上过程除了IO操做外,都是经过Fetcher完成的,足以体现他的重要。接下来的章节将会重点分析Fetcher。
本篇先是回顾了Kafka消费者的设计,而后从KafkaConsumer的Poll方法入手对拉取的逻辑进行分析。Kafka很巧妙的采用异步IO方式,缩短整个流程的时长。接下来咱们将会进入Fetcher的分析,看其如何准备拉取消息的请求,并完成消息的转化处理。