了解Netty的人多少都会知道Netty的高性能的一个缘由就是它是基于事件驱动的,而这一事件的原型就是Reactor模式。java
因此在学习EventLoop前,颇有必要先搞懂Reactor模式。react
本文目录:git
传统的服务器设计模式:github
先来简单的介绍下传统的服务器设计模式。设计模式
看从图例了解:服务器
传统的服务器设计模式是基于IO实现的。服务器在等待链接,及IO准备就绪前都会被阻塞。网络
代码示例以下:多线程
class Server implements Runnable { public void run() { try { ServerSocket ss = new ServerSocket(PORT); while (!Thread.interrupted()) new Thread(new Handler(ss.accept())).start(); } catch (IOException ex) { /* ... */ } } static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; } public void run() { try { byte[] input = new byte[MAX_INPUT]; socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); } catch (IOException ex) { /* ... */ } } private byte[] process(byte[] cmd) { /* ... */ } } }
传统的服务器模式的优点在于实现简便,相对于NIO的服务器,它的代码量更少,更直接。但它最大的缺点就是IO阻塞致使运行效率低下。socket
Reactor模式:ide
Reactor模式是利用NIO的多路复用而设计的一种基于事件驱动的服务器模式。主要的设计目的是经过分而治之的思想让服务器实现可扩容的目标。
Basic Reactor(单线程版本):
Basic Reactor是Reactor模式最基础的版本,能够说是定义了整个Reactor模式的大骨架,其余复杂的版本也是在此基础上演变而来。
深刻了解Basic Reactor是掌握Reactor模式的基本,所以咱们会用最多的内容去理解Basic Reactor。
不管是Reactor模式的哪些变化,基本上都离不开下列三种角色:
Reactor(反应堆):服务器启动的主入口
Acceptor(接收器):主要负责处理IO链接事件
Handler(处理器):负责处理IO读写以及业务逻辑处理等
先结合图例来了解下Reactor:
图中已经明显画出了Reactor和Acceptor的角色,而未画出的Handler部分就是黄色圆圈的部分(read,decode, compute, encode, send 构成了一个Handler的基本职能)
在经过代码来分析下:
1 package com.insaneXs.netty.reactor.basic; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.ServerSocketChannel; 9 import java.nio.channels.SocketChannel; 10 import java.util.Iterator; 11 import java.util.Set; 12 13 /** 14 * @Author: insaneXs 15 * @Description: 16 * @Date: Create at 2018-12-19 17 */ 18 public class Reactor implements Runnable{ 19 20 final Selector selector; 21 22 final ServerSocketChannel serverSocket; 23 24 Reactor(int port) throws Exception{ 25 26 //建立ServerSocketChannel,绑定端口,设置为非阻塞,选择器上注册ACCEPT事件 27 selector = Selector.open(); 28 serverSocket = ServerSocketChannel.open(); 29 30 serverSocket.bind(new InetSocketAddress(port)); 31 serverSocket.configureBlocking(false); 32 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); 33 34 sk.attach(new Acceptor()); 35 } 36 37 @Override 38 public void run() { 39 try { 40 while (!Thread.interrupted()) { 41 //阻塞,直到注册的事件发生 42 selector.select(); 43 Set selected = selector.selectedKeys(); 44 Iterator it = selected.iterator(); 45 while (it.hasNext()){ 46 //任务派发 47 dispatch((SelectionKey)(it.next())); 48 } 49 selected.clear(); 50 } 51 } catch (IOException ex) { 52 ex.printStackTrace(); 53 } 54 55 } 56 57 void dispatch(SelectionKey k) { 58 //经过将不一样的附件绑定到SelectionKey上,实现dispatch统一派发Acceptor和Handler的逻辑 59 Runnable r = (Runnable)(k.attachment()); 60 if (r != null) 61 r.run(); 62 } 63 64 class Acceptor implements Runnable{ 65 @Override 66 public void run() { 67 try { 68 //ACCEPT负责接收连接 69 SocketChannel sc = serverSocket.accept(); 70 if(sc != null) 71 new Handler(selector, sc); 72 } catch (IOException e) { 73 e.printStackTrace(); 74 } 75 } 76 } 77 78 class Handler implements Runnable{ 79 final SocketChannel socket; 80 81 final SelectionKey sk; 82 83 ByteBuffer input = ByteBuffer.allocate(1024); 84 ByteBuffer output = ByteBuffer.allocate(1024); 85 86 static final int READING = 0, SENDING = 1; 87 int state = READING; 88 89 Handler(Selector sel, SocketChannel c) throws IOException{ 90 socket = c; 91 c.configureBlocking(false); 92 // Optionally try first read now 93 //返回了新的SelectionKey,将Handler添加为SelectionKey的附件,先注册READ事件 94 sk = socket.register(sel, 0); 95 sk.attach(this); 96 sk.interestOps(SelectionKey.OP_READ); 97 sel.wakeup(); 98 } 99 100 boolean inputIsComplete() { 101 return true; 102 } 103 boolean outputIsComplete() { 104 return true; 105 } 106 void process() { 107 //DO SOME THING 108 } 109 110 @Override 111 public void run() { 112 try { 113 if (state == READING) read(); 114 else if (state == SENDING) send(); 115 } catch (IOException ex) { 116 ex.printStackTrace(); 117 } 118 } 119 120 void read() throws IOException { 121 socket.read(input); 122 if (inputIsComplete()) { 123 process(); 124 state = SENDING; 125 // Normally also do first write now 126 sk.interestOps(SelectionKey.OP_WRITE); 127 } 128 } 129 130 void send() throws IOException { 131 socket.write(output); 132 if (outputIsComplete()) sk.cancel(); 133 } 134 135 } 136 }
了解完Reactor中的角色分工,再看代码其实并不复杂。代码关键的部分也都加上了注释。
每一个角色的业务处理逻辑都是以run方法为入口,
Reactor中run方法处理的主要逻辑就是监听NIO的多路复用,并经过dispatch方法分发任务。
Acceptor中run方法处理的主要逻辑就是接收链接,并为处理读写作准备。
Handler中run方法处理的主要逻辑就是读写和业务逻辑的处理。
有几点值得注意的:
第一,这段代码最关键的地方就是在Reactor进行任务分发时,利用SelectionKey的Attach添加附件的方法实现了用同一入口分发给Acceptor和Handler(这是设计的比较巧妙的部分)。
第二,不管是哪一个角色都实现了Runnable,这也保证了即便是其余多线程版本,只须要修改部分代码,而不用动整个Reactor模式的骨架。
第三,咱们能够看到上面的代码都是直接调用run方法,而不是经过Thread.start方法来运行,说明Basic Reactor的处理过程确实是单线程下的。
另外提到一点就是Handler的构造函数中先是register的0,而后再设置SelectionKey的interestOps为OP_READ。这点在以前的Netty源码分析中,咱们也了解到,Netty正是这样的过程。
将代码转换成时序图,加深对代码的印象:
Basic Reactor优势与不足:
优势:利用了NIO的特性,能够仅用一条线程处理多个通道的链接处理。相较于传统的服务器模式,这样对资源的消耗更少。
不足:咱们能够看到不只IO的部分由Reactor的线程处理,连业务处理的逻辑一样是放在Reactor的线程中处理,这样可能就会致使Reactor线程积累愈来愈多的请求,致使效率降低。
MultiThreads版本的Reactor模型,正是为了解决上述的问题。
一样先经过图例来了解这个模式下,各个角色的关系:
这个图和Basic Reactor的区别是什么?咱们又该如何理解呢?
咱们能够看到以前的Handler处理的角色被一分为二,read,send(也就是IO的读写)和Basic Reactor中的模式不变,可是decode,compute,encode(也就是业务处理的逻辑)被拆出来,提交给ThreadPool运行。
新的Reactor模式对比Basic Reactor,其余代码不变,只是咱们修改了Handler,增长了一个新的角色,叫作Processor,做为负责处理业务逻辑的单元:
1 public class ThreadPooledHandler implements Runnable{ 2 final SocketChannel socket; 3 final SelectionKey sk; 4 ByteBuffer input = ByteBuffer.allocate(1024); 5 ByteBuffer output = ByteBuffer.allocate(1024); 6 static final int READING = 0, SENDING = 1; 7 static final int PROCESSING = 3; 8 int state = READING; 9 10 // uses util.concurrent thread pool 11 static ExecutorService pool = Executors.newFixedThreadPool(4); 12 13 ThreadPooledHandler(Selector sel, SocketChannel c) throws IOException { 14 socket = c; 15 c.configureBlocking(false); 16 // Optionally try first read now 17 //返回了新的SelectionKey,将Handler添加为SelectionKey的附件,先注册READ事件 18 sk = socket.register(sel, 0); 19 sk.attach(this); 20 sk.interestOps(SelectionKey.OP_READ); 21 sel.wakeup(); 22 } 23 24 boolean inputIsComplete() { 25 return true; 26 } 27 boolean outputIsComplete() { 28 return true; 29 } 30 31 void process() { 32 //DO SOME THING 33 } 34 35 @Override 36 public void run() { 37 try { 38 if (state == READING) read(); 39 else if (state == SENDING) send(); 40 } catch (IOException ex) { 41 ex.printStackTrace(); 42 } 43 } 44 45 synchronized void read() throws IOException { // ... 46 socket.read(input); 47 if (inputIsComplete()) { 48 state = PROCESSING; 49 pool.execute(new Processer()); 50 } 51 } 52 53 void send() throws IOException { 54 socket.write(output); 55 if (outputIsComplete()) sk.cancel(); 56 } 57 synchronized void processAndHandOff() { 58 process(); 59 state = SENDING; // or rebind attachment 60 sk.interestOps(SelectionKey.OP_WRITE); 61 } 62 63 //增长Processer角色,处理业务逻辑 64 class Processer implements Runnable { 65 public void run() { processAndHandOff(); } 66 } 67 68 }
为了方便看出变化,我将两个版本的代码放在一块儿,作了对比图:
最大的区别就是原先Handler中process方法被交给了Processer执行,而且在执行时,是提交给线程池去执行。而Handler负责的IO读写逻辑仍然在Reactor的线程中执行(只是非网络IO的业务逻辑部分在新的线程中执行)。
相对于BasicReactor,这个版本的Reactor能更好的利用现代多核CPU的性能。让一条线程负责处理IO,而其余线程执行业务逻辑。多路复用上监听的阻塞,并不会阻塞业务逻辑的执行。
主从复合的Reactor模型
多线程的Reactor模型处理能力已经很是的高效,可是IO的链接过程仍然多是个耗时的过程(好比SSL认证)。所以引出了一个新的变化——主从复合的Reactor模型。
先看图例:
和上一个版本比较,这个版本的Reactor区别主要是将Reactor拆分一个MainReactor(负责处理Accept事件)和多个SubReactor(负责处理IO读写事件)。
而MainReactor和SubReactor的关联只要是经过Acceptor。
咱们知道Reactor和Selector的关系是一对一的关系。一般一个Reactor由一条独立的线程执行。该线程在Reactor关联的Selector是监听事件。
所以这个模式下,当Accept在为链接进来的SocketChannel绑定Selector时,再也不是绑定到MainReactor对应的Selector中,而是绑定到其余Reactor对应的Selector上(对应其余线程)。
这也所以让MainReactor只负责执行ACCEPT,而SubReactor负责IO读写。也使得ACCEPT上费时的操做将不会影响IO读写和业务逻辑处理。
贴上代码:
增长SubReactor:
1 public class SubReactor implements Runnable{ 2 private final Selector selector; 3 4 public SubReactor() throws IOException { 5 selector = Selector.open(); 6 } 7 8 @Override 9 public void run() { 10 while(!Thread.interrupted()){ 11 try { 12 selector.select(); 13 Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 14 while(iter.hasNext()){ 15 SelectionKey sk = iter.next(); 16 ((Runnable)sk.attachment()).run(); 17 } 18 } catch (IOException e) { 19 e.printStackTrace(); 20 } 21 } 22 } 23 24 public Selector getSelector(){ 25 return selector; 26 } 27 }
SubReactor的代码和Basic Reactor中Reactor的代码很类似,由于不处理链接部分,因此没有ServerSocketChannel和绑定监听端口的操做。
接下来看MainReactor和Acceptor的代码:
1 package com.insaneXs.netty.reactor.multiple; 2 3 import com.insaneXs.netty.reactor.threadpooled.ThreadPooledHandler; 4 5 import java.io.IOException; 6 import java.net.InetSocketAddress; 7 import java.nio.channels.SelectionKey; 8 import java.nio.channels.Selector; 9 import java.nio.channels.ServerSocketChannel; 10 import java.nio.channels.SocketChannel; 11 import java.util.Iterator; 12 import java.util.Set; 13 14 /** 15 * @Author: insaneXs 16 * @Description: 17 * @Date: Create at 2018-12-21 18 */ 19 public class MainReactor implements Runnable{ 20 final Selector selector; 21 22 final ServerSocketChannel serverSocket; 23 24 private final static int SUB_REACTOR_COUNT = 3; 25 26 private final Selector[] selectors = new Selector[SUB_REACTOR_COUNT]; 27 28 MainReactor(int port) throws Exception{ 29 30 //建立ServerSocketChannel,绑定端口,设置为非阻塞,选择器上注册ACCEPT事件 31 selector = Selector.open(); 32 serverSocket = ServerSocketChannel.open(); 33 34 serverSocket.bind(new InetSocketAddress(port)); 35 serverSocket.configureBlocking(false); 36 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); 37 38 for(int i=0; i<selectors.length; i++){ 39 40 //建立SUB-REACTOR,并保存对应的Selector对象 41 SubReactor subReactor = new SubReactor(); 42 selectors[i] = subReactor.getSelector(); 43 //为SUB-REACTOR启动独立的线程 44 new Thread(subReactor).start(); 45 } 46 47 sk.attach(new Acceptor()); 48 } 49 50 @Override 51 public void run() { 52 try { 53 while (!Thread.interrupted()) { 54 //阻塞,直到注册的事件发生 55 selector.select(); 56 Set selected = selector.selectedKeys(); 57 Iterator it = selected.iterator(); 58 while (it.hasNext()){ 59 //任务派发 60 dispatch((SelectionKey)(it.next())); 61 } 62 selected.clear(); 63 } 64 } catch (IOException ex) { 65 ex.printStackTrace(); 66 } 67 68 } 69 70 void dispatch(SelectionKey k) { 71 //经过将不一样的附件绑定到SelectionKey上,实现dispatch统一派发Acceptor和Handler的逻辑 72 Runnable r = (Runnable)(k.attachment()); 73 if (r != null) 74 r.run(); 75 } 76 77 class Acceptor implements Runnable{ 78 private int idx = 0; 79 @Override 80 public void run() { 81 try { 82 //ACCEPT负责接收连接 83 SocketChannel sc = serverSocket.accept(); 84 if(sc != null)//将SocketChannel与SubReactor的Selector均匀绑定 85 new ThreadPooledHandler(selectors[idx], sc); 86 87 idx++; 88 if(idx == SUB_REACTOR_COUNT) 89 idx = 0; 90 } catch (IOException e) { 91 e.printStackTrace(); 92 } 93 } 94 } 95 96 97 98 }
作个对比图,比较下和以前的版本的差别:
MainReactor:
区别主要在MainReactor内部保存了一些SubReactor,在MainReactor被建立时,同时建立了几个SubReactor。而且建立线程独立的运行SubReactor。
再看看Acceptor:
两者Acceptor的区别就是当把Handler提交给线程池时,非主从复合结构的版本仍然是用一个Selector。而主从复合结构的Handler在处理时,用的多路复用器是SubReactor中的。所以分离出了ACCEPT和IO读写。
本文参考:Scalable IO in Java
本文代码:Github