深刻理解 Netty-Channel架构体系

全文围绕下图,Netty-Channel的简化版架构体系图展开,从顶层Channel接口开始入手,往下递进,闲言少叙,直接开撸java

概述: 从图中能够看到,从顶级接口Channel开始,在接口中定义了一套方法看成规范,紧接着的是来两个抽象的接口实现类,在这个抽象类中对接口中的方法,进行了部分实现,而后开始根据不一样的功能分支,分红服务端的Channel和客户端的Channelgit

1

回顾

Channel的分类

根据服务端和客户端,Channel能够分红两类(这两大类的分支见上图):github

  • 服务端: NioServerSocketChannel
  • 客户端: NioSocketChannel

什么是Channel?

channel是一个管道,用于链接字节缓冲区Buf和另外一端的实体,这个实例能够是Socket,也能够是File, 在Nio网络编程模型中, 服务端和客户端进行IO数据交互(获得彼此推送的信息)的媒介就是Channel编程

Netty对Jdk原生的ServerSocketChannel进行了封装和加强封装成了NioXXXChannel, 相对于原生的JdkChannel, Netty的Channel增长了以下的组件promise

  • id 标识惟一身份信息
  • 可能存在的parent Channel
  • 管道 pepiline
  • 用于数据读写的unsafe内部类
  • 关联上相伴终生的NioEventLoop

本篇博客,会追溯上图中的体系关系,找出NioXXXChannel的相对于jdk原生channel在哪里添加的上面的新组件服务器

源码开始-Channel

2

如今来到上图的Channel部分, 他是一个接口, netty用它规定了一个Channel应该具备的功能,在它的文档对Channel的是什么,以及对各个组件进行了描述网络

  • 阐述了channel是什么,有啥用
  • Channel经过ChannelPipeline中的多个Handler处理器,Channel使用它处理IO数据
  • Channel中的全部Io操做都是异步的,一经调用就立刻返回,因而Netty基于Jdk原生的Future进行了封装, ChannelFuture, 读写操做会返回这个对象,实现自动通知IO操做已完成
  • Channel是能够有parent的, 以下
// 建立客户端channel时,会把服务端的Channel设置成本身的parent
// 因而就像下面:
  服务端的channel = 客户端的channel.parent();
  服务的channel.parent()==null;

此外,Channel还定义了大量的抽象方法, 以下:多线程

/**
 * todo 返回一个仅供内部使用的unsafe对象, Chanel上 IO数据的读写都是借助这个类完成的
 */
Unsafe unsafe();
// 返回Channel的管道
ChannelPipeline pipeline();

ByteBufAllocator alloc();

@Override // todo 进入第一个实现 , 读取Channel中的 IO数据
Channel read();

// 返回Channel id
ChannelId id();  

// todo 返回channel所注册的 eventLoop
EventLoop eventLoop();

// 返回当前Channel的父channel
Channel parent();

// todo 描述了 关于channel的 一些列配置信息
ChannelConfig config();

// 检查channel是否开启
boolean isOpen();

// 检查channel是否注册
boolean isRegistered();

 // todo 什么是active 他说的是channel状态, 什么状态呢? 当前channel 若和Selector正常的通讯就说明 active
boolean isActive();

// 返回channel的元数据
ChannelMetadata metadata();

//  服务器的ip地址
SocketAddress localAddress();

// remoteAddress 客户端的ip地址
SocketAddress remoteAddress();

ChannelFuture closeFuture();
boolean isWritable();
long bytesBeforeUnwritable();
long bytesBeforeWritable();
@Override
Channel flush();

Channel重要的内部接口 unsafe

Netty中,真正帮助Channel完成IO读写操做的是它的内部类unsafe, 源码以下, 不少重要的功能在这个接口中定义, 下面列举的经常使用的方法架构

interface Unsafe {
//  把channel注册进EventLoop
void register(EventLoop eventLoop, ChannelPromise promise);
 
 // todo 给channel绑定一个 adress,
void bind(SocketAddress localAddress, ChannelPromise promise);

// 把channel注册进Selector
void deregister(ChannelPromise promise);

// 从channel中读取IO数据
void beginRead();

// 往channe写入数据
void write(Object msg, ChannelPromise promise);
...
...

AbstractChanel

3

接着往下看,下面来到Channel接口的直接实现类,AbstractChannel 他是个抽象类, AbstractChannel重写部分Channel接口预约义的方法, 它的抽象内部类AbstractUnsafe实现了Channel的内部接口unsafe并发

咱们如今是从上往下看,可是当咱们建立对象使用的时候实际上是使用的特化的对象,建立特化的对象就不免会调层层往上调用父类的构造方法, 因此咱们看看AbstractChannel的构造方法干了什么活? 源码以下:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    // todo channelId 表明Chanel惟一的身份标志
    id = newId();
    // todo 建立一个unsafe对象
    unsafe = newUnsafe();
    // todo 在这里初始化了每个channel都会有的pipeline组件
    pipeline = newChannelPipeline();
}

咱们看,AbstractChannel构造函数, 接受的子类传递进来的参数只有一个parent CHannel,并且,还不有可能为空, 因此在AbstractChannel是没有维护jdk底层的Channel的, 相反他会维护着Channel关联的EventLoop,我是怎么知道的呢? 首先,它的属性中存在这个字段,并且,将channel注册进selector的Register()方法是AbastractChannel重写的,Selector在哪呢? 在EventLoop里面,它怎么获得的呢? 它的子类传递了给了它

终于看出来点眉目,构造方法作了四件事

  • 设置parent
    • 若是当前建立的channel是客户端的channel,把parent初始化为他对应的parent
    • 若是为服务端的channel,这就是null
  • 建立惟一的id
  • 建立针对channel进行io读写的unsafe
  • 建立channel的处理器handler链 channelPipeline

AbstractChannel中维护着EventLoop

AbstractChanel的重要抽象内部类AbstractUnsafe 继承了Channel的内部接口Unsafe

他的源码以下,我贴出来了两个重要的方法, 关于这两个方法的解析,我写在代码的下面

protected abstract class AbstractUnsafe implements Unsafe {

@Override
// todo 入参 eventLoop == SingleThreadEventLoop   promise == NioServerSocketChannel + Executor
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }
    // todo 赋值给本身的 事件循环, 把当前的eventLoop赋值给当前的Channel上  做用是标记后续的全部注册的操做都得交给我这个eventLoop处理, 正好对应着下面的判断
    // todo 保证了 即使是在多线程的环境下一条channel 也只能注册关联上惟一的eventLoop,惟一的线程
    AbstractChannel.this.eventLoop = eventLoop;

    // todo 下面的分支判断里面执行的代码是同样的!!, 为何? 这是netty的重点, 它大量的使用线程, 线程之间就会产生同步和并发的问题
    // todo 下面的分支,目的就是把线程可能带来的问题降到最低限度
    // todo 进入inEventLoop() --> 判断当前执行这行代码的线程是否就是 SingleThreadEventExecutor里面维护的那条惟一的线程
    // todo 解释下面分支的必要性, 一个eventLoop能够注册多个channel, 可是channel的整个生命周期中全部的IO事件,仅仅和它关联上的thread有关系
    // todo 并且,一个eventLoop在他的整个生命周期中,只和惟一的线程进行绑定,
    //
    // todo 当咱们注册channel的时候就得确保给他专属它的thread,
    // todo 若是是新的链接到了,
    if (eventLoop.inEventLoop()) {
        // todo 进入regist0()
        register0(promise);
    } else {
        try {
            // todo 若是不是,它以一个任务的形式提交  事件循环 , 新的任务在新的线程开始,  规避了多线程的并发
            // todo 他是SimpleThreadEventExucutor中execute()实现的,把任务添加到执行队列执行
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // todo 进入这个方法doRegister()
        // todo 它把系统建立的ServerSocketChannel 注册进了选择器
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        // todo 确保在 notify the promise前调用 handlerAdded(...)
        // todo 这是必需的,由于用户可能已经经过ChannelFutureListener中的管道触发了事件。
        // todo 若是须要的话,执行HandlerAdded()方法
        // todo 正是这个方法, 回调了前面咱们添加 Initializer 中添加 Accpter的重要方法
        pipeline.invokeHandlerAddedIfNeeded();

        // todo  !!!!!!!  观察者模式!!!!!!  通知观察者,谁是观察者?  暂时理解ChannelHandler 是观察者
        safeSetSuccess(promise);

        // todo 传播行为, 传播什么行为呢?   在head---> ServerBootStraptAccptor ---> tail传播事件ChannelRegistered  , 也就是挨个调用它们的ChannelRegisted函数
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        // todo 对于服务端:  javaChannel().socket().isBound(); 即  当Channel绑定上了端口   isActive()才会返回true
        // todo 对于客户端的链接 ch.isOpen() && ch.isConnected(); 返回true , 就是说, Channel是open的 打开状态的就是true
        if (isActive()) {
            if (firstRegistration) {
                // todo 在pipeline中传播ChannelActive的行为,跟进去
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                // todo 能够接受客户端的数据了
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    // todo 因为端口的绑定未完成,因此 wasActive是 false

    try {
        // todo 绑定端口, 进去就是NIO原生JDK绑定端口的代码
        doBind(localAddress);
        // todo 端口绑定完成  isActive()是true
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    // todo 根据上面的逻辑判断, 结果为 true
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            // todo 来到这里很重要, 向下传递事件行为, 传播行为的时候, 从管道的第一个节点开始传播, 第一个节点被封装成 HeadContext的对象
           // todo 进入方法, 去 HeadContext里面查看作了哪些事情
            // todo 她会触发channel的read, 最终从新为 已经注册进selector 的 chanel, 二次注册添加上感性趣的accept事件
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    // todo 观察者模式, 设置改变状态, 通知观察者的方法回调
    safeSetSuccess(promise);
}

AbstractChannel抽象内部类的register(EventLoop,channelPromise)方法

这个方法,是将channel注册进EventLoop的Selector, 它的调用顺序以下:

本类方法 regist()--> 本类方法 register0() --> 本类抽象方法doRegister()

doRegister() 在这里设计成抽象方法,等着子类去具体的实现, 为啥这样作呢?

刚才说了,AbstractChannel自己就是个模板,并且它仅仅维护了EventLoop,没有拿到channel引用的它根本不可能进行注册的逻辑,那谁有jdk原生channel的引用呢? 它的直接子类AbstractNioChannel下面是AbstractNioChannel的构造方法, 它本身维护jdk原生的Channel,因此由他重写doRegister(),

*/ // todo 不管是服务端的channel 仍是客户端的channel都会使用这个方法进行初始化
// // TODO: 2019/6/23                null        ServerSocketChannel       accept
// todo  若是是在建立NioSocketChannel  parent==NioServerSocketChannel  ch == SocketChanel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);// todo  继续向上跟,建立基本的组件
    // todo 若是是建立NioSocketChannel   这就是在保存原生的jdkchannel
    // todo 若是是建立NioServerSocketChannel   这就是在保存ServerSocketChannel
    this.ch = ch;
    // todo 设置上感兴趣的事件
    this.readInterestOp = readInterestOp;
    try {
        // todo 做为服务端, ServerSocketChannel 设置为非阻塞的
        // todo 做为客户端   SocketChannel 设置为非阻塞的
        ch.configureBlocking(false);
    } catch (IOException e) {

AbstractChannel抽象内部类的bind()方法

bind()方法的调用顺序, 本类方法 bind()--> 本类的抽象方法 dobind()

方法的目的是给Channel绑定上属于它的端口,一样有一个抽象方法,等着子类去实现,由于咱们已经知道了AbstractChannel不维护channel的引用,因而我就去找dobind()这个抽象函数的实现, 结果发现,AbstractChannel的直接子类AbstractNioChannel中根本不没有他的实现,这是被容许的,由于AbstractNioChannel自己也是抽象类, 究竟是谁实现呢? 以下图:在NioServerSocketChannel中获取出 Jdk原生的channel, 客户端和服务端的channel又不一样,因此绑定端口这中特化的任务,交给他们本身实现

4

AbstractChannel的beginRead()()方法

上面完成注册以后,就去绑定端口,当端口绑定完成,就会channel处于active状态,下一步就是执行beginRead() ,执行的流程以下

本类抽象方法 beginRead() --> 本类抽象方法doBeginRead()

这个read() 就是从已经绑定好端口的channel中读取IO数据,和上面的方法同样,对于没有channel引用的AbstractChannel来讲,netty把它设计成抽象方法,交给拥有jdk 原生channel引用的AbstractNioChannel实现

小结:

AbstractChannel做为Channel的直接实现类,自己又是抽象类,因而它实现了Channel的预留的一些抽象方法, 初始化了channel的四个组件 id pipeline unsafe parent, 更为重要的是它的抽象内部类 实现了 关于nettyChannel的注册,绑定,读取数据的逻辑,并且以抽象类的方法,挖好了填空题等待子类的特化实现


递进AbstractNioChannel

5

跟进构造方法

依然是来到AbstractNioChannel的构造方法,发现它作了以下的构造工做:

  • 把parent传递给了AbstractChannel
  • 把子类传递过来的Channel要告诉Selector的感兴趣的选项保存
  • 设置channel为非阻塞
// todo 不管是服务端的channel 仍是客户端的channel都会使用这个方法进行初始化
// // TODO: 2019/6/23                null        ServerSocketChannel       accept
// todo  若是是在建立NioSocketChannel  parent==NioServerSocketChannel  ch == SocketChanel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);// todo  继续向上跟,建立基本的组件
    // todo 若是是建立NioSocketChannel   这就是在保存原生的jdkchannel
    // todo 若是是建立NioServerSocketChannel   这就是在保存ServerSocketChannel
    this.ch = ch;
    // todo 设置上感兴趣的事件
    this.readInterestOp = readInterestOp;
    try {
        // todo 做为服务端, ServerSocketChannel 设置为非阻塞的
        // todo 做为客户端   SocketChannel 设置为非阻塞的
        ch.configureBlocking(false);
    } catch (IOException e) {

重写了它父类的doRegister()

AbstractNioChannel维护channel的引用,真正的实现把 jdk 原生的 channel注册进 Selector中

@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
    // todo  javaChannel() -- 返回SelectableChanel 可选择的Channel,换句话说,能够和Selector搭配使用,他是channel体系的顶级抽象类, 实际的类型是 ServerSocketChannel
    // todo  eventLoop().unwrappedSelector(), -- >  获取选择器, 如今在AbstractNioChannel中 获取到的eventLoop是BossGroup里面的
    // todo  到目前看, 他是把ServerSocketChannel(系统建立的) 注册进了 EventLoop的选择器
    // todo 这里的 最后一个参数是  this是当前的channel , 意思是把当前的Channel当成是一个 attachment(附件) 绑定到selector上 做用???
    // todo  如今知道了attachment的做用了
     //    todo 1. 当channel在这里注册进 selector中返回一个selectionKey, 这个key告诉selector 这个channel是本身的
     //    todo 2. 当selector轮询到 有channel出现了本身的感兴趣的事件时, 须要从成百上千的channel精确的匹配出 出现Io事件的channel,
    //     todo     因而seleor就在这里提早把channel存放入 attachment中, 后来使用
    // todo 最后一个 this 参数, 若是是服务启动时, 他就是NioServerSocketChannel   若是是客户端他就是 NioSocketChannel
    // todo 到目前为止, 虽然注册上了,可是它不关心任何事件
    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    return;
} catch (CancelledKeyException e) {

新增内部接口

AbstractNioChannel新添加了一个内部接口,做为原Channel的扩展,源码以下, 咱们着重关心的就是这个新接口的 read()方法, 它的做用是从channel去读取IO数据,做为接口的抽象方法,它规范服务端和客户端根据本身需求去不一样的实现这个read()

怎么特化实现这个read方法呢? 如果服务端,它read的结果就是一个新的客户端的链接, 若是是客户端,它read的结果就是 客户端发送过来的数据,因此这个read()颇有必要去特化

/**
 * Read from underlying {@link SelectableChannel}
 */
// todo 两个实现类, NioByteUnsafe , 处理关于客户端发来的信息
// todo NioMessageUnsafe   处理客户端新进来的链接
void read();


/**
* Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
*/
public interface NioUnsafe extends Unsafe {
/**
 * Return underlying {@link SelectableChannel}
 */
SelectableChannel ch();

/**
 * Finish connect
 */
void finishConnect();


void forceFlush();
}

AbstractNioChannel抽象内部内同类时继承了它父类的AbstractUnsafe实现了当前的NioUnsafe, 再日后看, 问题来了, 服务端和客户端在的针对read的特化实如今哪里呢? 想一想看确定在它子类的unsafe内部类中,以下图,紫框框

6

一会再具体看这两个 内部类是如何特化read的 注意啊,再也不是抽象的了

再进一步 AbstractNioMessageChannel

它的构造函数以下, 只是调用父类的构造函数,传递参数

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    // todo 在进去
    // todo  null  ServerSocketChannel   accept
    super(parent, ch, readInterestOp);
}

AbstractNioMessageChannelMessageNioUnsaferead()特化实现

在read方法中,咱们能够看到,他调用是本类的抽象方法doReadMessages(List<Object> buf), 方法的实现类是继承体系的最底层的NioServerSocketChannel, 由于他就是那个特化的服务端channel

固然若是咱们一开始跟进read()时,来到的客户端的AbstractNioByteChannel,如今咱们找到的doReadMessage()就是由 客户端的channelNioSocketChannel完成的doReadBytes()

// todo 用于处理新连接进来的内部类
private final class NioMessageUnsafe extends AbstractNioUnsafe {

// todo 这个容器用于存放临时读到的链接
private final List<Object> readBuf = new ArrayList<Object>();

// todo 接受新连接的 read来到这里
@Override
public void read() {
    ...
    doBeginRead(buf);
    ...
}

// todo 处理新的链接 是在 NioServerSocketChannel中实现的, 进入查看
protected abstract int doReadMessages(List<Object> buf) throws Exception;

最终,特化的channel实现

如今咱们就来到了最底层,整张继承图就所有展示在眼前了,下面就去看看,特化的服务端Channel NioServerSocketChannelNioSocketChanneldoReadMessages()doReadBytes()的各自实现

服务端, 咱们看到了,它的特化read()是在建立新的 Jdk远程channel, 由于它在建立新的链接chanel

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    // todo java Nio底层在这里 建立jdk底层的 原生channel
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            // todo  把java原生的channel, 封装成 Netty自定义的封装的channel , 这里的buf是list集合对象,由上一层传递过来的
            // todo  this  --  NioServerSocketChannel
            // todo  ch --     SocketChnnel
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
        ...

客户端, 读取客户端发送过来的IO数据

@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

小结:

Netty的channel继承体系,到如今就完成了, 相信,当咱们如今再从 NioServerEventLoop入手,看他的初始化过程应该很简单了, 其中我但愿本身能够牢记几个点

  • AbstractChannel维护NioChannelEventLoop
  • AbstractNioChannel维护jdk原生 channel
  • AbstractChannel中的AbstractUnsafe主要是定义了一套模板,给子类提供了填空题,下面的三个填空
    • 注册 把chanel注册进Selector
    • 绑定 把chanel绑定上端口
    • 添加感兴趣的事件, 给建立出来的channel二次注册上netty能够处理的感兴趣的事件
  • channel的io操做是unsafe内部类完成的
    • 服务端从channel,读取出新链接NioMessageUnsafe
    • 客户端从channel,读取出数据NioByteUnsafe
相关文章
相关标签/搜索