Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式

原创文章,同步发自做者我的博客http://www.jasongj.com/java/nio_reactor/java

Java I/O模型

同步 vs. 异步

同步I/O 每一个请求必须逐个地被处理,一个请求的处理会致使整个流程的暂时等待,这些事件没法并发地执行。用户线程发起I/O请求后须要等待或者轮询内核I/O操做完成后才能继续执行。react

异步I/O 多个请求能够并发地执行,一个请求或者任务的执行不会致使整个流程的暂时等待。用户线程发起I/O请求后仍然继续执行,当内核I/O操做完成后会通知用户线程,或者调用用户线程注册的回调函数。ios

阻塞 vs. 非阻塞

阻塞 某个请求发出后,因为该请求操做须要的条件不知足,请求操做一直阻塞,不会返回,直到条件知足。缓存

非阻塞 请求发出后,若该请求须要的条件不知足,则当即返回一个标志信息告知条件不知足,而不会一直等待。通常须要经过循环判断请求条件是否知足来获取请求结果。安全

须要注意的是,阻塞并不等价于同步,而非阻塞并不是等价于异步。事实上这两组概念描述的是I/O模型中的两个不一样维度。服务器

同步和异步着重点在于多个任务执行过程当中,后发起的任务是否必须等先发起的任务完成以后再进行。而无论先发起的任务请求是阻塞等待完成,仍是当即返回经过循环等待请求成功。网络

而阻塞和非阻塞重点在于请求的方法是否当即返回(或者说是否在条件不知足时被阻塞)。多线程

Unix下五种I/O模型

Unix 下共有五种 I/O 模型:并发

  • 阻塞 I/O
  • 非阻塞 I/O
  • I/O 多路复用(select和poll)
  • 信号驱动 I/O(SIGIO)
  • 异步 I/O(Posix.1的aio_系列函数)

阻塞I/O

如上文所述,阻塞I/O下请求没法当即完成则保持阻塞。阻塞I/O分为以下两个阶段。dom

  • 阶段1:等待数据就绪。网络 I/O 的状况就是等待远端数据陆续抵达;磁盘I/O的状况就是等待磁盘数据从磁盘上读取到内核态内存中。
  • 阶段2:数据拷贝。出于系统安全,用户态的程序没有权限直接读取内核态内存,所以内核负责把内核态内存中的数据拷贝一份到用户态内存中。

非阻塞I/O

非阻塞I/O请求包含以下三个阶段

  • socket设置为 NONBLOCK(非阻塞)就是告诉内核,当所请求的I/O操做没法完成时,不要将线程睡眠,而是返回一个错误码(EWOULDBLOCK) ,这样请求就不会阻塞。
  • I/O操做函数将不断的测试数据是否已经准备好,若是没有准备好,继续测试,直到数据准备好为止。整个I/O 请求的过程当中,虽然用户线程每次发起I/O请求后能够当即返回,可是为了等到数据,仍须要不断地轮询、重复请求,消耗了大量的 CPU 的资源。
  • 数据准备好了,从内核拷贝到用户空间。

通常不多直接使用这种模型,而是在其余I/O模型中使用非阻塞I/O 这一特性。这种方式对单个I/O 请求意义不大,但给I/O多路复用提供了条件。

I/O多路复用(异步阻塞 I/O)

I/O多路复用会用到select或者poll函数,这两个函数也会使线程阻塞,可是和阻塞I/O所不一样的是,这两个函数能够同时阻塞多个I/O操做。并且能够同时对多个读操做,多个写操做的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操做函数。

从流程上来看,使用select函数进行I/O请求和同步阻塞模型没有太大的区别,甚至还多了添加监视Channel,以及调用select函数的额外操做,增长了额外工做。可是,使用 select之后最大的优点是用户能够在一个线程内同时处理多个Channel的I/O请求。用户能够注册多个Channel,而后不断地调用select读取被激活的Channel,便可达到在同一个线程内同时处理多个I/O请求的目的。而在同步阻塞模型中,必须经过多线程的方式才能达到这个目的。

调用select/poll该方法由一个用户态线程负责轮询多个Channel,直到某个阶段1的数据就绪,再通知实际的用户线程执行阶段2的拷贝。 经过一个专职的用户态线程执行非阻塞I/O轮询,模拟实现了阶段一的异步化。

信号驱动I/O(SIGIO)

首先咱们容许socket进行信号驱动I/O,并安装一个信号处理函数,线程继续运行并不阻塞。当数据准备好时,线程会收到一个SIGIO 信号,能够在信号处理函数中调用I/O操做函数处理数据。

异步I/O

调用aio_read 函数,告诉内核描述字,缓冲区指针,缓冲区大小,文件偏移以及通知的方式,而后当即返回。当内核将数据拷贝到缓冲区后,再通知应用程序。因此异步I/O模式下,阶段1和阶段2所有由内核完成,完成不须要用户线程的参与。

几种I/O模型对比

除异步I/O外,其它四种模型的阶段2基本相同,都是从内核态拷贝数据到用户态。区别在于阶段1不一样。前四种都属于同步I/O。

Java中四种I/O模型

上一章所述Unix中的五种I/O模型,除信号驱动I/O外,Java对其它四种I/O模型都有所支持。其中Java最先提供的blocking I/O便是阻塞I/O,而NIO便是非阻塞I/O,同时经过NIO实现的Reactor模式便是I/O复用模型的实现,经过AIO实现的Proactor模式便是异步I/O模型的实现。

从IO到NIO

面向流 vs. 面向缓冲

Java IO是面向流的,每次从流(InputStream/OutputStream)中读一个或多个字节,直到读取完全部字节,它们没有被缓存在任何地方。另外,它不能先后移动流中的数据,如需先后移动处理,须要先将其缓存至一个缓冲区。

Java NIO面向缓冲,数据会被读取到一个缓冲区,须要时能够在缓冲区中先后移动处理,这增长了处理过程的灵活性。但与此同时在处理缓冲区前须要检查该缓冲区中是否包含有所须要处理的数据,并须要确保更多数据读入缓冲区时,不会覆盖缓冲区内还没有处理的数据。

阻塞 vs. 非阻塞

Java IO的各类流是阻塞的。当某个线程调用read()或write()方法时,该线程被阻塞,直到有数据被读取到或者数据彻底写入。阻塞期间该线程没法处理任何其它事情。

Java NIO为非阻塞模式。读写请求并不会阻塞当前线程,在数据可读/写前当前线程能够继续作其它事情,因此一个单独的线程能够管理多个输入和输出通道。

选择器(Selector)

Java NIO的选择器容许一个单独的线程同时监视多个通道,能够注册多个通道到同一个选择器上,而后使用一个单独的线程来“选择”已经就绪的通道。这种“选择”机制为一个单独线程管理多个通道提供了可能。

零拷贝

Java NIO中提供的FileChannel拥有transferTo和transferFrom两个方法,可直接把FileChannel中的数据拷贝到另一个Channel,或者直接把另一个Channel中的数据拷贝到FileChannel。该接口常被用于高效的网络/文件者数据传输和大文件拷贝。在操做系统支持的状况下,经过该方法传输数据并不须要将源数据从内核态拷贝到用户态,再从用户态拷贝到目标通道的内核态,同时也避免了两次用户态和内核态间的上下文切换,也即便用了“零拷贝”,因此其性能通常高于Java IO中提供的方法。

使用FileChannel的零拷贝将本地文件内容传输到网络的示例代码以下所示。

public class NIOClient {

  public static void main(String[] args) throws IOException, InterruptedException {
    SocketChannel socketChannel = SocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(1234);
    socketChannel.connect(address);

    RandomAccessFile file = new RandomAccessFile(
        NIOClient.class.getClassLoader().getResource("test.txt").getFile(), "rw");
    FileChannel channel = file.getChannel();
    channel.transferTo(0, channel.size(), socketChannel);
    channel.close();
    file.close();
    socketChannel.close();
  }
}

阻塞I/O下的服务器实现

单线程逐个处理全部请求

使用阻塞I/O的服务器,通常使用循环,逐个接受链接请求并读取数据,而后处理下一个请求。

public class IOServer {

  private static final Logger LOGGER = LoggerFactory.getLogger(IOServer.class);

  public static void main(String[] args) {
    ServerSocket serverSocket = null;
    try {
      serverSocket = new ServerSocket();
      serverSocket.bind(new InetSocketAddress(2345));
    } catch (IOException ex) {
      LOGGER.error("Listen failed", ex);
      return;
    }
    try{
      while(true) {
        Socket socket = serverSocket.accept();
        InputStream inputstream = socket.getInputStream();
        LOGGER.info("Received message {}", IOUtils.toString(new InputStreamReader(inputstream)));
      }
    } catch(IOException ex) {
      try {
        serverSocket.close();
      } catch (IOException e) {
      }
      LOGGER.error("Read message failed", ex);
    }
  }
}

为每一个请求建立一个线程

上例使用单线程逐个处理全部请求,同一时间只能处理一个请求,等待I/O的过程浪费大量CPU资源,同时没法充分使用多CPU的优点。下面是使用多线程对阻塞I/O模型的改进。一个链接创建成功后,建立一个单独的线程处理其I/O操做。

public class IOServerMultiThread {

  private static final Logger LOGGER = LoggerFactory.getLogger(IOServerMultiThread.class);

  public static void main(String[] args) {
    ServerSocket serverSocket = null;
    try {
      serverSocket = new ServerSocket();
      serverSocket.bind(new InetSocketAddress(2345));
    } catch (IOException ex) {
      LOGGER.error("Listen failed", ex);
      return;
    }
    try{
      while(true) {
        Socket socket = serverSocket.accept();
        new Thread( () -> {
          try{
            InputStream inputstream = socket.getInputStream();
            LOGGER.info("Received message {}", IOUtils.toString(new InputStreamReader(inputstream)));
          } catch (IOException ex) {
            LOGGER.error("Read message failed", ex);
          }
        }).start();
      }
    } catch(IOException ex) {
      try {
        serverSocket.close();
      } catch (IOException e) {
      }
      LOGGER.error("Accept connection failed", ex);
    }
  }
}

使用线程池处理请求

为了防止链接请求过多,致使服务器建立的线程数过多,形成过多线程上下文切换的开销。能够经过线程池来限制建立的线程数,以下所示。

public class IOServerThreadPool {

  private static final Logger LOGGER = LoggerFactory.getLogger(IOServerThreadPool.class);

  public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    ServerSocket serverSocket = null;
    try {
      serverSocket = new ServerSocket();
      serverSocket.bind(new InetSocketAddress(2345));
    } catch (IOException ex) {
      LOGGER.error("Listen failed", ex);
      return;
    }
    try{
      while(true) {
        Socket socket = serverSocket.accept();
        executorService.submit(() -> {
          try{
            InputStream inputstream = socket.getInputStream();
            LOGGER.info("Received message {}", IOUtils.toString(new InputStreamReader(inputstream)));
          } catch (IOException ex) {
            LOGGER.error("Read message failed", ex);
          }
        });
      }
    } catch(IOException ex) {
      try {
        serverSocket.close();
      } catch (IOException e) {
      }
      LOGGER.error("Accept connection failed", ex);
    }
  }
}

Reactor模式

精典Reactor模式

精典的Reactor模式示意图以下所示。

在Reactor模式中,包含以下角色

  • Reactor 将I/O事件发派给对应的Handler
  • Acceptor 处理客户端链接请求
  • Handlers 执行非阻塞读/写

最简单的Reactor模式实现代码以下所示。

public class NIOServer {

  private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);

  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (selector.select() > 0) {
      Set<SelectionKey> keys = selector.selectedKeys();
      Iterator<SelectionKey> iterator = keys.iterator();
      while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          socketChannel.register(selector, SelectionKey.OP_READ);
        } else if (key.isReadable()) {
          SocketChannel socketChannel = (SocketChannel) key.channel();
          ByteBuffer buffer = ByteBuffer.allocate(1024);
          int count = socketChannel.read(buffer);
          if (count <= 0) {
            socketChannel.close();
            key.cancel();
            LOGGER.info("Received invalide data, close the connection");
            continue;
          }
          LOGGER.info("Received message {}", new String(buffer.array()));
        }
        keys.remove(key);
      }
    }
  }
}

为了方便阅读,上示代码将Reactor模式中的全部角色放在了一个类中。

从上示代码中能够看到,多个Channel能够注册到同一个Selector对象上,实现了一个线程同时监控多个请求状态(Channel)。同时注册时须要指定它所关注的事件,例如上示代码中socketServerChannel对象只注册了OP_ACCEPT事件,而socketChannel对象只注册了OP_READ事件。

selector.select()是阻塞的,当有至少一个通道可用时该方法返回可用通道个数。同时该方法只捕获Channel注册时指定的所关注的事件。

多工做线程Reactor模式

经典Reactor模式中,尽管一个线程可同时监控多个请求(Channel),可是全部读/写请求以及对新链接请求的处理都在同一个线程中处理,无想充分利用多CPU的优点,同时读/写操做也会阻塞对新链接请求的处理。所以能够引入多线程,并行处理多个读/写操做,以下图所示。

多线程Reactor模式示例代码以下所示。

public class NIOServer {

  private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);

  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
      if(selector.selectNow() < 0) {
        continue;
      }
      Set<SelectionKey> keys = selector.selectedKeys();
      Iterator<SelectionKey> iterator = keys.iterator();
      while(iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
          readKey.attach(new Processor());
        } else if (key.isReadable()) {
          Processor processor = (Processor) key.attachment();
          processor.process(key);
        }
      }
    }
  }
}

从上示代码中能够看到,注册完SocketChannel的OP_READ事件后,能够对相应的SelectionKey attach一个对象(本例中attach了一个Processor对象,该对象处理读请求),而且在获取到可读事件后,能够取出该对象。

注:attach对象及取出该对象是NIO提供的一种操做,但该操做并不是Reactor模式的必要操做,本文使用它,只是为了方便演示NIO的接口。

具体的读请求处理在以下所示的Processor类中。该类中设置了一个静态的线程池处理全部请求。而process方法并不直接处理I/O请求,而是把该I/O操做提交给上述线程池去处理,这样就充分利用了多线程的优点,同时将对新链接的处理和读/写操做的处理放在了不一样的线程中,读/写操做再也不阻塞对新链接请求的处理。

public class Processor {
  private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
  private static final ExecutorService service = Executors.newFixedThreadPool(16);

  public void process(SelectionKey selectionKey) {
    service.submit(() -> {
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
      int count = socketChannel.read(buffer);
      if (count < 0) {
        socketChannel.close();
        selectionKey.cancel();
        LOGGER.info("{}\t Read ended", socketChannel);
        return null;
      } else if(count == 0) {
        return null;
      }
      LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
      return null;
    });
  }
}

多Reactor

Netty中使用的Reactor模式,引入了多Reactor,也即一个主Reactor负责监控全部的链接请求,多个子Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,下降了主Reactor压力太大而形成的延迟。
而且每一个子Reactor分别属于一个独立的线程,每一个成功链接后的Channel的全部操做由同一个线程处理。这样保证了同一请求的全部状态和上下文在同一个线程中,避免了没必要要的上下文切换,同时也方便了监控请求响应状态。

多Reactor模式示意图以下所示。

多Reactor示例代码以下所示。

public class NIOServer {

  private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);

  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    int coreNum = Runtime.getRuntime().availableProcessors();
    Processor[] processors = new Processor[coreNum];
    for (int i = 0; i < processors.length; i++) {
      processors[i] = new Processor();
    }

    int index = 0;
    while (selector.select() > 0) {
      Set<SelectionKey> keys = selector.selectedKeys();
      for (SelectionKey key : keys) {
        keys.remove(key);
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          Processor processor = processors[(int) ((index++) / coreNum)];
          processor.addChannel(socketChannel);
        }
      }
    }
  }
}

如上代码所示,本文设置的子Reactor个数是当前机器可用核数的两倍(与Netty默认的子Reactor个数一致)。对于每一个成功链接的SocketChannel,经过round robin的方式交给不一样的子Reactor。

子Reactor对SocketChannel的处理以下所示。

public class Processor {
  private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
  private static final ExecutorService service =
      Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());

  private Selector selector;

  public Processor() throws IOException {
    this.selector = SelectorProvider.provider().openSelector();
    start();
  }

  public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }

  public void start() {
    service.submit(() -> {
      while (true) {
        if (selector.selectNow() <= 0) {
          continue;
        }
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
          SelectionKey key = iterator.next();
          iterator.remove();
          if (key.isReadable()) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel socketChannel = (SocketChannel) key.channel();
            int count = socketChannel.read(buffer);
            if (count < 0) {
              socketChannel.close();
              key.cancel();
              LOGGER.info("{}\t Read ended", socketChannel);
              continue;
            } else if (count == 0) {
              LOGGER.info("{}\t Message size is 0", socketChannel);
              continue;
            } else {
              LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
            }
          }
        }
      }
    });
  }
}

在Processor中,一样建立了一个静态的线程池,且线程池的大小为机器核数的两倍。每一个Processor实例均包含一个Selector实例。同时每次获取Processor实例时均提交一个任务到该线程池,而且该任务正常状况下一直循环处理,不会中止。而提交给该Processor的SocketChannel经过在其Selector注册事件,加入到相应的任务中。由此实现了每一个子Reactor包含一个Selector对象,并由一个独立的线程处理。

Java进阶系列

相关文章
相关标签/搜索