核心源码以下api
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // 执行拦截器 ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); } private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; // 获取元数据 ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); Cluster cluster = clusterAndWaitTime.cluster; // 序列化 key、value byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); // 获取分区。 // 若是为空,会计算 key 的 hash 值,再和该主题的分区总数取余获得分区号; // 若是 key 也为空,客户端会生成递增的随机整数,再和该主题的分区总数区域获得分区号。 int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); // 校验序列化后的记录是否超过限制 int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); // 时间戳,默认是 KafkaProducer 初始化时间 long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); // 初始化回调和响应的拦截器对象 Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); // 把消息添加到记录累加器中 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { // 当 batch 满了,或者建立了新的 batch 后,唤醒 Sender 线程 this.sender.wakeup(); } return result.future; }
核心源码以下缓存
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) { // 获取缓存的集群信息 Cluster cluster = metadata.fetch(); Integer partitionsCount = cluster.partitionCountForTopic(topic); // Return cached metadata if we have it, and if the record's partition is either undefined // or within the known partition range // 若是缓存中的数据知足条件,直接返回缓存中的元数据。 if (partitionsCount != null && (partition == null || partition < partitionsCount)) return new ClusterAndWaitTime(cluster, 0); long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; long elapsed; do { // 更新元数据的标记 needUpdate=true,并获取当前的 version。 int version = metadata.requestUpdate(); sender.wakeup(); // 唤醒 Sender 线程 try { metadata.awaitUpdate(version, remainingWaitMs); // 等待更新 } catch (TimeoutException ex) { } cluster = metadata.fetch(); // 从新获取元数据 elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) // 超出最大等待时间,抛出异常 throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null); // 分区数量是 0,继续上述循环 if (partition != null && partition >= partitionsCount) { // 当指定的分区号大于等于分数总数时,异常 throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount)); } return new ClusterAndWaitTime(cluster, elapsed); } // 等待更新 public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; // 版本号<=当前版本号,说明未更新,须要继续循环等待更新 while ((this.version <= lastVersion) && !isClosed()) { if (remainingWaitMs != 0) wait(remainingWaitMs); // 等待一会再判断 long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs) // 超过了最大等待时间 throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; } }
Kafka 使用缓冲池技术给消息分配堆字节缓存 HeapByteBuffer,缓冲池的空闲队列 free 存放了空闲的缓存队列,优先直接从中取出第一个进行分配缓存,若是缓冲池不够了,利用 ReentrantLock + Condition 构造等待队列,等待缓冲池足够分配。
Kafka 在处理消息响应时,释放分配的内存,并把加入空闲队列 free。安全
// 缓冲池 public class BufferPool { // 可用总内存 buffer.memory private final long totalMemory; // 一批消息的大小 batch.size private final int poolableSize; private final ReentrantLock lock; // 空闲缓存队列 private final Deque<ByteBuffer> free; // 等待队列 private final Deque<Condition> waiters; // 可用未分配的内存总量是nonPooledAvailableMemory和free * poolableSize中字节缓冲区的总和。 private long nonPooledAvailableMemory; } // 字节缓冲分配 public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("消息大小超过总内存"); ByteBuffer buffer = null; this.lock.lock(); try { // 直接在空闲队列分配 if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // 计算空闲队列总大小 int freeListSize = this.free.size() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // 可用的总内存(未分配的+空闲队列)>消息大小 // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; // 未分配内存总数-消息大小 } else { // 内存不够分配 int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // 加入等待队列 // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { // 轮询,直到足够分配内存 long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { // 等待一段时间 waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } remainingTimeToBlockNs -= timeNs; // 直接在空闲队列分配 if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { buffer = this.free.pollFirst(); accumulated = size; } else { // 内存不够,accumulated累加计数 freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory); this.nonPooledAvailableMemory -= got; accumulated += got; } } accumulated = 0; // 清空 } } } if (buffer == null) // 没有在空闲队列分配到内存,须要在堆上分配内存 return new HeapByteBuffer(size, size); else return buffer; } private void freeUp(int size) { while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size) this.nonPooledAvailableMemory += this.free.pollLast().capacity(); // 释放空闲队列的内存 } // 处理生产者响应消息时,释放分配的内存 public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); // 加到空闲队列 } else { this.nonPooledAvailableMemory += size; // 增长未分配内存数量 } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } }
累加器使用 CopyOnWriteMap 来缓存消息,key 是主题分区信息,value 是个双端队列,队列中的对象是压缩后的批量消息。app
// 累加器缓存 ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();
CopyOnWriteMap 是线程安全的,是由 Kafka 实现的写时复制 Map,内部定义了 volatile 的 Map,读时不用加锁,直接读取,写时须要加锁,而后拷贝一个 Map 副本进行实际的写入,写入完成后再把原来的 Map 指向修改后的 Map。
双端队列 Deque 实际上就是 ArrayDeque,非线程安全的,须要手动同步。使用双端队列能够在消息发送失败时,把消息直接放回队列头部进行重试。oop
// 累加消息到缓存 public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { ByteBuffer buffer = null; try { Deque<ProducerBatch> dq = getOrCreateDeque(tp); // 检查 batches 是否有该分区的映射,若是没有,则建立一个 synchronized (dq) { // 加锁后分配 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; } byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); // 计算消息大小 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); buffer = free.allocate(size, maxTimeToBlock); // 利用 BufferPool 分配字节缓存 synchronized (dq) { // 加锁后分配 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); // 构造出压缩后的批量消息对象 ProducerBatch MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); // 加入双端队列 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } }