html
疯狂创客圈,倾力推出:面试必备 + 面试必备 + 面试必备 的基础原理+实战 书籍 《Netty Zookeeper Redis 高并发实战》 java
你们好,我是 高并发的实战社群【疯狂创客圈】尼恩。Reactor模式很是重要,不管开发、仍是面试。react
本文的内容,在《Netty Zookeeper Redis 高并发实战》一书时,进行内容的完善和更新,而且进行的源码的升级。 博客和书不同,书更加层层升入、井井有条,请你们以书的内容为准。 具体请参考书的第四章 —— 鼎鼎大名的Reactor反应器模式 。面试
基础篇:netty源码 死磕3-编程
传说中神同样的Reactor反应器模式设计模式
1. 为何是Reactor模式
2. Reactor模式简介
3. 多线程IO的致命缺陷
4. 单线程Reactor模型
4.1. 什么是单线程Reactor呢?
4.2. 单线程Reactor的参考代码
4.3. 单线程模式的缺点:
5. 多线程的Reactor
5.1. 基于线程池的改进
5.2. 改进后的完整示意图
5.3. 多线程Reactor的参考代码
6. Reactor持续改进
7. Reactor编程的优势和缺点
7.1. 优势
7.2. 缺点tomcat
写多了代码的兄弟们都知道,JAVA代码因为处处面向接口及高度抽象,用到继承多态和设计模式,程序的组织不是按照正常的理解顺序来的,对代码跟踪非常个问题。因此,在阅读别人的源码时,若是不了解代码的组织方式,每每是晕头转向,不知在何处。尤为是阅读经典代码的时候,更是如此。服务器
反过来,若是先了解代码的设计模式,再来去代码,就会阅读的很轻松,不会那么难懂。网络
像netty这样的精品中的极品,确定也是须要先从设计模式入手的。netty的总体架构,基于了一个著名的模式——Reactor模式。Reactor模式,是高性能网络编程的必知必会模式。多线程
首先熟悉Reactor模式,必定是磨刀不误砍柴工。
Netty是典型的Reactor模型结构,关于Reactor的详尽阐释,本文站在巨人的肩膀上,借助 Doug Lea(就是那位让人无限景仰的大爷)的“Scalable IO in Java”中讲述的Reactor模式。
“Scalable IO in Java”的地址是:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
Reactor模式也叫反应器模式,大多数IO相关组件如Netty、Redis在使用的IO模式,为何须要这种模式,它是如何设计来解决高性能并发的呢?
最最原始的网络编程思路就是服务器用一个while循环,不断监听端口是否有新的套接字链接,若是有,那么就调用一个处理函数处理,相似:
while(true){ socket = accept(); handle(socket) }
这种方法的最大问题是没法并发,效率过低,若是当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量过低。
以后,想到了使用多线程,也就是很经典的connection per thread,每个链接用一个线程处理,相似:
package com.crazymakercircle.iodemo.base; import com.crazymakercircle.config.SystemConfig; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; class BasicModel implements Runnable { public void run() { try { ServerSocket ss = new ServerSocket(SystemConfig.SOCKET_SERVER_PORT); while (!Thread.interrupted()) new Thread(new Handler(ss.accept())).start(); //建立新线程来handle // or, single-threaded, or a thread pool } catch (IOException ex) { /* ... */ } } static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; } public void run() { try { byte[] input = new byte[SystemConfig.INPUT_SIZE]; socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); } catch (IOException ex) { /* ... */ } } private byte[] process(byte[] input) { byte[] output=null; /* ... */ return output; } } }
对于每个请求都分发给一个线程,每一个线程中都独自处理上面的流程。
tomcat服务器的早期版本确实是这样实现的。
必定程度上极大地提升了服务器的吞吐量,由于以前的请求在read阻塞之后,不会影响到后续的请求,由于他们在不一样的线程中。这也是为何一般会讲“一个线程只能对应一个socket”的缘由。另外有个问题,若是一个线程中对应多个socket链接不行吗?语法上确实能够,可是实际上没有用,每个socket都是阻塞的,因此在一个线程里只能处理一个socket,就算accept了多个也没用,前一个socket被阻塞了,后面的是没法被执行到的。
缺点在于资源要求过高,系统中建立线程是须要比较高的系统资源的,若是链接数过高,系统没法承受,并且,线程的反复建立-销毁也须要代价。
改进方法是:
采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。使用Reactor模式,对线程的数量进行控制,一个线程处理大量的事件。
Java的NIO模式的Selector网络通信,其实就是一个简单的Reactor模型。能够说是Reactor模型的朴素原型。
static class Server { public static void testServer() throws IOException { // 一、获取Selector选择器 Selector selector = Selector.open(); // 二、获取通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 3.设置为非阻塞 serverSocketChannel.configureBlocking(false); // 四、绑定链接 serverSocketChannel.bind(new InetSocketAddress(SystemConfig.SOCKET_SERVER_PORT)); // 五、将通道注册到选择器上,并注册的操做为:“接收”操做 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 六、采用轮询的方式,查询获取“准备就绪”的注册过的操做 while (selector.select() > 0) { // 七、获取当前选择器中全部注册的选择键(“已经准备就绪的操做”) Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { // 八、获取“准备就绪”的时间 SelectionKey selectedKey = selectedKeys.next(); // 九、判断key是具体的什么事件 if (selectedKey.isAcceptable()) { // 十、若接受的事件是“接收就绪” 操做,就获取客户端链接 SocketChannel socketChannel = serverSocketChannel.accept(); // 十一、切换为非阻塞模式 socketChannel.configureBlocking(false); // 十二、将该通道注册到selector选择器上 socketChannel.register(selector, SelectionKey.OP_READ); } else if (selectedKey.isReadable()) { // 1三、获取该选择器上的“读就绪”状态的通道 SocketChannel socketChannel = (SocketChannel) selectedKey.channel(); // 1四、读取数据 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int length = 0; while ((length = socketChannel.read(byteBuffer)) != -1) { byteBuffer.flip(); System.out.println(new String(byteBuffer.array(), 0, length)); byteBuffer.clear(); } socketChannel.close(); } // 1五、移除选择键 selectedKeys.remove(); } } // 七、关闭链接 serverSocketChannel.close(); } public static void main(String[] args) throws IOException { testServer(); } }
实际上的Reactor模式,是基于Java NIO的,在他的基础上,抽象出来两个组件——Reactor和Handler两个组件:
(1)Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含链接创建就绪、读就绪、写就绪等。
(2)Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。
以下图所示:
这是最简单的单Reactor单线程模型。Reactor线程是个多面手,负责多路分离套接字,Accept新链接,并分派请求到Handler处理器中。
下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差很少。Reactor和Hander 处于一条线程执行。
顺便说一下,能够将上图的accepter,看作是一种特殊的handler。
“Scalable IO in Java”,实现了一个单线程Reactor的参考代码,Reactor的代码以下:
package com.crazymakercircle.ReactorModel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { //Reactor初始化 selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); //非阻塞 serverSocket.configureBlocking(false); //分步处理,第一步,接收accept事件 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //attach callback object, Acceptor sk.attach(new Acceptor()); } public void run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { //Reactor负责dispatch收到的事件 dispatch((SelectionKey) (it.next())); } selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); //调用以前注册的callback对象 if (r != null) { r.run(); } } // inner class class Acceptor implements Runnable { public void run() { try { SocketChannel channel = serverSocket.accept(); if (channel != null) new Handler(selector, channel); } catch (IOException ex) { /* ... */ } } } }
Handler的代码以下:
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.config.SystemConfig; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; class Handler implements Runnable { final SocketChannel channel; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE); ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector selector, SocketChannel c) throws IOException { channel = c; c.configureBlocking(false); // Optionally try first read now sk = channel.register(selector, 0); //将Handler做为callback对象 sk.attach(this); //第二步,注册Read就绪事件 sk.interestOps(SelectionKey.OP_READ); selector.wakeup(); } boolean inputIsComplete() { /* ... */ return false; } boolean outputIsComplete() { /* ... */ return false; } void process() { /* ... */ return; } public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { /* ... */ } } void read() throws IOException { channel.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now //第三步,接收write就绪事件 sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { channel.write(output); //write完就结束了, 关闭select key if (outputIsComplete()) { sk.cancel(); } } }
这两段代码,是创建在JAVA NIO的基础上的,这两段代码建议必定要看懂。能够在IDE中去看源码,这样直观感受更佳。
若是对NIO的Seletor不彻底了解,影响到上面的代码阅读,请阅读疯狂创客圈的Java NIO死磕 文章。
一、 当其中某个 handler 阻塞时, 会致使其余全部的 client 的 handler 都得不到执行, 而且更严重的是, handler 的阻塞也会致使整个服务不能接收新的 client 请求(由于 acceptor 也被阻塞了)。 由于有这么多的缺陷, 所以单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,因此实际使用的很少。
二、所以,单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。
在线程Reactor模式基础上,作以下改进:
(1)将Handler处理器的执行放入线程池,多线程进行业务处理。
(2)而对于Reactor而言,能够仍为单个线程。若是服务器为多核的CPU,为充分利用系统资源,能够将Reactor拆分为两个线程。
一个简单的图以下:
下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差很少,只是更加详细。Reactor是一条独立的线程,Hander 处于线程池中执行。
“Scalable IO in Java”,的多线程Reactor的参考代码,是基于单线程作一个线程池的改进,改进的Handler的代码以下:
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.config.SystemConfig; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class MthreadHandler implements Runnable { final SocketChannel channel; final SelectionKey selectionKey; ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE); ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE); static final int READING = 0, SENDING = 1; int state = READING; ExecutorService pool = Executors.newFixedThreadPool(2); static final int PROCESSING = 3; MthreadHandler(Selector selector, SocketChannel c) throws IOException { channel = c; c.configureBlocking(false); // Optionally try first read now selectionKey = channel.register(selector, 0); //将Handler做为callback对象 selectionKey.attach(this); //第二步,注册Read就绪事件 selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } boolean inputIsComplete() { /* ... */ return false; } boolean outputIsComplete() { /* ... */ return false; } void process() { /* ... */ return; } public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { /* ... */ } } synchronized void read() throws IOException { // ... channel.read(input); if (inputIsComplete()) { state = PROCESSING; //使用线程pool异步执行 pool.execute(new Processer()); } } void send() throws IOException { channel.write(output); //write完就结束了, 关闭select key if (outputIsComplete()) { selectionKey.cancel(); } } synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment //process完,开始等待write就绪 selectionKey.interestOps(SelectionKey.OP_WRITE); } class Processer implements Runnable { public void run() { processAndHandOff(); } } }
Reactor 类没有大的变化,参考前面的代码。
对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。代码以下:
package com.crazymakercircle.ReactorModel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
class MthreadReactor implements Runnable
{
//subReactors集合, 一个selector表明一个subReactor
Selector[] selectors=new Selector[2];
int next = 0;
final ServerSocketChannel serverSocket;
MthreadReactor(int port) throws IOException
{ //Reactor初始化
selectors[0]=Selector.open();
selectors[1]= Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
//非阻塞
serverSocket.configureBlocking(false);
//分步处理,第一步,接收accept事件
SelectionKey sk =
serverSocket.register( selectors[0], SelectionKey.OP_ACCEPT);
//attach callback object, Acceptor
sk.attach(new Acceptor());
}
public void run()
{
try
{
while (!Thread.interrupted())
{
for (int i = 0; i <2 ; i++)
{
selectors[i].select();
Set selected = selectors[i].selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
{
//Reactor负责dispatch收到的事件
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
}
} catch (IOException ex)
{ /* ... */ }
}
void dispatch(SelectionKey k)
{
Runnable r = (Runnable) (k.attachment());
//调用以前注册的callback对象
if (r != null)
{
r.run();
}
}
class Acceptor { // ...
public synchronized void run() throws IOException
{
SocketChannel connection =
serverSocket.accept(); //主selector负责accept
if (connection != null)
{
new Handler(selectors[next], connection); //选个subReactor去负责接收到的connection
}
if (++next == selectors.length) next = 0;
}
}
}
1)响应快,没必要为单个同步时间所阻塞,虽然Reactor自己依然是同步的;
2)编程相对简单,能够最大程度的避免复杂的多线程及同步问题,而且避免了多线程/进程的切换开销;
3)可扩展性,能够方便的经过增长Reactor实例个数来充分利用CPU资源;
4)可复用性,reactor框架自己与具体事件处理逻辑无关,具备很高的复用性;
1)相比传统的简单模型,Reactor增长了必定的复杂性,于是有必定的门槛,而且不易于调试。
2)Reactor模式须要底层的Synchronous Event Demultiplexer支持,好比Java中的Selector支持,操做系统的select系统调用支持,若是要本身实现Synchronous Event Demultiplexer可能不会有那么高效。
3) Reactor模式在IO读写数据时仍是在同一个线程中实现的,即便使用多个Reactor机制的状况下,那些共享一个Reactor的Channel若是出现一个长时间的数据读写,会影响这个Reactor中其余Channel的相应时间,好比在大文件传输时,IO操做就会影响其余Client的相应时间,于是对这种操做,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用改进版的Reactor模式如Proactor模式。
在开启Netty源码前,上面的经典代码,必定要看懂哦!
Netty 亿级流量 高并发 IM后台 开源项目实战
Netty 源码、原理、JAVA NIO 原理
Java 面试题 一网打尽
疯狂创客圈 【 博客园 总入口 】