通过前面几篇kafka生产者专题讲解,咱们还能够找出哪些地方进一步来对它进行优化的吗?答案是确定的,这里咱们介绍一个kafka当前最新版本2.4.0合入的一个KIP-480,它的核心逻辑就是当存在无key的序列消息时,咱们消息发送的分区优先保持粘连,若是当前分区下的batch已经满了或者 linger.ms延迟时间已到开始发送,就会从新启动一个新的分区(逻辑仍是按照Round-Robin模式),咱们先把两种模式的示意图整理以下:算法
那咱们也来看下这种模式的源码实现:
它的源码实现是从Partitioner的接口开始修改的,以前的版本这个接口只有两个方法:缓存
public interface Partitioner extends Configurable, Closeable { /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); /** * This is called when partitioner is closed. */ public void close(); }
最新的Partitioner接口添加了一个onNewBatch方法,用来在新建了一个Batch的场景下进行触发,它的源码以下:app
public interface Partitioner extends Configurable, Closeable { /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); /** * This is called when partitioner is closed. */ public void close(); /** * Notifies the partitioner a new batch is about to be created. When using the sticky partitioner, * this method can change the chosen sticky partition for the new batch. * @param topic The topic name * @param cluster The current cluster metadata * @param prevPartition The partition previously selected for the record that triggered a new batch */ default public void onNewBatch(String topic, Cluster cluster, int prevPartition) { } }
老的分区模式,咱们在以前已经讲解过,这边主要讲解下这个新的方法实现:dom
public class StickyPartitionCache { private final ConcurrentMap<String, Integer> indexCache; public StickyPartitionCache() { //用来缓存全部的分区信息 this.indexCache = new ConcurrentHashMap<>(); } public int partition(String topic, Cluster cluster) { //若是缓存能够获取,说明以前已经有过该topic的分区信息 Integer part = indexCache.get(topic); if (part == null) { //不然触发获取新的分区算法 return nextPartition(topic, cluster, -1); } return part; } public int nextPartition(String topic, Cluster cluster, int prevPartition) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); Integer oldPart = indexCache.get(topic); Integer newPart = oldPart; // 因为该方法有两种触发场景,一种是该topic下没有任何分区缓存信息(例如新增topic);另一种就是新的Batch产生了,须要触发新的分区,因此它的进入条件也是这两种模式 if (oldPart == null || oldPart == prevPartition) { //接下来全部分区逻辑采起的是和老的Roud-Robin模式一致,逻辑不一样的地方是在于这里都是无Key的场景 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() < 1) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { newPart = availablePartitions.get(0).partition(); } else { while (newPart == null || newPart.equals(oldPart)) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // 当时新增topic分区场景,那就直接添加,不然就是更换分区场景,将新的分区替换老的分区 if (oldPart == null) { indexCache.putIfAbsent(topic, newPart); } else { indexCache.replace(topic, prevPartition, newPart); } return indexCache.get(topic); } return indexCache.get(topic); } }
了解完新的分区模式逻辑以后,咱们会有一个疑问,那是在何时触发的新分区逻辑呢?是在KafkaProducer的doSend方法里面有以下一段逻辑:优化
//尝试向以前的分区里面append消息 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true); //因为须要新建立Batch append没有成功 if (result.abortForNewBatch) { int prevPartition = partition; //触发新的分区 partitioner.onNewBatch(record.topic(), cluster, prevPartition); //再次获取新的分区值 partition = partition(record, serializedKey, serializedValue, cluster); //封装TopicPartition tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); //再次append消息 result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false); }
这种模式一个最大的优点在于能够最大限度的保障每一个batch的消息足够多,而且不至于会有过多的空batch提早申请,由于默认分区模式下,一组序列消息老是会被分散到各个分区中,会致使每一个batch的消息不够大,最终会致使客户端请求频次过多,而Sticky的模式能够下降请求频次,提高总体发送迟延。以下两个图示官方压测时延对比:this