# Kafka发送者源码解析

紧接着Kafka Producer发送消息这一篇文章继续往深刻探索。java

消息体

这里介绍一下ProducerRecord,他不是单纯的消息,它包含了多个属性。类定义以下:apache

public class ProducerRecord<K, V> {

    private final String topic; //主题
    private final Integer partition;//分区号
    private final Headers headers;//消息头部
    private final K key;//键
    private final V value;//值
    private final Long timestamp;//消息时间戳
    //省略其余
    }
复制代码

key用来指定消息的键,它不只是消息的附加信息,还能够用来计算分区号进而让消息发往特定的分区。后面再说 对于的构造函数有不少种,咱们用的是最简单的一种。参考以下:api

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) public ProducerRecord(String topic, V value) //示例代码中用的这个,至关于将其余属性所有设置为null 复制代码

消息发送

再来看看send(ProducerRecord,Callback)方法缓存

public class KafkaProducer<K, V> implements Producer<K, V> {
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        1.1
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
    ···
}
复制代码

1.1 生产者拦截器

消息在发送前会先调用org.apache.kafka.clients.producer.ProducerInterceptor#onSend方法来对消息作定制化处理。 doSend(ProducerRecord, Callback)方法bash

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // 确认生产者实例没有被关闭
            throwIfProducerClosed();
            // 首先确认主题的元数据是可用的
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                // 2.1
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            try {
                // 2.2
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
                // 2.3
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
            // 2.4 
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);

            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();

            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            // 2.5
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);
            // 2.6
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            // 2.7
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            ···
    }
复制代码

2.1 获取集群信息

KafkaProducer#waitOnMetadata,负责触发Kafka集群元数据的更新,并阻塞主线程等待更新完毕,底层会唤醒sender线程更新metadata保存的Kafka集群元信。网络

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
        // 获取Cluster信息
        Cluster cluster = metadata.fetch();

        // 若是cluster中的不合法主题列表包含指定主题,则抛出异常InvalidTopicException
        if (cluster.invalidTopics().contains(topic))
            throw new InvalidTopicException(topic);

        // 添加主题,下面会介绍
        metadata.add(topic);

        // 获取Topic中分区数量
        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        // 若是分区数不为空,而且partition不为空或者partition小于分区数,则返回一个ClusterAndWaitTime
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        long elapsed;
        do {
            if (partition != null) {
                log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
            } else {
                log.trace("Requesting metadata update for topic {}.", topic);
            }
            metadata.add(topic);
            // 获取当前元数据版本号
            int version = metadata.requestUpdate();
            // 唤醒sender线程
            sender.wakeup();
            try {
                // 阻塞等到元数据更新
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException(
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs));
            }
            // 再次获取cluster
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - begin;
            // 若是更新元数据的时间超过了最大等待时间,则跑出TimeoutException异常
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException(partitionsCount == null ?
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs) :
                        String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                                partition, topic, partitionsCount, maxWaitMs));
            }
            // 这一步其实就是校验元数据更新后,topic是否合法,若是不合法,则抛出异常
            metadata.maybeThrowExceptionForTopic(topic);
            remainingWaitMs = maxWaitMs - elapsed;
            // 获取获取Topic中分区数量
            partitionsCount = cluster.partitionCountForTopic(topic);
            // 循环条件:分区数等于空或者传进来的partition不等于空而且partition小于分区数
        } while (partitionsCount == null || (partition != null && partition >= partitionsCount));

        return new ClusterAndWaitTime(cluster, elapsed);
    }
复制代码
// 元数据添加主题
metadata.add(topic);

org.apache.kafka.clients.producer.internals.ProducerMetadata#add(String)
    public synchronized void add(String topic) {
        Objects.requireNonNull(topic, "topic cannot be null");
        // 若是topics里面不包含传入的topic,则更新topic列表
        if (topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null) {
            requestUpdateForNewTopics();
        }
    }

    public synchronized void requestUpdateForNewTopics() {
        // Override the timestamp of last refresh to let immediate update.
        this.lastRefreshMs = 0; // 将最近一次刷新时间置为0
        this.requestVersion++;
        requestUpdate();
    }

    public synchronized int requestUpdate() {
        this.needUpdate = true; // 将更新字段needUpdate设置true,表示须要强制更新
        return this.updateVersion; // 返回更新后版本值
    }
复制代码

2.2 序列化key

序列化record.key()并发

2.3序列化value

序列化record.valueapp

2.4 分区器

消息发送的过程当中吗,若是ProducerRecord中指定了partition字段,那么就不须要分区器的做用,由于该字段表明的就是所要发往的分区号。 若是没有指定partition字段,那么就须要依赖分区器,根据key这个字段来计算partition的值。分区器的做用就是为消息分配分区。 Kafka中默认的分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner#partitionide

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        // 若是key不为空,则会根据key进行哈希算出分区号,具备相同key的消息会被写入同一个分区
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}
复制代码

2.5 校验记录大小

private void ensureValidRecordSize(int size) {
        if (size > this.maxRequestSize)
            throw new RecordTooLargeException("The message is " + size +
                    " bytes when serialized which is larger than the maximum request size you have configured with the " +
                    ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
                    " configuration.");
        if (size > this.totalMemorySize)
            throw new RecordTooLargeException("The message is " + size +
                    " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                    ProducerConfig.BUFFER_MEMORY_CONFIG +
                    " configuration.");
    }
复制代码

2.6 将消息添加到RecordAccumulator

RecordAccumulator主要用来缓存消息以便Sender线程能够批量发送,进而减小网络传输的资源消耗。函数

public final class RecordAccumulator {

    // 指定每一个ProducerBatch底层ByteBuffer的大小
    private final int batchSize;
    // 压缩类型
    private final CompressionType compression;

    // BufferPool对象
    private final BufferPool free;

    // TopicPartition与ProducerBatch集合的映射关系
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    // 没有应答的ProducerBatch集合,包括发送与没发送的,底层是一个Set<ProducerBatch>
    private final IncompleteBatches incomplete;
    private int drainIndex;

    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            // 2.6.1
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

            
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            // 2.6.2
            buffer = free.allocate(size, maxTimeToBlock);
            // 2.6.3
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    return appendResult;
                }
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                buffer = null;
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
        } finally {
            if (buffer != null)
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
    }
复制代码

2.6.1

查找TopicPartition对应的Deque,如查不到,则建立新Deque,并添加到batchs集合,加锁调用tryAppend()试图添加消息,若添加成功则返回RecordAppendResult。

2.6.2

2.6.1 追加消息失败后,尝试从BufferPool中申请新的ByteBuffer,可能会致使阻塞,因此有这里没加锁,可能多个线程同时申请ByteBuffer。

2.6.3

从新加锁重试,调用tryAppend方法,是为了防止多个线程并发申请空间后,形成内部碎片。追加成功,则返回,若失败,则使用2.6.2 申请的ByteBuffer建立ProducerBatch,而后将消息添加到新建立的ProducerBatch中,将ProducerBatch添加到Deque,添加到incomplete集合中,返回RecordAppendResult。

// 查找Deque里面最后一个ProducerBatch对象,并将消息追加到ProducerBatch
    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque) {
        ProducerBatch last = deque.peekLast();
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
            if (future == null)
                last.closeForRecordAppends();
            else
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
        }
        return null;
    }
复制代码

2.7 是否唤醒sender线程

唤醒线程的条件是消息所在队列的最后一个ProducerBatch满了或此队列中不止一个ProducerBatch或者这是一个新建立的ProducerBatch Sender线程将在下一篇文章详细介绍

总结

整个生产者客户端有两个线程协调进行,分别是主线程与sender线程(也就是实际的发送线程)。主线程首先将业务数据封装成ProductRecord对象,,而后经过拦截器、序列化器与分区器的做用之后将消息放入RecordAccumulator(也被称为消息收集器)。Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,从RecordAccumulator中获取消息并批量发送到Kafka中。

生产者客户端的发送消息流程

若是有地方有疑惑或者写的有很差,能够评论或者经过邮箱联系我creazycoder@sina.com

相关参考:

图片来自《深刻理解Kafka核心设计与实践原理》

《Apache Kafka 源码剖析》

《深刻理解Kafka核心设计与实践原理》

相关文章
相关标签/搜索