上篇文章说道KafkaSelector在建立一个链接的时候和普通的nioSelector并无什么不一样,它是基于nioSelector的封装。咱们知道建立链接的一系列操做都是由Channel去完成,而KafkaChannel实际上就是对它的进一步封装: KafkaChannel不只封装了SocketChannel,还封装了Kafka本身的认证器Authenticator,和读写相关的NetworkReceive、Send。NetworkReceive和Send的底层都是经过ByteBuffer来实现的。java
实际上基本等同于KafkaSelector的建立:node
按照普通的方式建立完通道后,将其注册到NioSelector上,并关注OP_CONNECT,再以节点Id,SelectionKey来建立KafkaChannel,这里先不详细说明KafkaChannel,它是对通道的进一步封装。在建立完KafkaChannel后,将KafkaChannel与SelectionKey、节点ID作进一步绑定。segmentfault
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);// 将当前这个socketChannel注册到nioSelector上,并关注OP_CONNECT事件 KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);// 建立KafkaChannel key.attach(channel);// 将channel绑定到key上 this.channels.put(id, channel);// 将 nodeId 和 Channel绑定
这样有一个好处,首先KafkaChannel中包含了节点ID与SelectionKey,而咱们也能够根据节点ID来拿到KafkaChannel,一样能够根据SelectionKey来拿到KafkaChannel,这就意味着,咱们只要拿到了KafkaChannel、SelectionKey、节点ID中的任意一个,均可以经过这些引用关系拿到彼此,从而进行相关操做。app
实际上就是将要发送的ByteBuffer扔进KafkaChannel,此时并未进行IO操做,这里的Send对象,实际上就是对ByteBuffer的进一步封装,它主要包含了将要发往的节点ID、ByteBuffer大小、是否发送完毕等信息。咱们这里根据节点ID,从咱们刚才的channels中,取出KafkaChannel。socket
public void send(Send send) { // 看看send要发的这个nodeId在不在 KafkaChannel channel = channelOrFail(send.destination()); try { // 把数据扔进KafkaChannel中(只能放一个,放多个会报错),并关注write事件 channel.setSend(send); } catch (CancelledKeyException e) { // 失败了加一条node_id的失败记录 this.failedSends.add(send.destination()); close(channel); } }
这个KafkaChannel的setSend方法实际上很是简单,就是将要发送的send对象的引用交给KafkaChannel中的send。而且使这个channel的SelectionKey去关注OP_WRITE事件。ide
this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
在上篇文章里,咱们知道KafkaSelector也是经过轮询器去进行IO操做,看一下原始的nioSelector是如何进行io操做的:ui
public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000; public static void main(String args[]) throws Exception { // 打开服务端 Socket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开 Selector Selector selector = Selector.open(); // 服务端 Socket 监听8080端口, 并配置为非阻塞模式 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); serverSocketChannel.configureBlocking(false); // 将 channel 注册到 selector 中. // 一般咱们都是先注册一个 OP_ACCEPT 事件, 而后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ // 注册到 Selector 中. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 经过调用 select 方法, 阻塞地等待 channel I/O 可操做 if (selector.select(TIMEOUT) == 0) { System.out.print("."); continue; } // 获取 I/O 操做就绪的 SelectionKey, 经过 SelectionKey 能够知道哪些 Channel 的哪类 I/O 操做已经就绪. Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); // 当获取一个 SelectionKey 后, 就要将它删除, 表示咱们已经对这个 IO 事件进行了处理. keyIterator.remove(); if (key.isAcceptable()) { // 当 OP_ACCEPT 事件到来时, 咱们就有从 ServerSocketChannel 中获取一个 SocketChannel, // 表明客户端的链接 // 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel. // 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel. SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); //在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中. // 注意, 这里咱们若是没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回. clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE)); } if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clientChannel.read(buf); if (bytesRead == -1) { clientChannel.close(); } else if (bytesRead > 0) { key.interestOps(OP_READ | SelectionKey.OP_WRITE); System.out.println("Get data length: " + bytesRead); } } if (key.isValid() && key.isWritable()) { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clientChannel = (SocketChannel) key.channel(); clientChannel.write(buf); if (!buf.hasRemaining()) { key.interestOps(OP_READ); } buf.compact(); } } } } }
####一、读操做 首先,进行是否能够开始读操做的判断。一、channel.ready(),这里作了两个判断,一个是Kafka的认证器是否定证经过,另外一个则是是否握手成功。二、key.isReadable(),selectionKey是否关注了OP_READ。三、!hasStagedReceive(channel),判断该channel是否在hasStagedReceive这个map里面,若是该channel正在读,那么它会在这个map里面,直到读取完成。this
// channel是否已经准备好从链接中读取任何可读数据 /* if channel is ready read from any connections that have readable data */ if (channel.ready() // 链接的三次握手完成,而且 todo 权限验证经过 && key.isReadable() // key是否关注了read事件 && !hasStagedReceive(channel)) {// todo 这个通道不能是正在读数据的,由于在读的时候,会把这个channel扔进stagedReceives里面 NetworkReceive networkReceive; /** * 实际上这里就是分屡次去一个channel取数据,直到取完,并将其保存在key:channel value:new ArrayDeque<NetworkReceive> 中 */ while ((networkReceive = channel.read()) != null) { // 将屡次接收的数据放进stagedReceives下channel的Deque里面 addToStagedReceives(channel, networkReceive); } }
剩下的channel.read()就比较简单了,KafkaChannel里面封装了一个NetworkReceives,而NetworkReceives主要就是对ByteBuffer的封装。spa
咱们将该NioChannel传入,调用channel.read(size)方法,这个size,其实就是一个ByteBuffer,它是kafka协议中用来判断包体有多长的包头。rest
第一步,先判断byteBuffer(size)中是否还有剩余空间
第二步,从nioChannel中将数据读到byteBuffer中
第三步,判断byteBuffer是否是装满了
第四步,若是装满了,证实size这个bytebuffer已经拿到了包体的长度,调用readInt获取其capacity,再用这个capacity去申请一个用于接收包体的byteBuffer(buffer)。
第五步,正式地将channel中的数据中读取到byteBuffer(buffer)
public long readFromReadableChannel(ReadableByteChannel channel) throws IOException { int read = 0; if (size.hasRemaining()) { int bytesRead = channel.read(size); if (bytesRead < 0) { throw new EOFException(); } read += bytesRead; if (!size.hasRemaining()) { size.rewind(); int receiveSize = size.getInt(); if (receiveSize < 0) { throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); } if (maxSize != UNLIMITED && receiveSize > maxSize) { throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); } this.buffer = ByteBuffer.allocate(receiveSize); } } if (buffer != null) { int bytesRead = channel.read(buffer); if (bytesRead < 0) { throw new EOFException(); } read += bytesRead; } return read; }
读取完成以后,再作一下校验:就会返回了,也就是上面while ((networkReceive = channel.read()) != null)拿到的这个networkReceives,里面装着包头和包体。这里Kafka有一个小操做,就是将kafkaChannel内的networkReceive的引用赋值给外面的这个networkReceive后,会将kafkaChannel内的networkReceive的引用置为空。
/** * 接收数据,将数据保存在 NetworkReceive */ public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id); } receive(receive);// 这个方法就是上面说了一大堆第一步第二步第三步的那个方法。 if (receive.complete()) { receive.payload() .rewind(); result = receive; receive = null; } return result; }
####二、写操做 写操做要比读操做更加简单,上面有一个预发送操做,就是将要send的对象Send
/** * 发送时其实也有一次没发送完的状况,每发送完的话,就不会出如今completedSends里面 */ /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ // 若是channel已经ready 而且 咱们有数据来准备好写sockets if (channel.ready() && key.isWritable()) { Send send = channel.write(); // 这里会将KafkaChannel的send字段发送出去, // 若是未完成发送,或者没发完,则返回null // 发送成功则返回send对象 if (send != null) { this.completedSends.add(send);// 添加到completedSends集合 this.sensors.recordBytesSent(channel.id(), send.size()); } }
主要的发送方法就是channel.write();
public Send write() throws IOException { Send result = null; if (send != null && send(send)) { result = send; send = null; } return result; }
而write方法中最核心的方法则是send(send),这个send对象也是一个byteBuffer对象。底层中的底层仍是调用了channel.write(byteBuffer方法)
@Override public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); if (written < 0) { throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); } remaining -= written; // This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel. // Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than // GatheringByteChannel or ScatteringByteChannel. // 这是一个临时工做区,当发送时,接收数据的接口一直被BlockingChannel使用着。 // 一旦BlockingChannel 被移除,咱们就能够开始咱们的发送操做,接收经过 transportLayer 来工做而不是 GatheringByteChannel 或 ScatteringByteChannel if (channel instanceof TransportLayer) { pending = ((TransportLayer) channel).hasPendingWrites(); } return written; }
参考: Java NIO 的前生今世 之四 NIO Selector 详解 《Apache Kafka 源码剖析》 - 徐郡明著 Apache Kafka 源码 0.10.0.1