java中的AIO

简介

jdk7中新增了一些与文件(网络)I/O相关的一些api。这些API被称为NIO.2,或称为AIO(Asynchronous I/O)。AIO最大的一个特性就是异步能力,这种能力对socket与文件I/O都起做用。AIO实际上是一种在读写操做结束以前容许进行其余操做的I/O处理。AIO是对JDK1.4中提出的同步非阻塞I/O(NIO)的进一步加强。html

关于NIO,以前的一篇文章能够看看:java中的NIOjava

jdk7主要增长了三个新的异步通道:react

  • AsynchronousFileChannel: 用于文件异步读写;linux

  • AsynchronousSocketChannel: 客户端异步socket;git

  • AsynchronousServerSocketChannel: 服务器异步socket。github

由于AIO的实施需充分调用OS参与,IO须要操做系统支持、并发也一样须要操做系统的支持,因此性能方面不一样操做系统差别会比较明显。编程

前提概念

在具体看AIO以前,咱们须要知道一些必要的前提概念。segmentfault

Unix中的I/O模型

Unix定义了五种I/O模型设计模式

  • 阻塞I/Oapi

  • 非阻塞I/O

  • I/O复用(select、poll、linux 2.6种改进的epoll)

  • 信号驱动IO(SIGIO)

  • 异步I/O(POSIX的aio_系列函数)

一个戏谑的例子:

若是你想吃一份宫保鸡丁盖饭:

  • 同步阻塞:你到饭馆点餐,而后在那等着,还要一边喊:好了没啊!

  • 同步非阻塞:在饭馆点完餐,就去遛狗了。不过溜一下子,就回饭馆喊一声:好了没啊!

  • 异步阻塞:遛狗的时候,接到饭馆电话,说饭作好了,让您亲自去拿。

  • 异步非阻塞:饭馆打电话说,咱们知道您的位置,一会给你送过来,安心遛狗就能够了。

详情参见文章末尾的他山之石-Unix下五种IO模型。

Reactor与Proactor

  • 两种IO多路复用方案:Reactor and Proactor。

  • Reactor模式是基于同步I/O的,而Proactor模式是和异步I/O相关的。

  • reactor:能收了你跟俺说一声。proactor: 你给我收十个字节,收好了跟俺说一声。

详情参见文章末尾的他山之石-IO设计模式:Reactor和Proactor对比。

异步的处理

异步无非是通知系统作一件事情。而后忘掉它,本身作其余事情去了。不少时候系统作完某一件事情后须要一些后续的操做。怎么办?这时候就是告诉异步调用如何作后续处理。一般有两种方式:

  • 未来式: 当你但愿主线程发起异步调用,并轮询等待结果的时候使用未来式;

  • 回调式: 常说的异步回调就是它。

以文件读取为例

未来式

未来式异步读取

未来式用现有的Java.util.concurrent技术声明一个Future,用来保存异步操做的处理结果。一般用Future get()方法(带或不带超时参数)在异步IO操做完成时获取其结果。

AsynchronousFileChannel会关联线程池,它的任务是接收IO处理事件,并分发给负责处理通道中IO操做结果的结果处理器。跟通道中发起的IO操做关联的结果处理器确保是由线程池中的某个线程产生。

未来式例子

Path path = Paths.get("/data/code/github/java_practice/src/main/resources/1log4j.properties");
    AsynchronousFileChannel channel = AsynchronousFileChannel.open(path);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    Future<Integer> future = channel.read(buffer,0);
//        while (!future.isDone()){
//            System.out.println("I'm idle");
//        }
    Integer readNumber = future.get();

    buffer.flip();
    CharBuffer charBuffer = CharBuffer.allocate(1024);
    CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
    decoder.decode(buffer,charBuffer,false);
    charBuffer.flip();
    String data = new String(charBuffer.array(),0, charBuffer.limit());
    System.out.println("read number:" + readNumber);
    System.out.println(data);

回调式

回调式异步读取

回调式所采用的事件处理技术相似于Swing UI编程采用的机制。基本思想是主线程会派一个侦查员CompletionHandler到独立的线程中执行IO操做。这个侦查员将带着IO的操做的结果返回到主线程中,这个结果会触发它本身的completed或failed方法(要重写这两个方法)。在异步IO活动结束后,接口java.nio.channels.CompletionHandler会被调用,其中V是结果类型,A是提供结果的附着对象。此时必须已经有了该接口completed(V,A)和failed(V,A)方法的实现,你的程序才能知道异步IO操做成功或失败时该如何处理。

回调式例子

Path path = Paths.get("/data/code/github/java_practice/src/main/resources/1log4j.properties");
    AsynchronousFileChannel channel = AsynchronousFileChannel.open(path);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            System.out.println(Thread.currentThread().getName() + " read success!");
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            System.out.println("read error");
        }
    });

    while (true){
        System.out.println(Thread.currentThread().getName() + " sleep");
        Thread.sleep(1000);
    }

异步socket client操做

AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
    channel.connect(new InetSocketAddress("127.0.0.1",8888)).get();
    ByteBuffer buffer = ByteBuffer.wrap("中文,你好".getBytes());
    Future<Integer> future = channel.write(buffer);

    future.get();
    System.out.println("send ok");

异步socket server操做

final AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel
            .open()
            .bind(new InetSocketAddress("0.0.0.0",8888));
    channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
        @Override
        public void completed(final AsynchronousSocketChannel client, Void attachment) {
            channel.accept(null, this);

            ByteBuffer buffer = ByteBuffer.allocate(1024);
            client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result_num, ByteBuffer attachment) {
                    attachment.flip();
                    CharBuffer charBuffer = CharBuffer.allocate(1024);
                    CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
                    decoder.decode(attachment,charBuffer,false);
                    charBuffer.flip();
                    String data = new String(charBuffer.array(),0, charBuffer.limit());
                    System.out.println("read data:" + data);
                    try{
                        client.close();
                    }catch (Exception e){}
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    System.out.println("read error");
                }
            });
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            System.out.println("accept error");
        }
    });

    while (true){
        Thread.sleep(1000);
    }

他山之石

相关文章
相关标签/搜索