反应器设计模式(Reactor pattern),是一种基于事件驱动的设计模式。Reactor框架是ACE各个框架中最基础的一个框架,其余框架都或多或少地用到了Reactor框架。
在事件驱动的应用中,将一个或多个客户的服务请求分离(demultiplex)和调度(dispatch)给应用程序。在事件驱动的应用中,同步地、有序地处理同时接收的多个服务请求。
reactor模式与外观模式有点像。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。当一个主体发生改变时,全部依属体都获得通知。java
初始事件分发器(Initialization Dispatcher):用于管理Event Handler,定义注册、移除EventHandler等。Reactor模式的入口调用Synchronous Event Demultiplexer的select方法以阻塞等待事件返回,当阻塞等待返回时,事件发生时通知Initiation Dispatcher,而后Initiation Dispatcher调用event handler处理事件,即回调EventHandler中的handle_event()方法。react
同步(多路)事件分离器(Synchronous Event Demultiplexer):无限循环等待新事件的到来,一旦发现有新的事件到来,就会通知初始事件分发器去调取特定的事件处理器。编程
系统处理程序(Handles):操做系统中的句柄,是对资源在操做系统层面上的一种抽象,它能够是打开的文件、一个链接(Socket)、Timer等。因为Reactor模式通常使用在网络编程中,于是这里通常指Socket Handle,即一个网络链接(Connection,在Java NIO中的Channel)。这个Channel注册到Synchronous Event Demultiplexer中,以监听Handle中发生的事件,对ServerSocketChannnel能够是CONNECT事件,对SocketChannel能够是READ、WRITE、CLOSE事件等。设计模式
事件处理器(Event Handler): 定义事件处理方法,以供Initialization Dispatcher回调使用。网络
Concrete Event Handler :事件处理器的实际实现,并且绑定了一个Handle。由于在实际状况中,咱们每每不止一种事件处理器,所以这里将事件处理器接口和实现分开,与C++、Java这些高级语言中的多态相似。多线程
1)咱们注册Concrete Event Handler到Initiation Dispatcher中。
2)Initiation Dispatcher调用每一个Event Handler的get_handle接口获取其绑定的Handle。
3)Initiation Dispatcher调用handle_events开始事件处理循环。在这里,Initiation Dispatcher会将步骤2获取的全部Handle都收集起来,使用Synchronous Event Demultiplexer来等待这些Handle的事件发生。
4)当某个(或某几个)Handle的事件发生时,Synchronous Event Demultiplexer通知Initiation Dispatcher。
5)Initiation Dispatcher根据发生事件的Handle找出所对应的Handler。
6)Initiation Dispatcher调用Handler的handle_event方法处理事件。并发
对于客户端的因此请求,都又一个专门的线程去进行处理,这个线程无线循环去监听是否又客户的请求到来,一旦收到客户端的请求,就将其分发给响应的处理器进行处理。框架
考虑到工做线程的复用,将工做线程设计为线程池。socket
1)响应快,没必要为单个同步时间所阻塞,虽然Reactor自己依然是同步的;
2)编程相对简单,能够最大程度的避免复杂的多线程及同步问题,而且避免了多线程/进程的切换开销;
3)可扩展性,能够方便的经过增长Reactor实例个数来充分利用CPU资源;
4)可复用性,reactor框架自己与具体事件处理逻辑无关,具备很高的复用性;ide
1)相比传统的简单模型,Reactor增长了必定的复杂性,于是有必定的门槛,而且不易于调试。
2)Reactor模式须要底层的Synchronous Event Demultiplexer支持,好比Java中的Selector支持,操做系统的select系统调用支持,若是要本身实现Synchronous Event Demultiplexer可能不会有那么高效。
3) Reactor模式在IO读写数据时仍是在同一个线程中实现的,即便使用多个Reactor机制的状况下,那些共享一个Reactor的Channel若是出现一个长时间的数据读写,会影响这个Reactor中其余Channel的相应时间,好比在大文件传输时,IO操做就会影响其余Client的相应时间,于是对这种操做,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用Proactor模式。
reactor模式是javaNIO非堵塞技术的实现原理,咱们不只要知道其原理流程,还要知道其代码实现,固然这个reactor模式不只仅在NIO中实现,并且在redies等其余地方也出现过,说明这个模式仍是比较实用的,尤为是在多线程高并发的状况下使用。
import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.util.Iterator; import java.util.Set; /** * 反应器模式 * 用于解决多用户访问并发问题 * * 举个例子:餐厅服务问题 * * 传统线程池作法:来一个客人(请求)去一个服务员(线程) * 反应器模式作法:当客人点菜的时候,服务员就能够去招呼其余客人了,等客人点好了菜,直接招呼一声“服务员” * */ public class Reactor implements Runnable{ public final Selector selector; public final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException{ selector=Selector.open(); serverSocketChannel=ServerSocketChannel.open(); InetSocketAddress inetSocketAddress=new InetSocketAddress(InetAddress.getLocalHost(),port); serverSocketChannel.socket().bind(inetSocketAddress); serverSocketChannel.configureBlocking(false); //向selector注册该channel SelectionKey selectionKey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //利用selectionKey的attache功能绑定Acceptor 若是有事情,触发Acceptor selectionKey.attach(new Acceptor(this)); } @Override public void run() { try { while(!Thread.interrupted()){ selector.select(); Set<SelectionKey> selectionKeys= selector.selectedKeys(); Iterator<SelectionKey> it=selectionKeys.iterator(); //Selector若是发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。 while(it.hasNext()){ //来一个事件 第一次触发一个accepter线程 //之后触发SocketReadHandler SelectionKey selectionKey=it.next(); dispatch(selectionKey); selectionKeys.clear(); } } } catch (IOException e) { e.printStackTrace(); } } /** * 运行Acceptor或SocketReadHandler * @param key */ void dispatch(SelectionKey key) { Runnable r = (Runnable)(key.attachment()); if (r != null){ r.run(); } } }
import java.io.IOException; import java.nio.channels.SocketChannel; public class Acceptor implements Runnable{ private Reactor reactor; public Acceptor(Reactor reactor){ this.reactor=reactor; } @Override public void run() { try { SocketChannel socketChannel=reactor.serverSocketChannel.accept(); if(socketChannel!=null)//调用Handler来处理channel new SocketReadHandler(reactor.selector, socketChannel); } catch (IOException e) { e.printStackTrace(); } } }
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; public class SocketReadHandler implements Runnable{ private SocketChannel socketChannel; public SocketReadHandler(Selector selector,SocketChannel socketChannel) throws IOException{ this.socketChannel=socketChannel; socketChannel.configureBlocking(false); SelectionKey selectionKey=socketChannel.register(selector, 0); //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。 //参看dispatch(SelectionKey key) selectionKey.attach(this); //同时将SelectionKey标记为可读,以便读取。 selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } /** * 处理读取数据 */ @Override public void run() { ByteBuffer inputBuffer=ByteBuffer.allocate(1024); inputBuffer.clear(); try { socketChannel.read(inputBuffer); //激活线程池 处理这些request //requestHandle(new Request(socket,btt)); } catch (IOException e) { e.printStackTrace(); } } }