目录:html
Reactor(反应堆)和Proactor(前摄器)java
《I/O模型之三:两种高性能 I/O 设计模式 Reactor 和 Proactor》react
《【转】第8章 前摄器(Proactor):用于为异步事件多路分离和分派处理器的对象行为模式》数据库
《Java NIO系列教程(八)JDK AIO编程》-- java AIO的proactor模式编程
《Java NIO系列教程(七) selector原理 Epoll版的Selector》--java NIO的Reactor模式后端
《Netty中的三种Reactor(反应堆)》 设计模式
Netty的I/O线程NioEventLoop因为聚合了多路复用器Selector,能够同时并发处理成百上千个客户端SocketChannel。因为读写操做都是非阻塞的,这就能够充分提高I/O线程的运行效率,避免由频繁的I/O阻塞致使的线程挂起。另外,因为Netty采用了异步通讯模式,一个I/O线程能够并发处理N个客户端链接和读写操做,这从根本上解决了传统同步阻塞I/O一链接一线程模型,架构的性能、弹性伸缩能力和可靠性都获得了极大的提高。安全
常见的Reactor线程模型有三种,分别以下:网络
Netty是典型的Reactor模型结构,关于Reactor的详尽阐释,可参考POSA2,这里不作概念性的解释。而应用Java NIO构建Reactor模式,Doug Lea(就是那位让人无限景仰的大爷)在“Scalable IO in Java”中给了很好的阐述。这里截取其PPT中经典的图例说明 Reactor模式的典型实现:多线程
一、Reactor单线程模型
Reactor单线程模型,指的是全部的I/O操做都在同一个NIO线程上面完成,NIO线程的职责以下:
Reactor线程是个多面手,负责多路分离套接字,Accept新链接,并分派请求处处理器链中。该模型 适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,因此实际使用的很少。
对于一些小容量应用场景,可使用单线程模型,可是对于高负载、大并发的应用却不合适,主要缘由以下:
为了解决这些问题,演进出了Reactor多线程模型,下面咱们一块儿学习下Reactor多线程模型。
二、Reactor多线程模型
Reactor多线程模型与单线程模型最大区别就是有一组NIO线程处理I/O操做,它的特色以下:
在绝大多数场景下,Reactor多线程模型均可以知足性能需求;可是,在极特殊应用场景中,一个NIO线程负责监听和处理全部的客户端链接可能会存在性能问题。例如百万客户端并发链接,或者服务端须要对客户端的握手信息进行安全认证,认证自己很是损耗性能。这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型--主从Reactor多线程模型。
三、主从Reactor多线程模型
特色是:服务端用于接收客户端链接的再也不是1个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP链接请求处理完成后(可能包含接入认证等),将新建立的SocketChannel注册到I/O线程池(sub reactor线程池)的某个I/O线程上,由它负责SocketChannel的读写和编解码工做。
Acceptor线程池只用于客户端的登陆、握手和安全认证,一旦链路创建成功,就将链路注册到后端subReactor线程池的I/O线程上,有I/O线程负责后续的I/O操做。
第三种模型比起第二种模型,是将Reactor分红两部分,mainReactor负责监听server socket,accept新链接,并将创建的socket分派给subReactor。subReactor负责多路分离已链接的socket,读写网 络数据,对业务处理功能,其扔给worker线程池完成。一般,subReactor个数上可与CPU个数等同。
Netty的线程模型并发固定不变,经过在启动辅助类中建立不一样的EventLoopGroup实例并进行适当的参数配置,就能够支持上述三种Reactor线程模型。
Netty单线程模型服务端代码示例以下:
/** * Netty单线程模型服务端代码示例 * @param port */ public void bind(int port) { EventLoopGroup reactorGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(reactorGroup, reactorGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); //后面代码省略 } }); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { reactorGroup.shutdownGracefully(); } }
Netty多线程模型代码以下:
/** * Netty多线程模型代码 * @param port */ public void bind2(int port) { EventLoopGroup acceptorGroup = new NioEventLoopGroup(1); NioEventLoopGroup ioGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(acceptorGroup, ioGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); //后面代码省略 } }); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { acceptorGroup.shutdownGracefully(); ioGroup.shutdownGracefully(); } }
Netty主从线程模型代码以下:
/** * Netty主从线程模型代码 * @param port */ public void bind3(int port) { EventLoopGroup acceptorGroup = new NioEventLoopGroup(); NioEventLoopGroup ioGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(acceptorGroup, ioGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); //后面代码省略 } }); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { acceptorGroup.shutdownGracefully(); ioGroup.shutdownGracefully(); } }
说完Reacotr模型的三种形式,那么Netty是哪一种呢?其实,我还有一种Reactor模型的变种没说,那就是去掉线程池的第三种形式的变种,这也 是Netty NIO的默认模式。在实现上,Netty中的Boss类充当mainReactor,NioWorker类充当subReactor(默认 NioWorker的个数是Runtime.getRuntime().availableProcessors())。在处理新来的请求 时,NioWorker读完已收到的数据到ChannelBuffer中,以后触发ChannelPipeline中的ChannelHandler流。
Netty是事件驱动的,能够经过ChannelHandler链来控制执行流向。由于ChannelHandler链的执行过程是在 subReactor中同步的,因此若是业务处理handler耗时长,将严重影响可支持的并发数。这种模型适合于像Memcache这样的应用场景,但 对须要操做数据库或者和其余模块阻塞交互的系统就不是很合适。Netty的可扩展性很是好,而像ChannelHandler线程池化的须要,能够经过在 ChannelPipeline中添加Netty内置的ChannelHandler实现类–ExecutionHandler实现,对使用者来讲只是 添加一行代码而已。对于ExecutionHandler须要的线程池模型,Netty提供了两种可 选:1) MemoryAwareThreadPoolExecutor 可控制Executor中待处理任务的上限(超过上限时,后续进来的任务将被阻 塞),并可控制单个Channel待处理任务的上限;2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子类,它还能够保证同一Channel中处理的事件流的顺序性,这主要是控制事件在异步处 理模式下可能出现的错误的事件顺序,但它并不保证同一Channel中的事件都在一个线程中执行(一般也不必)。通常来 说,OrderedMemoryAwareThreadPoolExecutor 是个很不错的选择,固然,若是有须要,也能够DIY一个。