##NioSelector和KafkaSelector有什么区别?java
先说结论,KafkaSelector(org.apache.kafka.common.network.selector)是对NioSelector(java.nio.channels.Selector)的进一步封装。回想一下NioSelector,它参与了IO中的哪些过程?node
一、建立一个通道,并将通道注册到NioSelector上,咱们能够获得一个SelectionKey 二、轮询NioSelector中的ready集合,拿到对应的SelectionKey,并根据这个SelectionKey所关注的事件去执行对应的操做apache
实际上,KafkaSelector也是在调用NioSelector去执行这些操做,待补充……网络
##1、建立链接socket
KafkaSelector建立链接,和普通的NioSelector并无什么不一样,首先建立一个通道,并将其设置为非阻塞式的长链接,设置完毕后,执行链接操做。ui
SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false);// 非阻塞模式 Socket socket = socketChannel.socket(); socket.setKeepAlive(true);// 设置为长链接 if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) { socket.setSendBufferSize(sendBufferSize);// 设置SO_SNDBUF 大小 } if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) { socket.setReceiveBufferSize(receiveBufferSize);// 设置 SO_RCVBUF 大小 } socket.setTcpNoDelay(true); boolean connected; try { connected = socketChannel.connect(address);// 由于是非阻塞模式,因此方法可能会在链接正式链接以前返回 } catch (UnresolvedAddressException e) { socketChannel.close(); throw new IOException("Can't resolve address: " + address, e); } catch (IOException e) { socketChannel.close(); throw e; }
建立完通道后,将其注册到NioSelector上,并关注OP_CONNECT,再以节点Id,SelectionKey来建立KafkaChannel,这里先不详细说明KafkaChannel,它是对通道的进一步封装。在建立完KafkaChannel后,将KafkaChannel与SelectionKey、节点ID作进一步绑定。this
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);// 将当前这个socketChannel注册到nioSelector上,并关注OP_CONNECT事件 KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);// 建立KafkaChannel key.attach(channel);// 将channel绑定到key上 this.channels.put(id, channel);// 将 nodeId 和 Channel绑定
这样有一个好处,首先KafkaChannel中包含了节点ID与SelectionKey,而咱们也能够根据节点ID来拿到KafkaChannel,一样能够根据SelectionKey来拿到KafkaChannel,这就意味着,咱们只要拿到了KafkaChannel、SelectionKey、节点ID中的任意一个,均可以经过这些引用关系拿到彼此,从而进行相关操做。.net
##2、预发送 实际上就是将要发送的ByteBuffer扔进KafkaChannel,此时并未进行IO操做,这里的Send对象,实际上就是对ByteBuffer的进一步封装,它主要包含了将要发往的节点ID、ByteBuffer大小、是否发送完毕等信息。咱们这里根据节点ID,从咱们刚才的channels中,取出KafkaChannel。code
public void send(Send send) { // 看看send要发的这个nodeId在不在 KafkaChannel channel = channelOrFail(send.destination()); try { // 把数据扔进KafkaChannel中(只能放一个,放多个会报错),并关注write事件 channel.setSend(send); } catch (CancelledKeyException e) { // 失败了加一条node_id的失败记录 this.failedSends.add(send.destination()); close(channel); } }
##3、进行IO操做 来到了咱们比较熟悉的轮询环节,从NioSelector中取出全部SelectionKey进行轮询。对象
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { pollSelectionKeys(this.nioSelector.selectedKeys(), false);// 处理I/O的核心方法 pollSelectionKeys(immediatelyConnectedKeys, true); } private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) { Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); // 建立链接时(connect)将kafkaChannel注册到key上,就是为了在这里获取 KafkaChannel channel = channel(key); ……………………
#####一、判断一下key 链接好了没有,由于咱们用的是非阻塞链接,因此到了轮询阶段,尚未完成链接是正常的。
if (isImmediatelyConnected || key.isConnectable()) { // finishConnect方法会先检测socketChannel是否创建完成,创建后,会取消对OP_CONNECT事件关注,//TODO 并开始关注OP_READ事件 if (channel.finishConnect()) { this.connected.add(channel.id());// 将当前channel id 添加到已链接的集合中 this.sensors.connectionCreated.record(); } else { continue;// 表明链接未完成,则跳过对此Channel的后续处理 } }
#####二、身份验证(略过) #####三、判断KafkaChannel有没有准备好,有没有关注OP_READ,能不能读之类的,并进行读操做。 这里有一个判断,就是判断当前的KafkaChannel是否是在StagedReceives里。咱们日后看看,在从网络上读取数据时,咱们会将KafkaChannel扔进StagedReceives里,也就是说,若是这个KafkaChannel已经在StagedReceives里了,那么表明它已经在读数据了。
if (channel.ready() // 链接的三次握手完成,而且 todo 权限验证经过 && key.isReadable() // key是否关注了read事件 && !hasStagedReceive(channel)) {// todo 这个通道不能是正在读数据的,由于在读的时候,会把这个channel扔进stagedReceives里面 NetworkReceive networkReceive; /** * 实际上这里就是分屡次去一个channel取数据,直到取完,并将其保存在key:channel value:new ArrayDeque<NetworkReceive> 中 */ while ((networkReceive = channel.read()) != null) { addToStagedReceives(channel, networkReceive); } }
#####四、判断KafkaChannel有没有准备好,有没有关注OP_WRITE,并进行写操做
if (channel.ready() && key.isWritable()) { Send send = channel.write(); // 这里会将KafkaChannel的send字段发送出去, // 若是未完成发送,或者没发完,则返回null // 发送成功则返回send对象 if (send != null) { this.completedSends.add(send);// 添加到completedSends集合 this.sensors.recordBytesSent(channel.id(), send.size()); } }
##4、关闭空闲链接 在每一次IO操做完毕后,KafkaSelector都会调用一个方法,去关闭掉那些没怎么用的链接,实际上它就是一个基于时间戳的断连机制。 KafkaSelector中维护了一个哈希表,
LinkedHashMap<String, Long> lruConnections (new LinkedHashMap<>(16, .75F, true);
在每次进行IO操做时,将Key:节点ID,Value:当前时间戳扔进哈希表里面,在IO操做进行完毕时,检查一下,最大的那个节点,它的最后一次IO时间+connectionsMaxIdleNanos(建立KafkaSelector时指定),是否超过了当前的时间。 若是是,这个链接就会被关掉。
好比说connectionsMaxIdleNanos被指定成了1分钟,那么若是这个有序哈希表的最后一个节点的时间是一分钟以前,那么这个节点ID的通道将会被关掉。
private void maybeCloseOldestConnection() { if (currentTimeNanos > nextIdleCloseCheckTime) { if (lruConnections.isEmpty()) { nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; } else { Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet() .iterator() .next(); Long connectionLastActiveTime = oldestConnectionEntry.getValue(); nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos; if (currentTimeNanos > nextIdleCloseCheckTime) { String connectionId = oldestConnectionEntry.getKey(); if (log.isTraceEnabled()) { log.trace("About to close the idle connection from " + connectionId + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis"); } disconnected.add(connectionId); close(connectionId); } } } }
参考: 《Apache Kafka 源码剖析》 - 徐郡明著 Apache Kafka 源码 0.10.0.1