你绝对能看懂的Kafka源代码分析-KafkaConsumer类代码分析

目录:java

《Kafka Producer设计分析》安全

《KafkaProducer类代码分析》网络

《RecordAccumulator类代码分析》异步

《Sender类代码分析》学习

《NetworkClient类代码分析》fetch

-------------------------------------------------------------------ui

上一节《NetworkClient类代码分析》this

前文咱们分析了Kafka生产者端的源代码,了解了生产者产生消息的过程。消息由生产者发布到某个主题的某个分区上,其实最终是被存储在服务端的某个broker上。而消费者由订阅行为来决定它所要消费的主题和分区。消费者经过poll操做,不断的从服务端拉取该主题分区上产生的消息。.net

相信有兴趣看kafka源代码的同窗,确定对kafka的基本概念和原理有所了解。关于消费者,咱们知道在服务端会有GroupCoordinator,负责每一个consumer group的leader的选举,以及分发分区分配结果,而coumer的leader则会根据分区分配策略进行分区分配。这里须要注意,分区分配结果并非由leader分发给同组的consumer,而是leader返回给GroupCoordinator,再有GroupCoordinator进行分发。线程

每当Broker有变化,或者Consumer Group有出入组的变化时,会触发ConsumerGroup的rebalance。也就是上述的分区分配工做。

另外消费者本地保存了它所负责主题分区的消费状态,经过手动和自动的方式提交到服务端的内部主题中。rebalance事后,消费者从新从内部主题获取对应主题分区的消费位置。

关于消费者更多的内容介绍请参考我另一篇文章《Apache Kafka核心组件和流程-协调器(消费者和组协调器)》

上面咱们回顾了Consumer的设计和流程,为咱们进入源代码分析作好铺垫。接下来咱们将从KafkaConsumer入手,进行代码分析。

KafkaConsumer

咱们先看下使用KafkaConsumer进行消费的部分代码:

private final KafkaConsumer<Integer, String> consumer;

.........

consumer.subscribe(Collections.singletonList(this.topic));

ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));

for (ConsumerRecord<Integer, String> record : records) {

System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());

}

以上代码来自于源代码包中的例子,咱们能够看到KafkaConsumer先订阅topic,而后经过poll方法进行消息拉取。

能够看到KafkaConsumer经过poll方法进行消费,这也是KafkaConsumer最主要的方法。

咱们先看看KafkaConsumer内部的其余组件有哪些,见下图:

上图介绍了KafkaConsumer内部的几个重要组件:

一、前文说过消费者要本身记录消费的位置(但也须要提交到服务端保存,为了rebalance后的消费能衔接上),因此咱们须要SubScriptionState来保存消费的状态。

二、ConsumerCoordinator负责和GroupCoordinator通信,例如在leader选举,入组,分区分配等过程。

三、ConsumerNetworkClient是对NetworkClient的封装,若是你是从producer看过来的,想必对NetworkClient十分了解,他对nio的组件进行封装,实现网络IO。

四、PartitionAssignor,这是分区分配策略,在进行分区分配的时候会用到。

五、Fetcher负责组织拉取消息的请求,以及处理返回。不过须要注意它并不作网络IO,网络IO仍是由ConsumerNetworkClient完成。它其实对应生产者中的Sender。

咱们抛开订阅、rebalance这些流程,先以kafka消费流程为主,进行分析。有些组件在消费流程中是涉及不到的。消费流程主要涉及到Fetcher、SubScriptionState和ConsumerNetworkClient。特别是Fetcher,承担了重要的工做。不过咱们还须要一步步来,先进入poll方法的分析。

poll()方法

这是消息拉取的入口方法,他会从上次消费的位置拉取消息,也能够手动指定消费位置。入参是阻塞的时长,若是有消息将会当即返回,不然会阻塞到超时,若是没有数据则返回空的数据集合。

代码以下:

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    acquireAndEnsureOpen();
    try {
        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
        }

        // poll for new data until the timeout expires
        do {
            client.maybeTriggerWakeup();

            if (includeMetadataInTimeout) {
                if (!updateAssignmentMetadataIfNeeded(timer)) {
                    return ConsumerRecords.empty();
                }
            } else {
                while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
                    log.warn("Still waiting for metadata");
                }
            }

            final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
            if (!records.isEmpty()) {
                // before returning the fetched records, we can send off the next round of fetches
                // and avoid block waiting for their responses to enable pipelining while the user
                // is handling the fetched records.
                //
                // NOTE: since the consumed position has already been updated, we must not allow
                // wakeups or any other errors to be triggered prior to returning the fetched records.
                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                    client.pollNoWakeup();
                }

                return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }
        } while (timer.notExpired());

        return ConsumerRecords.empty();
    } finally {
        release();
    }
}

逻辑说明:

一、经过acquireAndEnsureOpen()确保本对象是单线程进入,这是由于KafkaConsumer非线程安全。

二、检查是否订阅了topic

三、进入主循环,条件是没有超时

四、在主循环中经过pollForFetches()拉取一次消息。这个方法中先检查是否经存在拉取过的未加工消息,这是由于上一轮次拉取作了提早拉取处理。有可能已经拉取回消息等待处理。若是没有已拉取未加工数据,则准备新的拉取请求,网络IO拉取消息,加工拉取回来的数据。

五、若是上一步拉取到消息,并不会当即返回,而是再一次触发消息拉取,而且使用的是非阻塞方式,调用client.pollNoWakeup()。这样作的目的是,提早网络IO,把消息拉取请求发出去。在网络IO的同时,消息数据返回给consumer的调用者进行业务处理。这样作到了并行处理,提升了效率。等下次调用KafkaConsumer进行poll,当进行到第4步时,有可能直接返回了上轮次提早拉取到的消息,从而省去了网络IO时间。

咱们经过下图帮助理解上面四、5步的设计:

图中带颜色的方框表明在整个拉取消息的流程中,不一样的处理过程,分布于不一样的对象中。图中下半部分展现的是Kafka处理逻辑。能够看到在第一轮次调用了两次ConusmerNetworkClient进行IO处理,第二次IO的同时,调用者已经开始拿到返回的消息进行业务处理,这里实现了并行处理。进入第二轮次,咱们发现kafkaConsumer能够直接取到上轮第二次IO回来的消息进行加工,加工后返回调用者,进行业务处理,同时下一轮次的消息拉取异步进行中。能够看到第二轮次的总时长已经没有了网络IO的时长,由于这部分工做在上一轮次已经异步进行完成。

若是不这样作,会怎么样呢?咱们看图中上半部分,咱们发现每一个轮次都是同样的,网络IO都须要同步等待,从第二轮开始,整个消息拉取处理的时长明显增长了IO部分,会更长。

以上状况比较极端,每次提早IO都会返回数据,而且消息的业务处理时长大于网络IO。这种状况下,能最大发挥出异步IO的优点。

以上这种设计的小细节真的值得咱们来学习。读源代码在了解原理的同时,咱们也要多总结优秀的设计思想,对咱们的工做颇有帮助。

从上面的分析看到,真正消息拉取的代码是:

final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);

下面咱们继续分析pollForFetches方法。

pollForFetches()方法

这个方法完成了从服务端拉取消息的动做,这个过程主要使用了Fetcher和ConsumerNetworkClient两个组件。Fetcher负责准备好拉取消息的request、处理response,而且把消息转化为对调用者友好的格式。ConsumerNetworkClient负责把请求发送出去,接收返回,也就是网络IO工做。

他的主要流程是以下四步:

一、查看是否已经存在拉取回来未加工的消息原始数据,有的话当即调用fetcher.fetchedRecords()加工,而后返回。

二、若是没有未加工的原始数据,那么调用fetcher.sendFetches()准备拉取请求。

三、经过ConsumerNetworkClient发送拉取请求。

四、加工拉取回的原始数据,返回。

其实正常来讲2,3,4步流程就足够了。为何会有第1步呢?那些已经存在的未加工的数据哪里来的?若是你理解了前面所讲的异步拉取设计,那么你应该知道答案。这些已经存在的未加工数据来自于上一轮次的异步IO。正是由于有了异步的IO拉取,才会有第一步的处理可能。

完整代码以下:

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
    long pollTimeout = Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

    // if data is available already, return it immediately
    final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty()) {
        return records;
    }

    // send any new fetches (won't resend pending fetches)
    fetcher.sendFetches();

    // We do not want to be stuck blocking in poll if we are missing some positions
    // since the offset lookup may be backing off after a failure

    // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
    // updateAssignmentMetadataIfNeeded before this method.
    if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
        pollTimeout = retryBackoffMs;
    }

    Timer pollTimer = time.timer(pollTimeout);
    client.poll(pollTimer, () -> {
        // since a fetch might be completed by the background thread, we need this poll condition
        // to ensure that we do not block unnecessarily in poll()
        return !fetcher.hasCompletedFetches();
    });
    timer.update(pollTimer.currentTimeMs());

    // after the long poll, we should check whether the group needs to rebalance
    // prior to returning data so that the group can stabilize faster
    if (coordinator.rejoinNeededOrPending()) {
        return Collections.emptyMap();
    }

    return fetcher.fetchedRecords();
}

能够看到以上过程除了IO操做外,都是经过Fetcher完成的,足以体现他的重要。接下来的章节将会重点分析Fetcher。

小结

本篇先是回顾了Kafka消费者的设计,而后从KafkaConsumer的Poll方法入手对拉取的逻辑进行分析。Kafka很巧妙的采用异步IO方式,缩短整个流程的时长。接下来咱们将会进入Fetcher的分析,看其如何准备拉取消息的请求,并完成消息的转化处理。