在前两篇《快速理解Linux网络I_O》、《java的I_O模型-BIO&NIO&AIO》两边中介绍了Linux下的I/O模型和java中的I/O模型,今天咱们介绍Reactor模型,并探究Netty的实现html
在互联网时代,咱们使用的软件基本上全是C/S架构,C/S架构的软件一个明显的好处就是:只要有网络,你能够在任何地方干同一件事。C/S架构能够抽象为以下模型:java
那服务器如何能快速的处理用户的请求呢?在我看来高性能服务器至少要知足以下几个需求:react
而知足如上需求的一个基础就是高性能的IO!编程
什么是Reactor模式?数组
两种I/O多路复用模式:Reactor和Proactor,两个与事件分离器有关的模式是Reactor和Proactor。Reactor模式采用同步IO,而Proactor采用异步IO。浏览器
在Reactor中,事件分离器负责等待文件描述符或socket为读写操做准备就绪,而后将就绪事件传递给对应的处理器,最后由处理器负责完成实际的读写工做。服务器
在Proactor模式中,处理器--或者兼任处理器的事件分离器,只负责发起异步读写操做。IO操做自己由操做系统来完成。传递给操做系统的参数须要包括用户定义的数据缓冲区地址和数据大小,操做系统才能从中获得写出操做所需数据,或写入从socket读到的数据。事件分离器捕获IO操做完成事件,而后将事件传递给对应处理器。网络
说人话的方式理解:多线程
Doug Lea是这样类比的架构
单线程版本Java NIO的支持:
Channels:与支持非阻塞读取的文件,套接字等的链接
Buffers:相似于数组的对象,可由Channels直接读取或写入
Selectors:通知一组通道中哪个有IO事件
SelectionKeys:维护IO事件状态和绑定
Reactor 代码以下
public class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); } @Override public void run() { while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { dispatch(selectionKey); } selectionKeys.clear(); } catch (IOException e) { e.printStackTrace(); } } } private void dispatch(SelectionKey selectionKey) { Runnable run = (Runnable) selectionKey.attachment(); if (run != null) { run.run(); } } class Acceptor implements Runnable { @Override public void run() { try { SocketChannel channel = serverSocketChannel.accept(); if (channel != null) { new Handler(selector, channel); } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { new Thread( new Reactor(1234) ).start(); } }
public class Handler implements Runnable{ private final static int DEFAULT_SIZE = 1024; private final SocketChannel socketChannel; private final SelectionKey seletionKey; private static final int READING = 0; private static final int SENDING = 1; private int state = READING; ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); public Handler(Selector selector, SocketChannel channel) throws IOException { this.socketChannel = channel; socketChannel.configureBlocking(false); this.seletionKey = socketChannel.register(selector, 0); seletionKey.attach(this); seletionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } @Override public void run() { if (state == READING) { read(); } else if (state == SENDING) { write(); } } private void write() { try { socketChannel.write(outputBuffer); } catch (IOException e) { e.printStackTrace(); } while (outIsComplete()) { seletionKey.cancel(); } } private void read() { try { socketChannel.read(inputBuffer); if (inputIsComplete()) { process(); System.out.println("接收到来自客户端(" + socketChannel.socket().getInetAddress().getHostAddress() + ")的消息:" + new String(inputBuffer.array())); seletionKey.attach(new Sender()); seletionKey.interestOps(SelectionKey.OP_WRITE); seletionKey.selector().wakeup(); } } catch (IOException e) { e.printStackTrace(); } } public boolean inputIsComplete() { return true; } public boolean outIsComplete() { return true; } public void process() { // do something... } class Sender implements Runnable { @Override public void run() { try { socketChannel.write(outputBuffer); } catch (IOException e) { e.printStackTrace(); } if (outIsComplete()) { seletionKey.cancel(); } } } }
这个模型和上面的NIO流程很相似,只是将消息相关处理独立到了Handler中去了!虽说到NIO一个线程就能够支持全部的IO处理。可是瓶颈也是显而易见的!若是这个客户端屡次进行请求,若是在Handler中的处理速度较慢,那么后续的客户端请求都会被积压,致使响应变慢!因此引入了Reactor多线程模型!
Reactor多线程模型就是将Handler中的IO操做和非IO操做分开,操做IO的线程称为IO线程,非IO操做的线程称为工做线程!这样的话,客户端的请求会直接被丢到线程池中,客户端发送请求就不会堵塞!
Reactor保持不变,仅须要改动Handler代码:
public class Handler implements Runnable{ private final static int DEFAULT_SIZE = 1024; private final SocketChannel socketChannel; private final SelectionKey seletionKey; private static final int READING = 0; private static final int SENDING = 1; private int state = READING; ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors()); private static final int PROCESSING = 3; private Selector selector; public Handler(Selector selector, SocketChannel channel) throws IOException { this.selector = selector; this.socketChannel = channel; socketChannel.configureBlocking(false); this.seletionKey = socketChannel.register(selector, 0); seletionKey.attach(this); seletionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } @Override public void run() { if (state == READING) { read(); } else if (state == SENDING) { write(); } } private void write() { try { socketChannel.write(outputBuffer); } catch (IOException e) { e.printStackTrace(); } while (outIsComplete()) { seletionKey.cancel(); } } private void read() { try { socketChannel.read(inputBuffer); if (inputIsComplete()) { process(); executorService.execute(new Processer()); } } catch (IOException e) { e.printStackTrace(); } } public boolean inputIsComplete() { return true; } public boolean outIsComplete() { return true; } public void process() { // do something... } class Sender implements Runnable { @Override public void run() { try { socketChannel.write(outputBuffer); } catch (IOException e) { e.printStackTrace(); } if (outIsComplete()) { seletionKey.cancel(); } } } synchronized void processAndHandOff() { process(); // or rebind attachment state = SENDING; seletionKey.interestOps(SelectionKey.OP_WRITE); selector.wakeup(); } class Processer implements Runnable { @Override public void run() { processAndHandOff(); } } }
主从Reactor多线程模型是将Reactor分红两部分,mainReactor负责监听server socket,accept新链接,并将创建的socket分派给subReactor。subReactor负责多路分离已链接的socket,读写网络数据,对业务处理功能,其扔给worker线程池完成。一般,subReactor个数上可与CPU个数等同:
Handler保持不变,仅须要改动Reactor代码:
public class Reactor { // also create threads Selector[] selectors; AtomicInteger next = new AtomicInteger(0); final ServerSocketChannel serverSocketChannel; private static ExecutorService sunReactors = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors()); private static final int PROCESSING = 3; public Reactor(int port) throws IOException { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); selectors = new Selector[4]; for (int i = 0; i < selectors.length; i++) { Selector selector = selectors[i]; serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); new Thread(()->{ while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { dispatch(selectionKey); } selectionKeys.clear(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } } private void dispatch(SelectionKey selectionKey) { Runnable run = (Runnable) selectionKey.attachment(); if (run != null) { run.run(); } } class Acceptor implements Runnable { @Override public void run() { try { SocketChannel channel = serverSocketChannel.accept(); if (channel != null) { sunReactors.execute(new Handler(selectors[next.getAndIncrement() % selectors.length], channel)); } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { new Reactor(1234); } }
以上是三种不一样的设计思路,接下来看一下Netty这个一个高性能NIO框架,其是如何实现Reactor模型的!
public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(serverHandler); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
咱们从Netty服务器代码来看,与Reactor模型进行对应!