EventLoop和EventLoopGroup

Netty框架的主要线程就是I/O线程,线程模型设计的好坏,决定了系统的吞吐量、并发性和安全性等架构质量属性。Netty的线程模型被精心地设计,既提高了框架的并发性能,又能在很大程度避免锁,局部实现了无锁化设计。java

线程模型

通常首先会想到的是经典的Reactor线程模型,尽管不一样的NIO框架对于Reactor模式的实现存在差别,但本质上仍是遵循了Reactor的基础线程模型。react

Reactor单线程模型

Reactor单线程模型,是指全部的I/O操做都在同一个NIO线程上面完成。数据库

NIO线程的职责以下:编程

  1. 做为NIO服务端,接收客户端的TCP链接;
  2. 做为NIO客户端,向服务端发起TCP链接;
  3. 读取通讯对端的请求或者应答消息;
  4. 向通讯对端发送消息请求或者应答消息。

因为Reactor模式使用的是异步非阻塞I/O,全部的I/O操做都不会致使阻塞,理论上一个线程能够独立处理全部I/O相关的操做。从架构层面看,一个NIO线程确实能够完成其承担的职责。例如,经过Acceptor类接收客户端的TCP链接请求消息,当链路创建成功以后,经过Dispatch将对应的ByteBuffer派发到指定的Handler上,进行消息解码。用户线程消息编码后经过NIO线程将消息发送给客户端。后端

在一些小容量应用场景下,可使用单线程模型。可是这对于高负载、大并发的应用场景却不合适,主要缘由以下:安全

  1. 一个NIO线程同时处理成百上千的链路,性能上没法支撑,即使NIO线程的CPU负荷达到100%,也没法知足海量消息的编码、解码、读取和发送。
  2. 当NIO线程负载太重以后,处理速度将变慢,这会致使大量客户端链接超时,超时以后每每会进行重发,这更加剧了NIO线程的负载,最终会致使大量消息积压和处理超时,成为系统的性能瓶颈。
  3. 可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会致使整个系统通讯模块不可用,不能接收和处理外部消息,形成节点故障。

Reactor多线程模型

Rector多线程模型与单线程模型最大的区别就是有一组NIO线程来处理I/O操做。网络

Reactor多线程模型的特色以下:多线程

  1. 有专门一个NIO线程——Acceptor线程用于监听服务端,接收客户端的TCP链接请求。
  2. 网络I/O操做——读、写等由一个NIO线程池负责,线程池能够采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送。
  3. 一个NIO线程能够同时处理N条链路,可是一个链路只对应一个NIO线程,防止发生并发操做问题。

在绝大多数场景下,Reactor多线程模型能够知足性能需求。可是,在个别特殊场景中,一个NIO线程负责监听和处理全部的客户端链接可能会存在性能问题。例如并发百万客户端链接,或者服务端须要对客户端握手进行安全认证,可是认证自己很是损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足的问题,为了解决性能问题,产生了第三种Reactor线程模型——主从Reactor多线程模型。架构

主从Reactor多线程模型

主从Reactor线程模型的特色是:服务端用于接收客户端链接的再也不是一个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP链接请求并处理完成后(可能包含接入认证等),将新建立的SocketChannel注册到I/O线程池(sub reactor线程池)的某个I/O线程上,由它负责SocketChannel的读写和编解码工做。Acceptor线程池仅仅用于客户端的登陆、握手和安全认证,一旦链路创建成功,就将链路注册到后端subReactor线程池的I/O线程上,由I/O线程负责后续的I/O操做。并发

利用主从NIO线程模型,能够解决一个服务端监听线程没法有效处理全部客户端链接的性能不足问题。所以,在Netty的官方demo中,推荐使用该线程模型。

Netty的线程模型

Netty的线程模型并非一成不变的,它实际取决于用户的启动参数配置。经过设置不一样的启动参数,Netty能够同时支持Reactor单线程模型、多线程模型和主从Reactor多线层模型。

下面让咱们经过一张原理图(图18-4)来快速了解Netty的线程模型。

能够经过Netty服务端启动代码来了解它的线程模型:

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer() {
                    @Override
                    public void initChannel(Channel ch)throws IOException{
                        ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
                        ch.pipeline().addLast(new NettyMessageEncoder());
                        ch.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(50));
                        ch.pipeline().addLast(new LoginAuthRespHandler());
                        ch.pipeline().addLast("HeartBeatHandler",new HeartBeatRespHandler());
                    }
                });

        // 绑定端口,同步等待成功
        b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();

服务端启动的时候,建立了两个NioEventLoopGroup,它们实际是两个独立的Reactor线程池。一个用于接收客户端的TCP链接另外一个用于处理I/O相关的读写操做,或者执行系统Task、定时任务Task等

Netty用于接收客户端请求的线程池职责以下。

(1)接收客户端TCP链接,初始化Channel参数;

(2)将链路状态变动事件通知给ChannelPipeline。

Netty处理I/O操做的Reactor线程池职责以下。

(1)异步读取通讯对端的数据报,发送读事件到ChannelPipeline;

(2)异步发送消息到通讯对端,调用ChannelPipeline的消息发送接口;

(3)执行系统调用Task;

(4)执行定时任务Task,例如链路空闲状态监测定时任务。

经过调整线程池的线程个数、是否共享线程池等方式,Netty的Reactor线程模型能够在单线程、多线程和主从多线程间切换,这种灵活的配置方式能够最大程度地知足不一样用户的个性化定制。

为了尽量地提高性能,Netty在不少地方进行了无锁化的设计,例如在I/O线程内部进行串行操做,避免多线程竞争致使的性能降低问题。表面上看,串行化设计彷佛CPU利用率不高,并发程度不够。可是,经过调整NIO线程池的线程参数,能够同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列—多个工做线程的模型性能更优。

它的设计原理如图:

Netty的NioEventLoop读取到消息以后,直接调用ChannelPipeline的fireChannelRead (Object msg)。只要用户不主动切换线程,一直都是由NioEventLoop调用用户的Handler,期间不进行线程切换。这种串行化处理方式避免了多线程操做致使的锁的竞争,从性能角度看是最优的。

最佳实践

Netty的多线程编程最佳实践以下。

(1)建立两个NioEventLoopGroup,用于逻辑隔离NIO Acceptor和NIO I/O线程。

(2)尽可能不要在ChannelHandler中启动用户线程(解码后用于将POJO消息派发到后端业务线程的除外)。

(3)解码要放在NIO线程调用的解码Handler中进行,不要切换到用户线程中完成消息的解码。

(4)若是业务逻辑操做很是简单,没有复杂的业务逻辑计算,没有可能会致使线程被阻塞的磁盘操做、数据库操做、网路操做等,能够直接在NIO线程上完成业务逻辑编排,不须要切换到用户线程。

(5)若是业务逻辑处理复杂,不要在NIO线程上完成,建议将解码后的POJO消息封装成Task,派发到业务线程池中由业务线程执行,以保证NIO线程尽快被释放,处理其余的I/O操做。

NioEventLoop

Netty的NioEventLoop并非一个纯粹的I/O线程,它除了负责I/O的读写以外,还兼顾处理如下两类任务:

  1. 系统Task:经过调用NioEventLoop的execute(Runnable task)方法实现,Netty有不少系统Task,建立它们的主要缘由是:I/O线程和用户线程同时操做网络资源时,为了防止并发操做致使的锁竞争,将用户线程的操做封装成Task放入消息队列中,由I/O线程负责执行,这样就实现了局部无锁化。
  2. 定时任务:经过调用NioEventLoop的schedule(Runnable command, long delay, TimeUnit unit)方法实现。

正是由于NioEventLoop具有多种职责,因此它的实现比较特殊,它并非个简单的Runnable。

它实现了EventLoop接口、EventExecutorGroup接口和ScheduledExecutorService接口,正是由于这种设计,致使NioEventLoop和其父类功能实现很是复杂。

NioEventLoop源码分析

Selector的初始化

做为NIO框架的Reactor线程,NioEventLoop须要处理网络I/O读写事件,所以它必须聚合一个多路复用器对象。

Selector的初始化很是简单,直接调用Selector.open()方法就能建立并打开一个新的Selector。Netty对Selector的selectedKeys进行了优化,用户能够经过io.netty.noKeySetOptimization开关决定是否启用该优化项。默认不打开selectedKeys优化功能。

    Selector selector;
    private SelectedSelectionKeySet selectedKeys;

    private final SelectorProvider provider;

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
        super(parent, executor, false);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        provider = selectorProvider;
        selector = openSelector();
    }

    private Selector openSelector() {
        final Selector selector;
        try {
            //经过provider.openSelector()建立并打开多路复用器
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
        //若是没有开启selectedKeys优化开关,就当即返回。
        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }
        //若是开启了优化开关,须要经过反射的方式从Selector实例中获取selectedKeys和publicSelectedKeys,
        //将上述两个成员变量设置为可写,经过反射的方式使用Netty构造的selectedKeys包装类selectedKeySet将原JDK的selectedKeys替换掉。
        try {
            SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

            Class<?> selectorImplClass =
                    Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());

            // Ensure the current selector implementation is what we can instrument.
            if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }

            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);

            selectedKeysField.set(selector, selectedKeySet);
            publicSelectedKeysField.set(selector, selectedKeySet);

            selectedKeys = selectedKeySet;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable t) {
            selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
        }

        return selector;
    }

run方法的实现

    @Override
    protected void run() {
        //全部的逻辑操做都在for循环体内进行,只有当NioEventLoop接收到退出指令的时候,才退出循环,不然一直执行下去,这也是通用的NIO线程实现方式。
        for (;;) {
            //首先须要将wakenUp还原为false,并将以前的wakeup状态保存到oldWakenUp变量中。
            oldWakenUp = wakenUp.getAndSet(false);
            try {
                //经过hasTasks()方法判断当前的消息队列中是否有消息还没有处理
                if (hasTasks()) {
                    //若是有则调用selectNow()方法当即进行一次select操做,看是否有准备就绪的Channel须要处理。
                    //Selector的selectNow()方法会当即触发Selector的选择操做,若是有准备就绪的Channel,则返回就绪Channel的集合,不然返回0。
                    //选择完成以后,再次判断用户是否调用了Selector的wakeup方法,若是调用,则执行selector.wakeup()操做。
                    selectNow();
                } else {
                    //执行select()方法,由Selector多路复用器轮询,看是否有准备就绪的Channel。
                    //取当前系统的纳秒时间,调用delayNanos()方法计算得到NioEventLoop中定时任务的触发时间。
                    //计算下一个将要触发的定时任务的剩余超时时间,将它转换成毫秒,为超时时间增长0.5毫秒的调整值。
                    //对剩余的超时时间进行判断,若是须要当即执行或者已经超时,则调用selector.selectNow()进行轮询操做,将selectCnt设置为1,并退出当前循环。
                    //将定时任务剩余的超时时间做为参数进行select操做,每完成一次select操做,对select计数器selectCnt加1。
                    //Select操做完成以后,须要对结果进行判断,若是存在下列任意一种状况,则退出当前循环:
                    //if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {break}
                    //1.有Channel处于就绪状态,selectedKeys不为0,说明有读写事件须要处理;
                    //2.oldWakenUp为true;
                    //3.系统或者用户调用了wakeup操做,唤醒当前的多路复用器;
                    //4.消息队列中有新的任务须要处理。
                    //若是本次Selector的轮询结果为空,也没有wakeup操做或是新的消息须要处理,则说明是个空轮询.
                    //有可能触发了JDK的epoll bug,它会致使Selector的空轮询,使I/O线程一直处于100%状态。
                    //截止到当前最新的JDK7版本,该bug仍然没有被彻底修复。因此Netty须要对该bug进行规避和修正。
                    //该Bug的修复策略以下:
                    //(1)对Selector的select操做周期进行统计;
                    //(2)每完成一次空的select操做进行一次计数;
                    //(3)在某个周期(例如100ms)内若是连续发生N次空轮询,说明触发了JDK NIO的epoll()死循环bug。
                    //监测到Selector处于死循环后,须要经过重建Selector的方式让系统恢复正常,重建步骤以下:
                    //(1)首先经过inEventLoop()方法判断是不是其余线程发起的rebuildSelector,
                    //若是由其余线程发起,为了不多线程并发操做Selector和其余资源,须要将rebuildSelector封装成Task,
                    //放到NioEventLoop的消息队列中,由NioEventLoop线程负责调用,这样就避免了多线程并发操做致使的线程安全问题。
                    //(2)调用openSelector方法建立并打开新的Selector
                    //(3)经过循环,将原Selector上注册的SocketChannel从旧的Selector上去注册,从新注册到新的Selector上,并将老的Selector关闭。
                    //过销毁旧的、有问题的多路复用器,使用新建的Selector,就能够解决空轮询Selector致使的I/O线程CPU占用100%的问题。
                    select();
                    //判断用户是否调用了Selector的wakeup方法
                    if (wakenUp.get()) {
                        //若是调用,则执行selector.wakeup()操做。
                        selector.wakeup();
                    }
                }

                cancelledKeys = 0;

                final long ioStartTime = System.nanoTime();
                needsToSelectAgain = false;
                //若是轮询到了处于就绪状态的SocketChannel,则须要处理网络I/O事件
                if (selectedKeys != null) {
                    processSelectedKeysOptimized(selectedKeys.flip());
                } else {
                    //因为默认未开启selectedKeys优化功能,因此会进入processSelectedKeysPlain分支执行。
                    //1.对SelectionKey进行保护性判断,若是为空则返回。
                    //2.获取SelectionKey的迭代器进行循环操做,经过迭代器获取SelectionKey和SocketChannel的附件对象,
                    //3.将已选择的选择键从迭代器中删除,防止下次被重复选择和处理
                    //4.对SocketChannel的附件类型进行判读,
                    //若是是AbstractNioChannel类型,说明它是NioServerSocketChannel或者NioSocketChannel,须要进行I/O读写相关的操做
                    //步骤以下:
                    //--首先从NioServerSocketChannel或者NioSocketChannel中获取其内部类Unsafe,判断当前选择键是否可用,
                    //--若是不可用,则调用Unsafe的close方法,释放链接资源。
                    //--若是选择键可用,则继续对网络操做位进行判断,以下:
                    //----若是是读或者链接操做,则调用Unsafe的read方法。此处Unsafe的实现是个多态
                    //对于NioServerSocketChannel,它的读操做就是接收客户端的TCP链接。
                    //对于NioSocketChannel,它的读操做就是从SocketChannel中读取ByteBuffer。
                    //----若是网络操做位为写,则说明有半包消息还没有发送完成,须要继续调用flush方法进行发送
                    //----若是网络操做位为链接状态,则须要对链接结果进行判读,在进行finishConnect判断以前,须要将网络操做位进行修改,注销掉SelectionKey.OP_CONNECT。
                    //若是它是NioTask,则对其进行类型转换,调用processSelectedKey进行处理。因为Netty自身没实现NioTask接口,因此一般状况下系统不会执行该分支,除非用户自行注册该Task到多路复用器。
                    processSelectedKeysPlain(selector.selectedKeys());
                }
                //因为NioEventLoop须要同时处理I/O事件和非I/O任务,为了保证二者都能获得足够的CPU时间被执行,Netty提供了I/O比例供用户定制。
                //若是I/O操做多于定时任务和Task,则能够将I/O比例调大,反之则调小,默认值为50%。
                //Task的执行时间根据本次I/O操做的执行时间计算得来。
                final long ioTime = System.nanoTime() - ioStartTime;
                final int ioRatio = this.ioRatio;//50%
                //处理完I/O事件以后,NioEventLoop须要执行非I/O操做的系统Task和定时任务
                //首先从定时任务消息队列中弹出消息进行处理,若是消息队列为空,则退出循环。
                //根据当前的时间戳进行判断,若是该定时任务已经或者正处于超时状态,则将其加入到执行Task Queue中,同时从延时队列中删除。
                //定时任务若是没有超时,说明本轮循环不须要处理,直接退出便可。
                //执行Task Queue中原有的任务和从延时队列中复制的已经超时或者正处于超时状态的定时任务
                //因为获取系统纳秒时间是个耗时的操做,每次循环都获取当前系统纳秒时间进行超时判断会下降性能。
                //为了提高性能,每执行60次循环判断一次,若是当前系统时间已经到了分配给非I/O操做的超时时间,则退出循环。
                //这是为了防止因为非I/O任务过多致使I/O操做被长时间阻塞。
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                //判断系统是否进入优雅停机状态,若是处于关闭状态。
                if (isShuttingDown()) {
                    //调用closeAll方法,释放资源。遍历获取全部的Channel,调用它的Unsafe.close()方法关闭全部链路,释放线程池、ChannelPipeline和ChannelHandler等资源。
                    closeAll();
                    //并让NioEventLoop线程退出循环,结束运行。
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unexpected exception in the selector loop.", t);

                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }
相关文章
相关标签/搜索