自顶向下深刻分析Netty(六)--Channel总述

自顶向下深刻分析Netty(六)--Channel总述

自顶向下深刻分析Netty(六)--Channel源码实现

6.1 总述

6.1.1 Channel

JDK中的Channel是通信的载体,而Netty中的Channel在此基础上进行封装从而赋予了Channel更多的能力,用户可使用Channel进行如下操做:php

  • 查询Channel的状态。
  • 配置Channel的参数。
  • 进行Channel支持的I/O操做(read,write,connect,bind)。
  • 获取对应的ChannelPipeline,从而能够自定义处理I/O事件或者其余请求。

为了保证在使用Channel或者处理I/O操做时不出现错误,如下几点须要特别注意:前端

  1. 全部的I/O操做都是异步的
    因为采用事件驱动的机制,因此Netty中的全部IO操做都是异步的。这意味着当咱们调用一个IO操做时,方法会当即返回并不保证操做已经完成。由上一章Future的讲解中,咱们知道,这些IO操做会返回一个ChannelFuture对象,咱们须要经过添加监听者的方式执行操做完成后需执行的代码
  2. Channel是有等级的
    若是一个Channel由另外一个Channel建立,那么他们之间造成父子关系。好比说,当ServerSocketChannel经过accept()方法接受一个SocketChannel时,那么SocketChannel的父亲是ServerSocketChannel,调用SocketChannel的parent()方法返回该ServerSocketChannel对象。
  3. 可使用向下转型获取子类的特定操做
    某些子类Channel会提供一些所需的特定操做,能够向下转型到这样的子类,从而得到特定操做。好比说,对于UDP的数据报的传输,有特定的join()和leave()操做,咱们能够向下转型到DatagramChannel从而使用这些操做。
  4. 释放资源
    当一个Channel再也不使用时,须调用close()或者close(ChannelPromise)方法释放资源。

6.1.2 Channel配置参数

(1).通用参数

CONNECT_TIMEOUT_MILLIS
        Netty参数,链接超时毫秒数,默认值30000毫秒即30秒。java

MAX_MESSAGES_PER_READ
        Netty参数,一次Loop读取的最大消息数,对于ServerChannel或者NioByteChannel,默认值为16,其余Channel默认值为1。默认值这样设置,是由于:ServerChannel须要接受足够多的链接,保证大吞吐量,NioByteChannel能够减小没必要要的系统调用select。linux

WRITE_SPIN_COUNT
        Netty参数,一个Loop写操做执行的最大次数,默认值为16。也就是说,对于大数据量的写操做至多进行16次,若是16次仍没有所有写完数据,此时会提交一个新的写任务给EventLoop,任务将在下次调度继续执行。这样,其余的写请求才能被响应不会由于单个大数据量写请求而耽误。git

ALLOCATOR
        Netty参数,ByteBuf的分配器,默认值为ByteBufAllocator.DEFAULT,4.0版本为UnpooledByteBufAllocator,4.1版本为PooledByteBufAllocator。该值也可使用系统参数io.netty.allocator.type配置,使用字符串值:"unpooled","pooled"。程序员

RCVBUF_ALLOCATOR
        Netty参数,用于Channel分配接受Buffer的分配器,默认值为AdaptiveRecvByteBufAllocator.DEFAULT,是一个自适应的接受缓冲区分配器,能根据接受到的数据自动调节大小。可选值为FixedRecvByteBufAllocator,固定大小的接受缓冲区分配器。github

AUTO_READ
        Netty参数,自动读取,默认值为True。Netty只在必要的时候才设置关心相应的I/O事件。对于读操做,须要调用channel.read()设置关心的I/O事件为OP_READ,这样如有数据到达才能读取以供用户处理。该值为True时,每次读操做完毕后会自动调用channel.read(),从而有数据到达便能读取;不然,须要用户手动调用channel.read()。须要注意的是:当调用config.setAutoRead(boolean)方法时,若是状态由false变为true,将会调用channel.read()方法读取数据;由true变为false,将调用config.autoReadCleared()方法终止数据读取。算法

WRITE_BUFFER_HIGH_WATER_MARK
        Netty参数,写高水位标记,默认值64KB。若是Netty的写缓冲区中的字节超过该值,Channel的isWritable()返回False编程

WRITE_BUFFER_LOW_WATER_MARK
        Netty参数,写低水位标记,默认值32KB。当Netty的写缓冲区中的字节超太高水位以后若降低到低水位,则Channel的isWritable()返回True。写高低水位标记使用户能够控制写入数据速度,从而实现流量控制。推荐作法是:每次调用channl.write(msg)方法首先调用channel.isWritable()判断是否可写。promise

MESSAGE_SIZE_ESTIMATOR
        Netty参数,消息大小估算器,默认为DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,其中ByteBuf和ByteBufHolder为实际大小,FileRegion估算值为0。该值估算的字节数在计算水位时使用,FileRegion为0可知FileRegion不影响高低水位。

SINGLE_EVENTEXECUTOR_PER_GROUP
        Netty参数,单线程执行ChannelPipeline中的事件,默认值为True。该值控制执行ChannelPipeline中执行ChannelHandler的线程。若是为Trye,整个pipeline由一个线程执行,这样不须要进行线程切换以及线程同步,是Netty4的推荐作法;若是为False,ChannelHandler中的处理过程会由Group中的不一样线程执行。

(2).SocketChannel参数

SO_RCVBUF
        Socket参数,TCP数据接收缓冲区大小。该缓冲区即TCP接收滑动窗口,linux操做系统可以使用命令:cat /proc/sys/net/ipv4/tcp_rmem查询其大小。通常状况下,该值可由用户在任意时刻设置,但当设置值超过64KB时,须要在链接到远端以前设置。

SO_SNDBUF
        Socket参数,TCP数据发送缓冲区大小。该缓冲区即TCP发送滑动窗口,linux操做系统可以使用命令:cat /proc/sys/net/ipv4/tcp_smem查询其大小。

TCP_NODELAY
        TCP参数,当即发送数据,默认值为Ture(Netty默认为True而操做系统默认为False)。该值设置Nagle算法的启用,改算法将小的碎片数据链接成更大的报文来最小化所发送的报文的数量,若是须要发送一些较小的报文,则须要禁用该算法。Netty默认禁用该算法,从而最小化报文传输延时。

SO_KEEPALIVE
        Socket参数,链接保活,默认值为False。启用该功能时,TCP会主动探测空闲链接的有效性。能够将此功能视为TCP的心跳机制,须要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能

SO_REUSEADDR
        Socket参数,地址复用,默认值False。有四种状况可使用:(1).当有一个有相同本地地址和端口的socket1处于TIME_WAIT状态时,而你但愿启动的程序的socket2要占用该地址和端口,好比重启服务且保持先前端口。(2).有多块网卡或用IP Alias技术的机器在同一端口启动多个进程,但每一个进程绑定的本地IP地址不能相同。(3).单个进程绑定相同的端口到多个socket上,但每一个socket绑定的ip地址不一样。(4).彻底相同的地址和端口的重复绑定。但这只用于UDP的多播,不用于TCP。

SO_LINGER
         Netty对底层Socket参数的简单封装,关闭Socket的延迟时间,默认值为-1,表示禁用该功能。-1以及全部<0的数表示socket.close()方法当即返回,但OS底层会将发送缓冲区所有发送到对端。0表示socket.close()方法当即返回,OS放弃发送缓冲区的数据直接向对端发送RST包,对端收到复位错误。非0整数值表示调用socket.close()方法的线程被阻塞直到延迟时间到或发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。

IP_TOS
        IP参数,设置IP头部的Type-of-Service字段,用于描述IP包的优先级和QoS选项。

ALLOW_HALF_CLOSURE
        Netty参数,一个链接的远端关闭时本地端是否关闭,默认值为False。值为False时,链接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,事件为ChannelInputShutdownEvent。

(3).ServerSocketChannel参数

SO_RCVBUF
        已说明,须要注意的是:当设置值超过64KB时,须要在绑定到本地端口前设置。该值设置的是由ServerSocketChannel使用accept接受的SocketChannel的接收缓冲区。

SO_REUSEADDR
        已说明

SO_BACKLOG
        Socket参数,服务端接受链接的队列长度,若是队列已满,客户端链接将被拒绝。默认值,Windows为200,其余为128。

(4).DatagramChannel参数

SO_BROADCAST
        Socket参数,设置广播模式。

SO_RCVBUF
        已说明

SO_SNDBUF
        已说明

SO_REUSEADDR
        已说明

IP_MULTICAST_LOOP_DISABLED
        对应IP参数IP_MULTICAST_LOOP,设置本地回环接口的多播功能。因为IP_MULTICAST_LOOP返回True表示关闭,因此Netty加上后缀_DISABLED防止歧义。

IP_MULTICAST_ADDR
        对应IP参数IP_MULTICAST_IF,设置对应地址的网卡为多播模式。

IP_MULTICAST_IF
        对应IP参数IP_MULTICAST_IF2,同上但支持IPV6。

IP_MULTICAST_TTL
        IP参数,多播数据报的time-to-live即存活跳数。

IP_TOS
        已说明

DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION
        Netty参数,DatagramChannel注册的EventLoop即表示已激活。


6.1.3 Channel接口

Channel接口中含有大量的方法,咱们先对这些方法分类:

  1. 状态查询
    boolean isOpen(); // 是否开放
    boolean isRegistered(); // 是否注册到一个EventLoop
    boolean isActive(); // 是否激活
    boolean isWritable();   // 是否可写

 

open表示Channel的开放状态,True表示Channel可用,False表示Channel已关闭再也不可用。registered表示Channel的注册状态,True表示已注册到一个EventLoop,False表示没有注册到EventLoop。active表示Channel的激活状态,对于ServerSocketChannel,True表示Channel已绑定到端口;对于SocketChannel,表示Channel可用(open)且已链接到对端。Writable表示Channel的可写状态,当Channel的写缓冲区outboundBuffer非null且可写时返回True。
一个正常结束的Channel状态转移有如下两种状况:

REGISTERED->CONNECT/BIND->ACTIVE->CLOSE->INACTIVE->UNREGISTERED REGISTERED->ACTIVE->CLOSE->INACTIVE->UNREGISTERED

其中第一种是服务端用于绑定的Channel或者客户端用于发起链接的Channel,第二种是服务端接受的SocketChannel。一个异常关闭的Channel则不会服从这样的状态转移。

  1. getter方法
    EventLoop eventLoop();  // 注册到的EventLoop
    Channel parent();   // 父类Channel
    ChannelConfig config(); // 配置参数
    ChannelMetadata metadata(); // 元数据
    SocketAddress localAddress();   // 本地地址
    SocketAddress remoteAddress();  // 远端地址
    Unsafe unsafe();    // Unsafe对象
    ChannelPipeline pipeline(); // 事件管道,用于处理IO事件
    ByteBufAllocator alloc();   // 字节缓存分配器
    ChannelFuture closeFuture();    // Channel关闭时的异步结果
    ChannelPromise voidPromise();   

 

  1. 异步结果生成
    ChannelPromise newPromise();
    ChannelFuture newSucceededFuture();
    ChannelFuture newFailedFuture(Throwable cause);

 

  1. I/O事件处理
    ChannelFuture bind(SocketAddress localAddress);
    ChannelFuture connect(SocketAddress remoteAddress);
    ChannelFuture disconnect();
    ChannelFuture close();
    ChannelFuture deregister();
    Channel read();
    ChannelFuture write(Object msg);
    Channel flush();
    ChannelFuture writeAndFlush(Object msg);

 

这里的I/O事件都是outbound出站事件,表示由用户发起,即用户能够调用这些方法产生响应的事件。对应地,有inbound入站事件,将在ChnanelPipeline一节中详述。


6.1.4 Unsafe

Unsafe?直译中文为不安全,这曾给我带来极大的困扰。若是你是第一次遇到这种接口,必定会和我感同身受。一个Unsafe对象是不安全的?这里说的不安全,是相对于用户程序员而言的,也就是说,用户程序员使用Netty进行编程时不会接触到这个接口和相关类。为何不会接触到呢?由于相似的接口和类是Netty的大量内部实现细节,不会暴露给用户程序员。然而咱们的目标是自顶向下深刻分析Netty,因此有必要深刻Unsafe雷区。咱们先看Unsafe接口中的方法:

    SocketAddress localAddress();   // 本地地址
    SocketAddress remoteAddress();  // 远端地址
    ChannelPromise voidPromise();   // 不关心结果的异步Promise?
    ChannelOutboundBuffer outboundBuffer(); // 写缓冲区
    void register(EventLoop eventLoop, ChannelPromise promise);
    void bind(SocketAddress localAddress, ChannelPromise promise);
    void connect(SocketAddress remoteAddress, SocketAddress localAddress, 
                              ChannelPromise promise);
    void disconnect(ChannelPromise promise);
    void close(ChannelPromise promise);
    void closeForcibly();
    void deregister(ChannelPromise promise);
    void beginRead();
    void write(Object msg, ChannelPromise promise);
    void flush();

 

也许你已经发现Unsafe接口和Channel接口中都有register、bind等I/O事件相关的方法,它们有什么区别呢?回忆一下EventLoop线程实现,当一个selectedKey就绪时,对I/O事件的处理委托给unsafe对象实现,代码相似以下:

    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        k.interestOps(k.interestOps() & ~SelectionKey.OP_CONNECT); 
        unsafe.finishConnect(); 
    }
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0
                  || readyOps == 0) {
        unsafe.read(); 
    }
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        ch.unsafe().forceFlush();
    }

 

也就是说,Unsafe的子类做为Channel的内部类,负责处理底层NIO相关的I/O事件。Channel则使用责任链的方式经过ChannelPipeline将事件提供给用户自定义处理。

 

6.2 Channel实现

![Netty_Channel类图][2]

Channel的类图比较清晰。咱们主要分析NioSocketChannel和NioServerSocketChannel这两条线。

6.2.1 AbstractChannel

首先看其中的字段:

    private final Channel parent;   // 父Channel
    private final Unsafe unsafe;    
    private final DefaultChannelPipeline pipeline;  // 处理通道
    private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
    private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
    private final CloseFuture closeFuture = new CloseFuture(this);

    private volatile SocketAddress localAddress;    // 本地地址
    private volatile SocketAddress remoteAddress;   // 远端地址
    private volatile EventLoop eventLoop;   // EventLoop线程
    private volatile boolean registered;    // 是否注册到EventLoop

 而后,咱们看其中的构造方法:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

 newUnsafe()和newChannelPipeline()可由子类覆盖实现。在Netty的实现中每个Channel都有一个对应的Unsafe内部类:AbstractChannel--AbstractUnsafe,AbstractNioChannel--AbstractNioUnsafe等等,newUnsafe()方法正好用来生成这样的对应关系。ChannelPipeline将在以后讲解,这里先了解它的功能:做为用户处理器Handler的容器为用户提供自定义处理I/O事件的能力即为用户提供业务逻辑处理。AbstractChannel中对I/O事件的处理,都委托给ChannelPipeline处理,代码都一模一样:

    public ChannelFuture bind(SocketAddress localAddress) {
        return pipeline.bind(localAddress);
    }

AbstractChannel其余方法都比较简单,主要关注状态断定的方法:

    public boolean isRegistered() {
        return registered;
    }

    public boolean isWritable() {
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        return buf != null && buf.isWritable(); // 写缓冲区不为null且可写
    }

 对于Channel的实现来讲,其中的内部类Unsafe才是关键,由于其中含有I/O事件处理的细节。AbstractUnsafe做为AbstractChannel的内部类,定义了I/O事件处理的基本框架,其中的细节留给子类实现。咱们将依次对各个事件框架进行分析。

  1. register事件框架

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        if (isRegistered()) {
            promise.setFailure(...);    // 已经注册则失败
            return;
        }
        if (!isCompatible(eventLoop)) { // EventLoop不兼容当前Channel
            promise.setFailure(...);
            return;
        }
        AbstractChannel.this.eventLoop = eventLoop;
        // 当前线程为EventLoop线程直接执行;不然提交任务给EventLoop线程
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(() -> { register0(promise); });
            } catch (Throwable t) {
                closeForcibly();    // 异常时关闭Channel
                closeFuture.setClosed();    
                safeSetFailure(promise, t);
            }
        }
    }

 

12-22行相似的代码结构,Netty使用了不少次,这是为了保证I/O事件以及用户定义的I/O事件处理逻辑(业务逻辑)在一个线程中处理。咱们看提交的任务register0():

    private void register0(ChannelPromise promise) {
        try {
            // 确保Channel没有关闭
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            doRegister();   // 模板方法,细节由子类完成
            neverRegistered = false;
            registered = true;
            pipeline.invokeHandlerAddedIfNeeded();  // 将用户Handler添加到ChannelPipeline
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();   // 触发Channel注册事件
            if (isActive()) {
                // ServerSocketChannel接受的Channel此时已被激活
                if (firstRegistration) {
                    // 首次注册且激活触发Channel激活事件
                    pipeline.fireChannelActive();   
                } else if (config().isAutoRead()) {
                    beginRead();   // 可视为模板方法 
                }
            }
        } catch (Throwable t) {
            closeForcibly();     // 可视为模板方法
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

 

register0()方法定义了注册到EventLoop的总体框架,整个流程以下:
(1).注册的具体细节由doRegister()方法完成,子类中实现。
(2).注册后将处理业务逻辑的用户Handler添加到ChannelPipeline
(3).异步结果设置为成功,触发Channel的Registered事件。
(4).对于服务端接受的客户端链接,若是首次注册,触发Channel的Active事件若是已设置autoRead,则调用beginRead()开始读取数据
对于(4)的是由于fireChannelActive()中也根据autoRead配置,调用了beginRead()方法。beginRead()方法其实也是一个框架,细节由doBeginRead()方法在子类中实现:

    public final void beginRead() {
        assertEventLoop();
        if (!isActive()) {
            return;
        }
        try {
            doBeginRead();
        } catch (final Exception e) {
            invokeLater(() -> { pipeline.fireExceptionCaught(e); });
            close(voidPromise());
        }
    }

异常处理的closeForcibly()方法也是一个框架,细节由doClose()方法在子类中实现:

    public final void closeForcibly() {
        assertEventLoop();
        try {
            doClose();
        } catch (Exception e) {
            logger.warn("Failed to close a channel.", e);
        }
    }

 register框架中有一对safeSetXXX()方法,将未完成的Promise标记为完成且成功或失败,其实现以下:

    protected final void safeSetSuccess(ChannelPromise promise) {
        if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
            logger.warn(...);
        }
    }

至此,register事件框架分析完毕。

  1. bind事件框架

    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        assertEventLoop();
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return; // 确保Channel没有关闭
        }
        boolean wasActive = isActive();
        try {
            doBind(localAddress);   // 模板方法,细节由子类完成
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
        if (!wasActive && isActive()) { 
            invokeLater(() -> { pipeline.fireChannelActive(); });   // 触发Active事件
        }
        safeSetSuccess(promise);
    }

bind事件框架较为简单,主要完成在Channel绑定完成后触发Channel的Active事件。其中的invokeLater()方法向Channel注册到的EventLoop提交一个任务:

    private void invokeLater(Runnable task) {
        try {
            eventLoop().execute(task);
        } catch (RejectedExecutionException e) {
            logger.warn("Can't invoke task later as EventLoop rejected it", e);
        }
    }

closeIfClosed()方法当Channel再也不打开时关闭Channel,代码以下:

    protected final void closeIfClosed() {
        if (isOpen()) {
            return;
        }
        close(voidPromise());
    }

close()也是一个框架,以后会进行分析。

  1. disconnect事件框架

    public final void disconnect(final ChannelPromise promise) {
        assertEventLoop();
        if (!promise.setUncancellable()) {
            return;
        }
        boolean wasActive = isActive();
        try {
            doDisconnect(); // 模板方法,细节由子类实现
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
        if (wasActive && !isActive()) {
            invokeLater(() ->{ pipeline.fireChannelInactive(); });  // 触发Inactive事件
        }
        safeSetSuccess(promise);
        closeIfClosed(); // disconnect框架可能会调用close框架
    }

 

  1. close事件框架

    public final void close(final ChannelPromise promise) {
        assertEventLoop();
        close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
    }

    private void close(final ChannelPromise promise, final Throwable cause,
                       final ClosedChannelException closeCause, final boolean notify) {
        if (!promise.setUncancellable()) {
            return;
        }
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;   
        if (outboundBuffer == null) {   // outboundBuffer做为一个标记,为空表示Channel正在关闭
            if (!(promise instanceof VoidChannelPromise)) {
                // 当Channel关闭时,将这次close异步请求结果也设置为成功
                closeFuture.addListener( (future) -> { promise.setSuccess(); });
            }
            return;
        }
        if (closeFuture.isDone()) {
            safeSetSuccess(promise);    // 已经关闭,保证底层close只执行一次
            return;
        }
        final boolean wasActive = isActive();
        this.outboundBuffer = null; // 设置为空禁止write操做,同时做为标记字段表示正在关闭
        Executor closeExecutor = prepareToClose();
        if (closeExecutor != null) {
            closeExecutor.execute(() -> {
                try {
                    doClose0(promise);  // prepareToClose返回的executor执行
                } finally {
                    invokeLater( () -> { // Channel注册的EventLoop执行
                        // 写缓冲队列中的数据所有设置失败
                        outboundBuffer.failFlushed(cause, notify);
                        outboundBuffer.close(closeCause);
                        fireChannelInactiveAndDeregister(wasActive);
                    });
                }
            });
        } else {    // 当前调用线程执行
            try {
                doClose0(promise);
            } finally {
                outboundBuffer.failFlushed(cause, notify);
                outboundBuffer.close(closeCause);
            }
            if (inFlush0) {
                invokeLater( () -> { fireChannelInactiveAndDeregister(wasActive); });
            } else {
                fireChannelInactiveAndDeregister(wasActive);
            }
        }
    }
    
    private void doClose0(ChannelPromise promise) {
        try {
            doClose();  // 模板方法,细节由子类实现
            closeFuture.setClosed();
            safeSetSuccess(promise);
        } catch (Throwable t) {
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

 close事件框架保证只有一个线程执行了真正关闭的doClose()方法,prepareToClose()作一些关闭前的清除工做并返回一个Executor,若是不为空,须要在该Executor里执行doClose0()方法;为空,则在当前线程执行(为何这样设计?)。写缓冲区outboundBuffer同时也做为一个标记字段,为空表示Channel正在关闭此时禁止写操做。fireChannelInactiveAndDeregister()方法须要invokeLater()使用EventLoop执行,是由于其中会调用deRegister()方法触发Inactive事件,而事件执行须要在EventLoop中执行。

    private void fireChannelInactiveAndDeregister(final boolean wasActive) {
        deregister(voidPromise(), wasActive && !isActive());
    }

 

  1. deregister事件框架

    public final void deregister(final ChannelPromise promise) {
        assertEventLoop();
        deregister(promise, false);
    }

    private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
        if (!promise.setUncancellable()) {
            return;
        }
        if (!registered) {
            safeSetSuccess(promise);    // 已经deregister
            return;
        }
        invokeLater( () -> {
            try {
                doDeregister(); // 模板方法,子类实现具体细节
            } catch (Throwable t) {
                logger.warn(...);
            } finally {
                if (fireChannelInactive) {
                    pipeline.fireChannelInactive(); // 根据参数触发Inactive事件
                }
                if (registered) {
                    registered = false;
                    pipeline.fireChannelUnregistered(); // 首次调用触发Unregistered事件
                }
                safeSetSuccess(promise);
            }
        });
    }

deregister事件框架的处理流程很清晰,其中,使用invokeLater()方法是由于:用户可能会在ChannlePipeline中将当前Channel注册到新的EventLoop,确保ChannelPipiline事件和doDeregister()在同一个EventLoop完成([?][3])。

须要注意的是:事件之间可能相互调用,好比:disconnect->close->deregister。

  1. write事件框架

    public final void write(Object msg, ChannelPromise promise) {
        assertEventLoop();
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            // 联系close操做,outboundBuffer为空表示Channel正在关闭,禁止写数据
            safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
            ReferenceCountUtil.release(msg); // 释放msg 防止泄露
            return;
        }
        int size;
        try {
            msg = filterOutboundMessage(msg);
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            ReferenceCountUtil.release(msg);
            return;
        }
        outboundBuffer.addMessage(msg, size, promise);
    }

事实上,这是Netty定义的write操做的所有代码,完成的功能是将要写的消息Msg加入到写缓冲区。其中的filterOutboundMessage()可对消息进行过滤整理,例如把HeapBuffer转为DirectBuffer,具体实现由子类负责:

    protected Object filterOutboundMessage(Object msg) throws Exception {
        return msg; // 默认实现
    }

 

  1. flush事件框架

    public final void flush() {
        assertEventLoop();
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return; // Channel正在关闭直接返回
        }
        outboundBuffer.addFlush();  // 添加一个标记
        flush0();
    }

    protected void flush0() {
        if (inFlush0) {
            return;     // 正在flush返回防止屡次调用
        }
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null || outboundBuffer.isEmpty()) {
            return; // Channel正在关闭或者已没有须要写的数据
        }
        inFlush0 = true;
        if (!isActive()) {
            // Channel已经非激活,将全部进行中的写请求标记为失败
            try {
                if (isOpen()) {
                    outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                } else {
                    outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                }
            } finally {
                inFlush0 = false;
            }
            return;
        }
        try {
            doWrite(outboundBuffer);    // 模板方法,细节由子类实现
        } catch (Throwable t) {
            if (t instanceof IOException && config().isAutoClose()) {
                close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            } else {
                outboundBuffer.failFlushed(t, true);
            }
        } finally {
            inFlush0 = false;
        }
    }

flush事件中执行真正的底层写操做,Netty对于写的处理引入了一个写缓冲区ChannelOutboundBuffer,由该缓冲区控制Channel的可写状态,其具体实现,将会在缓冲区一章中分析。

至此,Unsafe中的事件方法已经分析完7个,但还有connect和read没有引入,下一节将进行分析。

6.2.2 AbstractNioChannel

Netty的实现中,Unsafe的I/O事件框架中的细节实现方法doXXX()放到了Channel子类中而不是Unsafe子类中,因此咱们先分析Unsafe,而后分析Channel。
AbstractNioChannel从名字能够看出是对NIO的抽象,咱们自顶向下一步一步深刻细节,该类中定义了一个NioUnsafe接口:

    public interface NioUnsafe extends Unsafe {
        SelectableChannel ch(); // 对应NIO中的JDK实现的Channel
        void finishConnect();   // 链接完成
        void read();    // 从JDK的Channel中读取数据
        void forceFlush(); 
    }

回忆NIO的三大概念:Channel、Buffer、Selector,Netty的Channel包装了JDK的Channel从而实现更为复杂的功能。Unsafe中可使用ch()方法,NioChannel中可使用javaChannel()方法得到JDK的Channel。接口中定义了finishConnect()方法是由于:SelectableChannel设置为非阻塞模式时,connect()方法会当即返回,此时链接操做可能没有完成,若是没有完成,则须要调用JDK的finishConnect()方法完成链接操做。也许你已经注意到,AbstractUnsafe中并无connect事件框架,这是由于并非全部链接都有标准的connect过程,好比Netty的LocalChannel和EmbeddedChannel。可是NIO中的链接操做则有较为标准的流程,在介绍Connect事件框架前,先介绍一下其中使用到的相关字段,这些字段定义在AbstractNioChannel中:

    private ChannelPromise connectPromise;  // 链接异步结果
    private ScheduledFuture<?> connectTimeoutFuture;    // 链接超时检测任务异步结果
    private SocketAddress requestedRemoteAddress;   // 链接的远端地址

Connect事件框架:

    public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
                                            final ChannelPromise promise) {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return; // Channel已被关闭
        }
        try {
            if (connectPromise != null) {
                throw new ConnectionPendingException(); // 已有链接操做正在进行
            }
            boolean wasActive = isActive();
            // 模板方法,细节子类完成
            if (doConnect(remoteAddress, localAddress)) {
                fulfillConnectPromise(promise, wasActive);  // 链接操做已完成
            } else {
                // 链接操做还没有完成
                connectPromise = promise;
                requestedRemoteAddress = remoteAddress;
                // 这部分代码为Netty的链接超时机制
                int connectTimeoutMillis = config().getConnectTimeoutMillis();
                if (connectTimeoutMillis > 0) {
                    connectTimeoutFuture = eventLoop().schedule(() -> {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause = new ConnectTimeoutException("...");
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                }

                promise.addListener((ChannelFutureListener) (future) -> {
                    if (future.isCancelled()) {
                        // 链接操做取消则链接超时检测任务取消
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                });
            }
        } catch (Throwable t) {
            promise.tryFailure(annotateConnectException(t, remoteAddress));
            closeIfClosed();
        }
    }

Connect事件框架中包含了Netty的链接超时检测机制:向EventLoop提交一个调度任务,设定的超时时间已到则向链接操做的异步结果设置失败而后关闭链接。fulfillConnectPromise()设置异步结果为成功并触发Channel的Active事件:

    private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
        if (promise == null) {
            return; // 操做已取消或Promise已被通知?
        }
        boolean active = isActive();
        boolean promiseSet = promise.trySuccess();  // False表示用户取消操做
        if (!wasActive && active) { // 此时用户没有取消Connect操做
            pipeline().fireChannelActive(); // 触发Active事件
        }
        if (!promiseSet) {
            close(voidPromise()); // 操做已被用户取消,关闭Channel
        }
    }

 

FinishConnect事件框架:

    public final void finishConnect() {
        assert eventLoop().inEventLoop();
        try {
            boolean wasActive = isActive();
            doFinishConnect();  // 模板方法
            fulfillConnectPromise(connectPromise, wasActive);   // 首次Active触发Active事件
        } catch (Throwable t) {
            fulfillConnectPromise(connectPromise, annotateConnectException(...));
        } finally {
            if (connectTimeoutFuture != null) {
                connectTimeoutFuture.cancel(false); // 链接完成,取消超时检测任务
            }
            connectPromise = null;
        }
    }

finishConnect()只由EventLoop处理就绪selectionKey的OP_CONNECT事件时调用,从而完成链接操做。注意:链接操做被取消或者超时不会使该方法被调用。

Flush事件细节:

    protected final void flush0() {
        if (isFlushPending()) {
            return; // 已经有flush操做,返回
        }
        super.flush0(); // 调用父类方法
    }

    public final void forceFlush() {
        super.flush0(); // 调用父类方法
    }

    private boolean isFlushPending() {
        SelectionKey selectionKey = selectionKey();
        return selectionKey.isValid() && 
                    (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
    }

forceFlush()方法由EventLoop处理就绪selectionKey的OP_WRITE事件时调用,将缓冲区中的数据写入Channel。isFlushPending()方法容易致使困惑:为何selectionKey关心OP_WRITE事件表示正在Flush呢?OP_WRITE表示通道可写,而通常状况下通道均可写,若是selectionKey一直关心OP_WRITE事件,那么将不断从select()方法返回从而致使死循环。Netty使用一个写缓冲区,write操做将数据放入缓冲区中,flush时设置selectionKey关心OP_WRITE事件,完成后取消关心OP_WRITE事件。因此,若是selectionKey关心OP_WRITE事件表示此时正在Flush数据。

AbstractNioUnsafe还有最后一个方法removeReadOp():

    protected final void removeReadOp() {
        SelectionKey key = selectionKey();
        if (!key.isValid()) {
            return; // selectionKey已被取消
        }
        int interestOps = key.interestOps();
        if ((interestOps & readInterestOp) != 0) {
            key.interestOps(interestOps & ~readInterestOp); // 设置为再也不感兴趣
        }
    }

Netty中将服务端的OP_ACCEPT和客户端的Read统一抽象为Read事件,在NIO底层I/O事件使用bitmap表示,一个二进制位对应一个I/O事件。当一个二进制位为1时表示关心该事件,readInterestOp的二进制表示只有1位为1,因此体会interestOps & ~readInterestOp的含义,可知removeReadOp()的功能是设置SelectionKey再也不关心Read事件。相似的,还有setReadOp()、removeWriteOp()、setWriteOp()等等。

分析完AbstractNioUnsafe,咱们再分析AbstractNioChannel,首先看其中还没讲解的字段:

    private final SelectableChannel ch; // 包装的JDK Channel
    protected final int readInterestOp; // Read事件,服务端OP_ACCEPT,其余OP_READ
    volatile SelectionKey selectionKey; // JDK Channel对应的选择键
    private volatile boolean inputShutdown; // Channel的输入关闭标记
    private volatile boolean readPending;   // 底层读事件进行标记

再看一下构造方法:

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);    // 设置非阻塞模式
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                // log
            }
            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

其中的ch.configureBlocking(false)方法设置Channel为非阻塞模式,从而为Netty提供非阻塞处理I/O事件的能力。

对于AbstractNioChannel的方法,咱们主要分析它实现I/O事件框架细节部分的doXXX()方法。

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // 选择键取消从新selectNow(),清除因取消操做而缓存的选择键
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }
    
    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey()); // 设置取消选择键
    }

对于Register事件,当Channel属于NIO时,已经能够肯定注册操做的所有细节:将Channel注册到给定NioEventLoop的selector上便可。注意,其中第二个参数0表示注册时不关心任何事件,第三个参数为Netty的NioChannel对象自己。对于Deregister事件,选择键执行cancle()操做,选择键表示JDK Channel和selctor的关系,调用cancle()终结这种关系,从而实现从NioEventLoop中Deregister。须要注意的是:cancle操做调用后,注册关系不会当即生效,而会将cancle的key移入selector的一个取消键集合,当下次调用select相关方法或一个正在进行的select调用结束时,会从取消键集合中移除该选择键,此时注销才真正完成。一个Cancle的选择键为无效键,调用它相关的方法会抛出CancelledKeyException。

    protected void doBeginRead() throws Exception {
        if (inputShutdown) {
            return; // Channel的输入关闭?什么状况下发生?
        }
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return; // 选择键被取消而再也不有效
        }
        readPending = true; // 设置底层读事件正在进行
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            // 选择键关心Read事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

对于NioChannel的beginRead事件,只需将Read事件设置为选择键所关心的事件,则以后的select()调用若是Channel对应的Read事件就绪,便会触发Netty的read()操做。

    protected void doClose() throws Exception {
        ChannelPromise promise = connectPromise;
        if (promise != null) {
            // 链接操做还在进行,但用户调用close操做
            promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
            connectPromise = null;
        }
        ScheduledFuture<?> future = connectTimeoutFuture;
        if (future != null) {
            // 若是有链接超时检测任务,则取消
            future.cancel(false);
            connectTimeoutFuture = null;
        }
    }

此处的doClose操做主要处理了链接操做相关的后续处理。并无实际关闭Channel,因此须要子类继续增长细节实现。AbstractNioChannel中还有关于建立DirectBuffer的方法,将在之后必要时进行分析。其余的方法则较为简单,不在列出。最后提一下isCompatible()方法,说明NioChannel只在NioEventLoop中可用。

    protected boolean isCompatible(EventLoop loop) {
        return loop instanceof NioEventLoop;
    }

 

AbstractNioChannel的子类实现分为服务端AbstractNioMessageChannel和客户端AbstractNioByteChannel,咱们将首先分析服务端AbstractNioMessageChannel。

6.2.3 AbstractNioMessageChannel

AbstractNioMessageChannel是底层数据为消息的NioChannel。在Netty中,服务端Accept的一个Channel被认为是一条消息,UDP数据报也是一条消息。该类主要完善flush事件框架的doWrite细节和实现read事件框架(在内部类NioMessageUnsafe完成)。首先看read事件框架:

    public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // 此时读操做不被容许,既没有配置autoRead也没有底层读事件进行
            removeReadOp(); // 清除read事件,再也不关心
            return;
        }
        
        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        final ChannelPipeline pipeline = pipeline();
        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                for (;;) {
                    int localRead = doReadMessages(readBuf); // 模板方法,读取消息
                    if (localRead == 0) { // 没有数据可读
                        break;  
                    }
                    if (localRead < 0) { // 读取出错
                        closed = true;  
                        break;
                    }
                    if (!config.isAutoRead()) { //没有设置AutoRead
                        break;
                    }
                    if (readBuf.size() >= maxMessagesPerRead) { // 达到最大可读数
                        break;
                    }
                }
            } catch (Throwable t) {
                exception = t;
            }
            
            setReadPending(false);  // 已没有底层读事件
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                pipeline.fireChannelRead(readBuf.get(i));   //触发ChannelRead事件,用户处理
            }
            readBuf.clear(); // ChannelReadComplete事件中若是配置autoRead则会调用beginRead,从而不断进行读操做
            pipeline.fireChannelReadComplete(); // 触发ChannelReadComplete事件,用户处理

            if (exception != null) {
                if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
                    // ServerChannel异常也不能关闭,应该恢复读取下一个客户端
                    closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
                }
                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
                if (isOpen()) {
                    close(voidPromise());   // 非serverChannel且打开则关闭
                }
            }
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                // 既没有配置autoRead也没有底层读事件进行
                removeReadOp();
            }
        }
    }

 

read事件框架的流程已在代码中注明,须要注意的是读取消息的细节doReadMessages(readBuf)方法由子类实现。
咱们主要分析NioServerSocketChannel,它不支持doWrite()操做,因此咱们再也不分析本类的flush事件框架的doWrite细节方法,直接转向下一个目标:NioServerSocketChannel。

6.2.4 NioServerSocketChannel

你确定已经使用过NioServerSocketChannel,做为处于Channel最底层的子类,NioServerSocketChannel会实现I/O事件框架的底层细节。首先须要注意的是:NioServerSocketChannel只支持bind、read和close操做

   protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) { // JDK版本1.7以上
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
    
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();
        try {
            if (ch != null) {
                // 一个NioSocketChannel为一条消息
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);
            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
        return 0;
    }
    
    protected void doClose() throws Exception {
        javaChannel().close();
    }

其中的实现,都是调用JDK的Channel的方法,从而实现了最底层的细节。须要注意的是:此处的doReadMessages()方法每次最多返回一个消息(客户端链接),由此可知NioServerSocketChannel的read操做一次至多处理的链接数为config.getMaxMessagesPerRead(),也就是参数值MAX_MESSAGES_PER_READ。此外doClose()覆盖了AbstractNioChannel的实现,由于NioServerSocketChannel不支持connect操做,因此不须要链接超时处理。

最后,咱们再看关键构造方法:

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

其中的SelectionKey.OP_ACCEPT最为关键,Netty正是在此处将NioServerSocketChannel的read事件定义为NIO底层的OP_ACCEPT,统一完成read事件的抽象。

至此,咱们已分析完两条线索中的服务端部分,下面分析客户端部分。首先是AbstractNioChannel的另外一个子类AbstractNioByteChannel。

6.2.5 AbstractNioByteChannel

从字面可推知,AbstractNioByteChannel的底层数据为Byte字节。首先看构造方法:

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

 

其中的SelectionKey.OP_READ,说明AbstractNioByteChannel的read事件为NIO底层的OP_READ事件。
而后咱们看read事件框架:

    public final void read() {
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // 此时读操做不被容许,既没有配置autoRead也没有底层读事件进行
            removeReadOp();
            return;
        }

        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
        if (allocHandle == null) {
            this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
        }

        ByteBuf byteBuf = null;
        int messages = 0;
        boolean close = false;
        try {
            int totalReadAmount = 0;
            boolean readPendingReset = false;
            do {
                byteBuf = allocHandle.allocate(allocator);  // 建立一个ByteBuf
                int writable = byteBuf.writableBytes(); 
                int localReadAmount = doReadBytes(byteBuf); // 模板方法,子类实现细节
                if (localReadAmount <= 0) { // 没有数据可读
                    byteBuf.release();
                    byteBuf = null;
                    close = localReadAmount < 0; // 读取数据量为负数表示对端已经关闭
                    break;
                }
                if (!readPendingReset) {
                    readPendingReset = true;
                    setReadPending(false);  // 没有底层读事件进行
                    // 此时,若autoRead关闭则必须调用beginRead,read操做才会读取数据
                }
                pipeline.fireChannelRead(byteBuf);  // 触发ChannelRead事件,用户处理
                byteBuf = null;

                if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {   // 防止溢出
                    totalReadAmount = Integer.MAX_VALUE;
                    break;
                }
                totalReadAmount += localReadAmount;

                if (!config.isAutoRead()) { // 没有配置AutoRead
                    break;
                }
                if (localReadAmount < writable) {   // 读取数小于可写数,可能接受缓冲区已彻底耗尽
                    break;
                }
            } while (++ messages < maxMessagesPerRead);

            // ReadComplete结束时,若是开启autoRead则会调用beginRead,从而能够继续read
            pipeline.fireChannelReadComplete();
            allocHandle.record(totalReadAmount);

            if (close) {
                closeOnRead(pipeline);
                close = false;
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close);
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                // 既没有配置autoRead也没有底层读事件进行
                removeReadOp(); 
            }
        }
    }

AbstractNioByteChannel的read事件框架处理流程与AbstractNioMessageChannel的稍有不一样:AbstractNioMessageChannel依次读取Message,最后统一触发ChannelRead事件;而AbstractNioByteChannel每读取到必定字节就触发ChannelRead事件。这是由于,AbstractNioMessageChannel需求高吞吐量,特别是ServerSocketChannel须要尽量多地接受链接;而AbstractNioByteChannel需求快响应,要尽量快地响应远端请求

read事件的具体流程请参考代码和代码注释进行理解,再也不分析。注意到代码中有关于接收缓冲区的代码,这一部分咱们单独使用一节讲述,以后会分析。当读取到的数据小于零时,表示远端链接已关闭,这时会调用closeOnRead(pipeline)方法:

    private void closeOnRead(ChannelPipeline pipeline) {
        SelectionKey key = selectionKey();
        setInputShutdown(); // 远端关闭此时设置Channel的输入源关闭
        if (isOpen()) {
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
                // 取消关心Read事件并触发UserEvent事件ChannelInputShutdownEvent
                key.interestOps(key.interestOps() & ~readInterestOp);   
                pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
            } else {
                close(voidPromise());   // 直接关闭
            }
        }
    }

 这段代码正是Channel参数ALLOW_HALF_CLOSURE的意义描述,该参数为True时,会触发用户事件ChannelInputShutdownEvent,不然,直接关闭该Channel。

抛出异常时,会调用handleReadException(pipeline, byteBuf, t, close)方法:

    private void handleReadException(ChannelPipeline pipeline,
                                         ByteBuf byteBuf, Throwable cause, boolean close) {
        if (byteBuf != null) {  // 已读取到数据
            if (byteBuf.isReadable()) { // 数据可读
                setReadPending(false);
                pipeline.fireChannelRead(byteBuf);  
            } else {    // 数据不可读
                byteBuf.release();
            }
        }
        pipeline.fireChannelReadComplete();
        pipeline.fireExceptionCaught(cause);
        if (close || cause instanceof IOException) {
            closeOnRead(pipeline);
        }
    }

可见,抛出异常时,若是读取到可用数据和正常读取同样触发ChannelRead事件,只是最后会统一触发ExceptionCaught事件由用户进行处理。

至此,read事件框架分析完毕,下面咱们分析write事件的细节实现方法doWrite()。在此以前,先看filterOutboundMessage()方法对须要写的数据进行过滤。

    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }
            return newDirectBuffer(buf); // 非DirectBuf转为DirectBuf
        }
        if (msg instanceof FileRegion) {
            return msg;
        }
        throw new UnsupportedOperationException("...");
    }

 

可知,Netty支持的写数据类型只有两种:DirectBufferFileRegion。咱们再看这些数据怎么写到Channel上,也就是doWrite()方法:

    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = -1;
        boolean setOpWrite = false;
        for (;;) {
            Object msg = in.current();
            if (msg == null) {  // 数据已所有写完
                clearOpWrite();     // 清除OP_WRITE事件
                return;
            }

            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                int readableBytes = buf.readableBytes();
                if (readableBytes == 0) {
                    in.remove();
                    continue;
                }

                boolean done = false;
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    int localFlushedAmount = doWriteBytes(buf); // 模板方法,子类实现细节
                    if (localFlushedAmount == 0) {
                        // NIO在非阻塞模式下写操做可能返回0表示未写入数据
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    if (!buf.isReadable()) {
                        // ByteBuf不可读,此时数据已写完
                        done = true;
                        break;
                    }
                }
                
                in.progress(flushedAmount); // 记录进度
                if (done) {
                    in.remove();    // 完成时,清理缓冲区
                } else {
                    break;  // 跳出循环执行incompleteWrite()
                }
            } else if (msg instanceof FileRegion) {
                // ....
            } else {
                throw new Error();  // 其余类型不支持
            }
        }
        incompleteWrite(setOpWrite);
    }

代码中省略了对FileRegion的处理,FileRegion是Netty对NIO底层的FileChannel的封装,负责将File中的数据写入到WritableChannel中。FileRegion的默认实现是DefaultFileRegion,若是你很感兴趣它的实现,能够自行查阅。

咱们主要分析对ByteBuf的处理。doWrite的流程简洁明了,核心操做是模板方法doWriteBytes(buf),将ByteBuf中的数据写入到Channel,因为NIO底层的写操做返回已写入的数据量,在非阻塞模式下该值可能为0,此时会调用incompleteWrite()方法:

    protected final void incompleteWrite(boolean setOpWrite) {
        if (setOpWrite) {
            setOpWrite();   // 设置继续关心OP_WRITE事件
        } else {
            // 此时已进行写操做次数writeSpinCount,但并无写完
            Runnable flushTask = this.flushTask;
            if (flushTask == null) {
                flushTask = this.flushTask = (Runnable) () -> { flush(); };
            }
            // 再次提交一个flush()任务
            eventLoop().execute(flushTask);
        }
    }

 

该方法分两种状况处理,在上文提到的第一种状况(实际写0数据)下,设置SelectionKey继续关心OP_WRITE事件从而继续进行写操做;第二种状况下,也就是写操做进行次数达到配置中的writeSpinCount值但还没有写完,此时向EventLoop提交一个新的flush任务,此时能够响应其余请求,从而提交响应速度。这样的处理,不会使大数据的写操做占用所有资源而使其余请求得不到响应,可见这是一个较为公平的处理。这里引出一个问题:使用Netty如何搭建高性能文件服务器?
至此,已分析完对于Byte数据的read事件和doWrite细节的处理,接下里,继续分析NioSocketChannel,从而完善各事件框架的细节部分。

6.2.6 NioSocketChannel

NioSocketChannel做为Channel的最末端子类,实现了NioSocket相关的最底层细节实现,首先看doBind():

    protected void doBind(SocketAddress localAddress) throws Exception {
        doBind0(localAddress);
    }

    private void doBind0(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress);   // JDK版本1.7以上
        } else {
            javaChannel().socket().bind(localAddress);
        }
    }

 

这部分代码与NioServerSocketChannel中相同,委托给JDK的Channel进行绑定操做。
接着再看doConnect()和doFinishConnect()方法:

    protected boolean doConnect(SocketAddress remoteAddress, 
                                        SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = javaChannel().connect(remoteAddress);
            if (!connected) {
                // 设置关心OP_CONNECT事件,事件就绪时调用finishConnect()
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

    protected void doFinishConnect() throws Exception {
        if (!javaChannel().finishConnect()) {
            throw new Error();
        }
    }

 

JDK中的Channel在非阻塞模式下调用connect()方法时,会当即返回结果:成功创建链接返回True,操做还在进行时返回False。返回False时,须要在底层OP_CONNECT事件就绪时,调用finishConnect()方法完成链接操做。
再看doDisconnect()和doClose()方法:

    protected void doDisconnect() throws Exception {
        doClose();
    }

    protected void doClose() throws Exception {
        super.doClose();    // AbstractNioChannel中关于链接超时的处理
        javaChannel().close();
    }

 

而后看核心的doReadBytes()和doWriteXXX()方法:

    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
    }

    protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        return buf.readBytes(javaChannel(), expectedWrittenBytes);
    }

    protected long doWriteFileRegion(FileRegion region) throws Exception {
        final long position = region.transfered();
        return region.transferTo(javaChannel(), position);
    }

 

对于read和write操做,委托给ByteBuf处理,咱们将使用专门的一章,对这一部分细节进行完善,将在后面介绍。
NioSocketChannel最重要的部分是覆盖了父类的doWrite()方法,使用更高效的方式进行写操做,其代码以下:

    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            int size = in.size();
            if (size == 0) {
                clearOpWrite(); // 全部数据已写完,再也不关心OP_WRITE事件
                break;
            }
            long writtenBytes = 0;
            boolean done = false;
            boolean setOpWrite = false;

            ByteBuffer[] nioBuffers = in.nioBuffers();
            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();
            SocketChannel ch = javaChannel();

            switch (nioBufferCnt) {
                case 0: // 没有ByteBuffer,也就是只有FileRegion
                    super.doWrite(in);  // 使用父类方法进行普通处理
                    return;
                case 1: // 只有一个ByteBuffer,此时的处理等效于父类方法的处理
                    ByteBuffer nioBuffer = nioBuffers[0];
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final int localWrittenBytes = ch.write(nioBuffer);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
                default: // 多个ByteBuffer,采用gathering方法处理
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        // gathering方法,此时一次写多个ByteBuffer
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
            }
            in.removeBytes(writtenBytes);   // 清理缓冲区
            if (!done) {
                incompleteWrite(setOpWrite);    // 写操做并无完成
                break;
            }
        }
    }

 

在明白了父类的doWrite方法后,这段代码便容易理解,本段代码作的优化是:当输出缓冲区中有多个buffer时,采用Gathering Writes将数据从这些buffer写入到同一个channel。
在AbstractUnsafe对close事件框架的分析中,有一个prepareToClose()方法,进行关闭的必要处理并在必要时返回一个Executor执行doClose()操做,默认方法返回null,NioSocketChannelUnsafe覆盖了父类的实现,代码以下:

    protected Executor prepareToClose() {
            try {
                if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                    doDeregister(); // 取消选择键selectionKey
                    return GlobalEventExecutor.INSTANCE;
                }
            } catch (Throwable ignore) {
                //
            }
            return null;
        }

 

SO_LINGER表示Socket关闭的延时时间,在此时间内,内核将继续把TCP缓冲区的数据发送给对端且执行close操做的线程将阻塞直到数据发送完成。Netty的原则是I/O线程不能被阻塞,因此此时返回一个Executor用于执行阻塞的doClose()操做。doDeregister()取消选择键selectionKey是由于:延迟关闭期间, 若是selectionKey仍然关心OP_WRITE事件,而输出缓冲区又为null,这样write操做直接返回,不会再执行clearOpWrite()操做取消关心OP_WRITE事件,而Channel通常是可写的,这样OP_WRITE事件会不断就绪从而耗尽CPU,因此须要取消选择键删除注册的事件。
[1]: //upload-images.jianshu.io/upload_images/3288959-5a4be2f31620177d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240
[2]: http://img.blog.csdn.net/20160928165809260
[3]: https://github.com/netty/netty/issues/4435

 


做者:Hypercube
连接:https://www.jianshu.com/p/9258af254e1d
来源:简书
简书著做权归做者全部,任何形式的转载都请联系做者得到受权并注明出处。
做者:Hypercube 连接:https://www.jianshu.com/p/fffc18d33159 来源:简书 简书著做权归做者全部,任何形式的转载都请联系做者得到受权并注明出处。
相关文章
相关标签/搜索