kafka生产者分区优化

通过前面几篇kafka生产者专题讲解,咱们还能够找出哪些地方进一步来对它进行优化的吗?答案是确定的,这里咱们介绍一个kafka当前最新版本2.4.0合入的一个KIP-480,它的核心逻辑就是当存在无key的序列消息时,咱们消息发送的分区优先保持粘连,若是当前分区下的batch已经满了或者 linger.ms延迟时间已到开始发送,就会从新启动一个新的分区(逻辑仍是按照Round-Robin模式),咱们先把两种模式的示意图整理以下:
图片描述算法

那咱们也来看下这种模式的源码实现:
它的源码实现是从Partitioner的接口开始修改的,以前的版本这个接口只有两个方法:缓存

public interface Partitioner extends Configurable, Closeable {

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**
     * This is called when partitioner is closed.
     */
    public void close();

}

最新的Partitioner接口添加了一个onNewBatch方法,用来在新建了一个Batch的场景下进行触发,它的源码以下:app

public interface Partitioner extends Configurable, Closeable {

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**
     * This is called when partitioner is closed.
     */
    public void close();


    /**
     * Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
     * this method can change the chosen sticky partition for the new batch. 
     * @param topic The topic name
     * @param cluster The current cluster metadata
     * @param prevPartition The partition previously selected for the record that triggered a new batch
     */
    default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    }
}

老的分区模式,咱们在以前已经讲解过,这边主要讲解下这个新的方法实现:dom

public class StickyPartitionCache {
    private final ConcurrentMap<String, Integer> indexCache;
    public StickyPartitionCache() {
        //用来缓存全部的分区信息
        this.indexCache = new ConcurrentHashMap<>();
    }

    public int partition(String topic, Cluster cluster) {
        //若是缓存能够获取,说明以前已经有过该topic的分区信息
        Integer part = indexCache.get(topic);
        if (part == null) {
        //不然触发获取新的分区算法
            return nextPartition(topic, cluster, -1);
        }
        return part;
    }

    public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = indexCache.get(topic);
        Integer newPart = oldPart;
        // 因为该方法有两种触发场景,一种是该topic下没有任何分区缓存信息(例如新增topic);另一种就是新的Batch产生了,须要触发新的分区,因此它的进入条件也是这两种模式
        if (oldPart == null || oldPart == prevPartition) {
            //接下来全部分区逻辑采起的是和老的Roud-Robin模式一致,逻辑不一样的地方是在于这里都是无Key的场景
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() < 1) {
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
                newPart = availablePartitions.get(0).partition();
            } else {
                while (newPart == null || newPart.equals(oldPart)) {
                    Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                }
            }
            // 当时新增topic分区场景,那就直接添加,不然就是更换分区场景,将新的分区替换老的分区
            if (oldPart == null) {
                indexCache.putIfAbsent(topic, newPart);
            } else {
                indexCache.replace(topic, prevPartition, newPart);
            }
            return indexCache.get(topic);
        }
        return indexCache.get(topic);
    }
}

了解完新的分区模式逻辑以后,咱们会有一个疑问,那是在何时触发的新分区逻辑呢?是在KafkaProducer的doSend方法里面有以下一段逻辑:优化

//尝试向以前的分区里面append消息
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, true);
//因为须要新建立Batch append没有成功
if (result.abortForNewBatch) {
                int prevPartition = partition;
                //触发新的分区
                partitioner.onNewBatch(record.topic(), cluster, prevPartition);
                //再次获取新的分区值
                partition = partition(record, serializedKey, serializedValue, cluster);
                //封装TopicPartition
                tp = new TopicPartition(record.topic(), partition);
                if (log.isTraceEnabled()) {
                    log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                }
                // producer callback will make sure to call both 'callback' and interceptor callback
                interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
                //再次append消息
                result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, false);
            }

这种模式一个最大的优点在于能够最大限度的保障每一个batch的消息足够多,而且不至于会有过多的空batch提早申请,由于默认分区模式下,一组序列消息老是会被分散到各个分区中,会致使每一个batch的消息不够大,最终会致使客户端请求频次过多,而Sticky的模式能够下降请求频次,提高总体发送迟延。以下两个图示官方压测时延对比:
图片描述this

图片描述

图片描述

相关文章
相关标签/搜索