开篇一张图,读者更幸福,很少说上架构图。
这个架构图咱们在前面一篇文章《kafka生产者的蓄水池机制》里面介绍过,上一篇咱们是介绍了这个图里面的消息收集过程(咱们成为“蓄水池”机制),这里咱们就介绍它的另一部分,消息的发送机制。 node
全部的消息发送,都是从Sender线程开始,它是一个守护线程,因此咱们首先就须要来看一下Sender的run方法,最外层的run方式是一个主循环不断调用具体逻辑运行方法run,咱们看下它的具体逻辑处理run方法:api
void run(long now) { //生产者事务管理相关处理,本章节不作具体分析,后面专门章节再作分析,你们先了解一下 if (transactionManager != null) { try { if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) // Check if the previous run expired batches which requires a reset of the producer state. transactionManager.resetProducerId(); if (!transactionManager.isTransactional()) { // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " + "some previously sent messages and can no longer retry them. It isn't safe to continue.")); } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) { // as long as there are outstanding transactional requests, we simply wait for them to return client.poll(retryBackoffMs, now); return; } // do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, now); return; } else if (transactionManager.hasAbortableError()) { accumulator.abortUndrainedBatches(transactionManager.lastError()); } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. log.trace("Authentication exception while processing transactional request: {}", e); transactionManager.authenticationFailed(e); } } //实际的数据发送请求,并处理服务端响应 long pollTimeout = sendProducerData(now); client.poll(pollTimeout, now); }
接下来咱们从两个层面来看,一个是消息发送,一个是消息返回响应处理。缓存
消息的发送markdown
先看下sendProducerData的具体逻辑:网络
private long sendProducerData(long now) { //获取集群信息 Cluster cluster = metadata.fetch(); // 获取那些能够发送消息的分区列表信息 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // 若是这些分区没有对应的leader,就须要强制对metadata信息进行更新 if (!result.unknownLeaderTopics.isEmpty()) { // 没有leader的场景例如leader选举,或者topic已失效,这些都须要将topic从新加入,发送到服务端请求更新,由于如今还须要往这些topic发送消息 for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); this.metadata.requestUpdate(); } // 遍历全部获取到的网络节点,基于网络链接状态来检测这些节点是否可用,若是不可用则剔除 Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); //节点链接状态检查,若是容许链接,则从新建立链接 if (!this.client.ready(node, now)) { //未准备好的节点删除 iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // 获取全部待发送的批量消息以及其对应的leader节点集合 Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); //若是须要保证消息的强顺序性,则缓存对应 topic 分区对象,防止同一时间往同一个 topic 分区发送多条处于未完成状态的消息 if (guaranteeMessageOrder) { // 将每一个batch的分区对象信息加入到mute集合,采起Set实现,重复的topicpartition信息不会被加入 for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } // 获取本地过时的消息,返回 TimeoutException,并释放空间 List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now); // 过时的batch消息处理 if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false); if (transactionManager != null && expiredBatch.inRetry()) { // This ensures that no new batches are drained until the current in flight batches are fully resolved. transactionManager.markSequenceUnresolved(expiredBatch.topicPartition); } } //更新度量信息 sensors.updateProduceRequestMetrics(batches); // 设置pollTimeout,若是存在待发送的消息,则设置 pollTimeout 等于 0,这样能够当即发送请求,从而可以缩短剩余消息的缓存时间,避免堆积 long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (!result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}", result.readyNodes); pollTimeout = 0; } //调用NetWorkClient将消息发送到服务端 sendProduceRequests(batches, now); return pollTimeout; }
概括起来sendProducerData的核心流程以下:架构
1.经过accumulator.ready方法获取可发送的分区列表信息;并发
2.调用client.ready对获取到的全部网络节点进行连通性检测;负载均衡
3.经过.accumulator.drain获取全部待发送的批量消息以及其对应的leader节点集合;ide
4.在须要保障分区消息的强顺序性的场景下调用accumulator.mutePartition将分区信息添加到mute集合;函数
5.调用sendProduceRequests发送生产消息请求。
下面逐个流程讲解:
经过accumulator.ready方法获取可发送的分区列表信息:
public ReadyCheckResult ready(Cluster cluster, long nowMs) { //可接受消息的节点集合 Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; //记录未找到leader副本的Topic信息集合 Set<String> unknownLeaderTopics = new HashSet<>(); // 是否有线程在等待 BufferPool 分配空间 boolean exhausted = this.free.queued() > 0; //遍历待发送的batch里面的每一个分区信息,对其leader执行断定 for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<ProducerBatch> deque = entry.getValue(); // 获取当前 topic 分区 leader 副本所在的节点 Node leader = cluster.leaderFor(part); synchronized (deque) { if (leader == null && !deque.isEmpty()) { // 该分区下的leader未知,可是存在往该分区发送的消息,须要记录下,在后面的流程当发现有未知leader须要强制向服务端发送metadata的信息更新请求 unknownLeaderTopics.add(part.topic()); } //全部可发送的节点须要不能在mute集合里面,保障消息有序性,当mute里面还有消息未发送完成不能继续追加发送 else if (!readyNodes.contains(leader) && !muted.contains(part)) { ProducerBatch batch = deque.peekFirst(); if (batch != null) { long waitedTimeMs = batch.waitedTimeMs(nowMs); //是否处于重试操做判断 boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; boolean full = deque.size() > 1 || batch.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; //标记当前leader是否可发送 boolean sendable = full // 1. 队列中有多个 RecordBatch,或第一个 RecordBatch 已满 || expired // 2. 当前等待重试的时间过长 || exhausted // 3. 有其余线程在等待 BufferPoll 分配空间,即本地消息缓存已满 || closed // 4. producer 已经关闭 || flushInProgress();// 5. 有线程正在等待 flush 操做完成 if (sendable && !backingOff) { //知足可发送状态,而且没有处于重试操做的状态下,将当前leader加入可发送节点 readyNodes.add(leader); } else { long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); // 更新下次执行 ready 断定的时间间隔 nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } //返回检查结果 return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }
调用client.ready对获取到的全部网络节点进行连通性检测:
public boolean ready(Node node, long now) { if (node.isEmpty()) throw new IllegalArgumentException("Cannot connect to empty node " + node); //connectionStates已就绪,直接返回可链接 if (isReady(node, now)) return true; //链接状态显示可链接 if (connectionStates.canConnect(node.idString(), now)) // 则调用selector初始化该链接 initiateConnect(node, now); return false; }
经过.accumulator.drain获取全部待发送的批量消息以及其对应的leader节点集合:
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); //返回的nodeid对应的发送消息batch信息 Map<Integer, List<ProducerBatch>> batches = new HashMap<>(); //遍历每个可连通的node for (Node node : nodes) { int size = 0; List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); List<ProducerBatch> ready = new ArrayList<>(); /* drainIndex 用于记录上次发送中止的位置,本次继续从当前位置开始发送, * 若是每次都是从 0 位置开始,可能会致使排在后面的分区饿死,这是一个简单的负载均衡策略 */ int start = drainIndex = drainIndex % parts.size(); do { PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); //若是是须要保障消息的强顺序性,则不能将消息添加进目标分区,不然会致使消息乱序 if (!muted.contains(tp)) { // 获取当前分区对应的 RecordBatch 集合 Deque<ProducerBatch> deque = getDeque(tp); if (deque != null) { synchronized (deque) { ProducerBatch first = deque.peekFirst(); if (first != null) { //当前第一个batch是否处于重试状态或者已重试过 boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; // 没有重试过,或者重试已超时 if (!backoff) { if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // 单次消息数据量已达到上限,结束循环,通常对应一个请求的大小,防止请求消息过大 break; } //处理处于重试状态下的消息 else { //省略在重试状态下的事务处理流程 //遍历每一个节点,节点的起始位置也以一个轮训方式来遍历,而且每一个队列里面的batch也都是只取第一个,每一个队列轮训着取,全部这些操做都是为了对消息发送的均衡处理,保障消息公平发送 ProducerBatch batch = deque.pollFirst(); //close表明着消息batch通道被关闭,只能读取,没法写入 batch.close(); size += batch.records().sizeInBytes(); ready.add(batch); batch.drained(now); } } } } } } //更新本次drainIndex this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start != drainIndex); batches.put(node.id(), ready); } return batches; }
调用accumulator.mutePartition将分区信息添加到mute集合,这个过程比较简单就是将遍历待发送的batch消息,若是设置了保障消息时序强一致性,那就将这个分区信息保存在mute集合之中,每次发送消息以前都会去检查这个队列是否包含已有的分区,若是有则本次不作发送,每发送完成以后都会调用mute集合去除所在的分区信息,以即可以放入下一个消息进行发送。
调用sendProduceRequests发送生产消息请求:
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) { if (batches.isEmpty()) return; Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size()); final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size()); // 遍历全部batch消息,找到最小的版本号信息 byte minUsedMagic = apiVersions.maxUsableProduceMagic(); for (ProducerBatch batch : batches) { if (batch.magic() < minUsedMagic) minUsedMagic = batch.magic(); } // 遍历 RecordBatch 集合,整理成 produceRecordsByPartition 和 recordsByPartition for (ProducerBatch batch : batches) { TopicPartition tp = batch.topicPartition; MemoryRecords records = batch.records(); // 进行消息的向下兼容转换操做,例如分区消息的迁移,从一个高版本迁移到低版本,就须要额外从新构造MemoryRecords if (!records.hasMatchingMagic(minUsedMagic)) records = batch.records().downConvert(minUsedMagic, 0, time).records(); produceRecordsByPartition.put(tp, records); recordsByPartition.put(tp, batch); } String transactionalId = null; if (transactionManager != null && transactionManager.isTransactional()) { transactionalId = transactionManager.transactionalId(); } // 建立 ProduceRequest 请求构造器,produceRecordsByPartition用于构造请求器 ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout, produceRecordsByPartition, transactionalId); // 建立回调对象,用于处理响应,recordsByPartition用于响应回调处理 RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; String nodeId = Integer.toString(destination); // 建立 ClientRequest 请求对象,若是 acks 不等于 0 则表示会等待服务端的响应 ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback); //调用NetWorkClient发送消息 client.send(clientRequest, now); log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); }
接下来就须要了解一下NetWorkClient的发送流程,它的发送最终都是调用doSend函数完成:
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) { //获取目标节点id String nodeId = clientRequest.destination(); RequestHeader header = clientRequest.makeHeader(request.version()); //省略日志信息打印 Send send = request.toSend(nodeId, header); //新建InFlightRequest,并将请求添加进去 InFlightRequest inFlightRequest = new InFlightRequest( header, clientRequest.createdTimeMs(), clientRequest.destination(), clientRequest.callback(), clientRequest.expectResponse(), isInternalRequest, request, send, now); this.inFlightRequests.add(inFlightRequest); //网络消息发送 selector.send(inFlightRequest.send); }
至此,咱们消息发送讲解完成,接下来说解一下消息的响应拉取过程。
消息的响应拉取是从NetworkClient的poll方法开始的,它的逻辑解析以下:
public List<ClientResponse> poll(long timeout, long now) { ensureActive(); if (!abortedSends.isEmpty()) { // 当链接断开,或者版本不支持,须要优先处理这些响应 List<ClientResponse> responses = new ArrayList<>(); handleAbortedSends(responses); completeResponses(responses); return responses; } //metada信息的响应处理 long metadataTimeout = metadataUpdater.maybeUpdate(now); try { //该poll过程处理全部的网络链接、断开链接,初始化新的发送以及处理过程总的发送和接收请求,接收的信息最终会放到completedReceives中 this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // 处理全部的完成操做及响应 long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleInitiateApiVersionRequests(updatedNow); handleTimedOutRequests(responses, updatedNow); completeResponses(responses); return responses; }
响应操做的核心处理几个函数就是handle*的几个函数,咱们分别介绍一下:
handleCompletedSends该方法遍历全部发送完成的对象,对于那些不但愿接收响应的请求,建立本地响应队列并添加进去:
private void handleCompletedSends(List<ClientResponse> responses, long now) { // 遍历全部的发送完成的send对象 for (Send send : this.selector.completedSends()) { //找出最近一次在inFlightRequests的发送请求信息 InFlightRequest request = this.inFlightRequests.lastSent(send.destination()); //对于发送成功,可是不指望服务端响应的请求,建立本地响应队列并将其添加进去 if (!request.expectResponse) { //inFlightRequests在发送的时候添加,接收完成后去除 this.inFlightRequests.completeLastSent(send.destination()); // 添加到本地响应队列中 responses.add(request.completed(null, now)); } } }
handleCompletedReceives该方法获取服务端响应,并依据响应分类处理,分别是metadata、apiversion
private void handleCompletedReceives(List<ClientResponse> responses, long now) { //从completedReceives中遍历全部的接收信息,completedReceives中的信息是在上一层的selector.poll中添加进去的 for (NetworkReceive receive : this.selector.completedReceives()) { //获取返回响应的节点 ID String source = receive.source(); //从 inFlightRequests 集合中获取缓存的 request 对象 InFlightRequest req = inFlightRequests.completeNext(source); //解析响应信息 Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); //省略日志 AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct); if (req.isInternalRequest && body instanceof MetadataResponse) //处理metadata的更新响应信息 metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); else if (req.isInternalRequest && body instanceof ApiVersionsResponse) // 若是是更新 API 版本的响应,则更新本地缓存的目标节点支持的 API 版本信息 handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); else //添加到本地响应队列 responses.add(req.completed(body, now)); } }
handleDisconnections该方法会最终调用 Selector#disconnected方法获取断开链接的节点 ID 集合,并更新相应节点的链接状态为 DISCONNECTED,同时会清空本地缓存的与该节点相关的数据,最终建立一个 disconnected 类型的 ClientResponse 对象添加到结果集合中。若是这一步确实发现了已断开的链接,则标记须要更新本地缓存的节点元数据信息。
handleConnections该方法会调用Selector#connected方法获取链接正常的节点 ID 集合,若是当前节点是第一次创建链接,则需获取节点支持的 API 版本信息,方法会将当前节点的链接状态设置为CHECKING_API_VERSIONS,并将节点 ID 添加到 NetworkClient#nodesNeedingApiVersionsFetch 集合中,对于其它节点,则更新相应链接状态为 READY。
handleInitiateApiVersionRequests该方法用于处理NetworkClient#handleConnections 方法中标记的须要获取支持的API版本信息的节点,即记录到 NetworkClient#nodesNeedingApiVersionsFetch 集合中的节点。方法会遍历处理集合中的节点,并在判断目标节点容许接收请求的状况下,构建 ApiVersionsRequest 请求以获取目标节点支持的 API 版本信息,该请求会被包装成 ClientRequest 对象,并在下次 Selector#poll操做时一并送出。
handleTimedOutRequests该方法会遍历缓存在 inFlightRequests 中已经超时的相关请求对应的节点集合,针对此类节点将其视做断开链接进行处理。方法会建立一个 disconnected 类型的 ClientResponse 对象添加到结果集合中,并标记须要更新本地缓存的集群元数据信息。
最后一个是completeResponses,它的流程很简单,触发生产者的回调函数,通知服务端的响应信息:
private void completeResponses(List<ClientResponse> responses) { for (ClientResponse response : responses) { try { //遍历以前全部阶段的handle*处理过程当中添加的response,并回调其callback方法,这样生产者就收到服务端响应信息了 response.onComplete(); } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } }
Note:本公众号全部kafka系列的架构及源码分析文章都是基于1.1.2版本,若有特殊会进行额外声明。
kafka系列:
扫码关注咱们
互联网架构师之路
过滤技术杂质,只为精品呈现
若是喜欢,请关注加星喔