Reactor模式

什么是Reactor模式

Reactor模式是一种设计模式,它是基于事件驱动的,能够并发的处理多个服务请求,当请求抵达后,依据多路复用策略,同步的派发这些请求至相关的请求处理程序。vue

Reactor模式角色构成

在早先的论文An Object Behavioral Pattern for
Demultiplexing and Dispatching Handles for Synchronous Events
中Reactor模式主要有五大角色组成,分别以下:java

Handle:操做系统提供的一种资源,用于表示一个个的事件,在网络编程中能够是一个链接事件,一个读取事件,一个写入事件,Handle是事件产生的发源地
Synchronous Event Demultiplexer:本质上是一个系统调用,用于等待事件的发生,调用方在调用它的时候会被阻塞,一直阻塞到同步事件分离器上有事件产生为止
Initiation Dispatcher:定义了一些用于控制事件的调度方式的规范,提供对事件管理。它自己是整个事件处理器的核心所在,Initiation Dispatcher会经过Synchronous Event Demultiplexer来等待事件的发生。一旦事件发生,Initiation Dispatcher首先会分离出每个事件,而后调用事件处理器,最后调用相关的回调方法来处理这些事件
Event Handler:定义事件处理方法以供InitiationDispatcher回调使用
Concrete Event Handler:是事件处理器的实现。它自己实现了事件处理器所提供的各类回调方法,从而实现了特定于业务的逻辑。它本质上就是咱们所编写的一个个的处理器实现。react

img

Reactor模式实现流程

  1. 初始化 Initiation Dispatcher,而后将若干个Concrete Event Handler注册到 Initiation Dispatcher中,应用会标识出该事件处理器但愿Initiation Dispatcher在某些事件发生时向其发出通知
  2. Initiation Dispatcher 会要求每一个事件处理器向其传递内部的Handle,该Handle向操做系统标识了事件处理器
  3. 当全部的Concrete Event Handler都注册完毕后,就会启动 Initiation Dispatcher的事件循环,使用Synchronous Event Demultiplexer同步阻塞的等待事件的发生
  4. 当与某个事件源对应的Handle变为ready状态时,Synchronous Event Demultiplexer就会通知 Initiation Dispatcher
  5. Initiation Dispatcher会触发事件处理器的回调方法响应这个事件

img

Java NIO对Reactor的实现

在Java的NIO中,对Reactor模式有无缝的支持,即便用Selector类封装了操做系统提供的Synchronous Event Demultiplexer功能。Doug Lea(Java concurrent包的做者)在Scalable IO in Java中对此有很是详细的描述。概况来讲其主要流程以下:git

  1. 服务器端的Reactor线程对象会启动事件循环,并使用Selector来实现IO的多路复用
  2. 注册Acceptor事件处理器到Reactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样Reactor会监听客户端向服务器端发起的链接请求事件
  3. 客户端向服务器端发起链接请求,Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器经过accept()方法获得与这个客户端对应的链接(SocketChannel),而后将该链接所关注的READ/WRITE事件以及对应的READ/WRITE事件处理器注册到Reactor中,这样一来Reactor就会监听该链接的READ/WRITE事件了。
  4. 当Reactor监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理
  5. 每当处理完全部就绪的感兴趣的I/O事件后,Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理

Doug Lea 在Scalable IO in Java中分别描述了单线程的Reactor,多线程模式的Reactor以及多Reactor线程模式。数据库

单线程的Reactor,主要依赖Java NIO中的Channel,Buffer,Selector,SelectionKey。在单线程Reactor模式中,不只I/O操做在该Reactor线程上,连非I/O的业务操做也在该线程上进行处理了,这可能会大大延迟I/O请求的响应npm

img

在多线程Reactor中添加了一个工做线程池,将非I/O操做从Reactor线程中移出转交给工做者线程池来执行。这样可以提升Reactor线程的I/O响应,不至于由于一些耗时的业务逻辑而延迟对后面I/O请求的处理,可是全部的I/O操做依旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操做编程

img

多Reactor线程模式将“接受客户端的链接请求”和“与该客户端的通讯”分在了两个Reactor线程来完成。mainReactor完成接收客户端链接请求的操做,它不负责与客户端的通讯,而是将创建好的链接转交给subReactor线程来完成与客户端的通讯,这样一来就不会由于read()数据量太大而致使后面的客户端链接请求得不到即时处理的状况。而且多Reactor线程模式在海量的客户端并发请求的状况下,还能够经过实现subReactor线程池来将海量的链接分发给多个subReactor线程,在多核的操做系统中这能大大提高应用的负载和吞吐量设计模式

img

代码示例:服务器

// NIO selector 多路复用reactor线程模型
public class NIOReactor {

  // 处理业务操做的线程池
  private static ExecutorService workPool = Executors.newCachedThreadPool();

  // 封装了selector.select()等事件轮询的代码
  abstract class ReactorThread extends Thread {

    Selector selector;
    LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

    volatile boolean running = false;

    private ReactorThread() throws IOException {
      selector = Selector.open();
    }

    // Selector监听到有事件后,调用这个方法
    public abstract void handler(SelectableChannel channel) throws Exception;

    @Override
    public void run() {
      // 轮询Selector事件
      while (running) {
        try {
          // 执行队列中的任务
          Runnable task;
          while ((task = taskQueue.poll()) != null) {
            task.run();
          }
          selector.select(1000);
          // 获取查询结果
          Set<SelectionKey> selectionKeys = selector.selectedKeys();
          // 遍历查询结果
          Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
          while (keyIterator.hasNext()) {
            // 被封装的查询结果
            SelectionKey selectionKey = keyIterator.next();
            keyIterator.remove();
            int readyOps = selectionKey.readyOps();
            // 关注 Read 和 Accept两个事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0
                || readyOps == 0) {
              try {
                SelectableChannel channel = (SelectableChannel) selectionKey.attachment();
                channel.configureBlocking(false);
                handler(channel);
                // 若是关闭了,就取消这个KEY的订阅
                if (!channel.isOpen()) {
                  selectionKey.cancel();
                }

              } catch (Exception e) {
                // 若是有异常,就取消这个KEY的订阅
                selectionKey.cancel();
                e.printStackTrace();
              }
            }
          }

        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }

    private SelectionKey register(SelectableChannel channel) throws Exception {
      // 为何register要以任务提交的形式,让reactor线程去处理?
      // 由于线程在执行channel注册到selector的过程当中,会和调用selector.select()方法的线程争用同一把锁
      // 而select()方法实在eventLoop中经过while循环调用的,争抢的可能性很高,
      // 为了让register能更快的执行,就放到同一个线程来处理
      FutureTask<SelectionKey> futureTask =
          new FutureTask<>(() -> channel.register(selector, 0, channel));
      taskQueue.add(futureTask);
      return futureTask.get();
    }

    private void doStart() {
      if (!running) {
        running = true;
        start();
      }
    }
  }

  private ServerSocketChannel serverSocketChannel;

  // 一、建立多个线程 - accept处理reactor线程 (accept线程)
  private ReactorThread[] mainReactorThreads = new ReactorThread[1];

  // 二、建立多个线程 - io处理reactor线程  (I/O线程)
  private ReactorThread[] subReactorThreads = new ReactorThread[8];

  // 初始化线程组
  private void newGroup() throws IOException {
    // 建立mainReactor线程, 只负责处理serverSocketChannel
    for (int i = 0; i < mainReactorThreads.length; i++) {
      mainReactorThreads[i] =
          new ReactorThread() {
            AtomicInteger incr = new AtomicInteger(0);

            @Override
            public void handler(SelectableChannel channel) throws Exception {
              // 只作请求分发,不作具体的数据读取
              ServerSocketChannel ch = (ServerSocketChannel) channel;
              SocketChannel socketChannel = ch.accept();
              socketChannel.configureBlocking(false);
              // 收到链接创建的通知以后,分发给I/O线程继续去读取数据
              int index = incr.getAndIncrement() % subReactorThreads.length;
              ReactorThread workEventLoop = subReactorThreads[index];
              workEventLoop.doStart();
              SelectionKey selectionKey = workEventLoop.register(socketChannel);
              selectionKey.interestOps(SelectionKey.OP_READ);
              System.out.println(
                  Thread.currentThread().getName() + "收到新链接 : " + socketChannel.getRemoteAddress());
            }
          };
    }

    // 建立IO线程,负责处理客户端链接之后socketChannel的IO读写
    for (int i = 0; i < subReactorThreads.length; i++) {
      subReactorThreads[i] =
          new ReactorThread() {

            @Override
            public void handler(SelectableChannel channel) throws Exception {
              // work线程只负责处理IO处理,不处理accept事件
              SocketChannel ch = (SocketChannel) channel;
              ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
              while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                // 长链接状况下,须要手动判断数据有没有读取结束 (此处作一个简单的判断: 超过0字节就认为请求结束了)
                if (requestBuffer.position() > 0) break;
              }
              if (requestBuffer.position() == 0) return; // 若是没数据了, 则不继续后面的处理
              requestBuffer.flip();
              byte[] content = new byte[requestBuffer.limit()];
              requestBuffer.get(content);
              System.out.println(new String(content));
              System.out.println(
                  Thread.currentThread().getName() + "收到数据,来自:" + ch.getRemoteAddress());

              // TODO 业务操做 数据库、接口...
              workPool.submit(() -> {});

              // 响应结果 200
              String response =
                  "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World";
              ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
              while (buffer.hasRemaining()) {
                ch.write(buffer);
              }
            }
          };
    }
  }

  // 始化channel,而且绑定一个eventLoop线程
  private void initAndRegister() throws Exception {
    // 一、 建立ServerSocketChannel
    serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    // 二、 将serverSocketChannel注册到selector
    int index = new Random().nextInt(mainReactorThreads.length);
    mainReactorThreads[index].doStart();
    SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
    selectionKey.interestOps(SelectionKey.OP_ACCEPT);
  }

  // 绑定端口
  private void bind() throws IOException {
    //  一、 正式绑定端口,对外服务
    serverSocketChannel.bind(new InetSocketAddress(8080));
    System.out.println("启动完成,端口8080");
  }

  public static void main(String[] args) throws Exception {
    NIOReactor nioReactor = new NIOReactor();
    // 一、 建立main和sub两组线程
    nioReactor.newGroup();
    // 二、 建立serverSocketChannel,注册到mainReactor线程上的selector上
    nioReactor.initAndRegister();
    // 三、 为serverSocketChannel绑定端口
    nioReactor.bind();
  }
}

相关文章
相关标签/搜索