Netty channelRegisteredChannelActive---源码分析
通过下面的分析咱们能够了解netty读数据的一个过程,以及为何channelActive方法、channelReadComplete方法会回调ChannelOutboundHandler的read方法。segmentfault
上文说到在HeadContext的channelActive方法中会调用readIfIsAutoRead();该方法一样会在HeadContext的channelReadComplete(xxx)中调用。 readIfIsAutoRead();源码以下:多线程
private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { channel.read(); } }
channel.config().isAutoRead()能够经过ChannelOption.AUTO_READ设置。若是设置为false,那么channel便不会主动读数据,除非显示的调用ChannelHandlerContext的read()ide
AbstractChannel的read()以下oop
@Override public Channel read() { pipeline.read(); return this; }
在read()方法中调用了pipeline的read()方法源码分析
DefaultChannelPipeline的read()方法this
@Override public final ChannelPipeline read() { tail.read(); return this; }
那么接下来的重点就是DefaultChannelPipeline的tail及tail.read()方法了。先看一下tail对应的TailContext类,TailContext是DefaultChannelPipeline的内部类线程
DefaultChannelPipelinerest
final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ...省略代码... protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
TailContextnetty
// A special catch-all handler that handles both bytes and messages. final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } @Override public ChannelHandler handler() { return this; } ...省略代码...
TailContext继承自AbstractChannelHandlerContext,同时实现了ChannelInboundHandler,也是多重身份。
TailContext的read()方法是继承自AbstractChannelHandlerContext,TailContext没有重写。code
AbstractChannleHandlerContext的read()以下:
@Override public ChannelHandlerContext read() { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); } else { Runnable task = next.invokeReadTask; if (task == null) { next.invokeReadTask = task = new Runnable() { @Override public void run() { next.invokeRead(); } }; } executor.execute(task); } return this; } private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
其中findContextOutbound()是找到下一个Outbound的ChannelHandlerContext。那么由tail.read()所表明的含义即是从pipeline中的尾部的最后一个ChannelInboundHandler开始往前查找是Outbound的HandlerContext.
而后该HandlerContext的invokeRead()方法被调用。
如下分析和read过程没多大关系也能够跳过
AbstractChannleHandlerContext的read()方法中的
if (executor.inEventLoop()) { next.invokeRead(); } else { Runnable task = next.invokeReadTask; if (task == null) { next.invokeReadTask = task = new Runnable() { @Override public void run() { next.invokeRead(); } }; } executor.execute(task); }
AbstractEventExecutor的inEventLoop()
@Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }
上面代码的含义是若是调用ChannelHandlerContext read() 所在的线程和executor是同一个线程,那么直接执行AbstractChannelHandlerContext的invokeRead()方法,不然封装成任务,放到executor的任务队列,去等待执行。 这种相似的代码在netty中很常见,这是netty中不用考虑多线程问题的缘由。netty用这种方式很好的规避了多线程所带来的问题,很值得咱们借鉴
那么这个executor怎么来的呢?看一下AbstractChannelHandlerContext的executor()方法
@Override public EventExecutor executor() { if (executor == null) { return channel().eventLoop(); } else { return executor; } }
若是executor 为null,就返回channel().eventLoop()。这里channel().eventLoop()就是每一个channel所对应的EventLoop,专门用来处理IO事件,所以不能被阻塞,不能执行耗时任务,该eventLoop会在channel建立时会和channel绑定,ChannelInboundHandler的channelRegistered()也就会被回调。咱们建立ServerBootstrap是会指定一个WokerGroup例如NioEventLoopGroup,那么这个eventLoop便会是其中的一员。
那若是executor不为null,executor是怎么来的呢?
AbstractChannelHandlerContext的构造方法
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; }
executor是经过构造方法传进来的。pipeline在添加handler时能够指定EventExecutorGroup(能够查看ChannelPipeline接口的API),即是这么传进来的,具体的分析过程此处略去(可查看netty 耗时任务如何处理去查看具体分析过程),由于不是此篇文章的重点。
这样咱们就能处理耗时任务,而不阻塞IO线程了。
第2小节分析到AbstractChannelHandlerContext的invokeRead()方法会被调用,那么invokeRead()实现了什么功能?
private void invokeRead() { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).read(this); } catch (Throwable t) { notifyHandlerException(t); } } else { read(); } }
该方法所表达的含义很简单就是回调ChannelOutboundHandler的read(xxx)方法。若是咱们的自定义的ChannelOutboundHandler继承自ChannelOutboundHandlerAdapter,而且没有重写该方法,或者在重写的方法中调用了super.read(ctx); 那就会重复调用ChannelHandlerContext的read(),即AbstractChannelHandlerContext的read()方法。这样read(xxx)回调便会在ChannelHandlerContext的做用下从pipleline的ChannelOutboundHandler中的尾部传递到头部,直到DefaultChannelPipeline的DefaultChannelPipeline的HeadContext.
HeadContext的read(xxx)方法以下,HeadContext自己也是ChannelOutboundHandler
@Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); }
以NioChannel为例,unsafe.beginRead();最终会调用到AbstractNioChannel的doBeginRead()方法,其对应的源码以下:
@Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
该方法里就是Java Nio的相关操做,SelectionKey的性趣集中添加OP_READ,最终实现读数据。