Selector是对java nio中Selector的封装,额外提供了对kafka请求或者响应的处理方法java
Selector实现了Selectable接口。下面列举它的定义框架
public interface Selectable { public void close(String id); // 发送数据 public void send(Send send); // 等待timeout时间,而后处理就绪的链接 public void poll(long timeout) throws IOException; // 返回已经成功发送的数据 public List<Send> completedSends(); // 返回已经成功接收的数据 public List<NetworkReceive> completedReceives(); // 返回关闭的链接,ChannelState表示链接关闭前最后的状态,在KafkaChannel会有介绍 public Map<String, ChannelState> disconnected(); // 返回已经成功的链接 public List<String> connected(); // 取消监听读事件 public void mute(String id); // 开启监听读事件 public void unmute(String id); // 链接是否已经就绪(好比ssl握手过程是否完成) public boolean isChannelReady(String id); }
register方法由Processor调用,负责注册该链接socket
public class Selector implements Selectable, AutoCloseable { // nioSelector负责链接的事件监听器 private final java.nio.channels.Selector nioSelector; // channels集合 private final Map<String, KafkaChannel> channels; public void register(String id, SocketChannel socketChannel) throws ClosedChannelException { // 监听链接的读事件 SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ); // 构造KafkaChannel,而且绑定到该链接。KafkaChannel负责请求的解析和响应的发送 KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); key.attach(channel); // 更新channels this.channels.put(id, channel); } }
poll方法,是Selector最重要的方法。它负责监听事件,并作相应的处理ui
public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); // 清除 clear(); // hasStagedReceives返回是否还有receive处理。若是有,则设置timeout为0,表示不等待。 if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty()) timeout = 0; // 等待socket事件就绪 int readyKeys = select(timeout); if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { // 处理就绪的链接 pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); // 处理immediatelyConnectedKeys的链接 pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } // 更新请求 addToCompletedReceives(); // we use the time at the end of select to ensure that we don't close any connections that // have just been processed in pollSelectionKeys maybeCloseOldestConnection(endSelect); }
clear方法,清除上一次的链接列表this
public class Selector implements Selectable, AutoCloseable { // 完成发送的列表 private final List<Send> completedSends; // 已经成功接收完的请求列表 private final List<NetworkReceive> completedReceives; // 关闭的链接列表 private final Map<String, ChannelState> disconnected; // 已经创建的链接列表 private final List<String> connected; // 准备关闭的链接列表,当这个链接的请求处理完后,就会移除 private final Map<String, KafkaChannel> closingChannels; // 发送send失败的channel列表。用来更新disconnected的状态 private final List<String> failedSends; private void clear() { this.completedSends.clear(); this.completedReceives.clear(); this.connected.clear(); this.disconnected.clear(); // 遍历准备关闭的链接,当异常关闭的链接,其请求已所有读取完,就关闭 for (Iterator<Map.Entry<String, KafkaChannel>> it = closingChannels.entrySet().iterator(); it.hasNext(); ) { KafkaChannel channel = it.next().getValue(); Deque<NetworkReceive> deque = this.stagedReceives.get(channel); // 是否发送send失败 boolean sendFailed = failedSends.remove(channel.id()); if (deque == null || deque.isEmpty() || sendFailed) { // 关闭链接,而且从closingChannels删除 doClose(channel, true); it.remove(); } } // 遍历发送失败的链接 for (String channel : this.failedSends) { KafkaChannel failedChannel = closingChannels.get(channel); if (failedChannel != null) // 更新channel的state failedChannel.state(ChannelState.FAILED_SEND); // 添加到disconnected列表里 this.disconnected.put(channel, ChannelState.FAILED_SEND); } this.failedSends.clear(); } }
pollSelectionKeys方法,处理链接相关的事件code
public class Selector implements Selectable, AutoCloseable { private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { // 遍历Keys Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); // 获取与key绑定的KafkaChannel KafkaChannel channel = channel(key); try { // connect事件 if (isImmediatelyConnected || key.isConnectable()) { if (channel.finishConnect()) { // 必须等到KafkaChannel完成链接,才加入到connected列表 this.connected.add(channel.id()); } else continue; } // 若是channel尚未ready if (channel.isConnected() && !channel.ready()) channel.prepare(); //读事件 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { // channel已经准备好,而且尚未没有读取完的Receive NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) // channel.read若是返回null,表示一条请求的数据还没接收完整,须要继续监听读事件 // 加入到未处理的Receive队列 addToStagedReceives(channel, networkReceive); } // 写事件 if (channel.ready() && key.isWritable()) { // channel.write若是发送完,将会send。如没有,则会返回null Send send = channel.write(); if (send != null) { // 更新completedSends列表 this.completedSends.add(send); } } if (!key.isValid()) //若是链接失效,则关闭链接 close(channel, true); } catch (Exception e) { // 关闭链接 close(channel, true); } finally { ....... } } }
addToCompletedReceives方法,从stagedReceives取出Receive到completedReceives列表里接口
private void addToCompletedReceives() { if (!this.stagedReceives.isEmpty()) { // 遍历stagedReceives map, Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next(); KafkaChannel channel = entry.getKey(); if (!channel.isMute()) { // 若是channel还在监听读事件,则将Receive添加到completedReceives队列 Deque<NetworkReceive> deque = entry.getValue(); addToCompletedReceives(channel, deque); if (deque.isEmpty()) iter.remove(); } } } } // 从stagedDeque取出一个Receive,添加到completedReceives里面 private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) { NetworkReceive networkReceive = stagedDeque.poll(); this.completedReceives.add(networkReceive); }
close方法队列
private void close(KafkaChannel channel, boolean processOutstanding) { // 关闭链接 channel.disconnect(); // 获取channel有哪些未处理的Receive Deque<NetworkReceive> deque = this.stagedReceives.get(channel); if (processOutstanding && deque != null && !deque.isEmpty()) { // processOutstanding表示是否须要处理未完成的请求 if (!channel.isMute()) { // 若是链接还在监听读事件 addToCompletedReceives(channel, deque); if (deque.isEmpty()) this.stagedReceives.remove(channel); } // 添加到closingChannels列表,表示即将关闭 closingChannels.put(channel.id(), channel); } else // 不然关闭channel doClose(channel, processOutstanding); this.channels.remove(channel.id()); }
send 方法事件
public void send(Send send) { String connectionId = send.destination(); if (closingChannels.containsKey(connectionId)) this.failedSends.add(connectionId); else { KafkaChannel channel = channelOrFail(connectionId, false); try { // 调用channel setSend方法 channel.setSend(send); } catch (CancelledKeyException e) { this.failedSends.add(connectionId); close(channel, false); } } }
Selector实现了Selectable的接口。图片
Processor会从Selector获取 请求列表completedReceives,发送列表completedSends, disconnected 链接关闭列表.
当链接被意外关掉后,Selector仍然会处理完全部的请求