Kafka源码剖析 —— 网络I/O篇 —— 浅析KafkaSelector

##NioSelector和KafkaSelector有什么区别?java

先说结论,KafkaSelector(org.apache.kafka.common.network.selector)是对NioSelector(java.nio.channels.Selector)的进一步封装。回想一下NioSelector,它参与了IO中的哪些过程?node

一、建立一个通道,并将通道注册到NioSelector上,咱们能够获得一个SelectionKey 二、轮询NioSelector中的ready集合,拿到对应的SelectionKey,并根据这个SelectionKey所关注的事件去执行对应的操做apache

实际上,KafkaSelector也是在调用NioSelector去执行这些操做,待补充……网络

##1、建立链接socket

KafkaSelector建立链接,和普通的NioSelector并无什么不一样,首先建立一个通道,并将其设置为非阻塞式的长链接,设置完毕后,执行链接操做。ui

SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);// 非阻塞模式
        Socket socket = socketChannel.socket();
        socket.setKeepAlive(true);// 设置为长链接
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {
            socket.setSendBufferSize(sendBufferSize);// 设置SO_SNDBUF 大小
        }
        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {
            socket.setReceiveBufferSize(receiveBufferSize);// 设置 SO_RCVBUF 大小
        }
        socket.setTcpNoDelay(true);
        boolean connected;
        try {
            connected = socketChannel.connect(address);// 由于是非阻塞模式,因此方法可能会在链接正式链接以前返回
        } catch (UnresolvedAddressException e) {
            socketChannel.close();
            throw new IOException("Can't resolve address: " + address, e);
        } catch (IOException e) {
            socketChannel.close();
            throw e;
        }

建立完通道后,将其注册到NioSelector上,并关注OP_CONNECT,再以节点Id,SelectionKey来建立KafkaChannel,这里先不详细说明KafkaChannel,它是对通道的进一步封装。在建立完KafkaChannel后,将KafkaChannel与SelectionKey、节点ID作进一步绑定。this

SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);// 将当前这个socketChannel注册到nioSelector上,并关注OP_CONNECT事件
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);// 建立KafkaChannel
        key.attach(channel);// 将channel绑定到key上
        this.channels.put(id, channel);// 将 nodeId 和 Channel绑定

这样有一个好处,首先KafkaChannel中包含了节点ID与SelectionKey,而咱们也能够根据节点ID来拿到KafkaChannel,一样能够根据SelectionKey来拿到KafkaChannel,这就意味着,咱们只要拿到了KafkaChannel、SelectionKey、节点ID中的任意一个,均可以经过这些引用关系拿到彼此,从而进行相关操做。.net

##2、预发送 实际上就是将要发送的ByteBuffer扔进KafkaChannel,此时并未进行IO操做,这里的Send对象,实际上就是对ByteBuffer的进一步封装,它主要包含了将要发往的节点ID、ByteBuffer大小、是否发送完毕等信息。咱们这里根据节点ID,从咱们刚才的channels中,取出KafkaChannel。code

public void send(Send send) {
        // 看看send要发的这个nodeId在不在
        KafkaChannel channel = channelOrFail(send.destination());
        try {
            // 把数据扔进KafkaChannel中(只能放一个,放多个会报错),并关注write事件
            channel.setSend(send);
        } catch (CancelledKeyException e) {

            // 失败了加一条node_id的失败记录
            this.failedSends.add(send.destination());
            close(channel);
        }
    }

##3、进行IO操做 来到了咱们比较熟悉的轮询环节,从NioSelector中取出全部SelectionKey进行轮询。对象

if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false);// 处理I/O的核心方法
            pollSelectionKeys(immediatelyConnectedKeys, true);
}
 
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
		Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            // 建立链接时(connect)将kafkaChannel注册到key上,就是为了在这里获取
            KafkaChannel channel = channel(key);

……………………

#####一、判断一下key 链接好了没有,由于咱们用的是非阻塞链接,因此到了轮询阶段,尚未完成链接是正常的。

if (isImmediatelyConnected || key.isConnectable()) {
                    // finishConnect方法会先检测socketChannel是否创建完成,创建后,会取消对OP_CONNECT事件关注,//TODO 并开始关注OP_READ事件
                    if (channel.finishConnect()) {
                        this.connected.add(channel.id());// 将当前channel id 添加到已链接的集合中
                        this.sensors.connectionCreated.record();
                    } else {
                        continue;// 表明链接未完成,则跳过对此Channel的后续处理
                    }
                }

#####二、身份验证(略过) #####三、判断KafkaChannel有没有准备好,有没有关注OP_READ,能不能读之类的,并进行读操做。 这里有一个判断,就是判断当前的KafkaChannel是否是在StagedReceives里。咱们日后看看,在从网络上读取数据时,咱们会将KafkaChannel扔进StagedReceives里,也就是说,若是这个KafkaChannel已经在StagedReceives里了,那么表明它已经在读数据了。

if (channel.ready() // 链接的三次握手完成,而且 todo 权限验证经过
                    && key.isReadable() // key是否关注了read事件
                    && !hasStagedReceive(channel)) {// todo 这个通道不能是正在读数据的,由于在读的时候,会把这个channel扔进stagedReceives里面
                    NetworkReceive networkReceive;

                    /**
                     * 实际上这里就是分屡次去一个channel取数据,直到取完,并将其保存在key:channel  value:new ArrayDeque<NetworkReceive> 中
                     */
                    while ((networkReceive = channel.read()) != null) {
                        addToStagedReceives(channel, networkReceive);
                    }
                }

#####四、判断KafkaChannel有没有准备好,有没有关注OP_WRITE,并进行写操做

if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    // 这里会将KafkaChannel的send字段发送出去,
                    // 若是未完成发送,或者没发完,则返回null
                    // 发送成功则返回send对象
                    if (send != null) {
                        this.completedSends.add(send);// 添加到completedSends集合
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }

##4、关闭空闲链接 在每一次IO操做完毕后,KafkaSelector都会调用一个方法,去关闭掉那些没怎么用的链接,实际上它就是一个基于时间戳的断连机制。 KafkaSelector中维护了一个哈希表,

LinkedHashMap<String, Long> lruConnections (new LinkedHashMap<>(16, .75F, true);

在每次进行IO操做时,将Key:节点ID,Value:当前时间戳扔进哈希表里面,在IO操做进行完毕时,检查一下,最大的那个节点,它的最后一次IO时间+connectionsMaxIdleNanos(建立KafkaSelector时指定),是否超过了当前的时间。 若是是,这个链接就会被关掉。

好比说connectionsMaxIdleNanos被指定成了1分钟,那么若是这个有序哈希表的最后一个节点的时间是一分钟以前,那么这个节点ID的通道将会被关掉。

private void maybeCloseOldestConnection() {
        if (currentTimeNanos > nextIdleCloseCheckTime) {
            if (lruConnections.isEmpty()) {
                nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
            } else {
                Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet()
                                                                              .iterator()
                                                                              .next();
                Long connectionLastActiveTime = oldestConnectionEntry.getValue();
                nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
                if (currentTimeNanos > nextIdleCloseCheckTime) {
                    String connectionId = oldestConnectionEntry.getKey();
                    if (log.isTraceEnabled()) {
                        log.trace("About to close the idle connection from " + connectionId
                            + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
                    }

                    disconnected.add(connectionId);
                    close(connectionId);
                }
            }
        }
    }

参考: 《Apache Kafka 源码剖析》 - 徐郡明著 Apache Kafka 源码 0.10.0.1

相关文章
相关标签/搜索