上篇文章说到,消息是怎么被生产出来,简单来讲,就是消息被追加到了RecordAccumulator中,以ByteBuffer的形式存了下来。java
那么在消息添加到ByteBuffer中后,后续的步骤又是怎么样的呢?node
首先上篇文章提到了Kafka追加消息时加锁的粒度,是一个Deque<RecordBatch> dq,咱们根据topic(tp)来获取到要发往这个Topic的队列。这个队列中的元素 RecordBatch ,即是即将发往Broker的消息的载体,一个RecordBatch能够包含多条消息,咱们的消息即是追加到了最底层指向的一个ByteBuffer中。json
Deque<RecordBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) { throw new IllegalStateException("Cannot send after the producer is closed."); } RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { return appendResult; } }
当咱们的RecordBatch可能满了后(只要新的消息放不下,就会被认为是满了)数据结构
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) { // 获取deque中最后一个 RecordBatch last = deque.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); if (future == null) { last.records.close(); } else { return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false); } } return null; }
满了之后,在最底层的Buffer的引用会被赋给 RecordBatch 中 MemoryRecords 的 ByteBuffer,而后flip一下,变成读模式。app
public void close() { if (writable) { // close the compressor to fill-in wrapper message metadata if necessary compressor.close(); // flip the underlying buffer to be ready for reads // flip 基础buffer来供读 buffer = compressor.buffer(); buffer.flip(); // reset the writable flag writable = false; } }
那么何时会发送呢?在咱们的RecordBatch满了,或者建立了新的RecordBatch时,会唤醒sender => NetworkClient => Selector => nioSelectoride
// batch满了或者建立了新的batch,就去唤醒sender if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); }
唤醒nioSelector有什么用呢?若是对java的Selector有所了解,那么就应该知道Selector在select()或select(long time)的时候,是会阻塞的。oop
在建立kafkaProducer时,变回初始化一个Sender线程,这个Sender线程会循环地拉取数据发往broker。其中就有一个步骤,进行了select() 的操做。fetch
当咱们装填好了一个ByteBuffer,就会去唤醒Selector,不要阻塞了,继续执行,进行下一个循环,咱们来看看这个循环。这个循环实际上就是咱们的消息运输和销毁的流水线。this
/** * Run a single iteration of sending * * 发送消息的核心方法 * * @param now The current POSIX time in milliseconds */ void run(long now) { /** 一、从metadata获取元数据 */ Cluster cluster = metadata.fetch(); /** 二、从Accumulator选出能够发送的node节点 */ // get the list of partitions with data ready to send // 获取待发送的带数据的分区列表 // 复符合发送消息条件的节点会被返回 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); /** 三、若是ReadyCheckResult 中有 unknownLeader 的node,更新一下元数据 */ // if there are any partitions whose leaders are not known yet, force metadata update // 若是有任何分区还没选举出leader,强制metadata进行更新 if (result.unknownLeadersExist) { this.metadata.requestUpdate(); } /** 四、循环调用client(NetworkClient)中的ready方法,从io层面检查消息是否符合发送条件 */ /* * 移除尚未准备好要发送的节点 */ // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests // 建立produce请求 Map<Integer/* nodeId */, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<RecordBatch> batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } /** 六、处理RecordAccumulator中超时的消息*/ List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); /** 七、将待发送的消息封装成ClientRequest */ List<ClientRequest> requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } /** 八、将ClientRequest写入KafkaChannel中的send字段 */ for (ClientRequest request : requests) client.send(request, now); /** 九、真正的把消息发送出去,并处理客户端的ack,处理超时请求,调用用户自定义的Callback等。*/ // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; this.client.poll(pollTimeout, now); }
获取以前先进行前置判断:.net
简单地获取元数据、判断每一个tp的leader选出来了没之类的,若是存在未选出leader的tp,告诉元数据,等下要更新一下。
检验经过的节点(就是leader所在的那个节点),将会准备翻牌,将RecordBatch(封装了ByteBuffer)从RecordAccumulator中提取出来。
这里会循环全部的node(leader节点),根据这个节点所属的Topic分区(TopicPartition)信息,去获取Deque,咱们知道Deque是能够根据 Deque<RecordBatch> deque = getDeque(tp)获取的。
讲道理是要获取全部可发送的消息的,但kafka限定了一次request的大小,若是一次request的大小过大,则一次io须要的时间就越长(固然从宏观上来看,一次发的越多,效率可能越高),但总不可能为了提高整体效率,致使一条消息要发几秒吧?
the maximum request size to attempt to send to the server
因此当RecordBatch累加的大小超过必定限制后,循环会break,最终返回给sender一个 Map<Integer/* nodeId */, List<RecordBatch>> batches
for (Node node : nodes){ //.... do{ // Only drain the batch if it is not during backoff period. if (!backoff) { if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due // to compression; in this case we will still eventually send this batch in a single // request // 数据量已经满了,须要结束循环 break; } else { // 数据量没满,那么取出每一个deque的第一个元素,关闭memoryRecord(关闭Compressor,并将MemoryRecords设置为只读) RecordBatch batch = deque.pollFirst(); batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); batch.drainedMs = now; } } }while xxxx }
从RecordAccumulator中获取出来的这个Map,将会被封装成可发送对象 List<ClientRequest>。
咱们能够看到 List<ClientRequest> 被循环分红了两个Map:
Map<TopicPartition, ByteBuffer> produceRecordsByPartition Map<TopicPartition, RecordBatch> recordsByPartition
produceRecordsByPartition 的构成十分简单,即是key为tp,val为ByteBuffer的Map。 这个Map会被封装成Struct,Struct能够理解为相似于json的一种数据结构。在Kafka中,数据的传输都是用的Struct这种数据结构。
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) { // 将batches从新整理成两个map Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size()); final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; produceRecordsByPartition.put(tp, batch.records.buffer()); recordsByPartition.put(tp, batch); } // 组装 request ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); // TODO:建立request,这个requestSend就是真正的发送对象 RequestSend send = new RequestSend(Integer.toString(destination), this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); // 封装回调 RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; return new ClientRequest(now, acks != 0, send, callback); }
recordsByPartition 则用于封装回调,好比说失败了后的重试、ByteBuffer的释放等等。并调用发送消息时的那个回调。
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
数据的发送前面的文章已经说过,这个封装好的ClientRequest会被丢到KafkaSelector,最终被nioSelector发送出去,抵达Broker。浅析KafkaChannel、NetworkReceive、Send,以及底层优秀的实现:KafkaSelector的实现。
参考书籍: 《Kafka技术内幕》 郑奇煌著 《Apache Kafka源码剖析》 徐郡明著