Netty源码分析第4章(pipeline)---->第7节: 前章节内容回顾

 

Netty源码分析第四章: pipelinehtml

 

第七节: 前章节内容回顾promise

 

咱们在第一章和第三章中, 遗留了不少有关事件传输的相关逻辑, 这里带你们一一回顾ide

首先看两个问题:oop

1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为何会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法源码分析

2.客户端handler是何时被添加的?学习

首先看第一个问题:this

1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为何会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法?spa

咱们首先看这段代码:线程

public void read() { //必须是NioEventLoop方法调用的, 不能经过外部线程调用
    assert eventLoop().inEventLoop(); //服务端channel的config
    final ChannelConfig config = config(); //服务端channel的pipeline
    final ChannelPipeline pipeline = pipeline(); //处理服务端接入的速率
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //设置配置
 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //建立jdk底层的channel //readBuf用于临时承载读到连接
                int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器将读到的连接进行计数
 allocHandle.incMessagesRead(localRead); //链接数是否超过最大值
            } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍历每一条客户端链接
        for (int i = 0; i < size; i ++) { readPending = false; //传递事件, 将建立NioSokectChannel进行传递 //最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法  pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代码省略
    } finally { //代码省略
 } }

重点看pipeline.fireChannelRead(readBuf.get(i))code

首先, 这里pipeline是服务端channel的pipeline, 也就是NioServerSocketChannel的pipeline

咱们学习过pipeline以后, 对这种写法并不陌生, 就是传递channelRead事件, 这里经过传递channelRead事件走到了ServerBootstrapAcceptor的channelRead()方法, 说明在这步以前, ServerBootstrapAcceptor做为一个handler添加到了服务端channel的pipeline中, 那么这个handler何时添加的呢?

咱们回顾下第一章, 初始化NioServerSocketChannel的时候, 调用了ServerBootstrap的init方法:

void init(Channel channel) throws Exception { //获取用户定义的选项(1)
    final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } //获取用户定义的属性(2)
    final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //获取channel的pipline(3)
    ChannelPipeline p = channel.pipeline(); //work线程组(4)
    final EventLoopGroup currentChildGroup = childGroup; //用户设置的Handler(5)
    final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; //选项转化为Entry对象(6)
    synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } //属性转化为Entry对象(7)
    synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //添加服务端handler(8)
    p.addLast(new ChannelInitializer<Channel>() { //初始化channel
 @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }

这个方法比较长, 咱们重点关注第8步, 添加服务端channel, 这里的pipeline, 是服务服务端channel的pipeline, 也就是NioServerSocketChannel绑定的pipeline, 这里添加了一个ChannelInitializer类型的handler

咱们看一下ChannelInitializer这个类的继承关系:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { //省略类体
}

咱们看到其继承了ChannelInboundHandlerAdapter, 说明是一个inbound类型的handler

这里咱们可能会想到, 添加完handler会执行handlerAdded, 而后再handlerAdded方法中作了添加ServerBootstrapAcceptor这个handler

可是, 实际上并非这样的, 当程序执行到这里, 并无立刻执行handlerAdded, 咱们紧跟addLast方法

最后会跟到DefualtChannelPipeline的一个addLast方法中去:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //判断handler是否被重复添加(1)
 checkMultiplicity(handler); //建立一个HandlerContext并添加到列表(2)
        newCtx = newContext(group, filterName(name, handler), handler); //添加HandlerContext(3)
 addLast0(newCtx); //是否已注册
        if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); //回调用户事件
            executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } //回调添加事件(4)
 callHandlerAdded0(newCtx); return this; }

首先完成了handler的添加, 可是并无立刻执行回调

这里咱们重点关注if (!registered)这个条件判断, 其实在注册完成, registered会变成true, 可是走到这一步的时候NioServerSockeChannel并无完成注册(能够回顾第一章看注册在哪一步), 因此会进到if里并返回自身

咱们重点关注callHandlerCallbackLater这个方法, 咱们跟进去:

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { assert !registered; //判断是否已添加, 未添加, 进行添加, 已添加进行删除
    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); //获取第一个Callback任务
    PendingHandlerCallback pending = pendingHandlerCallbackHead; //若是第一个Callback任务为空
    if (pending == null) { //将第一个任务设置为刚建立的任务
        pendingHandlerCallbackHead = task; } else { while (pending.next != null) { pending = pending.next; } pending.next = task; } }

因咱们调用这个方法的时候added传的true, 因此PendingHandlerCallback task赋值为new PendingHandlerAddedTask(ctx)

PendingHandlerAddedTask这个类, 咱们从名字能够看出, 这是一个handler添加的延迟任务, 用于执行handler延迟添加的操做, 一样也对应一个名字为PendingHandlerRemovedTask的类, 用于执行延迟删除handler的操做, 这两个类都继承抽象类PendingHandlerCallback

 

咱们看PendingHandlerAddedTask类构造方法:

PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { super(ctx); }

这里调用了父类的构造方法, 再跟进去:

PendingHandlerCallback(AbstractChannelHandlerContext ctx) { this.ctx = ctx; }

在父类中, 保存了要添加的context, 也就是ChannelInitializer类型的包装类

回到callHandlerCallbackLater方法中:

PendingHandlerCallback pending = pendingHandlerCallbackHead;

这表示获取第一个PendingHandlerCallback的任务, 其实PendingHandlerCallback是一个单向链表, 自身维护一个PendingHandlerCallback类型的next, 指向下一个任务, 在DefaultChannelPipeline这个类中, 定义了个PendingHandlerCallback类型的引用pendingHandlerCallbackHead, 用来指向延迟回调任务的中的第一个任务

 

以后判断这个任务是为空, 若是是第一次添加handler, 那么这里就是空, 因此将第一个任务赋值为咱们刚建立的添加任务

若是不是第一次添加handler, 则将咱们新建立的任务添加到链表的尾部, 由于这里咱们是第一次添加, 因此第一个回调任务就指向了咱们建立的添加handler的任务

完成这一系列操做以后, addLast方法返归, 此时并无完成添加操做

而何时完成添加操做的呢?

在服务端channel注册时候的会走到AbstractChannel的register0方法:

private void register0(ChannelPromise promise) { try { //作实际的注册(1)
 doRegister(); neverRegistered = false; registered = true; //触发事件(2)
 pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //触发注册成功事件(3)
 pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { //传播active事件(4)
 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { //省略代码
 } }

重点关注第二步pipeline.invokeHandlerAddedIfNeeded(), 这里已经经过doRegister()方法完成了实际的注册, 咱们跟到该方法中:

final void invokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { firstRegistration = false; callHandlerAddedForAllHandlers(); } }

这里会判断是否第一次注册, 这里返回true, 而后会执行callHandlerAddedForAllHandlers()方法, 咱们跟进去:

private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; registered = true; pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; this.pendingHandlerCallbackHead = null; } //获取task
    PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { //执行添加handler方法
 task.execute(); task = task.next; } }

这里拿到第一个延迟执行handler添加的task其实就是咱们以前剖析过的, 延迟执行handler添加的task, 就是PendingHandlerAddedTask对象

在while循环中, 经过执行execute()方法将handler添加

咱们跟到PendingHandlerAddedTask的execute()方法中:

void execute() { //获取当前eventLoop线程
    EventExecutor executor = ctx.executor(); //是当前执行的线程
    if (executor.inEventLoop()) { callHandlerAdded0(ctx); } else { try { //添加到队列
            executor.execute(this); } catch (RejectedExecutionException e) { //代码省略
 } } }

终于在这里, 咱们看到了执行回调的方法

再回到init方法中:

void init(Channel channel) throws Exception { //获取用户定义的选项(1)
    final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } //获取用户定义的属性(2)
    final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //获取channel的pipline(3)
    ChannelPipeline p = channel.pipeline(); //work线程组(4)
    final EventLoopGroup currentChildGroup = childGroup; //用户设置的Handler(5)
    final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; //选项转化为Entry对象(6)
    synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } //属性转化为Entry对象(7)
    synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //添加服务端handler(8)
    p.addLast(new ChannelInitializer<Channel>() { //初始化channel
 @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }

咱们继续看第8步添加服务端handler

由于这里的handler是ChannelInitializer, 因此完成添加以后会调用ChannelInitializer的handlerAdded方法

跟到handlerAdded方法:

public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //默认状况下, 会返回true
    if (ctx.channel().isRegistered()) { initChannel(ctx); } }

由于执行到这步服务端channel已经完成注册, 因此会执行到initChannel方法

跟到initChannel方法:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception { //这段代码是否被执行过
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { //调用以后会删除当前节点
 remove(ctx); } return true; } return false; }

咱们关注initChannel这个方法, 这个方法是在ChannelInitializer的匿名内部来实现的, 这里咱们注意, 在initChannel方法执行完毕以后会调用remove(ctx)删除当前节点

咱们继续跟进initChannel方法:

@Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }

这里首先添加用户自定义的handler, 这里若是用户没有定义, 则添加不成功, 而后, 会调用addLast将ServerBootstrapAcceptor这个handler添加了进去, 一样这个handler也继承了ChannelInboundHandlerAdapter, 在这个handler中, 重写了channelRead方法, 因此, 这就是第一个问题的答案

紧接着咱们看第二个问题:

2.客户端handler是何时被添加的?

咱们这里看ServerBootstrapAcceptor的channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //添加channelHadler, 这个channelHandler, 就是用户代码添加的ChannelInitializer
 child.pipeline().addLast(childHandler); //代码省略

    try { //work线程注册channel
        childGroup.register(child).addListener(new ChannelFutureListener() { //代码省略
 }); } catch (Throwable t) { forceClose(child, t); } }

这里真相能够大白了, 服务端再建立完客户端channel以后, 将新建立的NioSocketChannel做为参数触发channelRead事件(能够回顾NioMessageUnsafe的read方法, 代码这里就不贴了), 因此这里的参数msg就是NioSocketChannel

拿到channel时候再将客户端的handler添加进去, 咱们回顾客户端handler的添加过程:

.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } });

和服务端channel的逻辑同样, 首先会添加ChannelInitializer这个handler可是没有注册因此没有执行添加handler的回调, 将任务保存到一个延迟回调的task中

等客户端channel注册完毕, 会将执行添加handler的回调, 也就是handlerAdded方法, 在回调中执行initChannel方法将客户端handler添加进去, 而后删除ChannelInitializer这个handler

由于在服务端channel中这块逻辑已经进行了详细的剖析, 因此这边就不在赘述, 同窗们能够本身跟进去走一遍流程

这里注意, 由于每建立一个NioSoeketChannel都会调用服务端ServerBootstrapAcceptor的channelRead方法, 因此这里会将每个NioSocketChannel的handler进行添加

 

第四章总结

        本章剖析了事件传输的相关逻辑, 包括handler的添加, 删除, inbound和outbound以及异常事件的传输, 最后结合第一章和第三章, 剖析了服务端channel和客户端channel的添加过程, 同窗们能够课后跟进源码, 将这些功能本身再走一遍以加深印象.其余的有关事件传输的逻辑, 能够结合这一章的知识点进行自行剖析

 

上一节: 传播异常事件

下一节: AbstractByteBuf

相关文章
相关标签/搜索