样例代码来自于io.netty.example.telnet.TelnetClient
,完整样例请参考NettyExample工程。java
客户端和服务端比较类似,因此本篇会在必定程度上略去重复的部分,以减小篇幅。git
public void run() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new TelnetClientInitializer()); // Start the connection attempt. Channel ch = b.connect(host, port).sync().channel(); // Read commands from the stdin. ChannelFuture lastWriteFuture = null; BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); for (;;) { String line = in.readLine(); if (line == null) { break; } // Sends the received line to the server. lastWriteFuture = ch.writeAndFlush(line + "\r\n"); // If user typed the 'bye' command, wait until the server closes // the connection. if ("bye".equals(line.toLowerCase())) { ch.closeFuture().sync(); break; } } // Wait until all messages are flushed before closing the channel. if (lastWriteFuture != null) { lastWriteFuture.sync(); } } finally { group.shutdownGracefully(); } }
Bootstrap b = new Bootstrap(); //tag0 b.group(group) //tag1 .channel(NioSocketChannel.class) //tag2 .handler(new TelnetClientInitializer());//tag3
tag0代码主要初始化了父类的 options和attrs 属性;代码略。github
tag1设置了group属性promise
@SuppressWarnings("unchecked") public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return (B) this; }
tag2设置了channelFactory属性服务器
public Bootstrap channel(Class<? extends Channel> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new BootstrapChannelFactory<Channel>(channelClass)); }
tag3设置了handler属性app
public B handler(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; return (B) this; }less
下面开始第二段代码分析,依次执行下面的方法。socket
Channel ch = b.connect(host, port) //tag4 .sync().channel(); //tag5 public ChannelFuture connect(String inetHost, int inetPort) { return connect(new InetSocketAddress(inetHost, inetPort)); } public ChannelFuture connect(SocketAddress remoteAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validate(); return doConnect(remoteAddress, localAddress()); } private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//tag4.1 final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise = channel.newPromise(); if (regFuture.isDone()) { doConnect0(regFuture, channel, remoteAddress, localAddress, promise);//tag4.2 } else { regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } }); } return promise; } 分析tag4.1代码,细心的读者注意到,这些和服务端的代码执行过程是同样的。运用模板模式,子类定义独特的实现。 final ChannelFuture AbstractBootstrap.initAndRegister() { Channel channel; try { channel = createChannel();//tag4.1.1 } catch (Throwable t) { return VoidChannel.INSTANCE.newFailedFuture(t); } try { init(channel);//tag4.1.2 } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } ChannelPromise regFuture = channel.newPromise(); channel.unsafe().register(regFuture);//tag4.1.3 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } }
分析 tag4.1.1,里面经过反射来实例化NioSocketChannelide
@Override Channel createChannel() { EventLoop eventLoop = group().next(); return channelFactory().newChannel(eventLoop);//tag4.1.1.1 } public NioSocketChannel(EventLoop eventLoop) { this(eventLoop, newSocket());//调用下面的newSocket()方法 } private static SocketChannel newSocket() { try { return SocketChannel.open(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } } public NioSocketChannel(EventLoop eventLoop, SocketChannel socket) { this(null, eventLoop, socket); } protected AbstractNioByteChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch) { super(parent, eventLoop, ch, SelectionKey.OP_READ);//调用父类方法 } protected AbstractNioChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) { super(parent, eventLoop);//调用父类方法,tag4.1.1.1 this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false);//tag4.1.1.2 } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } protected AbstractChannel(Channel parent, EventLoop eventLoop) { this.parent = parent; this.eventLoop = validate(eventLoop); unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this);//tag4.1.1.1.1 }
分析tag4.1.1.1.1,里面调用DefaultChannelPipeline构造器,和服务端的逻辑同样,故不做分析。oop
此时系统返回到tag4.1.1.2 继续执行ch.configureBlocking(false);
,此时完成tag4.1.1 方法执行,开始执行tag4.1.2方法
@Override @SuppressWarnings("unchecked") void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(handler());//tag4.1.2.1 final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { for (Entry<ChannelOption<?>, Object> e: options.entrySet()) { try { if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + channel, t); } } } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } } }
分析tag4.1.2.1,里面将这个TelnetClientInitializer Handler加入到pipeline中,此时handler链是HeadHandler,TelnetClientInitializer,TailHandler,共计3个。
此时程序返回到tag4.1.3继续执行,
@Override public final void AbstractChannel.register(final ChannelPromise promise) { if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise);//tag4.1.3.1 } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); promise.setFailure(t); } } } private void AbstractChannel.AbstractUnsafe.register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!ensureOpen(promise)) { return; } doRegister();//tag4.1.3.1.1 registered = true; promise.setSuccess(); pipeline.fireChannelRegistered();//tag4.1.3.1.2 if (isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + "Swallowing the cause of the registration failure:", t); } } }
tag4.1.3.1.1 代码以下
@Override protected void AbstractNioChannel.doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this);//tag4.1.3.1.2.1 return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
tag4.1.3.1.2.1 把selector注册到javaChannel上;而后程序继续执行tag4.1.3.1.2代码。
@Override public ChannelPipeline fireChannelRegistered() { head.fireChannelRegistered(); return this; } @Override public ChannelHandlerContext fireChannelRegistered() { DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED); next.invoker.invokeChannelRegistered(next); return this; } @Override @SuppressWarnings("unchecked") public final void ChannelInitializer.channelRegistered(ChannelHandlerContext ctx) throws Exception { ChannelPipeline pipeline = ctx.pipeline(); boolean success = false; try { initChannel((C) ctx.channel());//tag4.1.3.1.2.1 pipeline.remove(this);//tag4.1.3.1.2.2 ctx.fireChannelRegistered();//tag4.1.3.1.2.3 success = true; } catch (Throwable t) { logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); } finally { if (pipeline.context(this) != null) { pipeline.remove(this); } if (!success) { ctx.close(); } } }
后面的逻辑和服务端相似,此时执行的handler是TelnetClientInitializer,并执行ChannelInitializer的channelRegistered方法,channelRegistered方法里面接着调用了initChannel。
标记 tag4.1.3.1.2.1 代码以下
@Override public void TelnetClientInitializer.initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Add the text line codec combination first, pipeline.addLast("framer", new DelimiterBasedFrameDecoder( 8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", DECODER); pipeline.addLast("encoder", ENCODER); // and then business logic. pipeline.addLast("handler", CLIENTHANDLER); }
在完成tag4.1.3.1.2.2的 pipeline.remove(this);
后,此时handler链以下:HeadHandler,DelimiterBasedFrameDecoder,StringDecoder,StringEncoder,TelnetClientHandler, TailHandler。
接着程序又开始执行下一个handler,最终找到TailHandler的channelRegistered方法。TailHandler的channelRegistered方法是空方法。
此时 tag4.1 的代码执行结束,开始执行 tag4.2的代码
private static void doConnect0( final ChannelFuture regFuture, final Channel channel, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { if (localAddress == null) { channel.connect(remoteAddress, promise);//tag4.2.1 } else { channel.connect(remoteAddress, localAddress, promise); } promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } @Override public ChannelFuture AbstractChannel.connect(SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); }
通过一番计算,找到HeadHandler,执行unsafe的方法。
@Override public void HeadHandler.connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }
AbstractNioChannel.AbstractNioUnsafe的 connect方法以下:
@Override public void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (!ensureOpen(promise)) { return; } try { if (connectPromise != null) { throw new IllegalStateException("connection attempt already made"); } boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) {//tag4.2.1.1 fulfillConnectPromise(promise, wasActive);//tag4.2.1.2 } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() {//tag4.2.1.3 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { if (t instanceof ConnectException) { Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress); newT.setStackTrace(t.getStackTrace()); t = newT; } promise.tryFailure(t); closeIfClosed(); } }
tag4.2.1.1 代码以下,进行了bind本地端口和connect远程服务器的操做。
@Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { javaChannel().socket().bind(localAddress); } boolean success = false; try { boolean connected = javaChannel().connect(remoteAddress);//tag4.2.1.1.1 if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT);//tag4.2.1.1.2 } success = true; return connected; } finally { if (!success) { doClose(); } } }
tag4.2.1.1.2 里面执行了connect远程服务器的操做,,个人机器上该方法返回false(返回值详见connect方法说明)。而后会触发执行selectionKey().interestOps(SelectionKey.OP_CONNECT);
须要额外说明的是,此时服务器会触发channelActivi事件。在本例的服务端代码里,会在客户端链接时,发送消息给客户端。不过先暂时忽略服务端和客户端的数据交互,下文分析。
而后tag4.2.1.1 执行结束,因为此时的返回值是false,因此不会执行tag4.2.1.2的 fulfillConnectPromise(promise, wasActive);
而后程序继续执行tag4.2.1.3 代码,进行链接超时处理:若是设置了超时时间,那么等待指定的超时时间后,再看看是否已经链接上。若是连不上,则设置失败状态。
接着开始下一个事件循环,因为在tag4.2.1.1.2执行了selectionKey().interestOps(SelectionKey.OP_CONNECT)
操做,会进入到下面的代码。这里咱们重点关注tag4.3的代码。
private static void NioEventLoop.processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //略XXXX if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect();//tag4.3 //略XXXX } @Override public void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. assert eventLoop().inEventLoop(); assert connectPromise != null; try { boolean wasActive = isActive(); doFinishConnect();//tag4.3.1 fulfillConnectPromise(connectPromise, wasActive);//tag4.3.2 } catch (Throwable t) { if (t instanceof ConnectException) { Throwable newT = new ConnectException(t.getMessage() + ": " + requestedRemoteAddress); newT.setStackTrace(t.getStackTrace()); t = newT; } // Use tryFailure() instead of setFailure() to avoid the race against cancel(). connectPromise.tryFailure(t); closeIfClosed(); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } } @Override protected void NioSocketChannel.doFinishConnect() throws Exception { if (!javaChannel().finishConnect()) { throw new Error(); } }
在执行完下面的boolean promiseSet = promise.trySuccess();
方法后,实例代码中的 Channel ch = b.connect(host, port).sync().channel();
就执行完毕了,而后主线程就阻塞在实例代码中的 String line = in.readLine();
这句代码里了。
private void AbstractNioChannel.AbstractNioUnsafe.fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { // trySuccess() will return false if a user cancelled the connection attempt. boolean promiseSet = promise.trySuccess(); // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, // because what happened is what happened. if (!wasActive && isActive()) { pipeline().fireChannelActive();//tag4.3.2.1 } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). if (!promiseSet) { close(voidPromise()); } } @Override public ChannelPipeline fireChannelActive() { head.fireChannelActive();//tag4.3.2.1.1 if (channel.config().isAutoRead()) { channel.read();//tag4.3.2.1.2 } return this; }
此时,继续执行tag4.3.2.1的代码,进而执行tag4.3.2.1.1的代码,最终执行TailHandler.channelActive方法。因为TailHandler类内部的方法基本都是空实现,因此再也不贴代码了。而后再执行tag4.3.2.1.2的channel.read();
代码,最终执行了AbstractNioChannel.doBeginRead()方法的selectionKey.interestOps(interestOps | readInterestOp);
,等同于执行了selectionKey.interestOps(SelectionKey.OP_READ);
。
此时方法返回,在NioEventLoop.run()通过了一些简单的数据清理后,而后有机会对服务端的channelActive时发送的数据进行处理了(在tag4.2.1.1.2曾经提过)。客户端和服务端交互过程详见下篇。