kafka生产者的消息发送机制

开篇一张图,读者更幸福,很少说上架构图。
kafka生产者的消息发送机制
这个架构图咱们在前面一篇文章《kafka生产者的蓄水池机制》里面介绍过,上一篇咱们是介绍了这个图里面的消息收集过程(咱们成为“蓄水池”机制),这里咱们就介绍它的另一部分,消息的发送机制。
kafka生产者的消息发送机制 node

Sender运行过程

全部的消息发送,都是从Sender线程开始,它是一个守护线程,因此咱们首先就须要来看一下Sender的run方法,最外层的run方式是一个主循环不断调用具体逻辑运行方法run,咱们看下它的具体逻辑处理run方法:api

void run(long now) {
    //生产者事务管理相关处理,本章节不作具体分析,后面专门章节再作分析,你们先了解一下
        if (transactionManager != null) {
            try {
                if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                    // Check if the previous run expired batches which requires a reset of the producer state.
                    transactionManager.resetProducerId();

                if (!transactionManager.isTransactional()) {
                    // this is an idempotent producer, so make sure we have a producer id
                    maybeWaitForProducerId();
                } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
                    transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
                            "some previously sent messages and can no longer retry them. It isn't safe to continue."));
                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                    // as long as there are outstanding transactional requests, we simply wait for them to return
                    client.poll(retryBackoffMs, now);
                    return;
                }

                // do not continue sending if the transaction manager is in a failed state or if there
                // is no producer id (for the idempotent case).
                if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
                    RuntimeException lastError = transactionManager.lastError();
                    if (lastError != null)
                        maybeAbortBatches(lastError);
                    client.poll(retryBackoffMs, now);
                    return;
                } else if (transactionManager.hasAbortableError()) {
                    accumulator.abortUndrainedBatches(transactionManager.lastError());
                }
            } catch (AuthenticationException e) {
                // This is already logged as error, but propagated here to perform any clean ups.
                log.trace("Authentication exception while processing transactional request: {}", e);
                transactionManager.authenticationFailed(e);
            }
        }
    //实际的数据发送请求,并处理服务端响应
        long pollTimeout = sendProducerData(now);
        client.poll(pollTimeout, now);
    }

接下来咱们从两个层面来看,一个是消息发送,一个是消息返回响应处理。缓存

kafka生产者的消息发送机制
消息的发送markdown

先看下sendProducerData的具体逻辑:网络

private long sendProducerData(long now) {
    //获取集群信息
        Cluster cluster = metadata.fetch();

        // 获取那些能够发送消息的分区列表信息
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // 若是这些分区没有对应的leader,就须要强制对metadata信息进行更新
        if (!result.unknownLeaderTopics.isEmpty()) {
            // 没有leader的场景例如leader选举,或者topic已失效,这些都须要将topic从新加入,发送到服务端请求更新,由于如今还须要往这些topic发送消息
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }

        // 遍历全部获取到的网络节点,基于网络链接状态来检测这些节点是否可用,若是不可用则剔除
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
      //节点链接状态检查,若是容许链接,则从新建立链接
            if (!this.client.ready(node, now)) {
          //未准备好的节点删除
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // 获取全部待发送的批量消息以及其对应的leader节点集合
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                this.maxRequestSize, now);

    //若是须要保证消息的强顺序性,则缓存对应 topic 分区对象,防止同一时间往同一个 topic 分区发送多条处于未完成状态的消息
        if (guaranteeMessageOrder) {
            // 将每一个batch的分区对象信息加入到mute集合,采起Set实现,重复的topicpartition信息不会被加入
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        // 获取本地过时的消息,返回 TimeoutException,并释放空间
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
        // 过时的batch消息处理
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
            }
        }
    //更新度量信息
        sensors.updateProduceRequestMetrics(batches);

        // 设置pollTimeout,若是存在待发送的消息,则设置 pollTimeout 等于 0,这样能够当即发送请求,从而可以缩短剩余消息的缓存时间,避免堆积
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0;
        }
    //调用NetWorkClient将消息发送到服务端
        sendProduceRequests(batches, now);

        return pollTimeout;
}

概括起来sendProducerData的核心流程以下:架构

1.经过accumulator.ready方法获取可发送的分区列表信息;并发

2.调用client.ready对获取到的全部网络节点进行连通性检测;负载均衡

3.经过.accumulator.drain获取全部待发送的批量消息以及其对应的leader节点集合;ide

4.在须要保障分区消息的强顺序性的场景下调用accumulator.mutePartition将分区信息添加到mute集合;函数

5.调用sendProduceRequests发送生产消息请求。

下面逐个流程讲解:

经过accumulator.ready方法获取可发送的分区列表信息:

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        //可接受消息的节点集合
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
    //记录未找到leader副本的Topic信息集合
        Set<String> unknownLeaderTopics = new HashSet<>();

    // 是否有线程在等待 BufferPool 分配空间
        boolean exhausted = this.free.queued() > 0;
    //遍历待发送的batch里面的每一个分区信息,对其leader执行断定
        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<ProducerBatch> deque = entry.getValue();

      // 获取当前 topic 分区 leader 副本所在的节点
            Node leader = cluster.leaderFor(part);
            synchronized (deque) {
                if (leader == null && !deque.isEmpty()) {
                    // 该分区下的leader未知,可是存在往该分区发送的消息,须要记录下,在后面的流程当发现有未知leader须要强制向服务端发送metadata的信息更新请求
                    unknownLeaderTopics.add(part.topic());
                } 
        //全部可发送的节点须要不能在mute集合里面,保障消息有序性,当mute里面还有消息未发送完成不能继续追加发送
        else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                    ProducerBatch batch = deque.peekFirst();
                    if (batch != null) {
                        long waitedTimeMs = batch.waitedTimeMs(nowMs);
            //是否处于重试操做判断
                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        boolean full = deque.size() > 1 || batch.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
            //标记当前leader是否可发送
                        boolean sendable = full // 1. 队列中有多个 RecordBatch,或第一个 RecordBatch 已满
            || expired // 2. 当前等待重试的时间过长
            || exhausted // 3. 有其余线程在等待 BufferPoll 分配空间,即本地消息缓存已满
            || closed // 4. producer 已经关闭
            || flushInProgress();// 5. 有线程正在等待 flush 操做完成
                        if (sendable && !backingOff) {
            //知足可发送状态,而且没有处于重试操做的状态下,将当前leader加入可发送节点
                            readyNodes.add(leader);
                        } else {
                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
              // 更新下次执行 ready 断定的时间间隔
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }
    //返回检查结果
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

调用client.ready对获取到的全部网络节点进行连通性检测:

public boolean ready(Node node, long now) {
        if (node.isEmpty())
            throw new IllegalArgumentException("Cannot connect to empty node " + node);
    //connectionStates已就绪,直接返回可链接
        if (isReady(node, now))
            return true;

    //链接状态显示可链接
        if (connectionStates.canConnect(node.idString(), now))
            // 则调用selector初始化该链接
            initiateConnect(node, now);

        return false;
}

经过.accumulator.drain获取全部待发送的批量消息以及其对应的leader节点集合:

public Map<Integer, List<ProducerBatch>> drain(Cluster cluster,
                                                   Set<Node> nodes,
                                                   int maxSize,
                                                   long now) {
        if (nodes.isEmpty())
            return Collections.emptyMap();

        //返回的nodeid对应的发送消息batch信息
        Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
    //遍历每个可连通的node
        for (Node node : nodes) {
            int size = 0;
            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
            List<ProducerBatch> ready = new ArrayList<>();
            /* drainIndex 用于记录上次发送中止的位置,本次继续从当前位置开始发送,
      * 若是每次都是从 0 位置开始,可能会致使排在后面的分区饿死,这是一个简单的负载均衡策略
      */
            int start = drainIndex = drainIndex % parts.size();
            do {
                PartitionInfo part = parts.get(drainIndex);
                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                //若是是须要保障消息的强顺序性,则不能将消息添加进目标分区,不然会致使消息乱序
                if (!muted.contains(tp)) {
          // 获取当前分区对应的 RecordBatch 集合
                    Deque<ProducerBatch> deque = getDeque(tp);
                    if (deque != null) {
                        synchronized (deque) {
                            ProducerBatch first = deque.peekFirst();
                            if (first != null) {
                //当前第一个batch是否处于重试状态或者已重试过
                                boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
                                // 没有重试过,或者重试已超时
                                if (!backoff) {
                                    if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
                                         // 单次消息数据量已达到上限,结束循环,通常对应一个请求的大小,防止请求消息过大
                                        break;
                                    } 
                  //处理处于重试状态下的消息
                  else {
                                        //省略在重试状态下的事务处理流程
                    //遍历每一个节点,节点的起始位置也以一个轮训方式来遍历,而且每一个队列里面的batch也都是只取第一个,每一个队列轮训着取,全部这些操做都是为了对消息发送的均衡处理,保障消息公平发送
                    ProducerBatch batch = deque.pollFirst();
                    //close表明着消息batch通道被关闭,只能读取,没法写入
                                        batch.close();
                                        size += batch.records().sizeInBytes();
                                        ready.add(batch);
                                        batch.drained(now);
                                    }
                                }
                            }
                        }
                    }
                }
        //更新本次drainIndex
                this.drainIndex = (this.drainIndex + 1) % parts.size();
            } while (start != drainIndex);
            batches.put(node.id(), ready);
        }
        return batches;
}

调用accumulator.mutePartition将分区信息添加到mute集合,这个过程比较简单就是将遍历待发送的batch消息,若是设置了保障消息时序强一致性,那就将这个分区信息保存在mute集合之中,每次发送消息以前都会去检查这个队列是否包含已有的分区,若是有则本次不作发送,每发送完成以后都会调用mute集合去除所在的分区信息,以即可以放入下一个消息进行发送。

调用sendProduceRequests发送生产消息请求:

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
        if (batches.isEmpty())
            return;

        Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
        final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

        // 遍历全部batch消息,找到最小的版本号信息
        byte minUsedMagic = apiVersions.maxUsableProduceMagic();
        for (ProducerBatch batch : batches) {
            if (batch.magic() < minUsedMagic)
                minUsedMagic = batch.magic();
        }

        // 遍历 RecordBatch 集合,整理成 produceRecordsByPartition 和 recordsByPartition
        for (ProducerBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            MemoryRecords records = batch.records();

            // 进行消息的向下兼容转换操做,例如分区消息的迁移,从一个高版本迁移到低版本,就须要额外从新构造MemoryRecords
            if (!records.hasMatchingMagic(minUsedMagic))
                records = batch.records().downConvert(minUsedMagic, 0, time).records();
            produceRecordsByPartition.put(tp, records);
            recordsByPartition.put(tp, batch);
        }

        String transactionalId = null;
        if (transactionManager != null && transactionManager.isTransactional()) {
            transactionalId = transactionManager.transactionalId();
        }

    // 建立 ProduceRequest 请求构造器,produceRecordsByPartition用于构造请求器
        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                produceRecordsByPartition, transactionalId);
    // 建立回调对象,用于处理响应,recordsByPartition用于响应回调处理
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };

        String nodeId = Integer.toString(destination);
    // 建立 ClientRequest 请求对象,若是 acks 不等于 0 则表示会等待服务端的响应
        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
    //调用NetWorkClient发送消息
        client.send(clientRequest, now);
        log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

接下来就须要了解一下NetWorkClient的发送流程,它的发送最终都是调用doSend函数完成:

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        //获取目标节点id
    String nodeId = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        //省略日志信息打印
        Send send = request.toSend(nodeId, header);
    //新建InFlightRequest,并将请求添加进去
        InFlightRequest inFlightRequest = new InFlightRequest(
                header,
                clientRequest.createdTimeMs(),
                clientRequest.destination(),
                clientRequest.callback(),
                clientRequest.expectResponse(),
                isInternalRequest,
                request,
                send,
                now);
        this.inFlightRequests.add(inFlightRequest);
    //网络消息发送
        selector.send(inFlightRequest.send);
}

至此,咱们消息发送讲解完成,接下来说解一下消息的响应拉取过程。

kafka生产者的消息发送机制

消息的响应拉取

消息的响应拉取是从NetworkClient的poll方法开始的,它的逻辑解析以下:

public List<ClientResponse> poll(long timeout, long now) {
        ensureActive();

        if (!abortedSends.isEmpty()) {
            // 当链接断开,或者版本不支持,须要优先处理这些响应
            List<ClientResponse> responses = new ArrayList<>();
            handleAbortedSends(responses);
            completeResponses(responses);
            return responses;
        }
    //metada信息的响应处理
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
      //该poll过程处理全部的网络链接、断开链接,初始化新的发送以及处理过程总的发送和接收请求,接收的信息最终会放到completedReceives中
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // 处理全部的完成操做及响应
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutRequests(responses, updatedNow);
        completeResponses(responses);

        return responses;
}

响应操做的核心处理几个函数就是handle*的几个函数,咱们分别介绍一下:

handleCompletedSends该方法遍历全部发送完成的对象,对于那些不但愿接收响应的请求,建立本地响应队列并添加进去:

private void handleCompletedSends(List<ClientResponse> responses, long now) {
        // 遍历全部的发送完成的send对象
        for (Send send : this.selector.completedSends()) {
        //找出最近一次在inFlightRequests的发送请求信息
            InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
      //对于发送成功,可是不指望服务端响应的请求,建立本地响应队列并将其添加进去
            if (!request.expectResponse) {
        //inFlightRequests在发送的时候添加,接收完成后去除
                this.inFlightRequests.completeLastSent(send.destination());
        // 添加到本地响应队列中
                responses.add(request.completed(null, now));
            }
        }
    }

handleCompletedReceives该方法获取服务端响应,并依据响应分类处理,分别是metadata、apiversion

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
      //从completedReceives中遍历全部的接收信息,completedReceives中的信息是在上一层的selector.poll中添加进去的
        for (NetworkReceive receive : this.selector.completedReceives()) {
        //获取返回响应的节点 ID
            String source = receive.source();
      //从 inFlightRequests 集合中获取缓存的 request 对象
            InFlightRequest req = inFlightRequests.completeNext(source);
      //解析响应信息
            Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
                throttleTimeSensor, now);
           //省略日志
            AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
            if (req.isInternalRequest && body instanceof MetadataResponse)
          //处理metadata的更新响应信息
                metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
            else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
           // 若是是更新 API 版本的响应,则更新本地缓存的目标节点支持的 API 版本信息
                handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
            else
          //添加到本地响应队列
                responses.add(req.completed(body, now));
        }
    }

handleDisconnections该方法会最终调用 Selector#disconnected方法获取断开链接的节点 ID 集合,并更新相应节点的链接状态为 DISCONNECTED,同时会清空本地缓存的与该节点相关的数据,最终建立一个 disconnected 类型的 ClientResponse 对象添加到结果集合中。若是这一步确实发现了已断开的链接,则标记须要更新本地缓存的节点元数据信息。

handleConnections该方法会调用Selector#connected方法获取链接正常的节点 ID 集合,若是当前节点是第一次创建链接,则需获取节点支持的 API 版本信息,方法会将当前节点的链接状态设置为CHECKING_API_VERSIONS,并将节点 ID 添加到 NetworkClient#nodesNeedingApiVersionsFetch 集合中,对于其它节点,则更新相应链接状态为 READY。

handleInitiateApiVersionRequests该方法用于处理NetworkClient#handleConnections 方法中标记的须要获取支持的API版本信息的节点,即记录到 NetworkClient#nodesNeedingApiVersionsFetch 集合中的节点。方法会遍历处理集合中的节点,并在判断目标节点容许接收请求的状况下,构建 ApiVersionsRequest 请求以获取目标节点支持的 API 版本信息,该请求会被包装成 ClientRequest 对象,并在下次 Selector#poll操做时一并送出。

handleTimedOutRequests该方法会遍历缓存在 inFlightRequests 中已经超时的相关请求对应的节点集合,针对此类节点将其视做断开链接进行处理。方法会建立一个 disconnected 类型的 ClientResponse 对象添加到结果集合中,并标记须要更新本地缓存的集群元数据信息。

最后一个是completeResponses,它的流程很简单,触发生产者的回调函数,通知服务端的响应信息:

private void completeResponses(List<ClientResponse> responses) {
        for (ClientResponse response : responses) {
            try {
        //遍历以前全部阶段的handle*处理过程当中添加的response,并回调其callback方法,这样生产者就收到服务端响应信息了
                response.onComplete();
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
    }

Note:本公众号全部kafka系列的架构及源码分析文章都是基于1.1.2版本,若有特殊会进行额外声明。

推荐阅读

kafka系列:

  1. kafka是如何作到百万级高并发低迟延的?
  2. kafka生产者的蓄水池机制

kafka生产者的消息发送机制
kafka生产者的消息发送机制
扫码关注咱们
互联网架构师之路

过滤技术杂质,只为精品呈现

若是喜欢,请关注加星喔

相关文章
相关标签/搜索