你绝对能看懂的Kafka源代码分析-NetworkClient类代码分析

上一节《Sender类代码分析》

通过前文的学习,我们知道Sender最终把消息发送出去,依靠的是NetWorkClient。它是Kafka的一个重要组件,负责网络IO,包括连接的建立,读数据、写数据等等。Kafka网络IO的实现是通过java的NIO,Kafka对NIO进行了封装。在学习Kafka网络IO相关之前,大家先参考网上文章对NIO有简单的了解(后续我可能也会写一篇入门教程),再继续阅读本篇文章。

Kafka IO部分设计

Kafka IO部分涉及的主要类和依赖关系见下图:

上半部分是Kafka的类,下半部是java nio的类。Kafka的类讲解如下:

1、NetWorkClient,顾名思义,这是Kafka IO对外暴露的客户端。IO操作都是通过它来对外暴露方法调用。实际上它是通过Kafka的KSelector来实现。

2、KSelector,其实此类名称也是Selector,为了区分nio的selector,故称之为KSelector。他拥有nio selector的引用。此外他维护了所有的KafkaChannel。

3、KafkaChannel,他对应nio中的Channel概念,它通过TransportLayer间接持有SocketChannel和SelectionKey这两个nio中的核心对象。另外他还维护了发送和接收的数据对象:Send实现及NetWorkReceive。另外请注意唯一一个从下往上的箭头,KafkaChannel还会把自己attach到自己对应的SelectionKey中。这样可以通过SelectionKey方便取到对应KafkaChannel。

4、TransportLayer,从名称可以看出这个类实现传输层功能,而传输是通过nio实现,所以他持有SocketChannel和Selector这两个nio的核心对象。他所做的事情就是通过这两个对象实现网络IO。

5、Send,这是一个接口,有多个实现,目的就是封装要发送的数据,底层是nio的ByteBuffer。

6、NetWorkReceive,接收数据的对象,底层是nio的ByteBuffer。

流程分析

NetWorkClient实现通道的建立,读取消息、发送消息等功能。这些功能上原理是相同的,我们继续从KafkaProducer发送消息为入口点,继续分析发送消息的流程。

前文讲到,Sender最终通过NetWorkClient的两个方法完成消息发送,如下:

 

client.send(clientRequest, now);
client.poll(pollTimeout, now);

那么我们就从这两个方法开始分析。

send()方法

我们回忆一下sender发送消息流程,sender把batch按照要发往的node分好类,分装为ClientRequest,然后调用NetWorkClient的send方法。在这个方法里并没有真正网络IO,而只是准备好了要发送的请求对象。

Sender的send方法中实际调用的是doSend(ClientRequest clientRequest, boolean isInternalRequest, long now)方法。

代码如下:

ensureActive();
    String nodeId = clientRequest.destination();
    if (!isInternalRequest) {
        // If this request came from outside the NetworkClient, validate
        // that we can send data.  If the request is internal, we trust
        // that internal code has done this validation.  Validation
        // will be slightly different for some internal requests (for
        // example, ApiVersionsRequests can be sent prior to being in
        // READY state.)
        if (!canSendRequest(nodeId, now))
            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
    }
    AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
    try {
        NodeApiVersions versionInfo = apiVersions.get(nodeId);
        short version;
        // Note: if versionInfo is null, we have no server version information. This would be
        // the case when sending the initial ApiVersionRequest which fetches the version
        // information itself.  It is also the case when discoverBrokerVersions is set to false.
        if (versionInfo == null) {
            version = builder.latestAllowedVersion();
            if (discoverBrokerVersions && log.isTraceEnabled())
                log.trace("No version information found when sending {} with correlation id {} to node {}. " +
                        "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
        } else {
            version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
                    builder.latestAllowedVersion());
        }
        // The call to build may also throw UnsupportedVersionException, if there are essential
        // fields that cannot be represented in the chosen version.
        doSend(clientRequest, isInternalRequest, now, builder.build(version));
    } catch (UnsupportedVersionException unsupportedVersionException) {
        // If the version is not supported, skip sending the request over the wire.
        // Instead, simply add it to the local queue of aborted requests.
        log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
                clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
        ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
                clientRequest.callback(), clientRequest.destination(), now, now,
                false, unsupportedVersionException, null, null);
        abortedSends.add(clientResponse);
    }
}

 

 

此方法逻辑如下:

1、检查NetWorkClient状态为激活。

2、取得请求发送目的地的nodeId。

3、如果是非内部请求,检查connectionState是否ready、Channel是否ready、是否达到发送中上限

4、通过ClientRequest携带的AbstractRequest.Builder对象获取的version以及目的地node的api version,来取得最终的version

5、通过builder.build(version)方法,来初始化request,这里实际生成的是ProduceRequest。

6、最后调用doSend(clientRequest, isInternalRequest, now, builder.build(version));

我们继续看doSend(clientRequest, isInternalRequest, now, builder.build(version))方法。

doSend()方法

此方法核心代码如下:

String destination = clientRequest.destination();
    RequestHeader header = clientRequest.makeHeader(request.version());
    Send send = request.toSend(destination, header);
    InFlightRequest inFlightRequest = new InFlightRequest(
            clientRequest,
            header,
            isInternalRequest,
            request,
            send,
            now);
    this.inFlightRequests.add(inFlightRequest);
    selector.send(send);

逻辑如下:

1、获取destination,实际上就是要发往的node的id。

2、生成Requestheader对象,包括apiKey 、version、clientId、 correlation这些属性。

3、生成待发送的send对象,这个send对象封装了目的地和header生成的ByteBuffer对象

4、生成InFlightRequest读喜庆。它持有ClientRequest,request,send等对象。

5、把InFlightRequest添加到inFlightRequests中,InFlightRequests中按照node的id存储InFlightRequest的队列。

6、最后调用通过selector的send(send)方法做IO前的最后准备工作。

Selector的send()方法

代码如下:

public void send(Send send) {
    String connectionId = send.destination();
    KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
    if (closingChannels.containsKey(connectionId)) {
        // ensure notification via `disconnected`, leave channel in the state in which closing was triggered
        this.failedSends.add(connectionId);
    } else {
        try {
            channel.setSend(send);
        } catch (Exception e) {
            // update the state for consistency, the channel will be discarded after `close`
            channel.state(ChannelState.FAILED_SEND);
            // ensure notification via `disconnected` when `failedSends` are processed in the next poll
            this.failedSends.add(connectionId);
            close(channel, CloseMode.DISCARD_NO_NOTIFY);
            if (!(e instanceof CancelledKeyException)) {
                log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
                        connectionId, e);
                throw e;
            }
        }
    }
}

 

这个方法逻辑如下:

1、我们知道Selector维护着所有的channel,这个方法先通过目标node的id,获取KafkaChannel或者新建Channel。

2、我们还知道KakkaChannel中维护着要发送的send对象,所以获取channel后,需要设置send对象。

 

至此,整个sender发送流程中的第一步才完整走完,也就是sender的run 方法中第一步sendProducerData(now):

此时还没有进行网络IO,只是做好了IO的准备。channel、要发送的数据,已经就绪,只待被发送出去。

整个准备IO请求从run方法进入,完整调用链如下:

整个流程最终的目的,就是把累积的消息按照node分组,生成请求对象,把数据设置到相应的channel中,并关注写事件。

接下来就是真正调用Client.poll方法进行网络IO的流程分析。

poll()方法

此方法主要代码如下:

........
try {
    this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
    log.error("Unexpected error during I/O", e);
}

// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
..........

此方法中通过selector的poll做网络IO,然后处理响应

主要逻辑如下:

1、调用selector的poll方法做IO

2、处理完成的发送响应

3、处理完成的接收响应

4、处理关闭的响应

5、处理连接状态变化

6、处理初始化ApiVersion请求

7、处理所有Response的callback

selector的poll方法

从这个方法开始,终于进入了真正的网络IO。我们逐层进行分析。

poll方法的主要逻辑如下:

1、清理各种状态及内部缓存

2、通过nio selector的selectedKeys方法获取已经准备好的SelectionKey的集合readyKeys。

3、如果keysWithBufferedRead不为空,则移除它和readyKeys的交集,得到toPoll集合。这样确保不会被处理两次。然后调用pollSelectionKeys(toPoll, false, endSelect);

4、调用pollSelectionKeys方法处理readyKeys。

5、调用 pollSelectionKeys方法处理immediatelyConnectedKeys

6、关闭已经延期并且允许关闭的Channel

7、关闭长期空闲的Channel

8、把stagedReceives中的receive转移到completedReceives列表中

最终重要的逻辑就是3、4、5步中调用的pollSelectionKeys方法处理readyKeys。下面我们对其进行分析。

selector的pollSelectionKeys()方法

这个方法中将会迭代Selectionkey的集合。因为迭代的顺序很可能每次都一样,如果memoryPool所剩过小,则会造成读取饥饿。所以当内存所剩过小时,则先打乱selectionKeys的顺序。

迭代逻辑如下:

1、通过key获取attch上的KafkaChannel。能这样取到,是因为connect的时候把kafaChannel attach到了SelectionKey上。

2、更新此Channel的LRU值

3、如果入参中立即连接为true或者selectionKey是可连接的,那么通过KafkaChannel的finishConnect()方法完成连接

4、如果KafkaChannel是连接上的,但是并不ready。那么调用channel.prepare()作准备

5、尝试做读取操作attemptRead(key, channel)。

首先判断如果channel是读ready的,并且在buffer中有数据可读,并且没有staged的接收数据,才进行处理

networkReceive = channel.read(),读取数据到networkReceive中

把networkReceive对象加入到stagedReceives

6、如果Channel写ready,则进行写操作。调用channel.write(), 内部调用Channel的send(Send send),然后再调用send的writeTo(transportLayer),这个方法里通过socketChannel.write(srcs)最终把数据写出去。最后移除写关注。

7、如果SelectionKey无效,则调用Close方法进行关闭。

总结一下,首先拿到准备好的SelectionKey,然后根据Channel关注事件的不同,进行相应的操作。比如连接或者读写。读写的都是通过nio中的channel进行的最终IO操作。

至此,NetworkClient的Poll方法中第一个重要的步骤,通过selector进行IO操作已经讲解完成,

回头看poll方法的源代码,我们可以看到在调用selector的poll操作后,流程并没有结束。接下来先是生成Response的列表List<ClientResponse> responses = new ArrayList<>();

然后调用了如下方法:

handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);

下面我们逐个讲解这些方法

handleCompletedSends()方法

循环selector中的completedSends列表,逐个处理完成的Send对象

1、从inFlightRequests中移除该send关联的inFlightRequest

2、通过InFlightRequest的completed方法,生成ClientResponse。

3、把ClientResponse放入List<ClientResponse>中

handleCompletedReceives()方法

循环selector中的completedReceives列表,逐个处理完成的Receive对象。

1、从inFlightRequests中移除该NetworkReceive关联的inFlightRequest

2、通过InFlightRequest的completed方法,生成ClientResponse。

3、把ClientResponse放入List<ClientResponse>中

handleDisconnections()方法

循环selector中的disconnected的Map,

1、取得node的id和对应的状态

2、processDisconnection中做断开的处理,

handleConnections()方法

循环处理selector中的connected列表

1、把需要再次拉去api版本的节点以及对应的ApiVersionsRequest.Builder保存到nodesNeedingApiVersionsFetch

2、把connectionStates中对应的node状态改为已连接

handleInitiateApiVersionRequests()方法

循环处理nodesNeedingApiVersionsFetch。

1、取得node的id和对应的ApiVersionsRequest.Builder

2、生成Client对象

3、通过doSend方法完成发送数据的准备。

handleTimedOutRequests()方法

遍历inFlightRequests,获取超时的node Id列表,循环处理。逻辑和handleDisconnections()一样。

completeResponses()方法

循环ClientResponse列表,调用ClientResponse的onComplete方法。此方法中实际调用了ClientResponse相关联的callback对象的onComplte方法。这个callback对象是我们在生成ClientRequest之前声明的,在Sender的sendProduceRequest方法中。代码如下:

RequestCompletionHandler callback = new RequestCompletionHandler() {

public void onComplete(ClientResponse response) {

handleProduceResponse(response, recordsByPartition, time.milliseconds());

}

};

可以看到最终调用了sender的 handleProduceResponse方法,我们最后再分析下此方法。

Sender的handleProduceResponse()方法

按照不同的情况分别调用completeBatch方法

1、连接被关闭

2、version不匹配

3、正常情况

取得ProduceResponse,迭代它内部的Map<TopicPartition, PartitionResponse> responses

3.1去的主题分区

3.2根据主题分区,取得该主题分区下的所有batches

3.3调用completeBatch

由此可以见正常情况,是按照主题分区分别调用completeBatch

Sender的completeBatch()方法

这是请求最终完成返回处理的方法,逻辑如下:

按照不同情况进行处理

1、如果是batch太大,那么对batch进行切分,然后重新通过追加消息到RecordAccumulator中

2、如果有error,并且可以重试,则再次通过追加消息到RecordAccumulator中

3、如果没有error,则调用completeBatch(batch, response),调用batch的回调方法,最后释放空间。

小结

本篇首先讲解了NetworkClient的设计。然后对两个主要过程进行详细讲解,一个是NetworkClient准备发送请求的过程,另外一个是NetworkClient最终IO的过程。

然后在本文最后介绍了网络请求后的response处理,需要重试的的进行重试,需要回调的则进行回调操作。

Kafka生产者总结

文章写到这里,Kafka生产者相关的源代码已经分析完毕。讲解的只是很多,回过头去看,好像又有些理不清头绪了,没关系,我们再回顾一下。

首先我们看一看关于Producer我们讲解了哪些内容:

1、KafkaProducer

2、RecordAccumulator

3、Sender

4、NetworkClient

 

整个消息的发送流程我做个简单的总结回顾

1、KafkaProducer是生产者的入口,也是主线程,它还维护sender子线程。

2、在主线程中,不断往RecordAccumulator中追加消息。

3、RecordAccumulator是一个消息的仓库,当有消息batch封箱完成时,KafkaProducer会唤醒Sender线程做消息的发送处理。

4、Sender首先把batch按照要发往的node分组,生成ClientRequest请求对象。

5、Sender再通过NetworkClient的send方法,把ClientRequest需要的资源准备好,如Channel,数据等。

6、Sender最后通过NetworkClient的poll方法,底层通过nio把准备好的请求最终发送出去。

7、Sender再统一处理response,进行重试或者回调。

 

短短几句话总结,化作代码却是千行万行。文章里涉及的代码讲解并没有完全覆盖,很多细节,以及一些分支流程,还需要再进一步学习。

真的是学海无涯、代码无边,只能耐着性子一点点的学,一点点的扣,相信坚持必有回报!