org.apache.kafka.common.client.Selector实现了Selectable接口,用于提供符合Kafka网络通信特色的异步的、非阻塞的、面向多个链接的网络I/O.html
这些网络IO包括了链接的建立、断开,请求的发送和接收,以及一些网络相关的metrics统计等功能。java
因此,它实际上应该至少具体如下功能node
首先得谈一下Selector这东西是准备怎么让人用的。这个注释里说了一部分:react
A nioSelector interface for doing non-blocking multi-connection network I/O.
This class works with NetworkSend and NetworkReceive to transmit size-delimited network requests and responses.apacheA connection can be added to the nioSelector associated with an integer id by doing
nioSelector.connect("42", new InetSocketAddress("google.com", server.port), 64000, 64000);
The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating the connection. The successful invocation of this method does not mean a valid connection has been established. Sending requests, receiving responses, processing connection completions, and disconnections on the existing connections are all done using the poll() call.
nioSelector.send(new NetworkSend(myDestination, myBytes));
nioSelector.send(new NetworkSend(myOtherDestination, myOtherBytes));
nioSelector.poll(TIMEOUT_MS);
The nioSelector maintains several lists that are reset by each call to poll() which are available via various getters. These are reset by each call to poll(). This class is not thread safe!编程
首先,Selector的API都是非阻塞或者带有阻塞超时时间的,这个特色直接源于Java NIO的Selector和SocketChannel的特性。这种异步非阻塞的IO带来的问题就是,必须时不时地调用某个方法,来检测IO完成的进度状况,对于NIO的selector,这个方法就是select,对于Kafka的Selector,这个方法就是poll.设计模式
为此,注释里举了一个典型的例子,这是一个发送数据的例子:api
nioSelector.send(new NetworkSend(myDestination, myBytes)); nioSelector.send(new NetworkSend(myOtherDestination, myOtherBytes)); nioSelector.poll(TIMEOUT_MS);
可是Kafka Selector的poll不只检测IO的进度,它还执行IO操做,好比当发现有channel可读了,它就从中读数据出来。那么,是否能够说Kafka的Selector执行的是异步IO呢?下面来谈下这个问题。服务器
异步IO vs 同步非阻塞IO网络
异步IO是说实际的IO动做是由操做系统调用另外的线程或者其它的计算资源来作的。那么,想要肯定Selector执行的是不是异步IO,得先看下它所构建的Channel是哪种,毕竟不是全部的channel都支持异步IO。
Selector建立channel的动做是在#connect(String, InetSocketAddress, int, int)方法中。
SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false);
它是建了一个SocketChannel.而SocketChannel并不能进行异步IO,当它被设为no-blocking模式时,进行的是非阻塞的IO。在Java7中,引入了AsynchronizedSocketChannel,它进行的才是真正的异步IO。
参见
两种高性能I/O设计模式(Reactor/Proactor)的比较
Java Non-Blocking and Asynchronous IO with NIO & NIO.2 (JSR203) - Reactor/Proactor Implementations
An NIO.2 primer, Part 1: The asynchronous channel APIs
因为Selector的各个方法是非阻塞的,所以须要保存每一个操做当前的完成进度。好比,正在写,写完成,读完成,链接创建成功,等。这样在调用者调用了poll方法之后,调用者能够检查各个操做完成的状况。
Selector内部的确有一些集合来保存这些信息:
private final Map<String, KafkaChannel> channels; //有正在链接以及链接成功的channel,注意它的类型是KafkaChannel private final List<Send> completedSends; //已发送完的请求 private final List<NetworkReceive> completedReceives; //已接收完成的响应。注意,这个集合并无包括全部已接收完成的响应,stagedReceives集合也包括了一些接收完成的响应 private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives; //已接收完成,但尚未暴露给用户的响应 private final Set<SelectionKey> immediatelyConnectedKeys; //在调用SocketChannel#connect方法时当即完成的SelectionKey.为何保存的是SelectionKey呢? private final List<String> disconnected; //已断开链接的节点 private final List<String> connected; //新链接成功的节点 private final List<String> failedSends; //发送失败的节点,但并非因为IO异常致使的失败,而是因为SelectionKey被cancel引发的失败,好比对一个已关闭的channel设置interestOps
可是这里的集合有些并非按照channel来组织的。好比:completedSend, completedReceives, disconnected, connected和failedSends。由于这些集合是在一个poll以后,Selector的使用者应该处理的,它们是按照类型组织。在poll执行的最开始,它会调用clear方法,清空这些集合,由于它们是上次poll的结果。因此,在一次poll以后查看这些结果的话,看到的就是此次poll的结果。
/** * Clear the results from the prior poll */ private void clear() { this.completedSends.clear(); this.completedReceives.clear(); this.connected.clear(); this.disconnected.clear(); this.disconnected.addAll(this.failedSends); this.failedSends.clear(); }
这里之因此把failedSends加到disconnected之中,是由于failedSends里保存的失败的send,并非上次poll留下来的,而是上次poll以后,这次poll以前,调用send方法时添加到failedSends集合中的。当有failedSends时,selector就会关闭这个channel,所以在clear过程当中,须要把failedSends里保存的节点加到disconnected之中。
须要注意的是,这些集合里并无包括正在发送以及正在接收的请求。缘由是KafkaChannel对象自己持有正在处理的请求和响应。
public class KafkaChannel { private final String id; private final TransportLayer transportLayer; private final Authenticator authenticator; private final int maxReceiveSize; private NetworkReceive receive; private Send send; ... }
这里须要注意是是它的setSend和read方法
public void setSend(Send send) { if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }
public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id); } receive(receive); if (receive.complete()) { receive.payload().rewind(); result = receive; receive = null; } return result; }
当一个send正在发送的过程当中,send != null, 此时调用setSend会抛出IllegalStateException。那么,Selector在能够在一个poll以前能够往一个channel发送多个请求吗?
这个须要须要追溯到哪些方法会调用KafkaChannel#setSend。结果只有NetworkClient的send(ClientRequest, long)方法会最终调到它。
而NetworkClient的send方法是这样的
public void send(ClientRequest request, long now) { String nodeId = request.request().destination(); if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); doSend(request, now); } private boolean canSendRequest(String node) { return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); }
这里connectionStates.isConnected用来检测节点是否已经链接上。selector.isChannelReady()用来检测channel是否准备完成。因为Kafka security的一些要求,当socket channel链接创建完成后,可能还须要跟server交换一些认证数据,才能认为channel准备完毕。那么,重点就在于inFlightRequest.canSendMore这个方法了。由于若是它不检测一个channel是否有正在发送的send,就可能会在调用NetworkClient#send时,再试图给这个channel添加一个send,就会引起异常。
InFlightRequest保存了全部已发送,但还没收到响应的请求。
InFlightRequests的canSendMore是这样的:
public boolean canSendMore(String node) { Deque<ClientRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection); }
重点在于queue.peekFirst().request().completed, 即若是发给这个节点的最先的请求尚未发送完成,是不能再往这个节点发送请求的。
可是,从canSendMore方法中也能够看出,只要没有超过maxInFlightRequestsPerConnection,一个node能够有多个in-flight request的。这点,实际上影响到了另外一个集合的数据结构的选择——stagedReceives
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
stagedRecieves用来保存已经接收完成,可是尚未暴露给用户(即没有放在completedReceive列表中)的NetworkReceive(即响应).
这里有两个问题:
第二个问题的答案就是NetworkClient的canSendMore方法并无限制一个node只有在全部已发送请求都收到响应的状况下才能发送新请求。所以,一个node能够有多个in-flight request,也能够有多个已发送的请求。所以,Selector也就可能会收到来自于同一个node的多个响应。所以,selector在每次poll的时候,读取请求的操做是这样的:
/* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); }
也就是说,只要有能够完整读出的响应,都会把这些响应放到stagedReceives列表中。这个while循环使得在一次poll中,可能会添加多个NetworkReceive到stagedReceives里。
可是,每次poll,只会把最先的一个NetworkReceive放在completedReceives里。
* checks if there are any staged receives and adds to completedReceives */ private void addToCompletedReceives() { if (!this.stagedReceives.isEmpty()) { Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next(); KafkaChannel channel = entry.getKey(); if (!channel.isMute()) { Deque<NetworkReceive> deque = entry.getValue(); NetworkReceive networkReceive = deque.poll(); //从这个channel的stagedReceives队列中取最先的一个 this.completedReceives.add(networkReceive);//把它添加到completedRecievs列表中 this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit()); if (deque.isEmpty()) iter.remove(); } } } }
这个行为比较奇怪。可能的解释是这会简化NetworkClient的实现,形成一种"对每一个channel,poll一次只发送一个请求,只接收一个响应“的假像,使得NetworkClient的用户更容易处理请求和响应之间的对应关系。既然poll是一个非阻塞操做,用户就能够在未收到某个请求的响应时,屡次调用poll,这个也没什么问题。由于poll一次并不保证就能收到刚才发出的请求对应的响应。
至于第一个问题,则是因为性能的考虑。
addToStagedReceives方法用于把一个NetworkReceive加到某个channel的stagedReceivs队列中。
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) { if (!stagedReceives.containsKey(channel)) stagedReceives.put(channel, new ArrayDeque<NetworkReceive>()); Deque<NetworkReceive> deque = stagedReceives.get(channel); deque.add(receive); }
若是这个channel没有stagedReceives队列,会给它建一个,此时new的是ArrayDeque对象。这个ArrayDeque是JDK中性能最高的FIFO队列的实现,优于ArrayList和linkedList.
详见What is the fastest Java collection with the basic functionality of a Queue?
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { if (this.channels.containsKey(id)) throw new IllegalStateException("There is already a connection for id " + id); SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); Socket socket = socketChannel.socket(); socket.setKeepAlive(true); if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setSendBufferSize(sendBufferSize); if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setReceiveBufferSize(receiveBufferSize); socket.setTcpNoDelay(true); boolean connected; try { connected = socketChannel.connect(address); } catch (UnresolvedAddressException e) { socketChannel.close(); throw new IOException("Can't resolve address: " + address, e); } catch (IOException e) { socketChannel.close(); throw e; } SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); key.attach(channel); this.channels.put(id, channel); if (connected) { // OP_CONNECT won't trigger for immediately connected channels log.debug("Immediately connected to node {}", channel.id()); immediatelyConnectedKeys.add(key); key.interestOps(0); } }
虽然在connect方法中,SocketChannel被设为non-blocking, 而后调用socketChannel.connect(address),虽然是非阻塞模式,可是connect方法仍然有可能会直接返回ture,表明链接成功。connect方法的doc是这么说的:
If this channel is in non-blocking mode then an invocation of this method initiates a non-blocking connection operation. If the connection is established immediately, as can happen with a local connection, then this method returns true. Otherwise this method returns false and the connection operation must later be completed by invoking the finishConnect method.
好比,若是是一个本地的链接,就可能在非阻模式下也会当即返回链接成功。也是挺神奇的,想想,若是认为”执行指令“是一种阻塞的话,绝对意义上的非阻塞方法是不存在的,不存在执行时间为零的方法。也就是说,若是进行一个本地链接,OS加上JVM是能够在有限的指令数量和时间段内肯定链接成功,这也能够被认为是在非阻塞状态下进行的。
在前边的connect方法中,socket被配置了keepAlive,能够检测出来链接断开的状况。可是,还有一种状况须要考虑,就是一个链接过久没有用来执行读写操做,为了下降服务器端的压力,须要释放这些的链接。因此Selector有LRU机制,来淘汰这样的链接。
在Java里,实现LRU机制最简单的就是使用LinkedHashMap, Selector也的确是这么作的。
private final Map<String, Long> lruConnections; this.lruConnections = new LinkedHashMap<>(16, .75F, true);
lruConnection的key是node的id, value是上次访问的时间。它的“顺序”被设为access顺序。Selector会用map的put操做来access这个map,当NIO的selector poll出来一批SelectionKey以后,这些key对应的node被从新put进map,以刷新它们的最近访问顺序,同时也把具体的“最近使用时间”做为entry的value放在这个map中。
这发生在会被每次poll调用的pollSelectionKeys方法中
lruConnections.put(channel.id(), currentTimeNanos);
之因此要在value中保存最近使用时间,是由于这个时间会被用于计算空闲时间,当空闲时间超过了connectionMaxIdleMs时,就会关闭这个链接。
在poll的最后,会执行maybeCloseOldestConnection方法,来检测并关闭须要关闭的链接。
private void maybeCloseOldestConnection() { if (currentTimeNanos > nextIdleCloseCheckTime) { if (lruConnections.isEmpty()) { nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; } else { Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next(); Long connectionLastActiveTime = oldestConnectionEntry.getValue(); nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos; if (currentTimeNanos > nextIdleCloseCheckTime) { String connectionId = oldestConnectionEntry.getKey(); if (log.isTraceEnabled()) log.trace("About to close the idle connection from " + connectionId + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis"); disconnected.add(connectionId); close(connectionId); } } } }
这里有几点要注意:
Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next(); Long connectionLastActiveTime = oldestConnectionEntry.getValue(); nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
这种作法有些不妥,由于这样作的话一个poll并不能关闭全部应该关闭的空闲链接,不能期望用户接下来会主动地多poll几回。
Kafka使用这个抽象出来的Selector的确比直接使用NIO在编程上要好一些,主要是代码会不那么臃肿,由于Selector配合KafkaChannel、Send, NetworkReceive, 处理了NIO网络编程的一些细节。Selector的这些代码写的也的确不错。 不过,poll这个操做被搞得有些教条,被赋予了太多的责任,看起来是为了迎合Kafka的新consumer的特色搞出来的东西。这个东西让人想起了回合制的游戏,设置好下一回合想干啥,点肯定,而后就喝茶等了。