KafkaChannel负责基于socket的链接,认证,数据读取发送。它包含TransportLayer和Authenticator两个部分。TransportLayer负责数据交互,Authenticator负责安全验证。java
ChannelBuilders提供了实例化ChannelBuilder的工厂方法,clientChannelBuilder和serverChannelBuilder数组
public class ChannelBuilders { // 这里构造为私有方法,代表这个类只提供类方法 private ChannelBuilders() { } // 实例化客户端使用的ChannelBuilder public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProtocol, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, String clientSaslMechanism, boolean saslHandshakeRequestEnable) { return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, clientSaslMechanism, saslHandshakeRequestEnable, null); } // 实例化服务端使用的ChannelBuilder public static ChannelBuilder serverChannelBuilder(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config, CredentialCache credentialCache) { return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName, null, true, credentialCache); } private static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, String clientSaslMechanism, boolean saslHandshakeRequestEnable, CredentialCache credentialCache) { ....... ChannelBuilder channelBuilder; // 根据Protocol,选择不一样的channelBuidler switch (securityProtocol) { case SSL: // 基于ssl requireNonNullMode(mode, securityProtocol); channelBuilder = new SslChannelBuilder(mode); break; case SASL_SSL: case SASL_PLAINTEXT: // 基于sasl requireNonNullMode(mode, securityProtocol); JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs); channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache); break; case PLAINTEXT: case TRACE: // 没有任何加密 channelBuilder = new PlaintextChannelBuilder(); break; default: throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol); } channelBuilder.configure(configs); return channelBuilder; }
ChannelBuidler是接口,实现其接口的有PlaintextChannelBuilder, SaslChannelBuilder,SslChannelBuilder。其中PlaintextChannelBuilder最为简单,因此这里以它为例。 ChannelBuidler中最主要的方法是buildChannel,它会建立transportLayer和authenticator,来实例化KafkaChannel。安全
public class PlaintextChannelBuilder implements ChannelBuilder { public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException { try { // 实例化TransportLayer PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key); // 实例化Authenticator Authenticator authenticator = new DefaultAuthenticator(); authenticator.configure(transportLayer, this.principalBuilder, this.configs); // 返回KafkaChannel return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize); } catch (Exception e) { log.warn("Failed to create channel due to ", e); throw new KafkaException(e); } } }
先回到Selector的pollSelectionKeys方法,它代表了KafkaChannel方法是什么时候被调用app
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); KafkaChannel channel = channel(key); if (isImmediatelyConnected || key.isConnectable()) { // 调用channel的finishConnect方法,处理链接 if (channel.finishConnect()) { ...... } else continue; } if (channel.isConnected() && !channel.ready()) // 而后调用channel的prepare方法,作准备工做(好比ssl链接的握手过程) channel.prepare(); if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { // 当channel准备工做完成,调用channel的read方法,读取请求 NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } } ....... }
KafkaChannel负责链接,数据读取,发送框架
public class KafkaChannel { // 首先完成链接 public boolean finishConnect() throws IOException { boolean connected = transportLayer.finishConnect(); if (connected) state = ready() ? ChannelState.READY : ChannelState.AUTHENTICATE; return connected; } public boolean isConnected() { return transportLayer.isConnected(); } public void prepare() throws IOException { //而后握手 if (!transportLayer.ready()) transportLayer.handshake(); // 认证 if (transportLayer.ready() && !authenticator.complete()) authenticator.authenticate(); if (ready()) // 若是都完成,更新状态 state = ChannelState.READY; } public boolean ready() { // 当transportLayer和authenticator都完成,channel才认为状态准备好了 return transportLayer.ready() && authenticator.complete(); } // channel的读取请求 public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id); } // 读取请求 receive(receive); if (receive.complete()) { receive.payload().rewind(); result = receive; receive = null; } return result; } private long receive(NetworkReceive receive) throws IOException { // 调用NetworkReceive的readFrom方法 return receive.readFrom(transportLayer); } // 设置send,可是并不着急发送,等待transportLayer写事件就绪 public void setSend(Send send) { if (this.send != null) // 只能一次发送一个Send throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); this.send = send; // 监听写事件 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); } // 若是没有发送完,返回null。若是发送完,返回send。而且更新this.send为null public Send write() throws IOException { Send result = null; // 调用send发送 if (send != null && send(send)) { result = send; send = null; } return result; } private boolean send(Send send) throws IOException { // 调用Send的writreTo方法 send.writeTo(transportLayer); if (send.completed()) transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed(); }
NetworkReceive表示一个请求。数据格式为socket
| size | data |ide
size 表示data的长度,为4个字节的int类型 data则为请求的数据,长度为sizeui
public class NetworkReceive implements Receive { // channel的id,表示这个请求是属于哪一个channel private final String source; // 只有4个字节,读取请求的size private final ByteBuffer size; // 请求数据的最大长度 private final int maxSize; // 请求数据 private ByteBuffer buffer; public NetworkReceive(int maxSize, String source) { this.source = source; // 这里只分配4个字节 this.size = ByteBuffer.allocate(4); this.buffer = null; this.maxSize = maxSize; } public long readFrom(ScatteringByteChannel channel) throws IOException { return readFromReadableChannel(channel); } public long readFromReadableChannel(ReadableByteChannel channel) throws IOException { int read = 0; // 检查是否已经完成读取size if (size.hasRemaining()) { // 读取数据的前4个字节,表示请求数据的大小 int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); read += bytesRead; if (!size.hasRemaining()) { // 若是读取完成 size.rewind(); // 获取请求数据的大小receiveSize int receiveSize = size.getInt(); // 检查数据大小的合理 if (receiveSize < 0) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); // 根据receiveSize,分配buffer this.buffer = ByteBuffer.allocate(receiveSize); } } // buffer已经分配了,代表size读取完 if (buffer != null) { int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); read += bytesRead; } return read; } // 返回请求数据 public ByteBuffer payload() { return this.buffer; } // 当size和buffer都读取玩,则返回true public boolean complete() { return !size.hasRemaining() && !buffer.hasRemaining(); }
NetworkSend只是继承ByteBufferSend,增长了两个类方法this
public class NetworkSend extends ByteBufferSend { public NetworkSend(String destination, ByteBuffer buffer) { //为buffer添加sizeBuffer,而后初始化父类ByteBufferSend super(destination, sizeDelimit(buffer)); } // 为buffer添加一个size的sizeBuffer,组成ByteBuffer数组 private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) { return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer}; } // 实例化4个字节的ByteBuffer,使用int初始化 private static ByteBuffer sizeBuffer(int size) { ByteBuffer sizeBuffer = ByteBuffer.allocate(4); sizeBuffer.putInt(size); sizeBuffer.rewind(); return sizeBuffer; } } public class ByteBufferSend implements Send { // 发送地址 private final String destination; // 响应数据的总大小 private final int size; protected final ByteBuffer[] buffers; // remaining表示buffer中未写完的数据长度 private int remaining; // 表示是否channel中还有数据未发送 private boolean pending = false; public ByteBufferSend(String destination, ByteBuffer... buffers) { this.destination = destination; this.buffers = buffers; // 计算全部buffer的总大小 for (ByteBuffer buffer : buffers) remaining += buffer.remaining(); this.size = remaining; } @Override public boolean completed() { // 数据首先会从buffer中写入到channel,而后channel再把数据写入到真实的socket中 return remaining <= 0 && !pending; } @Override public long writeTo(GatheringByteChannel channel) throws IOException { // 写入到channel中 long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); // 更新remaining remaining -= written; // 检查pending状态 pending = TransportLayers.hasPendingWrites(channel); return written; } }
上面NetworkReceive和NetworkSend调用了TransportLayer的方法, channel.write和channel.read。 TransportLayer是接口,PlaintextTransportLayer是实现TransportLayer的类之一,由于它比较简单,因此这里以它为例。加密
public class PlaintextTransportLayer implements TransportLayer { private final SelectionKey key; private final SocketChannel socketChannel; public PlaintextTransportLayer(SelectionKey key) throws IOException { this.key = key; this.socketChannel = (SocketChannel) key.channel(); } //调用socketChannel的read方法 public long read(ByteBuffer[] dsts) throws IOException { return socketChannel.read(dsts); } //调用socketChannel的write方法 public int write(ByteBuffer src) throws IOException { return socketChannel.write(src); } }
类之间的关系。ChannelBuilders实例化ChannelBuilder,ChannelBuilder实例化TransportLayer和Authenticator, 而后实例化ChannelBuidler。ChannelBuidler而后实例化KafkaChannel,KafkaChannel使用NetworkSend表示发送数据,NetworkReceive表示接收数据。