Kafka 源码解析之 Producer 单 Partition 顺序性实现及配置说明(五)

欢迎你们关注 github.com/hsfxuebao/j… ,但愿对你们有所帮助,要是以为能够的话麻烦给点一下Star哈html

今天把 Kafka Producer 最后一部分给讲述一下,Producer 大部份内容都已经在前面几篇文章介绍过了,这里简单作个收尾,但并非对前面的总结,本文从两块来说述:RecordAccumulator 类的实现、Kafka Producer 如何保证其顺序性以及 Kafka Producer 的配置说明,每一个 Producer 线程都会有一个 RecordAccumulator 对象,它负责缓存要发送 RecordBatch、记录发送的状态而且进行相应的处理,这里会详细讲述 Kafka Producer 如何保证单 Partition 的有序性。最后,简单介绍一下 Producer 的参数配置说明,只有正确地理解 Producer 相关的配置参数,才能更好地使用 Producer,发挥其相应的做用。java

RecordAccumulator

这里再看一下 RecordAccumulator 的数据结构,以下图所示,每一个 topic-partition 都有一个对应的 deque,deque 中存储的是 RecordBatch,它是发送的基本单位,只有这个 topic-partition 的 RecordBatch 达到大小或时间要求才会触发发送操做(但并非只有达到这两个条件之一才会被发送,这点要理解清楚)。node

RecordAccumulator 模型RecordAccumulator 模型git

再看一下 RecordAccumulator 类的主要方法介绍,以下图所示。github

RecordAccumulator 主要方法及其说明RecordAccumulator 主要方法及其说明apache

这张图基本上涵盖了 RecordAccumulator 的主要方法,下面会选择其中几个方法详细讲述,会围绕着 Kafka Producer 如何实现单 Partition 顺序性这个主题来说述。缓存

mutePartition() 与 unmutePartition()

先看下 mutePartition()unmutePartition() 这两个方法,它们是保证有序性关键之一,其主要作用就是将指定的 topic-partition 从 muted 集合中加入或删除,后面会看到它们的做用。markdown

private final Set<TopicPartition> muted;

public void mutePartition(TopicPartition tp) {
    muted.add(tp);
}

public void unmutePartition(TopicPartition tp) {
    muted.remove(tp);
}
复制代码

这里先说一下这两个方法调用的条件,这样的话,下面在介绍其余方法时才会更容易理解:数据结构

  • mutePartition():若是要求保证顺序性,那么这个 tp 对应的 RecordBatch 若是要开始发送,就将这个 tp 加入到 muted 集合中;
  • unmutePartition():若是 tp 对应的 RecordBatch 发送完成,tp 将会从 muted 集合中移除。

也就是说,muted 是用来记录这个 tp 是否有还有未完成的 RecordBatch。less

ready()

ready() 是在 Sender 线程中调用的,其做用选择那些能够发送的 node,也就是说,若是这个 tp 对应的 batch 能够发送(达到时间或大小要求),就把 tp 对应的 leader 选出来。

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    Set<Node> readyNodes = new HashSet<>();
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    Set<String> unknownLeaderTopics = new HashSet<>();

    boolean exhausted = this.free.queued() > 0;
    for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
        TopicPartition part = entry.getKey();
        Deque<RecordBatch> deque = entry.getValue();

        Node leader = cluster.leaderFor(part);
        synchronized (deque) {
            if (leader == null && !deque.isEmpty()) {
                // This is a partition for which leader is not known, but messages are available to send.
                // Note that entries are currently not removed from batches when deque is empty.
                unknownLeaderTopics.add(part.topic());
            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {//note: part 若是 mute 就不会遍历
                RecordBatch batch = deque.peekFirst();
                if (batch != null) {
                    boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                    //note: 是不是在重试
                    long waitedTimeMs = nowMs - batch.lastAttemptMs;
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                    boolean full = deque.size() > 1 || batch.isFull(); //note: batch 满了
                    boolean expired = waitedTimeMs >= timeToWaitMs; //note: batch 超时
                    boolean sendable = full || expired || exhausted || closed || flushInProgress();
                    if (sendable && !backingOff) {
                        readyNodes.add(leader);// note: 将能够发送的 leader 添加到集合中
                    } else {
                        // Note that this results in a conservative estimate since an un-sendable partition may have
                        // a leader that will later be found to have sendable data. However, this is good enough
                        // since we'll just wake up and then sleep again for the remaining time.
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
    }

    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
复制代码

能够看到这一行 (!readyNodes.contains(leader) && !muted.contains(part)),若是 muted 集合包含这个 tp,那么在遍历时将不会处理它对应的 deque,也就是说,若是一个 tp 加入了 muted 集合中,即便它对应的 RecordBatch 能够发送了,也不会触发引发其对应的 leader 被选择出来。

drain()

drain() 是用来遍历可发送请求的 node,而后再遍历在这个 node 上全部 tp,若是 tp 对应的 deque 有数据,将会被选择出来直到超过一个请求的最大长度(max.request.size)为止,也就说说即便 RecordBatch 没有达到条件,但为了保证每一个 request 尽快多地发送数据提升发送效率,这个 RecordBatch 依然会被提早选出来并进行发送。

//note: 返回该 node 对应的能够发送的 RecordBatch 的 batches,并从 queue 中移除(最大的大小为maxSize,超过的话,下次再发送)
public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
                                             Set<Node> nodes,
                                             int maxSize,
                                             long now) {
    if (nodes.isEmpty())
        return Collections.emptyMap();

    Map<Integer, List<RecordBatch>> batches = new HashMap<>();
    for (Node node : nodes) {
        int size = 0;
        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
        List<RecordBatch> ready = new ArrayList<>();
        /* to make starvation less likely this loop doesn't start at 0 */
        int start = drainIndex = drainIndex % parts.size();
        do {
            PartitionInfo part = parts.get(drainIndex);
            TopicPartition tp = new TopicPartition(part.topic(), part.partition());
            // Only proceed if the partition has no in-flight batches.
            if (!muted.contains(tp)) {//note: 被 mute 的 tp 依然不会被遍历
                Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
                if (deque != null) {
                    synchronized (deque) {
                        RecordBatch first = deque.peekFirst();
                        if (first != null) {
                            boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                            // Only drain the batch if it is not during backoff period.
                            if (!backoff) {
                                if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                    // there is a rare case that a single batch size is larger than the request size due
                                    // to compression; in this case we will still eventually send this batch in a single
                                    // request
                                    break;
                                } else {
                                    RecordBatch batch = deque.pollFirst();
                                    batch.close();
                                    size += batch.sizeInBytes();
                                    ready.add(batch);
                                    batch.drainedMs = now;
                                }
                            }
                        }
                    }
                }
            }
            this.drainIndex = (this.drainIndex + 1) % parts.size();
        } while (start != drainIndex);
        batches.put(node.id(), ready);
    }
    return batches;
}
复制代码

在遍历 node 的全部 tp 时,能够看到是有条件的 —— !muted.contains(tp),若是这个 tp 被添加到 muted 集合中,那么它将不会被遍历,也就不会做为 request 一部分被发送出去,这也就保证了 tp 若是还有未完成的 RecordBatch,那么其对应 deque 中其余 RecordBatch 即便达到条件也不会被发送,就保证了 tp 在任什么时候刻只有一个 RecordBatch 在发送。

顺序性如何保证?

是否保证顺序性,仍是在 Sender 线程中实现的,mutePartition()unmutePartition() 也都是在 Sender 中调用的,这里看一下 KafkaProducer 是如何初始化一个 Sender 对象的。

// from KafkaProducer
this.sender = new Sender(client,
                         this.metadata,
                         his.accumulator,
                         config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                         config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                         (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                         config.getInt(ProducerConfig.RETRIES_CONFIG),
                         this.metrics,
                         Time.SYSTEM,
                         this.requestTimeoutMs);//NOTE: Sender 实例,发送请求的后台线程

// from Sender
public Sender(KafkaClient client,
              Metadata metadata,
              RecordAccumulator accumulator,
              boolean guaranteeMessageOrder,
              int maxRequestSize,
              short acks,
              int retries,
              Metrics metrics,
              Time time,
              int requestTimeout) {
        this.client = client;
        this.accumulator = accumulator;
        this.metadata = metadata;
        this.guaranteeMessageOrder = guaranteeMessageOrder;
        this.maxRequestSize = maxRequestSize;
        this.running = true; //note: 默认为 true
        this.acks = acks;
        this.retries = retries;
        this.time = time;
        this.sensors = new SenderMetrics(metrics);
        this.requestTimeout = requestTimeout;
}
复制代码

对于上述过程能够这样进行解读

this.guaranteeMessageOrder = (config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1)

若是 KafkaProducer 的 max.in.flight.requests.per.connection 设置为1,那么就能够保证其顺序性,不然的话,就不保证顺序性,从下面这段代码也能够看出。

//from Sender
//note: max.in.flight.requests.per.connection 设置为1时会保证
if (guaranteeMessageOrder) {
    // Mute all the partitions draine
    for (List<RecordBatch> batchList : batches.values()) {
         for (RecordBatch batch : batchList)
             this.accumulator.mutePartition(batch.topicPartition);
    }
}
复制代码

也就是说,若是要保证单 Partition 的顺序性,须要在 Producer 中配置 max.in.flight.requests.per.connection=1,而其实现机制则是在 RecordAccumulator 中实现的。

Producer Configs

这里是关于 Kafka Producer 一些配置的说明,内容来自官方文档Producer Configs以及本身的一些我的理解,这里以官方文档保持一致,按其重要性分为三个级别进行讲述(涉及到权限方面的参数,这里先不介绍)。

high importance

medium importance

下面的这些参数虽然被描述为 medium,但实际上对 Producer 的吞吐量等影响也一样很大,在实践中跟 high 参数的重要性基本同样。

low importance

至此,Kafka Producer 部分的源码分析已经结束,从下周开始将开始对 Kafka Consumer 部分进行分析。对于不一样的场景,合理配置相应的 Kafka Producer 参数。

kafka源码注释分析

转自:Kafka 源码分析系列

相关文章
相关标签/搜索