nio简介 Java I/O演进与Linux网络I/O模型 java Blocking Queue

上一篇  Java I/O演进与Linux网络I/O模型

1、传统BIO

java传统bio编程概念: http://www.cnblogs.com/carl10086/p/6034563.html#_label4html

使用bio写一个简单的TimeServer

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class TimeServer {

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (args != null && args.length > 0) {

            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }

        }
        ServerSocket server = null;
        try {
            server = new ServerSocket(port);
            System.out.println("The time server is start in port : " + port);
            Socket socket = null;
            while (true) {
                socket = server.accept();
                new Thread(new TimeServerHandler(socket)).start();
            }
        } finally {
            if (server != null) {
                System.out.println("The time server close");
                server.close();
                server = null;
            }
        }
    }
}
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

/**
 * @author Administrator
 * @version 1.0
 * @date 2014年2月14日
 */
public class TimeServerHandler implements Runnable {

    private Socket socket;

    public TimeServerHandler(Socket socket) {
        this.socket = socket;
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(
                    this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String currentTime = null;
            String body = null;
            while (true) {
                body = in.readLine();
                if (body == null)
                    break;
                System.out.println("The time server receive order : " + body);
                currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
                        System.currentTimeMillis()).toString() : "BAD ORDER";
                out.println(currentTime);
            }

        } catch (Exception e) {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            if (out != null) {
                out.close();
                out = null;
            }
            if (this.socket != null) {
                try {
                    this.socket.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                this.socket = null;
            }
        }
    }
}

在这个TimeServer的基础上引入线程池,这里暂时称为伪异步IO

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

import com.phei.netty.bio.TimeServerHandler;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class TimeServer {

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (args != null && args.length > 0) {

            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }

        }
        ServerSocket server = null;
        try {
            server = new ServerSocket(port);
            System.out.println("The time server is start in port : " + port);
            Socket socket = null;
            TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(
                    50, 10000);// 建立IO任务线程池
            while (true) {
                socket = server.accept();
                singleExecutor.execute(new TimeServerHandler(socket));
            }
        } finally {
            if (server != null) {
                System.out.println("The time server close");
                server.close();
                server = null;
            }
        }
    }
}
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author Administrator
 * @version 1.0
 * @date 2014年2月15日
 */
public class TimeServerHandlerExecutePool {

    private ExecutorService executor;

    public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
        executor = new ThreadPoolExecutor(Runtime.getRuntime()
                .availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<java.lang.Runnable>(queueSize));
    }

    public void execute(java.lang.Runnable task) {
        executor.execute(task);
    }
}

这里列举一些伪异步可能阻塞的地方:前端

  •   accept()
  •   read()
  •   write()

参考jdk文档 关于Socket中InputStreamread()操做:java

Reads some number of bytes from the input stream and stores them into the buffer array b. The number of bytes actually read is returned as an integer. This method blocks until input data is available, end of file is detected, or an exception is thrown.linux

这个方法有三种状况下会阻塞:编程

  • 有数据可读
  • 可用数据已经读取完毕
  • 发生空指针或者I/O异常

说明当发生网络阻塞或者说网络问题的情形时,线程池中的线程会阻塞住,若是线程池没有可用线程会一直排队。线程池和阻塞队列的内容介绍能够参考:java Blocking Queue数组

write也同样:也会阻塞至字节所有被写完或者异常。服务器

学习过TCP/IP相关内容,当消息的接收方处理缓慢的时候,将不能及时地从TCP缓冲区读取数据,这将会致使发送方的TCP window size不断减小,直到为0,双方处于Keep-alive状态,消息发送方将不能再向TCP缓冲区写入消息,若是这里采用的是同步阻塞IO,write操做将会被无限期阻塞,直到window size>0或者发生I/O异常。markdown

所以,这种伪异步在网络不佳的状况下可能会出现各类连锁问题:网络

(1) 服务器处理缓慢多线程

(2) 假如全部的可用线程都被阻塞,队列会排队至内存没法容纳

(3) 前端只有一个Acceptor线程,这个线程不能出问题,不然消息都会被拒绝掉

(4) 容易崩溃

2、NIO编程

2.1 通道

Java NIO 的通道相似流,但又有些不一样:

  • 既能够从通道中读取数据,又能够写数据到通道。但流的读写一般是单向的。
  • 通道能够非阻塞地读写。
  • 通道中的数据老是要先读到一个 Buffer,或者老是要从一个 Buffer 中写入。

正如上面所说,从通道读取数据到缓冲区,从缓冲区写入数据到通道。以下图所示:

Channel 的实现

这些是 Java NIO 中最重要的通道的实现:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

FileChannel 从文件中读写数据。

DatagramChannel 能经过 UDP 读写网络中的数据。

SocketChannel 能经过 TCP 读写网络中的数据。

ServerSocketChannel 能够监听新进来的 TCP 链接,像 Web 服务器那样。对每个新进来的链接都会建立一个 SocketChannel。

2.2 缓冲区

Java NIO 中的 Buffer 用于和 NIO 通道进行交互。如你所知,数据是从通道读入缓冲区,从缓冲区写入到通道中的。

缓冲区本质上是一块能够写入数据,而后能够从中读取数据的内存。这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。

Buffer 的基本用法

使用 Buffer 读写数据通常遵循如下四个步骤:

  1. 写入数据到 Buffer
  2. 调用flip()方法
  3. 从 Buffer 中读取数据
  4. 调用clear()方法或者compact()方法

当向 buffer 写入数据时,buffer 会记录下写了多少数据。一旦要读取数据,须要经过 flip() 方法将 Buffer 从写模式切换到读模式。在读模式下,能够读取以前写入到 buffer 的全部数据。

一旦读完了全部的数据,就须要清空缓冲区,让它能够再次被写入。有两种方式能清空缓冲区:调用 clear() 或 compact() 方法。clear() 方法会清空整个缓冲区。compact() 方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。

下面是一个使用 Buffer 的例子:

RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();  
//create buffer with capacity of 48 bytes  
ByteBuffer buf = ByteBuffer.allocate(48);  
int bytesRead = inChannel.read(buf); //read into buffer.  
while (bytesRead != -1) {  
  buf.flip();  //make buffer ready for read  
  while(buf.hasRemaining()){  
      System.out.print((char) buf.get()); // read 1 byte at a time  
  }  
  buf.clear(); //make buffer ready for writing  
  bytesRead = inChannel.read(buf);  
}  
aFile.close();  

Buffer 的 capacity,position 和 limit

缓冲区本质上是一块能够写入数据,而后能够从中读取数据的内存。这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。

为了理解 Buffer 的工做原理,须要熟悉它的三个属性:

  • capacity
  • position
  • limit

position 和 limit 的含义取决于 Buffer 处在读模式仍是写模式。无论 Buffer 处在什么模式,capacity 的含义老是同样的。

这里有一个关于 capacity,position 和 limit 在读写模式中的说明,详细的解释在插图后面。

capacity

  做为一个内存块,Buffer 有一个固定的大小值,也叫 “capacity”。 你只能往里写 capacity 个 byte、long,char 等类型。一旦 Buffer 满了,须要将其清空(经过读数据或者清除数据)才能继续写数据往里写数据。

position

  当你写数据到 Buffer 中时,position 表示当前的位置。初始的 position 值为 0. 当一个 byte、long 等数据写到 Buffer 后, position 会向前移动到下一个可插入数据的 Buffer 单元。position 最大可为 capacity – 1。

  当读取数据时,也是从某个特定位置读。当将 Buffer 从写模式切换到读模式,position 会被重置为 0。当从 Buffer 的 position 处读取数据时,position 向前移动到下一个可读的位置。

limit

  在写模式下,Buffer 的 limit 表示你最多能往 Buffer 里写多少数据。 写模式下,limit 等于 Buffer 的 capacity。

  当切换 Buffer 到读模式时,limit 表示你最多能读到多少数据。所以,当切换 Buffer 到读模式时,limit 会被设置成写模式下的 position 值。换句话说,你能读到以前写入的全部数据(limit 被设置成已写数据的数量,这个值在写模式下就是 position)

Buffer 的类型

Java NIO 有如下 Buffer 类型

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

如你所见,这些 Buffer 类型表明了不一样的数据类型。换句话说,就是能够经过 char,short,int,long,float 或 double 类型来操做缓冲区中的字节。

MappedByteBuffer 有些特别,在涉及它的专门章节中再讲。

Buffer 的分配

要想得到一个 Buffer 对象首先要进行分配。 每个 Buffer 类都有一个 allocate 方法。下面是一个分配 48 字节 capacity 的 ByteBuffer 的例子。

ByteBuffer buf = ByteBuffer.allocate(48);

这是分配一个可存储 1024 个字符的 CharBuffer:

CharBuffer buf = CharBuffer.allocate(1024);

向 Buffer 中写数据

写数据到 Buffer 有两种方式:

  • 从 Channel 写到 Buffer。
  • 经过 Buffer 的 put() 方法写到 Buffer 里。

从 Channel 写到 Buffer 的例子

int bytesRead = inChannel.read(buf); //read into buffer.

经过 put 方法写 Buffer 的例子:

buf.put(127);

put 方法有不少版本,容许你以不一样的方式把数据写入到 Buffer 中。例如, 写到一个指定的位置,或者把一个字节数组写入到 Buffer。 更多 Buffer 实现的细节参考 JavaDoc。

flip() 方法

flip 方法将 Buffer 从写模式切换到读模式。调用 flip() 方法会将 position 设回 0,并将 limit 设置成以前 position 的值。

换句话说,position 如今用于标记读的位置,limit 表示以前写进了多少个 byte、char 等 —— 如今能读取多少个 byte、char 等。

从 Buffer 中读取数据

从 Buffer 中读取数据有两种方式:

  • 从 Buffer 读取数据到 Channel。
  • 使用 get() 方法从 Buffer 中读取数据。

从 Buffer 读取数据到 Channel 的例子:

//read from buffer into channel.  
int bytesWritten = inChannel.write(buf);

使用 get() 方法从 Buffer 中读取数据的例子

byte aByte = buf.get();

get 方法有不少版本,容许你以不一样的方式从 Buffer 中读取数据。例如,从指定 position 读取,或者从 Buffer 中读取数据到字节数组。更多 Buffer 实现的细节参考 JavaDoc。

rewind() 方法

Buffer.rewind() 将 position 设回 0,因此你能够重读 Buffer 中的全部数据。limit 保持不变,仍然表示能从 Buffer 中读取多少个元素(byte、char 等)。

clear() 与 compact() 方法

一旦读完 Buffer 中的数据,须要让 Buffer 准备好再次被写入。能够经过 clear() 或 compact() 方法来完成。

若是调用的是 clear() 方法,position 将被设回 0,limit 被设置成 capacity 的值。换句话说,Buffer 被清空了。Buffer 中的数据并未清除,只是这些标记告诉咱们能够从哪里开始往 Buffer 里写数据。

若是 Buffer 中有一些未读的数据,调用 clear() 方法,数据将 “被遗忘”,意味着再也不有任何标记会告诉你哪些数据被读过,哪些尚未。

若是 Buffer 中仍有未读的数据,且后续还须要这些数据,可是此时想要先先写些数据,那么使用 compact() 方法。

compact() 方法将全部未读的数据拷贝到 Buffer 起始处。而后将 position 设到最后一个未读元素正后面。limit 属性依然像 clear() 方法同样,设置成 capacity。如今 Buffer 准备好写数据了,可是不会覆盖未读的数据。

mark() 与 reset() 方法

经过调用 Buffer.mark() 方法,能够标记 Buffer 中的一个特定 position。以后能够经过调用 Buffer.reset() 方法恢复到这个 position。例如:

buffer.mark();
//call buffer.get() a couple of times, e.g. during parsing.
buffer.reset();  //set position back to mark.

equals() 与 compareTo() 方法

可使用 equals() 和 compareTo() 方法两个 Buffer。

equals()

当知足下列条件时,表示两个 Buffer 相等:

  1. 有相同的类型(byte、char、int 等)。
  2. Buffer 中剩余的 byte、char 等的个数相等。
  3. Buffer 中全部剩余的 byte、char 等都相同。

如你所见,equals 只是比较 Buffer 的一部分,不是每个在它里面的元素都比较。实际上,它只比较 Buffer 中的剩余元素。

compareTo() 方法

compareTo() 方法比较两个 Buffer 的剩余元素 (byte、char 等), 若是知足下列条件,则认为一个 Buffer“小于” 另外一个 Buffer:

  1. 第一个不相等的元素小于另外一个 Buffer 中对应的元素 。
  2. 全部元素都相等,但第一个 Buffer 比另外一个先耗尽 (第一个 Buffer 的元素个数比另外一个少)。

(译注:剩余元素是从 position 到 limit 之间的元素)

2.3 选择器

Selector(选择器)是 Java NIO 中可以检测一到多个 NIO 通道,并可以知晓通道是否为诸如读写事件作好准备的组件。这样,一个单独的线程能够管理多个 channel,从而管理多个网络链接。

为何使用 Selector?

仅用单个线程来处理多个 Channels 的好处是,只须要更少的线程来处理通道。事实上,能够只用一个线程处理全部的通道。对于操做系统来讲,线程之间上下文切换的开销很大,并且每一个线程都要占用系统的一些资源(如内存)。所以,使用的线程越少越好。

可是,须要记住,现代的操做系统和 CPU 在多任务方面表现的愈来愈好,因此多线程的开销随着时间的推移,变得愈来愈小了。实际上,若是一个 CPU 有多个内核,不使用多任务多是在浪费 CPU 能力。无论怎么说,关于那种设计的讨论应该放在另外一篇不一样的文章中。在这里,只要知道使用 Selector 可以处理多个通道就足够了。

下面是单线程使用一个 Selector 处理 3 个 channel 的示例图:

Selector 的建立

经过调用 Selector.open() 方法建立一个 Selector,以下:

Selector selector = Selector.open();

向 Selector 注册通道

为了将 Channel 和 Selector 配合使用,必须将 channel 注册到 selector 上。经过 SelectableChannel.register() 方法来实现,以下:

channel.configureBlocking(false);
SelectionKey key = channel.register(selector,
    Selectionkey.OP_READ)

与 Selector 一块儿使用时,Channel 必须处于非阻塞模式下。这意味着不能将 FileChannel 与 Selector 一块儿使用,由于 FileChannel 不能切换到非阻塞模式。而套接字通道均可以。

注意 register() 方法的第二个参数。这是一个 “interest 集合”,意思是在经过 Selector 监听 Channel 时对什么事件感兴趣。能够监听四种不一样类型的事件:

  1. Connect
  2. Accept
  3. Read
  4. Write

通道触发了一个事件意思是该事件已经就绪。因此,某个 channel 成功链接到另外一个服务器称为 “链接就绪”。一个 server socket channel 准备好接收新进入的链接称为 “接收就绪”。一个有数据可读的通道能够说是 “读就绪”。等待写数据的通道能够说是 “写就绪”。

这四种事件用 SelectionKey 的四个常量来表示:

  1. SelectionKey.OP_CONNECT
  2. SelectionKey.OP_ACCEPT
  3. SelectionKey.OP_READ
  4. SelectionKey.OP_WRITE

若是你对不止一种事件感兴趣,那么能够用 “位或” 操做符将常量链接起来,以下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

在下面还会继续提到 interest 集合

SelectionKey

在上一小节中,当向 Selector 注册 Channel 时,register() 方法会返回一个 SelectionKey 对象。这个对象包含了一些你感兴趣的属性:

  • interest 集合
  • ready 集合
  • Channel
  • Selector
  • 附加的对象(可选)

下面我会描述这些属性。

interest 集合

就像向 Selector 注册通道一节中所描述的,interest 集合是你所选择的感兴趣的事件集合。能够经过 SelectionKey 读写 interest 集合,像这样:

int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept  = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

能够看到,用 “位与” 操做 interest 集合和给定的 SelectionKey 常量,能够肯定某个肯定的事件是否在 interest 集合中。

ready 集合

ready 集合是通道已经准备就绪的操做的集合。在一次选择 (Selection) 以后,你会首先访问这个 ready set。Selection 将在下一小节进行解释。能够这样访问 ready 集合:

int readySet = selectionKey.readyOps();

能够用像检测 interest 集合那样的方法,来检测 channel 中什么事件或操做已经就绪。可是,也可使用如下四个方法,它们都会返回一个布尔类型:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel + Selector

从 SelectionKey 访问 Channel 和 Selector 很简单。以下:

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

还能够在用 register() 方法向 Selector 注册 Channel 的时候附加对象。如:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

Selector 选择通道

一旦向 Selector 注册了一或多个通道,就能够调用几个重载的 select() 方法。这些方法返回你所感兴趣的事件(如链接、接受、读或写)已经准备就绪的那些通道。换句话说,若是你对 “读就绪” 的通道感兴趣,select() 方法会返回读事件已经就绪的那些通道。

下面是 select() 方法:

  • int select()
  • int select(long timeout)
  • int selectNow()

select()阻塞到至少有一个通道在你注册的事件上就绪了。

select(long timeout)和 select() 同样,除了最长会阻塞 timeout 毫秒 (参数)。

selectNow()不会阻塞,无论什么通道就绪都马上返回(译者注:此方法执行非阻塞的选择操做。若是自从前一次选择操做后,没有通道变成可选择的,则此方法直接返回零。)。

select() 方法返回的 int 值表示有多少通道已经就绪。亦即,自上次调用 select() 方法后有多少通道变成就绪状态。若是调用 select() 方法,由于有一个通道变成就绪状态,返回了 1,若再次调用 select() 方法,若是另外一个通道就绪了,它会再次返回 1。若是对第一个就绪的 channel 没有作任何操做,如今就有两个就绪的通道,但在每次 select() 方法调用之间,只有一个通道就绪了。

selectedKeys()

一旦调用了 select() 方法,而且返回值代表有一个或更多个通道就绪了,而后能够经过调用 selector 的 selectedKeys() 方法,访问 “已选择键集(selected key set)” 中的就绪通道。以下所示:

Set selectedKeys = selector.selectedKeys();

当像 Selector 注册 Channel 时,Channel.register() 方法会返回一个 SelectionKey 对象。这个对象表明了注册到该 Selector 的通道。能够经过 SelectionKey 的 selectedKeySet() 方法访问这些对象。

能够遍历这个已选择的键集合来访问就绪的通道。以下:

Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
}

这个循环遍历已选择键集中的每一个键,并检测各个键所对应的通道的就绪事件。

注意每次迭代末尾的 keyIterator.remove() 调用。Selector 不会本身从已选择键集中移除 SelectionKey 实例。必须在处理完通道时本身移除。下次该通道变成就绪时,Selector 会再次将其放入已选择键集中。

SelectionKey.channel() 方法返回的通道须要转型成你要处理的类型,如 ServerSocketChannel 或 SocketChannel 等。

wakeUp()

某个线程调用 select() 方法后阻塞了,即便没有通道已经就绪,也有办法让其从 select() 方法返回。只要让其它线程在第一个线程调用 select() 方法的那个对象上调用 Selector.wakeup() 方法便可。阻塞在 select() 方法上的线程会立马返回。

若是有其它线程调用了 wakeup() 方法,但当前没有线程阻塞在 select() 方法上,下个调用 select() 方法的线程会当即 “醒来(wake up)”。

close()

用完 Selector 后调用其 close() 方法会关闭该 Selector,且使注册到该 Selector 上的全部 SelectionKey 实例无效。通道自己并不会关闭。

完整的示例

这里有一个完整的示例,打开一个 Selector,注册一个通道注册到这个 Selector 上 (通道的初始化过程略去), 而后持续监控这个 Selector 的四种事件(接受,链接,读,写)是否就绪。

Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
  int readyChannels = selector.select();
  if(readyChannels == 0) continue;
  Set selectedKeys = selector.selectedKeys();
  Iterator keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
  }j
} 

3、NIO实现TimeServer

TimeServer序列图

首先看序列图:

大体步骤以下:

1. 打开ServerSocketChannel,用于监听,它是全部客户端链接的父管道:

ServerSocketChannel servChannel = ServerSocketChannel.open();

2. 绑定监听端口,设置为非阻塞模式,代码以下:

servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port), 1024);

3. 建立Reactor线程,建立多路复用器并启动线程,代码以下:

Selector selector = Selector.open();
new Thread(new ReactorTask()).start();

4. 将ServerSocketChannel注册到Selector上,而且监听ACCEPT事件

servChannel.register(selector, SelectionKey.OP_ACCEPT);

5. 多路复用器在线程run方法的无限循环体内轮询准备就绪的Key

while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

6. 假若有新的connection接入,处理新的接入请求,完成TCP三次握手,创建物理链路,而且设置为非阻塞模式,注册到Selector中监听读事件,示例代码以下:

if (key.isAcceptable()) {
                // Accept the new connection
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                sc.socket().setReuseAddress(true);
                // Add the new connection to the selector
                sc.register(selector, SelectionKey.OP_READ);
            }

7. 若是发生读事件,异步读取客户端请求消息到缓冲区,示例代码以下:

int readNumber = channel.read(receivedBuffer);

8. 对ByteBuffer进行解码,若是有半包消息指针,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑

Object message = null;
while( buffer.hasRemain() ){
    byteBuffer.mark();
    Object message = decode(byteBuffer);    
    if (message == null) {
        byteBuffer.reset();
        break;
    }
    if (!byteBuffer.hasRemain()) {
        byteBuffer.clear();
    } else {
        byteBuffer.compact();
    }
    if (messageList !=null && !messageList.isEmpty() ){
        for (Object messageE : messageList ) {
            handlerTask(messageE);
        }
    }  
}

9. 最后,将消息写出

socketChannel.write(buffer);

注意: 若是发送区TCP缓冲区满,会出现写半包,须要注册监听写操做位,循环写,直到整包消息写入TCP缓冲区。

TimeServer源码

import java.io.IOException;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class TimeServer {

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @author Administrator
 * @version 1.0
 * @date 2014年2月16日
 */
public class MultiplexerTimeServer implements Runnable {

    private Selector selector;

    private ServerSocketChannel servChannel;

    private volatile boolean stop;

    /**
     * 初始化多路复用器、绑定监听端口
     *
     * @param port
     */
    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
        while (!stop) {
            try {
                //设置休眠时间为1s。不管有读写事件发生,selector每隔1s被唤醒一次。
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

        // 多路复用器关闭后,全部注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,因此不须要重复释放资源
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    }

    private void handleInput(SelectionKey key) throws IOException {

        if (key.isValid()) {
            // 处理新接入的请求消息
            if (key.isAcceptable()) {
                // Accept the new connection
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                //建立tcp 3次握手 虚链路...
                SocketChannel sc = ssc.accept();
                //设置为非阻塞模式
                sc.configureBlocking(false);
                sc.socket().setReuseAddress(true);
                // Add the new connection to the selector
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                // Read the data
                SocketChannel sc = (SocketChannel) key.channel();
                // 不知道客户端发送的码流大小,做为例子,开辟1024的缓冲区。
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                // 返回值>0,表示有字节,对字节进行解码
                if (readBytes > 0) {
                    readBuffer.flip(); //flip以后才能够进行读操做
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The time server receive order : "
                            + body);
                    //若是请求内容是"Query time order"就回复,不然回复"BAD ORDER"
                    String currentTime = "QUERY TIME ORDER"
                            .equalsIgnoreCase(body) ? new java.util.Date(
                            System.currentTimeMillis()).toString()
                            : "BAD ORDER";
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else
                    ; // 读到0字节,忽略
            }
        }
    }

    private void doWrite(SocketChannel channel, String response)
            throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            //异步操做,没法保证发送完,会出现"写半包"问题,须要注册写操做,而后不断轮询Selector是否发送完毕,例子中不处理,太麻烦。
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
}

TimeClient序列图

 

TimeClient源码分析

public class TimeClient {

    /**
     * @param args
     */
    public static void main(String[] args) {

    int port = 8080;
    if (args != null && args.length > 0) {
        try {
        port = Integer.valueOf(args[0]);
        } catch (NumberFormatException e) {
        // 采用默认值
        }
    }
    new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001")
        .start();
    }
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @author Administrator
 * @date 2014年2月16日
 * @version 1.0
 */
public class TimeClientHandle implements Runnable {

    private String host;
    private int port;

    private Selector selector;
    private SocketChannel socketChannel;

    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
    this.host = host == null ? "127.0.0.1" : host;
    this.port = port;
    try {
        selector = Selector.open();
        socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
    } catch (IOException e) {
        e.printStackTrace();
        System.exit(1);
    }
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
    try {
        doConnect();
    } catch (IOException e) {
        e.printStackTrace();
        System.exit(1);
    }
    while (!stop) {
        try {
        selector.select(1000);
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        Iterator<SelectionKey> it = selectedKeys.iterator();
        SelectionKey key = null;
        while (it.hasNext()) {
            key = it.next();
            it.remove();
            try {
            handleInput(key);
            } catch (Exception e) {
            if (key != null) {
                key.cancel();
                if (key.channel() != null)
                key.channel().close();
            }
            }
        }
        } catch (Exception e) {
        e.printStackTrace();
        System.exit(1);
        }
    }

    // 多路复用器关闭后,全部注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,因此不须要重复释放资源
    if (selector != null)
        try {
        selector.close();
        } catch (IOException e) {
        e.printStackTrace();
        }

    }

    private void handleInput(SelectionKey key) throws IOException {

    if (key.isValid()) {
        // 判断是否链接成功
        SocketChannel sc = (SocketChannel) key.channel();
        if (key.isConnectable()) {
        if (sc.finishConnect()) {
            sc.register(selector, SelectionKey.OP_READ);
            doWrite(sc);
        } else
            System.exit(1);// 链接失败,进程退出
        }
        if (key.isReadable()) {
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        int readBytes = sc.read(readBuffer);
        if (readBytes > 0) {
            readBuffer.flip();
            byte[] bytes = new byte[readBuffer.remaining()];
            readBuffer.get(bytes);
            String body = new String(bytes, "UTF-8");
            System.out.println("Now is : " + body);
            this.stop = true;
        } else if (readBytes < 0) {
            // 对端链路关闭
            key.cancel();
            sc.close();
        } else
            ; // 读到0字节,忽略
        }
    }

    }

    private void doConnect() throws IOException {
    // 若是直接链接成功,则注册到多路复用器上,发送请求消息,读应答
    if (socketChannel.connect(new InetSocketAddress(host, port))) {
        socketChannel.register(selector, SelectionKey.OP_READ);
        doWrite(socketChannel);
    } else
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
    }

    private void doWrite(SocketChannel sc) throws IOException {
    byte[] req = "QUERY TIME ORDER".getBytes();
    ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
    writeBuffer.put(req);
    writeBuffer.flip();
    sc.write(writeBuffer);
    if (!writeBuffer.hasRemaining())
        System.out.println("Send order 2 server succeed.");
    }

}

(1) 建立Selector和SocketChannel对象,能够设定SocketChannel的TCP参数

(2) 发送链接请求,做为例子,链接是成功的,不须要进行重连操做。 doConnect中,若是链接成功,注册READ事件,若是没有成功,服务器端没有应答信息,不表示链接失败。须要注册CONNECT事件,当服务器返回syn-ack消息后,Selector可以轮询到这个SocketChannel处于链接就绪状态。

(3) 轮询selector,当有就绪的Channel,则使用handleInput(key)方法

(4) 对SelectorKey进行判断,若是是链接事件,则注册监听READ操做,而且写入请求。

(5) doWrite中直接使用ByteBuffer进行写请求,一样是异步,一样有"半写包"问题,这里暂时略过。

(6) 若是是可读事件,一样不知道会有多少的响应,所以暂时分配1M,进行异步接收,因为是异步的,因此一样须要对结果进行判断,此处再也不描述,也不解决TCP粘包问题

(7) 最后客户端主动关闭链接...结束

总结NIO

例子中,咱们不少问题都没有处理,好比"半包读"和"半包写"等等,NIO的代码很是的复杂,若是要真正写的话,可是NIO依旧被广发使用...

缘由以下:

(1) 客户端链接操做能够no-blocking的方式,能够经过多路复用器注册OP_CONNECT事件,而后轮询SelectKey的方式等待结果。

(2) SocketChannel的读写都是no-blocking的方式,即非阻塞的方式,没有就当即返回下一个...

(3) 线程模型的优化,因为JDK的Selector在Linux等主流操做系统上使用的epoll模型。这意味着一个Selector线程能够处理成千上万个客户端链接。Windows系统上的Selector类是sun.nio.ch.WindowsSelectorImpl,而linux上的实现类是sun.nio.ch.EPollSelectorImpl,很明显是epoll模型。

4、AIO

jdk1.7升级了NIO类库,Java正式提供了异步文件I/O操做,同时提供了与UNIX网络编程事件驱动I/O对应的AIO。

NIO 2.0 引入了新的异步通道的概念,而且提供了异步文件通道和异步套接字通道的实现。异步通道有如下2种方式

  • 经过java.util.concurrent.Future类表示异步操做的结果;
  • 在执行异步操做的时候传入一个java.nio.channels。

CompletionHanler接口的实现类做为操做完成的回调。

NIO2.0的异步套接字通道是真正的异步非阻塞I/O,对应于UNIX网络中的事件驱动IO。它不须要经过多路复用器去对注册的通道进行轮询实现异步读写,从而简化了NIO的编程模型。

AIO SERVER端源码

import java.io.IOException;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class TimeServer {

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
        new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
    }
}

下面是真正的Handler:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;

/**
 * @author Administrator
 * @version 1.0
 * @date 2014年2月16日
 */
public class AsyncTimeServerHandler implements Runnable {

    private int port;

    CountDownLatch latch;
    AsynchronousServerSocketChannel asynchronousServerSocketChannel;

    public AsyncTimeServerHandler(int port) {
        this.port = port;
        try {
            asynchronousServerSocketChannel = AsynchronousServerSocketChannel
                    .open();
            asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {

        latch = new CountDownLatch(1);
        doAccept();
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void doAccept() {
        asynchronousServerSocketChannel.accept(this,
                new AcceptCompletionHandler());
    }

}

(1) 首先建立了一个AsynchronousServerSocketChannel,而后调用它的bind方法绑定监听端口。

(2) 使用CountDownLatch在启动完成以后一直阻塞住,实际项目中,不须要启动独立的线程,这里是演示。

(3) doAccept()用于接口客户端的链接,因为是异步操做,能够传递一个CompletionHandler<AsynchronousSocketChannel, ? super A>的类型来处理。

实际处理类以下:

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月16日
 */
public class AcceptCompletionHandler implements
        CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {

    @Override
    public void completed(AsynchronousSocketChannel result,
                          AsyncTimeServerHandler attachment) {
        attachment.asynchronousServerSocketChannel.accept(attachment, this);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        result.read(buffer, buffer, new ReadCompletionHandler(result));
    }

    @Override
    public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
        exc.printStackTrace();
        attachment.latch.countDown();
    }

}

(1) 这里又再次调用了

attachment.asynchronousServerSocketChannel.accept(attachment, this);

方法,为何要再次调用:若是有新的客户端介入,系统将回调咱们传入channel,咱们要去接收成千上万的个请求,每调用一次,到complete方法就表示接收成功了,能够去接收下一次请求了,当咱们再次调用这个方法,能够理解为本次请求已经被成功接收,接收线程能够接收另外一个请求了。

(2) 成功接收请求后要处理请求消息,调用channel的read方法,咱们看一下该方法的参数

  ByteBuffer dst: 接收缓冲区,用于异步Channel中读取数据包;

  Attachment: 异步Channel携带的附件,通知回调时做为参数使用

  CompletionHandler<Interger,? super A> : 接收通知回调的业务Handler,在本例中为ReadCompletionHandler

代码以下:

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月16日
 */
public class ReadCompletionHandler implements
        CompletionHandler<Integer, ByteBuffer> {

    private AsynchronousSocketChannel channel;

    public ReadCompletionHandler(AsynchronousSocketChannel channel) {
        if (this.channel == null)
            this.channel = channel;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        attachment.flip();
        byte[] body = new byte[attachment.remaining()];
        attachment.get(body);
        try {
            String req = new String(body, "UTF-8");
            System.out.println("The time server receive order : " + req);
            String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(
                    System.currentTimeMillis()).toString() : "BAD ORDER";
            doWrite(currentTime);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    private void doWrite(String currentTime) {
        if (currentTime != null && currentTime.trim().length() > 0) {
            byte[] bytes = (currentTime).getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer, writeBuffer,
                    new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer buffer) {
                            // 若是没有发送完成,继续发送
                            if (buffer.hasRemaining())
                                channel.write(buffer, buffer, this);
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            try {
                                channel.close();
                            } catch (IOException e) {
                                // ingnore on close
                            }
                        }
                    });
        }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

(1) 首先继续根据channel建立ReadCompletionHandler中当作成员变量来使用,主要用于半包消息和发送应答,例子没有处理半包消息。

(2) attachment是一个ByteBuffer,其中已经有写入的数据了,所以flip准备读出

(3) 调用doWrite进行写数据,和前面的read同样调用channel.write()方法,一样也有3个参数以支持异步操做,这里直接使用匿名内部类,若是没有发送完成就继续发送

AIO Client端源码说明

/**
 * @author lilinfeng
 * @date 2014年2月14日
 * @version 1.0
 */
public class TimeClient {

    /**
     * @param args
     */
    public static void main(String[] args) {
    int port = 8080;
    if (args != null && args.length > 0) {
        try {
        port = Integer.valueOf(args[0]);
        } catch (NumberFormatException e) {
        // 采用默认值
        }

    }
    new Thread(new AsyncTimeClientHandler("127.0.0.1", port),
        "AIO-AsyncTimeClientHandler-001").start();

    }
}
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

/**
 * @author Administrator
 * @version 1.0
 * @date 2014年2月16日
 */
public class AsyncTimeClientHandler implements
        CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {

    private AsynchronousSocketChannel client;
    private String host;
    private int port;
    private CountDownLatch latch;

    public AsyncTimeClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            client = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {

        latch = new CountDownLatch(1);
        client.connect(new InetSocketAddress(host, port), this, this);
        try {
            latch.await();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        try {
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void completed(Void result, AsyncTimeClientHandler attachment) {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        client.write(writeBuffer, writeBuffer,
                new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer buffer) {
                        if (buffer.hasRemaining()) {
                            client.write(buffer, buffer, this);
                        } else {
                            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                            client.read(
                                    readBuffer,
                                    readBuffer,
                                    new CompletionHandler<Integer, ByteBuffer>() {
                                        @Override
                                        public void completed(Integer result,
                                                              ByteBuffer buffer) {
                                            buffer.flip();
                                            byte[] bytes = new byte[buffer
                                                    .remaining()];
                                            buffer.get(bytes);
                                            String body;
                                            try {
                                                body = new String(bytes,
                                                        "UTF-8");
                                                System.out.println("Now is : "
                                                        + body);
                                                latch.countDown();
                                            } catch (UnsupportedEncodingException e) {
                                                e.printStackTrace();
                                            }
                                        }

                                        @Override
                                        public void failed(Throwable exc,
                                                           ByteBuffer attachment) {
                                            try {
                                                client.close();
                                                latch.countDown();
                                            } catch (IOException e) {
                                                // ingnore on close
                                            }
                                        }
                                    });
                        }
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        try {
                            client.close();
                            latch.countDown();
                        } catch (IOException e) {
                            // ingnore on close
                        }
                    }
                });
    }

    @Override
    public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
        exc.printStackTrace();
        try {
            client.close();
            latch.countDown();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

(1) 这里跟Server很是相似,很少加描述了。

(2) 这里一样没有处理半包问题,所以本例只能做为功能测试,若是对代码进行压力或者性能测试,输出结果就会存在问题。

(3) 运行代码,使用jps 查看pid,再使用jstack -l pid 查看堆栈信息,发现JDK底层经过线程池来执行回调通知,异步回调类由sun.nio.ch.AsynchronousChannelGroupImpl实现。所以:异步Socket Channel是一个被动执行对象,这里不须要像NIO编程那样建立一个独立的I/O线程去处理读写操做,使用的JDK内部的线程池自动回调驱动读写操做,所以,基于新的NIO2.0编程会更加简单。

 5、四种IO的对比

不少人喜欢讲jdk1.4提供的NIO框架称为异步非阻塞I/O,可是它实际上只是非阻塞I/O,早起的JDK1.4和1.5 update10以前,JDK Selector还只能使用select/poll实现IO复用技术,不是异步的,在JDK1.5 UPDATE10和Linux core2.6之上才能使用epoll,而上层API不变,只是底层Selector由epoll实现,仍旧没有改变IO模型,仍是多路复用模型。

而NIO2.0才开始提供异步的套接字通道,是真正的异步I/O模型。

可是这里不对其叫法作纠结,1.4 1.7 都称为异步非阻塞IO也无所谓。
这里对这四种IO进行约定:

1. bio

2. 伪异步: bio+ 线程池,官方没有这种叫法..

3. NIO: jdk1.4多路复用

4. AIO: jdk1.7 aio 模型

相关文章
相关标签/搜索