netty源码分析之pipeline(一)

经过前面的源码系列文章中的netty reactor线程三部曲,咱们已经知道,netty的reactor线程就像是一个发动机,驱动着整个netty框架的运行,而服务端的绑定新链接的创建正是发动机的导火线,将发动机点燃html

netty在服务端端口绑定和新链接创建的过程当中会创建相应的channel,而与channel的动做密切相关的是pipeline这个概念,pipeline像是能够看做是一条流水线,原始的原料(字节流)进来,通过加工,最后输出java

本文,我将以新链接的创建为例分为如下几个部分给你介绍netty中的pipeline是怎么玩转起来的react

  • pipeline 初始化
  • pipeline 添加节点
  • pipeline 删除节点

pipeline 初始化

新链接的创建这篇文章中,咱们已经知道了建立NioSocketChannel的时候会将netty的核心组件建立出来spring

channel中的核心组件

pipeline是其中的一员,在下面这段代码中被建立bootstrap

AbstractChannel缓存

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
复制代码

AbstractChannel性能优化

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}
复制代码

DefaultChannelPipeline微信

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
复制代码

pipeline中保存了channel的引用,建立完pipeline以后,整个pipeline是这个样子的多线程

pipeline默认结构

pipeline中的每一个节点是一个ChannelHandlerContext对象,每一个context节点保存了它包裹的执行器 ChannelHandler 执行操做所须要的上下文,其实就是pipeline,由于pipeline包含了channel的引用,能够拿到全部的context信息并发

默认状况下,一条pipeline会有两个节点,head和tail,后面的文章咱们具体分析这两个特殊的节点,今天咱们重点放在pipeline

pipeline添加节点

下面是一段很是常见的客户端代码

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Spliter())
         p.addLast(new Decoder());
         p.addLast(new BusinessHandler())
         p.addLast(new Encoder());
     }
});
复制代码

首先,用一个spliter未来源TCP数据包拆包,而后将拆出来的包进行decoder,传入业务处理器BusinessHandler,业务处理完encoder,输出

整个pipeline结构以下

pipeline结构

我用两种颜色区分了一下pipeline中两种不一样类型的节点,一个是 ChannelInboundHandler,处理inBound事件,最典型的就是读取数据流,加工处理;还有一种类型的Handler是 ChannelOutboundHandler, 处理outBound事件,好比当调用writeAndFlush()类方法时,就会通过该种类型的handler

不论是哪一种类型的handler,其外层对象 ChannelHandlerContext 之间都是经过双向链表链接,而区分一个 ChannelHandlerContext究竟是in仍是out,在添加节点的时候咱们就能够看到netty是怎么处理的

DefaultChannelPipeline

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}
复制代码
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    for (ChannelHandler h: handlers) {
        addLast(executor, null, h);
    }
    return this;
}
复制代码
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1.检查是否有重复handler
        checkMultiplicity(handler);
        // 2.建立节点
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3.添加节点
        addLast0(newCtx);
    }
   
    // 4.回调用户方法
    callHandlerAdded0(handler);
    
    return this;
}
复制代码

这里简单地用synchronized方法是为了防止多线程并发操做pipeline底层的双向链表

咱们仍是逐步分析上面这段代码

1.检查是否有重复handler

在用户代码添加一条handler的时候,首先会查看该handler有没有添加过

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}
复制代码

netty使用一个成员变量added标识一个channel是否已经添加,上面这段代码很简单,若是当前要添加的Handler是非共享的,而且已经添加过,那就抛出异常,不然,标识该handler已经添加

因而可知,一个Handler若是是sharable的,就能够无限次被添加到pipeline中,咱们客户端代码若是要让一个Handler被共用,只须要加一个@Sharable标注便可,以下

@Sharable
public class BusinessHandler {
    
}
复制代码

而若是Handler是sharable的,通常就经过spring的注入的方式使用,不须要每次都new 一个

isSharable() 方法正是经过该Handler对应的类是否标注@Sharable来实现的

ChannelHandlerAdapter

public boolean isSharable() {
   Class<?> clazz = getClass();
    Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
    Boolean sharable = cache.get(clazz);
    if (sharable == null) {
        sharable = clazz.isAnnotationPresent(Sharable.class);
        cache.put(clazz, sharable);
    }
    return sharable;
}
复制代码

这里也能够看到,netty为了性能优化到极致,还使用了ThreadLocal来缓存Handler的状态,高并发海量链接下,每次有新链接添加Handler都会建立调用此方法

2.建立节点

回到主流程,看建立上下文这段代码

newCtx = newContext(group, filterName(name, handler), handler);
复制代码

这里咱们须要先分析 filterName(name, handler) 这段代码,这个函数用于给handler建立一个惟一性的名字

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        return generateName(handler);
    }
    checkDuplicateName(name);
    return name;
}
复制代码

显然,咱们传入的name为null,netty就给咱们生成一个默认的name,不然,检查是否有重名,检查经过的话就返回

netty建立默认name的规则为 简单类名#0,下面咱们来看些具体是怎么实现的

private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
        new FastThreadLocal<Map<Class<?>, String>>() {
    @Override
    protected Map<Class<?>, String> initialValue() throws Exception {
        return new WeakHashMap<Class<?>, String>();
    }
};

private String generateName(ChannelHandler handler) {
    // 先查看缓存中是否有生成过默认name
    Map<Class<?>, String> cache = nameCaches.get();
    Class<?> handlerType = handler.getClass();
    String name = cache.get(handlerType);
    // 没有生成过,就生成一个默认name,加入缓存 
    if (name == null) {
        name = generateName0(handlerType);
        cache.put(handlerType, name);
    }

    // 生成完了,还要看默认name有没有冲突
    if (context0(name) != null) {
        String baseName = name.substring(0, name.length() - 1);
        for (int i = 1;; i ++) {
            String newName = baseName + i;
            if (context0(newName) == null) {
                name = newName;
                break;
            }
        }
    }
    return name;
}
复制代码

netty使用一个 FastThreadLocal(后面的文章会细说)变量来缓存Handler的类和默认名称的映射关系,在生成name的时候,首先查看缓存中有没有生成过默认name(简单类名#0),若是没有生成,就调用generateName0()生成默认name,而后加入缓存

接下来还须要检查name是否和已有的name有冲突,调用context0(),查找pipeline里面有没有对应的context

private AbstractChannelHandlerContext context0(String name) {
    AbstractChannelHandlerContext context = head.next;
    while (context != tail) {
        if (context.name().equals(name)) {
            return context;
        }
        context = context.next;
    }
    return null;
}
复制代码

context0()方法链表遍历每个 ChannelHandlerContext,只要发现某个context的名字与待添加的name相同,就返回该context,最后抛出异常,能够看到,这个实际上是一个线性搜索的过程

若是context0(name) != null 成立,说明现有的context里面已经有了一个默认name,那么就从 简单类名#1 往上一直找,直到找到一个惟一的name,好比简单类名#3

若是用户代码在添加Handler的时候指定了一个name,那么要作到事仅仅为检查一下是否有重复

private void checkDuplicateName(String name) {
    if (context0(name) != null) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

复制代码

处理完name以后,就进入到建立context的过程,由前面的调用链得知,group为null,所以childExecutor(group)也返回null

DefaultChannelPipeline

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }
    //..
}

复制代码

DefaultChannelHandlerContext

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}
复制代码

构造函数中,DefaultChannelHandlerContext将参数回传到父类,保存Handler的引用,进入到其父类

AbstractChannelHandlerContext

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
}
复制代码

netty中用两个字段来表示这个channelHandlerContext属于inBound仍是outBound,或者二者都是,两个boolean是经过下面两个小函数来判断(见上面一段代码)

DefaultChannelHandlerContext

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}
复制代码

经过instanceof关键字根据接口类型来判断,所以,若是一个Handler实现了两类接口,那么他既是一个inBound类型的Handler,又是一个outBound类型的Handler,好比下面这个类

ChannelDuplexHandler

经常使用的,将decode操做和encode操做合并到一块儿的codec,通常会继承 MessageToMessageCodec,而MessageToMessageCodec就是继承ChannelDuplexHandler

MessageToMessageCodec

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {

    protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out) throws Exception;

    protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out) throws Exception;
 }

复制代码

context 建立完了以后,接下来终于要将建立完毕的context加入到pipeline中去了

3.添加节点

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; // 1
    newCtx.next = tail; // 2
    prev.next = newCtx; // 3
    tail.prev = newCtx; // 4
}
复制代码

用下面这幅图可见简单的表示这段过程,说白了,其实就是一个双向链表的插入操做

添加节点过程

操做完毕,该context就加入到pipeline中

添加节点以后

到这里,pipeline添加节点的操做就完成了,你能够根据此思路掌握全部的addxxx()系列方法

4.回调用户方法

AbstractChannelHandlerContext

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    ctx.handler().handlerAdded(ctx);
    ctx.setAddComplete();
}

复制代码

到了第四步,pipeline中的新节点添加完成,因而便开始回调用户代码 ctx.handler().handlerAdded(ctx);,常见的用户代码以下

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 节点被添加完毕以后回调到此
        // do something
    }
}
复制代码

接下来,设置该节点的状态

AbstractChannelHandlerContext

final void setAddComplete() {
    for (;;) {
        int oldState = handlerState;
        if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return;
        }
    }
}
复制代码

用cas修改节点的状态至:REMOVE_COMPLETE(说明该节点已经被移除) 或者 ADD_COMPLETE

pipeline删除节点

netty 有个最大的特性之一就是Handler可插拔,作到动态编织pipeline,好比在首次创建链接的时候,须要经过进行权限认证,在认证经过以后,就能够将此context移除,下次pipeline在传播事件的时候就就不会调用到权限认证处理器

下面是权限认证Handler最简单的实现,第一个数据包传来的是认证信息,若是校验经过,就删除此Handler,不然,直接关闭链接

public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
        if (verify(authDataPacket)) {
            ctx.pipeline().remove(this);
        } else {
            ctx.close();
        }
    }

    private boolean verify(ByteBuf byteBuf) {
        //...
    }
}
复制代码

重点就在 ctx.pipeline().remove(this) 这段代码

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    
    return this;
}

复制代码

remove操做相比add简单很多,分为三个步骤:

1.找到待删除的节点 2.调整双向链表指针删除 3.回调用户函数

1.找到待删除的节点

DefaultChannelPipeline

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    if (ctx == null) {
        throw new NoSuchElementException(handler.getClass().getName());
    } else {
        return ctx;
    }
}

@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }

    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {

        if (ctx == null) {
            return null;
        }

        if (ctx.handler() == handler) {
            return ctx;
        }

        ctx = ctx.next;
    }
}
复制代码

这里为了找到Handler对应的context,照样是经过依次遍历双向链表的方式,直到某一个context的Handler和当前Handler相同,便找到了该节点

2.调整双向链表指针删除

DefaultChannelPipeline

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    assert ctx != head && ctx != tail;

    synchronized (this) {
        // 2.调整双向链表指针删除
        remove0(ctx);
    }
    // 3.回调用户函数
    callHandlerRemoved0(ctx);
    return ctx;
}

private static void remove0(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next; // 1
    next.prev = prev; // 2
}

复制代码

经历的过程要比添加节点要简单,能够用下面一幅图来表示

删除节点过程

最后的结果为

删除节点以后

结合这两幅图,能够很清晰地了解权限验证Handler的工做原理,另外,被删除的节点由于没有对象引用到,果过段时间就会被gc自动回收

3.回调用户函数

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.handler().handlerRemoved(ctx);
    } finally {
        ctx.setRemoved();
    }
}
复制代码

到了第三步,pipeline中的节点删除完成,因而便开始回调用户代码 ctx.handler().handlerRemoved(ctx);,常见的代码以下

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 节点被删除完毕以后回调到此,可作一些资源清理
        // do something
    }
}
复制代码

最后,将该节点的状态设置为removed

final void setRemoved() {
    handlerState = REMOVE_COMPLETE;
}
复制代码

removexxx系列的其余方法族大同小异,你能够根据上面的思路展开其余的系列方法,这里再也不赘述

总结

1.以新链接建立为例,新链接建立的过程当中建立channel,而在建立channel的过程当中建立了该channel对应的pipeline,建立完pipeline以后,自动给该pipeline添加了两个节点,即ChannelHandlerContext,ChannelHandlerContext中有用pipeline和channel全部的上下文信息。

2.pipeline是双向个链表结构,添加和删除节点均只须要调整链表结构

3.pipeline中的每一个节点包着具体的处理器ChannelHandler,节点根据ChannelHandler的类型是ChannelInboundHandler仍是ChannelOutboundHandler来判断该节点属于in仍是out或者二者都是

下一篇文章将继续pipeline的分析,敬请期待!

若是你想系统地学Netty,个人小册《Netty 入门与实战:仿写微信 IM 即时通信系统》能够帮助你,若是你想系统学习Netty原理,那么你必定不要错过个人Netty源码分析系列视频:coding.imooc.com/class/230.h…

相关文章
相关标签/搜索