Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.java
咱们首先经过netty官方的demo来分析一下,TelnetServer。ios
public final class TelnetServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8992" : "8023")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new TelnetServerInitializer(sslCtx)); b.bind(PORT).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
经过上面的代码,咱们总结一下:git
TelnetServer中的bossGroup的线程数量设置为1,我有个疑问,线程数量若是大于1会怎么样?咱们先看看netty相关的系统监听和服务注册的源码。服务的起点在b.bind(PORT).sync().channel().closeFuture().sync(),那么咱们就线程b.bind(PORT)开始:github
public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); } private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); ... if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { ... } }
上面的三个方法的代码中,最重要的是initAndRegister()和doBind0两个方法,下面咱们先来看一下initAndRegister方法:编程
final ChannelFuture initAndRegister() {
Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } ... ChannelFuture regFuture = config().group().register(channel); ... return regFuture; }
其中,channelFactory.newChannel()会建立一个NioServerSocketChannel的实例,这个就和咱们的demo中.channel(NioServerSocketChannel.class)就联系起来了。咱们重点来看看init(channel)和config().group().register(channel),先来看看init方法,init方法在ServerBootstrap中:设计模式
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); System.out.println("hanlder names is :"+p.names()); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); System.out.println("hanlder names is :"+p.names()); }
上面的代码能够发现,init主要干了下面的几件事:api
咱们再来看看config().group().register(channel)相关的代码,其中config().group()获取到的group就是demo中的:bossGroup,看一下此group下实现的register源码:promise
public ChannelFuture register(Channel channel) { return next().register(channel); }
其中的next()方法会今后group中获取到一个NioEventLoop,关于建立NioEventLoop的过程及分配线程的细节,你们有兴趣的能够自行研究一下NioEventLoopGroup。接下来,咱们再来看看NioEventLoop的register方法:数据结构
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
其中promise.channel().unsafe().register方法在AbstractUnsafe类里面:架构
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } ... } }
AbstractChannel.this.eventLoop = eventLoop 这行代码将此unsafe对象和NioEventLoopGroup分配的NioEventLoop绑定,其实就是将NioServerSocketChannel和它的eventLoop进行绑定,使得此NioServerSocketChannel相关的代码只能在eventLoop的专属线程里执行,这里也能够回答了咱们开头的问题:“TelnetServer中的bossGroup的线程数量设置为1,我有个疑问,线程数量若是大于1会怎么样?”,答案是:线程数量只能设置为1,由于有且只有一个线程会服务于NioServerSocketChannel,设置多了是浪费。咱们再来看看register0()相关的代码,注意register0()相关的代码执行已是在eventLoop的专属线程里执行的了:
private void register0(ChannelPromise promise) { try { ... doRegister(); ... pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } ... }
这里面比较重要的是doRegister()、isActive(),咱们先来看看doRegister()方法:
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } ... } }
javaChannel().register方法调用jdk底层的channel进行注册,具体逻辑就不深刻下去,咱们再来看看上面的isActive()方法:
public boolean isActive() { return javaChannel().socket().isBound(); }
判断端口是否绑定,由于咱们如今还没绑定,因此这里会返回false。接下来,咱们再来回头看以前提到的AbstractBootstrap的doBind0()方法:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
上面代码中的channel.bind会调用到AbstractChannel的bind方法:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
继续来看DefaultChannelPipeline中的bind方法:
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
tail的类型是TailContext,咱们来看看它里面的bind方法:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
上面的代码中的next类型为HeadContext,由于已经在eventLoop里面,因此会直接执行next.invokeBind(localAddress, promise),源码以下:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); } }
((ChannelOutboundHandler) handler()).bind方法,咱们再来看看这个hanlder的bind方法:
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
又调到了unsafe里面的方法,咱们继续分析:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { ... boolean wasActive = isActive(); try { doBind(localAddress); } ... if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
核心代码是doBind方法的调用,它在NioServerSocketChannel中,咱们来继续分析:
protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
doBind方法里面就开始调用jdk的相关绑定端口的底层代码,到此咱们nioserver的启动流程就已经分析完毕,咱们来总结一下:
启动过程当中涉及到的设计模式总结:
在前面的server启动分析时,咱们就遇到了ChannelPipeline,这个章节咱们着重介绍一下ChannelPipeline。首先咱们来看一下ChannelPipeline的类结构关系图: 如上图所示,ChannelPipeline的继承关系比较简单,咱们实际使用的pipeline对象都是DefaultChannelPipeline类的对象。咱们在来看一张pipeline和其它重要对象的关系图:
由上面的图片上能够看出,如下几点:
咱们先提个问题,为何要有双向链表,难道单向的链表不能够吗?咱们先来看看DefaultChannelPipeline中的构造方法源码:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
DefaultChannelPipeline在初始化的时候,会建立两个context,一个为tail,一个为head,tail和head组成双向链表结构,后续业务添加的context/handler对,都会加入到这个双向链表结构里面。咱们先来看一下TailContext的源码:
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } }
上面的代码中,主要是调用了父类的构造方法:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; }
注意,tail的outbound标志是false,inbound是true,从字面意义来理解,tail是用来处理inbound事件的,它不能处理outbound相关的事件。但真实的状况却并不彻底是这样,head会是一个例外。head和tail它们既是HandlerContext的同时,又是HandlerContext关联的hanlder,来看一下代码:
public ChannelHandler handler() { return this; }
咱们再来看看HeadContext的源码:
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } }
head的inbound标志是true,outbound的标志是false,按照以前的说法,head就只能处理outbound相关的事件的,但事实上不是这样的:咱们能够发现一个head和tail实现细节的不一样:head同时实现了ChannelOutboundHandler和ChannelInboundHandler接口,而tail只实现了ChannelInboundHandler接口。下面以一个inbound事件来进行分析一下:先来看DefaultPipeline中的fireChannelRegistered():
public final ChannelPipeline fireChannelRegistered() { AbstractChannelHandlerContext.invokeChannelRegistered(head); return this; }
方法调用了AbstractChannelHandlerContext的静态方法,并将head做为参数:
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } }
上面的代码将会在eventloop下调用head的invokeChannelRegistered,咱们再来看看:
private void invokeChannelRegistered() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } }
上面的方法会调用到head的channelRegistered方法里面,咱们暂时分析到这里,代码分析的结论与咱们刚刚的分析判断是一致的:head既能够处理inbound事件,也能够处理outbound事件。
咱们刚刚分析的ChannelRegistered,就是一个典型的inbound事件。下面咱们来分析一下inbound和outbound事件。下图是来自于netty官网关于inbound和outbound事件顺序的图示。由图可知:
I/O Request
via Channel or
ChannelHandlerContext
| +---------------------------------------------------+---------------+ | ChannelPipeline | | | \|/ | | +---------------------+ +-----------+----------+ | | | Inbound Handler N | | Outbound Handler 1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler N-1 | | Outbound Handler 2 | | | +----------+----------+ +-----------+----------+ | | /|\ . | | . . | | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| | [ method call] [method call] | | . . | | . \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 2 | | Outbound Handler M-1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 1 | | Outbound Handler M | | | +----------+----------+ +-----------+----------+ | | /|\ | | +---------------+-----------------------------------+---------------+ | \|/ +---------------+-----------------------------------+---------------+ | | | | | [ Socket.read() ] [ Socket.write() ] | | | | Netty Internal I/O Threads (Transport Implementation) | +-------------------------------------------------------------------+
咱们来详细的分析一下inbound事件相关的源码。首先,咱们来看看inbound事件有哪些:
fireChannelRegistered; fireChannelUnregistered; fireChannelActive; fireChannelInactive; fireChannelRead(Object msg); fireChannelReadComplete; fireUserEventTriggered(Object event) fireChannelWritabilityChanged; fireExceptionCaught(Throwable cause);
inbound事件共用9个事件,它们都是以fire...开头。咱们来简单看一下fireChannelRead相关的流程代码,流程的起点是在NioByteUnsafe的read方法:
public final void read() {
...
try {
do {
byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); ... allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } ... }
每次从底层的socket里面读取到内容,netty都会调用pipeline的fireChannelRead方法,此方法就是咱们刚刚看到的inbound事件里面的方法:
public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
上面的pipeline代码会调用到AbstractChannelHandlerContext的invokeChannelRead方法并将head和读取到的msg传递过去,咱们再来看看invokeChannelRead:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
上面的方法会先调用head的invokeChannelRead方法,进入head中进行处理:
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
流程进入到head的channelRead方法,咱们来看看:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); }
上面的代码中的ctx仍是head自己,咱们来看看head的fireChannelRead方法:
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
上面的方法中会经过咱们看看已经看到过的invokeChannelRead方法,调用到head的下一个的处理inbound事件的Context中去,后面代码咱们便不展开。咱们总结一下inbound相关事件的处理:
咱们再来看看outbound的事件有哪些,outbound的事件比inbound事件会复杂一些,由于它的外部调用接口会比较多,可是抽象一下,就是下面这几种事件:
bind; connect; disconnect; close; deregister; read; write; flush;
outbound的事件入口也在pipeline的公共方法里,例如write的流程调用:
public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); }
上面的方法会调用到tail的writeAndFlush方法里面。关于write流程的分析,后面会有专门的章节分析,在此不展开了。
经过上面的分析,咱们都了解了inbound和outbound事件相关处理细节,那么在处理inbound和outbound事件时,若是处理逻辑遇到了异常,ChannelPipeline是如何处理的?咱们接下来便来分析一下ChannelPipeline里关于异常的处理。按下面三种状况,异常事件的处理状况是不一样的:
其中,第一和第三两种状况处理方式相同。咱们先来看看inbound异常事件的处理。
咱们选择channelActive来分析,首先来看DefaultPipeline中的fireChannelActive:
public final ChannelPipeline fireChannelActive() { AbstractChannelHandlerContext.invokeChannelActive(head); return this; }
咱们再接着往下看:
static void invokeChannelActive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelActive(); } }); } }
上面的静态方法中,会直接进入到next.invokeChannelActive(),此时的ChannelHandlerContext为head:
private void invokeChannelActive() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelActive(); } }
在上面的代码中,咱们假设在try{}模块内抛出了异常,流程便走到了notifyHandlerException:
private void notifyHandlerException(Throwable cause) { ... invokeExceptionCaught(cause); }
直接看重点的invokeExceptionCaught:
private void invokeExceptionCaught(final Throwable cause) { if (invokeHandler()) { try { handler().exceptionCaught(this, cause); } catch (Throwable error) { ... } } ... }
上面的代码会调用到Context对应的handler的exceptionCaught方法,目前咱们的context仍是head:
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); }
再接着看AbstractChannelHandlerContext的方法:
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { invokeExceptionCaught(next, cause); return this; }
注意上面方法中的next,它是head的next节点,咱们再来看看invokeExceptionCaught:
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) { ObjectUtil.checkNotNull(cause, "cause"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeExceptionCaught(cause); } ... }
上面的代码会调用next(下一个Context)的invokeExceptionCaught方法,最终会调用到能处理异常的hanlder,而后终止,netty建议咱们将异常处理的Context做为最后一个,也就是tail前面的一个。若是没有能处理此异常的hanlder,那么最后会走到tail中的处理方法。
inbound异常事件总结:
关于outbound异常事件(ChannelPromise)的处理流程并非在链表上进行传递处理的,它由于须要通知到ChannelPromise,所以,它的代码最终会走到PromiseNotificationUtil方法中:
public static void tryFailure(Promise<?> p, Throwable cause, InternalLogger logger) { if (!p.tryFailure(cause) && logger != null) { Throwable err = p.cause(); if (err == null) { logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause); } else { logger.warn( "Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}", p, ThrowableUtil.stackTraceToString(err), cause); } } }
上面的代码若是调用通知promise成功,则返回,不然打印日志。
outbound异常事件(ChannelPromise)总结:
最后,咱们总结一下inbound和outbound事件:
涉及到的设计模式总结:
由于write的流程相对比较复杂,在此咱们单独拿一个章节来进行分析。首先,咱们来拿netty4中的telnet的demo来讲明netty4的write流程:
涉及到的类:TelnetClient、AbstractChannel、DefaultChannelPipeline、TailContext、AbstractChannelHandlerContext、SingleThreadEventExecutor、NioEventLoop、AbstractEventExecutor、AbstractChannelHandlerContext.WriteAndFlushTask、
流程顺序是:TelnetClient -> AbstractChannel -> DefaultChannelPipeline -> TailContext(AbstractChannelHandlerContext) -> NioEventLoop (SingleThreadEventExecutor) ->NioEventLoop(run方法) -> AbstractEventExecutor(safeExecute方法) -> WriteAndFlushTask(run方法) -> AbstractChannelHandlerContext(hanlder为StringEncoder) -> StringEncoder(write方法) -> HeadContext(invokeWrite方法) -> NioSocketChannelUnsafe(write)
流程的起点在TelnetClient,咱们来看一下源码:
lastWriteFuture = ch.writeAndFlush(line + "\r\n");
其中的ch为NioSocketChannel,telnetclient直接调用了NioSocketChannel的父类AbstractChannel(不是直接的父类)中的writeAndFlush方法,代码以下:
public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); }
上面的方法比较简单,直接调用了DefaultChannelPipeline的writeAndFlush方法,也就是outbound事件开始在pipeline中传递:
public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); }
上面的方法调用了TailContext的writeAndFlush方法,实际上是TailContext的父类AbstractChannelHandlerContext中的方法:
public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); } public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } write(msg, true, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
上面的最后一个方法中,会被调用两次。第一次调用时,第一次的next的ChannelHandlerContext对应的context为handler对应为io.netty.handler.codec.string.StringEncoder的context,context和handler的对应关系为一对一。首先由于executor.inEventLoop() = false,也就是当前线程和channel的专属负责读写的线程不是同一个线程,因此会先走到else中的逻辑里面,先建立一个WriteAndFlushTask类型的task,而后调用safeExecute方法:
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { try { executor.execute(runnable); } catch (Throwable cause) { try { promise.setFailure(cause); } finally { if (msg != null) { ReferenceCountUtil.release(msg); } } } }
safeExecute会调用NioEventLoop(SingleThreadEventExecutor)里的execute方法:
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
上面的代码重点在于addTask方法,咱们来看一下细节:
protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (!offerTask(task)) { reject(task); } } final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task); }
上面的代码显示了,以前生成的task会最终存进类型为: 的taskQueue中LinkedBlockingQueue中,到此为止,业务线程已经将write的操做任务经过队列移交给了NioEventLoop的线程,那么咱们再来看看NioEventLoop是如何处理上面的task任务的:
protected void run() { for (;;) { try { ... if (ioRatio == 100) { ... } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } ... } }
上面代码中最核心的处理以前task的地方是经过runAllTasks方法,咱们再来看看runAllTasks方法:
protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); ... for (;;) { safeExecute(task); ... task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; } protected static void safeExecute(Runnable task) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception. Task: {}", task, t); } }
上段代码经过调用父类AbstractEventExecutor的safeExecute()方法,最终调用到了在以前生成的WriteAndFlushTask的run方法,咱们来看一下在WriteAndFlushTask中的代码流程:
public final void run() { try { // Check for null as it may be set to null if the channel is closed already if (ESTIMATE_TASK_SIZE_ON_SUBMIT) { ctx.pipeline.decrementPendingOutboundBytes(size); } write(ctx, msg, promise); } finally { // Set to null so the GC can collect them directly ctx = null; msg = null; promise = null; handle.recycle(this); } } protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ctx.invokeWrite(msg, promise); } public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { super.write(ctx, msg, promise); ctx.invokeFlush(); }
上面的代码在WriteAndFlushTask及它的父类中,最终会执行这行代码:ctx.invokeWrite(msg, promise),又调用回了AbstractChannelHandlerContext(hanlder为StringEncoder),咱们来分析一下:
private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { System.out.println("not invoke write."); write(msg, promise); } } private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
在上面的代码中最终会执行到((ChannelOutboundHandler) handler()).write(this, msg, promise),也就是StringEncoder的write方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { CodecOutputList out = null; try { if (acceptOutboundMessage(msg)) { out = CodecOutputList.newInstance(); @SuppressWarnings("unchecked") I cast = (I) msg; try { encode(ctx, cast, out); } } ... } finally { if (out != null) { final int sizeMinusOne = out.size() - 1; if (sizeMinusOne == 0) { ctx.write(out.get(0), promise); } ... } } }
上面的代码主要是对string进行编码,而后再调用ctx的write方法,此刻的ctx为StringEncoder对应的context,咱们再来分析一下context的write方法:
public ChannelFuture write(final Object msg, final ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } try { if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } write(msg, false, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
咱们又回到了以前分析过的write方法,只不过此次的next的类型为HeadContext,已是write的最后一个context了,代码最终会执行到next.invokeWrite(m, promise),咱们来继续分析:
private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } } private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
上面的两个方法最终会执行((ChannelOutboundHandler) handler()).write(this, msg, promise),由于如今的context是HeadContext,那么咱们来看看HeadContext的Handler()会是什么?
public ChannelHandler handler() { return this; } public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
原来HeadContext的Handler()就是它本身,代码会调用到unsafe的write方法,unsafe的类型为:NioSocketChannelUnsafe,咱们再来看看进入到unsafe中的代码:
public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; ... outboundBuffer.addMessage(msg, size, promise); }
上面的代码将msg信息存入到outboundBuffer中,咱们以前在研究WriteAndFlushTask的run方法时,最后还有一个flush操做,当将msg信息存入到outbondBuffer后,unsafe中的flush方法会被调用,咱们来看一下:
public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); } protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; ... try { doWrite(outboundBuffer); } catch (Throwable t) { ... } finally { inFlush0 = false; } }
上面的方法,最终会调用此unsafe的doWrite方法:
protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); }
最终代码将由unsafe的doWrite方法来调用jdk的nio相关操做。
经过分析netty4的源码及流程,咱们总结以下:
在本文中,咱们前后分析了:netty服务启动流程、netty的信息流转通道channelPipeline机制、并详细的分析了netty4的write流程。咱们如今给本次分享作一个小结: