Netty中的三种Reactor(反应堆)

目录:html

Reactor(反应堆)和Proactor(前摄器)java

I/O模型之三:两种高性能 I/O 设计模式 Reactor 和 Proactorreact

【转】第8章 前摄器(Proactor):用于为异步事件多路分离和分派处理器的对象行为模式数据库

Java NIO系列教程(八)JDK AIO编程》-- java AIO的proactor模式编程

Java NIO系列教程(七) selector原理 Epoll版的Selector》--java NIO的Reactor模式后端

Netty中的三种Reactor(反应堆)》 设计模式

 

Netty的I/O线程NioEventLoop因为聚合了多路复用器Selector,能够同时并发处理成百上千个客户端SocketChannel。因为读写操做都是非阻塞的,这就能够充分提高I/O线程的运行效率,避免由频繁的I/O阻塞致使的线程挂起。另外,因为Netty采用了异步通讯模式,一个I/O线程能够并发处理N个客户端链接和读写操做,这从根本上解决了传统同步阻塞I/O一链接一线程模型,架构的性能、弹性伸缩能力和可靠性都获得了极大的提高。安全

高效的Reactor线程模型

常见的Reactor线程模型有三种,分别以下:网络

  1. Reactor单线程模型;
  2. Reactor多线程模型;
  3. 主从Reactor多线程模型;

Netty是典型的Reactor模型结构,关于Reactor的详尽阐释,可参考POSA2,这里不作概念性的解释。而应用Java NIO构建Reactor模式,Doug Lea(就是那位让人无限景仰的大爷)在“Scalable IO in Java”中给了很好的阐述。这里截取其PPT中经典的图例说明 Reactor模式的典型实现:多线程

一、Reactor单线程模型

Reactor单线程模型,指的是全部的I/O操做都在同一个NIO线程上面完成,NIO线程的职责以下:

  1. 做为NIO服务端,接收客户端的TCP链接;
  2. 做为NIO客户端,向服务端发起TCP链接;
  3. 读取通讯对端的请求或者应答消息;
  4. 向通讯对端发送消息请求或者应答消息;

Reactor线程是个多面手,负责多路分离套接字,Accept新链接,并分派请求处处理器链中。该模型 适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,因此实际使用的很少。

对于一些小容量应用场景,可使用单线程模型,可是对于高负载、大并发的应用却不合适,主要缘由以下:

  1. 一个NIO线程同时处理成百上千的链路,性能上没法支撑。即使NIO线程的CPU负荷达到100%,也没法知足海量消息的编码、解码、读取和发送;
  2. 当NIO线程负载太重以后,处理速度将变慢,这会致使大量客户端链接超时,超时以后每每进行重发,这更加剧了NIO线程的负载,最终致使大量消息积压和处理超时,NIO线程会成为系统的性能瓶颈;
  3. 可靠性问题。一旦NIO线程意外跑飞,或者进入死循环,会致使整个系统通信模块不可用,不能接收和处理外部信息,形成节点故障。

为了解决这些问题,演进出了Reactor多线程模型,下面咱们一块儿学习下Reactor多线程模型。

二、Reactor多线程模型

Reactor多线程模型与单线程模型最大区别就是有一组NIO线程处理I/O操做,它的特色以下:

  1. 有一个专门的NIO线程--acceptor新城用于监听服务端,接收客户端的TCP链接请求;
  2. 网络I/O操做--读、写等由一个NIO线程池负责,线程池能够采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;
  3. 1个NIO线程能够同时处理N条链路,可是1个链路只对应1个NIO线程,防止发生并发操做问题。

在绝大多数场景下,Reactor多线程模型均可以知足性能需求;可是,在极特殊应用场景中,一个NIO线程负责监听和处理全部的客户端链接可能会存在性能问题。例如百万客户端并发链接,或者服务端须要对客户端的握手信息进行安全认证,认证自己很是损耗性能。这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型--主从Reactor多线程模型。

三、主从Reactor多线程模型

特色是:服务端用于接收客户端链接的再也不是1个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP链接请求处理完成后(可能包含接入认证等),将新建立的SocketChannel注册到I/O线程池(sub reactor线程池)的某个I/O线程上,由它负责SocketChannel的读写和编解码工做。

Acceptor线程池只用于客户端的登陆、握手和安全认证,一旦链路创建成功,就将链路注册到后端subReactor线程池的I/O线程上,有I/O线程负责后续的I/O操做。

第三种模型比起第二种模型,是将Reactor分红两部分,mainReactor负责监听server socket,accept新链接,并将创建的socket分派给subReactor。subReactor负责多路分离已链接的socket,读写网 络数据,对业务处理功能,其扔给worker线程池完成。一般,subReactor个数上可与CPU个数等同。

 

NioEventLoopGroup 与 Reactor 线程模型的对应

Netty的线程模型并发固定不变,经过在启动辅助类中建立不一样的EventLoopGroup实例并进行适当的参数配置,就能够支持上述三种Reactor线程模型。

Netty单线程模型服务端代码示例以下:

/**
     * Netty单线程模型服务端代码示例
     * @param port
     */
    public void bind(int port) {
        EventLoopGroup reactorGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(reactorGroup, reactorGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("http-codec", new HttpServerCodec());
                        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        //后面代码省略
                    }
                });
        
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reactorGroup.shutdownGracefully();
        }
    }

Netty多线程模型代码以下:

/**
     * Netty多线程模型代码
     * @param port
     */
    public void bind2(int port) {
        EventLoopGroup acceptorGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup ioGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptorGroup, ioGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("http-codec", new HttpServerCodec());
                        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        //后面代码省略
                    }
                });
        
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            acceptorGroup.shutdownGracefully();
            ioGroup.shutdownGracefully();
        }
    }

Netty主从线程模型代码以下:

/**
     * Netty主从线程模型代码
     * @param port
     */
    public void bind3(int port) {
        EventLoopGroup acceptorGroup = new NioEventLoopGroup();
        NioEventLoopGroup ioGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptorGroup, ioGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("http-codec", new HttpServerCodec());
                        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        //后面代码省略
                    }
                });
        
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            acceptorGroup.shutdownGracefully();
            ioGroup.shutdownGracefully();
        }
    }

 

说完Reacotr模型的三种形式,那么Netty是哪一种呢?其实,我还有一种Reactor模型的变种没说,那就是去掉线程池的第三种形式的变种,这也 是Netty NIO的默认模式。在实现上,Netty中的Boss类充当mainReactor,NioWorker类充当subReactor(默认 NioWorker的个数是Runtime.getRuntime().availableProcessors())。在处理新来的请求 时,NioWorker读完已收到的数据到ChannelBuffer中,以后触发ChannelPipeline中的ChannelHandler流。

Netty是事件驱动的,能够经过ChannelHandler链来控制执行流向。由于ChannelHandler链的执行过程是在 subReactor中同步的,因此若是业务处理handler耗时长,将严重影响可支持的并发数。这种模型适合于像Memcache这样的应用场景,但 对须要操做数据库或者和其余模块阻塞交互的系统就不是很合适。Netty的可扩展性很是好,而像ChannelHandler线程池化的须要,能够经过在 ChannelPipeline中添加Netty内置的ChannelHandler实现类–ExecutionHandler实现,对使用者来讲只是 添加一行代码而已。对于ExecutionHandler须要的线程池模型,Netty提供了两种可 选:1) MemoryAwareThreadPoolExecutor 可控制Executor中待处理任务的上限(超过上限时,后续进来的任务将被阻 塞),并可控制单个Channel待处理任务的上限;2) OrderedMemoryAwareThreadPoolExecutor 是  MemoryAwareThreadPoolExecutor 的子类,它还能够保证同一Channel中处理的事件流的顺序性,这主要是控制事件在异步处 理模式下可能出现的错误的事件顺序,但它并不保证同一Channel中的事件都在一个线程中执行(一般也不必)。通常来 说,OrderedMemoryAwareThreadPoolExecutor 是个很不错的选择,固然,若是有须要,也能够DIY一个。