紧接着Kafka Producer发送消息这一篇文章继续往深刻探索。java
这里介绍一下ProducerRecord
,他不是单纯的消息,它包含了多个属性。类定义以下:apache
public class ProducerRecord<K, V> {
private final String topic; //主题
private final Integer partition;//分区号
private final Headers headers;//消息头部
private final K key;//键
private final V value;//值
private final Long timestamp;//消息时间戳
//省略其余
}
复制代码
key用来指定消息的键,它不只是消息的附加信息,还能够用来计算分区号进而让消息发往特定的分区。后面再说 对于的构造函数有不少种,咱们用的是最简单的一种。参考以下:api
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) public ProducerRecord(String topic, V value) //示例代码中用的这个,至关于将其余属性所有设置为null 复制代码
再来看看send(ProducerRecord,Callback)方法缓存
public class KafkaProducer<K, V> implements Producer<K, V> {
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
1.1
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
···
}
复制代码
消息在发送前会先调用org.apache.kafka.clients.producer.ProducerInterceptor#onSend
方法来对消息作定制化处理。 doSend(ProducerRecord, Callback)方法bash
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 确认生产者实例没有被关闭
throwIfProducerClosed();
// 首先确认主题的元数据是可用的
ClusterAndWaitTime clusterAndWaitTime;
try {
// 2.1
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
// 2.2
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
// 2.3
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
// 2.4
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
// 2.5
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
// 2.6
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
// 2.7
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
···
}
复制代码
KafkaProducer#waitOnMetadata
,负责触发Kafka集群元数据的更新,并阻塞主线程等待更新完毕,底层会唤醒sender线程更新metadata保存的Kafka集群元信。网络
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// 获取Cluster信息
Cluster cluster = metadata.fetch();
// 若是cluster中的不合法主题列表包含指定主题,则抛出异常InvalidTopicException
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
// 添加主题,下面会介绍
metadata.add(topic);
// 获取Topic中分区数量
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// 若是分区数不为空,而且partition不为空或者partition小于分区数,则返回一个ClusterAndWaitTime
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
do {
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic);
// 获取当前元数据版本号
int version = metadata.requestUpdate();
// 唤醒sender线程
sender.wakeup();
try {
// 阻塞等到元数据更新
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs));
}
// 再次获取cluster
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
// 若是更新元数据的时间超过了最大等待时间,则跑出TimeoutException异常
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
// 这一步其实就是校验元数据更新后,topic是否合法,若是不合法,则抛出异常
metadata.maybeThrowExceptionForTopic(topic);
remainingWaitMs = maxWaitMs - elapsed;
// 获取获取Topic中分区数量
partitionsCount = cluster.partitionCountForTopic(topic);
// 循环条件:分区数等于空或者传进来的partition不等于空而且partition小于分区数
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
}
复制代码
// 元数据添加主题
metadata.add(topic);
org.apache.kafka.clients.producer.internals.ProducerMetadata#add(String)
public synchronized void add(String topic) {
Objects.requireNonNull(topic, "topic cannot be null");
// 若是topics里面不包含传入的topic,则更新topic列表
if (topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null) {
requestUpdateForNewTopics();
}
}
public synchronized void requestUpdateForNewTopics() {
// Override the timestamp of last refresh to let immediate update.
this.lastRefreshMs = 0; // 将最近一次刷新时间置为0
this.requestVersion++;
requestUpdate();
}
public synchronized int requestUpdate() {
this.needUpdate = true; // 将更新字段needUpdate设置true,表示须要强制更新
return this.updateVersion; // 返回更新后版本值
}
复制代码
序列化record.key()并发
序列化record.valueapp
消息发送的过程当中吗,若是ProducerRecord
中指定了partition字段,那么就不须要分区器的做用,由于该字段表明的就是所要发往的分区号。 若是没有指定partition字段,那么就须要依赖分区器,根据key这个字段来计算partition的值。分区器的做用就是为消息分配分区。 Kafka中默认的分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition
。ide
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
// 若是key不为空,则会根据key进行哈希算出分区号,具备相同key的消息会被写入同一个分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
复制代码
private void ensureValidRecordSize(int size) {
if (size > this.maxRequestSize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the maximum request size you have configured with the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
" configuration.");
if (size > this.totalMemorySize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the total memory buffer you have configured with the " +
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
}
复制代码
RecordAccumulator
中RecordAccumulator
主要用来缓存消息以便Sender线程能够批量发送,进而减小网络传输的资源消耗。函数
public final class RecordAccumulator {
// 指定每一个ProducerBatch底层ByteBuffer的大小
private final int batchSize;
// 压缩类型
private final CompressionType compression;
// BufferPool对象
private final BufferPool free;
// TopicPartition与ProducerBatch集合的映射关系
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
// 没有应答的ProducerBatch集合,包括发送与没发送的,底层是一个Set<ProducerBatch>
private final IncompleteBatches incomplete;
private int drainIndex;
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 2.6.1
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
// 2.6.2
buffer = free.allocate(size, maxTimeToBlock);
// 2.6.3
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
return appendResult;
}
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
复制代码
查找TopicPartition对应的Deque,如查不到,则建立新Deque,并添加到batchs集合,加锁调用tryAppend()试图添加消息,若添加成功则返回RecordAppendResult。
2.6.1 追加消息失败后,尝试从BufferPool中申请新的ByteBuffer,可能会致使阻塞,因此有这里没加锁,可能多个线程同时申请ByteBuffer。
从新加锁重试,调用tryAppend方法,是为了防止多个线程并发申请空间后,形成内部碎片。追加成功,则返回,若失败,则使用2.6.2 申请的ByteBuffer建立ProducerBatch
,而后将消息添加到新建立的ProducerBatch
中,将ProducerBatch
添加到Deque,添加到incomplete集合中,返回RecordAppendResult。
// 查找Deque里面最后一个ProducerBatch对象,并将消息追加到ProducerBatch
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
return null;
}
复制代码
唤醒线程的条件是消息所在队列的最后一个ProducerBatch
满了或此队列中不止一个ProducerBatch
或者这是一个新建立的ProducerBatch
Sender线程将在下一篇文章详细介绍
整个生产者客户端有两个线程协调进行,分别是主线程与sender线程(也就是实际的发送线程)。主线程首先将业务数据封装成ProductRecord
对象,,而后经过拦截器、序列化器与分区器的做用之后将消息放入RecordAccumulator
(也被称为消息收集器)。Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,从RecordAccumulator
中获取消息并批量发送到Kafka中。
若是有地方有疑惑或者写的有很差,能够评论或者经过邮箱联系我creazycoder@sina.com
相关参考:
图片来自《深刻理解Kafka核心设计与实践原理》
《Apache Kafka 源码剖析》
《深刻理解Kafka核心设计与实践原理》