Java NIO 是由 Java 1.4 引进的异步 IO.
Java NIO 由如下几个核心部分组成:java
IO 和 NIO 的区别主要体如今三个方面:数组
传统的 IO 是面向字节流或字符流的, 而在 NIO 中, 咱们抛弃了传统的 IO 流, 而是引入了 Channel 和 Buffer 的概念. 在 NIO 中, 我只能从 Channel 中读取数据到 Buffer 中或将数据从 Buffer 中写入到 Channel.
那么什么是 基于流 呢? 在通常的 Java IO 操做中, 咱们以流式的方式顺序地从一个 Stream 中读取一个或多个字节, 所以咱们也就不能随意改变读取指针的位置.
而 基于 Buffer 就显得有点不一样了. 咱们首先须要从 Channel 中读取数据到 Buffer 中, 当 Buffer 中有数据后, 咱们就能够对这些数据进行操做了. 不像 IO 那样是顺序操做, NIO 中咱们能够随意地读取任意位置的数据.缓存
Java 提供的各类 Stream 操做都是阻塞的, 例如咱们调用一个 read 方法读取一个文件的内容, 那么调用 read 的线程会被阻塞住, 直到 read 操做完成.
而 NIO 的非阻塞模式容许咱们非阻塞地进行 IO 操做. 例如咱们须要从网络中读取数据, 在 NIO 的非阻塞模式中, 当咱们调用 read 方法时, 若是此时有数据, 则 read 读取并返回; 若是此时没有数据, 则 read 直接返回, 而不会阻塞当前线程.服务器
selector 是 NIO 中才有的概念, 它是 Java NIO 之因此能够非阻塞地进行 IO 操做的关键.
经过 Selector, 一个线程能够监听多个 Channel 的 IO 事件, 当咱们向一个 Selector 中注册了 Channel 后, Selector 内部的机制就能够自动地为咱们不断地查询(select) 这些注册的 Channel 是否有已就绪的 IO 事件(例如可读, 可写, 网络链接完成等). 经过这样的 Selector 机制, 咱们就能够很简单地使用一个线程高效地管理多个 Channel 了.网络
一般来讲, 全部的 NIO 的 I/O 操做都是从 Channel 开始的. 一个 channel 相似于一个 stream.
java Stream 和 NIO Channel 对比dom
Channel 类型有:异步
这些通道涵盖了 UDP 和 TCP网络 IO以及文件 IO.
基本的 Channel 使用例子:socket
public static void main( String[] args ) throws Exception { RandomAccessFile aFile = new RandomAccessFile("/Users/xiongyongshun/settings.xml", "rw"); FileChannel inChannel = aFile.getChannel(); ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buf); while (bytesRead != -1) { buf.flip(); while(buf.hasRemaining()){ System.out.print((char) buf.get()); } buf.clear(); bytesRead = inChannel.read(buf); } aFile.close(); }
FileChannel 是操做文件的Channel, 咱们能够经过 FileChannel 从一个文件中读取数据, 也能够将数据写入到文件中.
注意
, FileChannel 不能设置为非阻塞模式.this
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw"); FileChannel inChannel = aFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buf);
String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); buf.put(newData.getBytes()); buf.flip(); while(buf.hasRemaining()) { channel.write(buf); }
当咱们对 FileChannel 的操做完成后, 必须将其关闭操作系统
channel.close();
long pos channel.position(); channel.position(pos +123);
咱们能够经过 channel.size()获取关联到这个 Channel 中的文件的大小. 注意, 这里返回的是文件的大小, 而不是 Channel 中剩余的元素个数.
channel.truncate(1024);
将文件的大小截断为1024字节.
咱们能够强制将缓存的未写入的数据写入到文件中:
channel.force(true);
SocketChannel 是一个客户端用来进行 TCP 链接的 Channel.
建立一个 SocketChannel 的方法有两种:
SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("http://example.com", 80));
socketChannel.close();
ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = socketChannel.read(buf);
若是 read()返回 -1, 那么表示链接中断了.
String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); buf.put(newData.getBytes()); buf.flip(); while(buf.hasRemaining()) { channel.write(buf); }
咱们能够设置 SocketChannel 为异步模式, 这样咱们的 connect, read, write 都是异步的了.
socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("http://example.com", 80)); while(! socketChannel.finishConnect() ){ //wait, or do something else... }
在异步模式中, 或许链接尚未创建, connect 方法就返回了, 所以咱们须要检查当前是不是链接到了主机, 所以经过一个 while 循环来判断.
在异步模式下, 读写的方式是同样的.
在读取时, 由于是异步的, 所以咱们必须检查 read 的返回值, 来判断当前是否读取到了数据.
ServerSocketChannel 顾名思义, 是用在服务器为端的, 能够监听客户端的 TCP 链接, 例如:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9999)); while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); //do something with socketChannel... }
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.close();
咱们可使用ServerSocketChannel.accept()方法来监听客户端的 TCP 链接请求, accept()方法会阻塞, 直到有链接到来, 当有链接时, 这个方法会返回一个 SocketChannel 对象:
while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); //do something with socketChannel... }
在非阻塞模式下, accept()是非阻塞的, 所以若是此时没有链接到来, 那么 accept()方法会返回null:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9999)); serverSocketChannel.configureBlocking(false); while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); if(socketChannel != null){ //do something with socketChannel... } }
DatagramChannel 是用来处理 UDP 链接的.
DatagramChannel channel = DatagramChannel.open(); channel.socket().bind(new InetSocketAddress(9999));
ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); channel.receive(buf);
String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); buf.put(newData.getBytes()); buf.flip(); int bytesSent = channel.send(buf, new InetSocketAddress("example.com", 80));
由于 UDP 是非链接的, 所以这个的 connect 并非向 TCP 同样真正意义上的链接, 而是它会讲 DatagramChannel 锁住, 所以咱们仅仅能够从指定的地址中读取或写入数据.
channel.connect(new InetSocketAddress("example.com", 80));
当咱们须要与 NIO Channel 进行交互时, 咱们就须要使用到 NIO Buffer, 即数据从 Buffer读取到 Channel 中, 而且从 Channel 中写入到 Buffer 中.
实际上, 一个 Buffer 其实就是一块内存区域, 咱们能够在这个内存区域中进行数据的读写. NIO Buffer 实际上是这样的内存块的一个封装, 并提供了一些操做方法让咱们可以方便地进行数据的读写.
Buffer 类型有:
这些 Buffer 覆盖了能从 IO 中传输的全部的 Java 基本数据类型.
使用 NIO Buffer 的步骤以下:
当咱们将数据写入到 Buffer 中时, Buffer 会记录咱们已经写了多少的数据, 当咱们须要从 Buffer 中读取数据时, 必须调用 Buffer.flip()将 Buffer 切换为读模式.
一旦读取了全部的 Buffer 数据, 那么咱们必须清理 Buffer, 让其重新可写, 清理 Buffer 能够调用 Buffer.clear() 或 Buffer.compact().
例如:
/** * @author xiongyongshun * @Email yongshun1228@gmail.com * @version 1.0 * @created 16/8/1 13:13 */ public class Test { public static void main(String[] args) { IntBuffer intBuffer = IntBuffer.allocate(2); intBuffer.put(12345678); intBuffer.put(2); intBuffer.flip(); System.err.println(intBuffer.get()); System.err.println(intBuffer.get()); } }
上述中, 咱们分配两个单位大小的 IntBuffer, 所以它能够写入两个 int 值.
咱们使用 put 方法将 int 值写入, 而后使用 flip 方法将 buffer 转换为读模式, 而后连续使用 get 方法从 buffer 中获取这两个 int 值.
每当调用一次 get 方法读取数据时, buffer 的读指针都会向前移动一个单位长度(在这里是一个 int 长度)
一个 Buffer 有三个属性:
其中 position 和 limit 的含义与 Buffer 处于读模式或写模式有关, 而 capacity 的含义与 Buffer 所处的模式无关.
一个内存块会有一个固定的大小, 即容量(capacity), 咱们最多写入capacity 个单位的数据到 Buffer 中, 例如一个 DoubleBuffer, 其 Capacity 是100, 那么咱们最多能够写入100个 double 数据.
当从一个 Buffer 中写入数据时, 咱们是从 Buffer 的一个肯定的位置(position)开始写入的. 在最初的状态时, position 的值是0. 每当咱们写入了一个单位的数据后, position 就会递增一.
当咱们从 Buffer 中读取数据时, 咱们也是从某个特定的位置开始读取的. 当咱们调用了 filp()方法将 Buffer 从写模式转换到读模式时, position 的值会自动被设置为0, 每当咱们读取一个单位的数据, position 的值递增1.
position 表示了读写操做的位置指针.
limit - position 表示此时还能够写入/读取多少单位的数据.
例如在写模式, 若是此时 limit 是10, position 是2, 则表示已经写入了2个单位的数据, 还能够写入 10 - 2 = 8 个单位的数据.
public class Test { public static void main(String args[]) { IntBuffer intBuffer = IntBuffer.allocate(10); intBuffer.put(10); intBuffer.put(101); System.err.println("Write mode: "); System.err.println("\tCapacity: " + intBuffer.capacity()); System.err.println("\tPosition: " + intBuffer.position()); System.err.println("\tLimit: " + intBuffer.limit()); intBuffer.flip(); System.err.println("Read mode: "); System.err.println("\tCapacity: " + intBuffer.capacity()); System.err.println("\tPosition: " + intBuffer.position()); System.err.println("\tLimit: " + intBuffer.limit()); } }
这里咱们首先写入两个 int 值, 此时 capacity = 10, position = 2, limit = 10.
而后咱们调用 flip 转换为读模式, 此时 capacity = 10, position = 0, limit = 2;
为了获取一个 Buffer 对象, 咱们首先须要分配内存空间. 每一个类型的 Buffer 都有一个 allocate()方法, 咱们能够经过这个方法分配 Buffer:
ByteBuffer buf = ByteBuffer.allocate(48);
这里咱们分配了48 * sizeof(Byte)字节的内存空间.
CharBuffer buf = CharBuffer.allocate(1024);
这里咱们分配了大小为1024个字符的 Buffer, 即 这个 Buffer 能够存储1024 个 Char, 其大小为 1024 * 2 个字节.
Direct Buffer:
Non-Direct Buffer:
int bytesRead = inChannel.read(buf); //read into buffer. buf.put(127);
//read from buffer into channel. int bytesWritten = inChannel.write(buf); byte aByte = buf.get();
Buffer.rewind()方法能够重置 position 的值为0, 所以咱们能够从新读取/写入 Buffer 了.
若是是读模式, 则重置的是读模式的 position, 若是是写模式, 则重置的是写模式的 position.
例如:
/** * @author xiongyongshun * @Email yongshun1228@gmail.com * @version 1.0 * @created 16/8/1 13:13 */ public class Test { public static void main(String[] args) { IntBuffer intBuffer = IntBuffer.allocate(2); intBuffer.put(1); intBuffer.put(2); System.err.println("position: " + intBuffer.position()); intBuffer.rewind(); System.err.println("position: " + intBuffer.position()); intBuffer.put(1); intBuffer.put(2); System.err.println("position: " + intBuffer.position()); intBuffer.flip(); System.err.println("position: " + intBuffer.position()); intBuffer.get(); intBuffer.get(); System.err.println("position: " + intBuffer.position()); intBuffer.rewind(); System.err.println("position: " + intBuffer.position()); } }
rewind() 主要针对于读模式. 在读模式时, 读取到 limit 后, 能够调用 rewind() 方法, 将读 position 置为0.
咱们能够经过调用 Buffer.mark()将当前的 position 的值保存起来, 随后能够经过调用 Buffer.reset()方法将 position 的值回复回来.
例如:
/** * @author xiongyongshun * @Email yongshun1228@gmail.com * @version 1.0 * @created 16/8/1 13:13 */ public class Test { public static void main(String[] args) { IntBuffer intBuffer = IntBuffer.allocate(2); intBuffer.put(1); intBuffer.put(2); intBuffer.flip(); System.err.println(intBuffer.get()); System.err.println("position: " + intBuffer.position()); intBuffer.mark(); System.err.println(intBuffer.get()); System.err.println("position: " + intBuffer.position()); intBuffer.reset(); System.err.println("position: " + intBuffer.position()); System.err.println(intBuffer.get()); } }
这里咱们写入两个 int 值, 而后首先读取了一个值. 此时读 position 的值为1.
接着咱们调用 mark() 方法将当前的 position 保存起来(在读模式, 所以保存的是读的 position), 而后再次读取, 此时 position 就是2了.
接着使用 reset() 恢复原来的读 position, 所以读 position 就为1, 能够再次读取数据.
方法源码:
public final Buffer flip() { limit = position; position = 0; mark = -1; return this; }
Buffer 的读/写模式共用一个 position 和 limit 变量.
当从写模式变为读模式时, 原先的 写 position 就变成了读模式的 limit.
方法源码
public final Buffer rewind() { position = 0; mark = -1; return this; }
rewind, 即倒带, 这个方法仅仅是将 position 置为0.
方法源码:
public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; }
根据源码咱们能够知道, clear 将 positin 设置为0, 将 limit 设置为 capacity.
clear 方法使用场景:
IntBuffer intBuffer = IntBuffer.allocate(2); intBuffer.flip(); System.err.println("position: " + intBuffer.position()); System.err.println("limit: " + intBuffer.limit()); System.err.println("capacity: " + intBuffer.capacity()); // 这里不能读, 由于 limit == position == 0, 没有数据. //System.err.println(intBuffer.get()); intBuffer.clear(); System.err.println("position: " + intBuffer.position()); System.err.println("limit: " + intBuffer.limit()); System.err.println("capacity: " + intBuffer.capacity()); // 这里能够读取数据了, 由于 clear 后, limit == capacity == 2, position == 0, // 即便咱们没有写入任何的数据到 buffer 中. System.err.println(intBuffer.get()); // 读取到0 System.err.println(intBuffer.get()); // 读取到0
咱们能够经过 equals() 或 compareTo() 方法比较两个 Buffer, 当且仅当以下条件知足时, 两个 Buffer 是相等的:
经过上述条件咱们能够发现, 比较两个 Buffer 时, 并非 Buffer 中的每一个元素都进行比较, 而是比较 Buffer 中剩余的元素.
Selector 容许一个单一的线程来操做多个 Channel. 若是咱们的应用程序中使用了多个 Channel, 那么使用 Selector 很方便的实现这样的目的, 可是由于在一个线程中使用了多个 Channel, 所以也会形成了每一个 Channel 传输效率的下降.
使用 Selector 的图解以下:
为了使用 Selector, 咱们首先须要将 Channel 注册到 Selector 中, 随后调用 Selector 的 select()方法, 这个方法会阻塞, 直到注册在 Selector 中的 Channel 发送可读写事件. 当这个方法返回后, 当前的这个线程就能够处理 Channel 的事件了.
经过 Selector.open()方法, 咱们能够建立一个选择器:
Selector selector = Selector.open();
为了使用选择器管理 Channel, 咱们须要将 Channel 注册到选择器中:
channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
注意
, 若是一个 Channel 要注册到 Selector 中, 那么这个 Channel 必须是非阻塞的, 即channel.configureBlocking(false);
由于 Channel 必需要是非阻塞的, 所以 FileChannel 是不可以使用选择器的, 由于 FileChannel 都是阻塞的.
注意到, 在使用 Channel.register()方法时, 第二个参数指定了咱们对 Channel 的什么类型的事件感兴趣, 这些事件有:
一个 Channel发出一个事件也能够称为** 对于某个事件, Channel 准备好了. 所以一个 Channel 成功链接到了另外一个服务器也能够被称为 connect ready.
咱们可使用或运算|**来组合多个事件, 例如:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
注意, 一个 Channel 仅仅能够被注册到一个 Selector 一次, 若是将 Channel 注册到 Selector 屡次, 那么其实就是至关于更新 SelectionKey 的 interest set
. 例如:
channel.register(selector, SelectionKey.OP_READ); channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
上面的 channel 注册到同一个 Selector 两次了, 那么第二次的注册其实就是至关于更新这个 Channel 的 interest set 为 SelectionKey.OP_READ | SelectionKey.OP_WRITE.
如上所示, 当咱们使用 register 注册一个 Channel 时, 会返回一个 SelectionKey 对象, 这个对象包含了以下内容:
咱们能够经过以下方式获取 interest set:
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
表明了 Channel 所准备好了的操做.
咱们能够像判断 interest set 同样操做 Ready set, 可是咱们还可使用以下方法进行判断:
int readySet = selectionKey.readyOps(); selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
咱们能够经过 SelectionKey 获取相对应的 Channel 和 Selector:
Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector();
咱们能够在selectionKey中附加一个对象:
selectionKey.attach(theObject); Object attachedObj = selectionKey.attachment();
或者在注册时直接附加:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
咱们能够经过 Selector.select()方法获取对某件事件准备好了的 Channel, 即若是咱们在注册 Channel 时, 对其的可写事件感兴趣, 那么当 select()返回时, 咱们就能够获取 Channel 了.
注意
, select()方法返回的值表示有多少个 Channel 可操做.
若是 select()方法返回值表示有多个 Channel 准备好了, 那么咱们能够经过 Selected key set 访问这个 Channel:
Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); }
注意, 在每次迭代时, 咱们都调用 "keyIterator.remove()" 将这个 key 从迭代器中删除, 由于 select() 方法仅仅是简单地将就绪的 IO 操做放到 selectedKeys 集合中, 所以若是咱们从 selectedKeys 获取到一个 key, 可是没有将它删除, 那么下一次 select 时, 这个 key 所对应的 IO 事件还在 selectedKeys 中.
例如此时咱们收到 OP_ACCEPT 通知, 而后咱们进行相关处理, 可是并无将这个 Key 从 SelectedKeys 中删除, 那么下一次 select() 返回时 咱们还能够在 SelectedKeys 中获取到 OP_ACCEPT 的 key.
注意, 咱们能够动态更改 SekectedKeys 中的 key 的 interest set.
例如在 OP_ACCEPT 中, 咱们能够将 interest set 更新为 OP_READ, 这样 Selector 就会将这个 Channel 的 读 IO 就绪事件包含进来了.
咱们再来回顾一下 Java NIO 中的 Selector 的使用流程:
当调用了 Selector.close()方法时, 咱们实际上是关闭了 Selector 自己而且将全部的 SelectionKey 失效, 可是并不会关闭 Channel.
/** * @author xiongyongshun * @Email yongshun1228@gmail.com * @version 1.0 * @created 16/8/1 13:13 */ public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000; public static void main(String args[]) throws Exception { // 打开服务端 Socket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开 Selector Selector selector = Selector.open(); // 服务端 Socket 监听8080端口, 并配置为非阻塞模式 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); serverSocketChannel.configureBlocking(false); // 将 channel 注册到 selector 中. // 一般咱们都是先注册一个 OP_ACCEPT 事件, 而后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ // 注册到 Selector 中. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 经过调用 select 方法, 阻塞地等待 channel I/O 可操做 if (selector.select(TIMEOUT) == 0) { System.out.print("."); continue; } // 获取 I/O 操做就绪的 SelectionKey, 经过 SelectionKey 能够知道哪些 Channel 的哪类 I/O 操做已经就绪. Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { // 当获取一个 SelectionKey 后, 就要将它删除, 表示咱们已经对这个 IO 事件进行了处理. keyIterator.remove(); SelectionKey key = keyIterator.next(); if (key.isAcceptable()) { // 当 OP_ACCEPT 事件到来时, 咱们就有从 ServerSocketChannel 中获取一个 SocketChannel, // 表明客户端的链接 // 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel. // 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel. SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); //在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中. // 注意, 这里咱们若是没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回. clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE)); } if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clientChannel.read(buf); if (bytesRead == -1) { clientChannel.close(); } else if (bytesRead > 0) { key.interestOps(OP_READ | SelectionKey.OP_WRITE); System.out.println("Get data length: " + bytesRead); } } if (key.isValid() && key.isWritable()) { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clientChannel = (SocketChannel) key.channel(); clientChannel.write(buf); if (!buf.hasRemaining()) { key.interestOps(OP_READ); } buf.compact(); } } } } }