NIO(1、概述)
NIO(2、Buffer)
NIO(3、Channel)
NIO(4、Selector)html
上文说了描述了Buffer的实现机制,那么这个章节就主要描述数据是如何进入缓冲区的,而且又是如何从缓冲区流出的。java
这张图只是简单归纳了Channel的类图,固然,Channel的设计远比这个更复杂:例如SelectableChannel还有SocketChannel和ServerSocketChannel的实现,NetworkChannel继承Channel并抽象了更多的方法;例如FileChannel,除了继承AbstractInterruptibleChannel以外,还实现了GatheringByteChannel和ScatteringByteChannel接口。api
Channel
咱们能够看到,Channel接口自己定义了 close() 和 isOpen() 方法,在继承Channel的接口中,又分别抽象了读通道(ReadableByteChannel)、写通道(WritableByteChannel)及可中断的异步通道(InterruptibleChannel)接口。读写通道天然没必要说,下文也会有介绍。数组
InterruptibleChannel
这里说下InterruptibleChannel,这是一个能够被中断的异步通道,继承了 close() 方法。当一个线程在I/O被阻塞时,另外一个线程执行了close()方法,那么阻塞的线程会抛出 AsynchronousCloseException异常。当一个线程在I/O被阻塞时,另外一个线程调用阻塞的线程中断(interrupt())方法,那么将会抛出ClosedByInterruptException异常。浏览器
AbstractInterruptibleChannel
AbstractInterruptibleChannel抽象类,这是全部可中断通道实现的基类,咱们能够看到,FileChannel正是直接继承自它,后文会介绍的SocketChannel和ServerSocketChannel继承自AbstractSelectableChannel抽象类,而AbstractSelectableChannel又继承自SelectableChannel抽象类,SelectableChannel上图就能够看出,一样继承AbstractInterruptibleChannel。咱们不得不想一下,为何FileChannel直接继承AbstractInterruptibleChannel抽象类,而另外一个经常使用的好比SocketChannel却须要继承自SelectableChannel抽象类?此时咱们须要只了解SelectableChannel是什么问题就迎刃而解了。安全
SelectableChannel
第一个章节在概述中提过一个名词“多路复用”,当时也做了简单描述:一个线程经过选择器处理和管理多个通道,利用一个线程的资源去处理多个链接。SelectableChannel正是扮演着其中的一个重要角色。而相似SocketChannel同样实现NetworkChannel接口的通道,这种网络数据的传输尤为在高并发的压力下,让CPU利用率真正体如今处理数据上,而不是频繁上下文切换的开销,使用这种机制能明显提高处理性能,而FileChannel这种对文件操做是绝对不会使用到这种机制的。另外,SelectableChannel在阻塞模型中,每个I/O操做都会阻塞到它完成为止,在非阻塞模型中,是不会出现阻塞的状况,一样返回的字节数可能少于要求或者干脆没有,是否阻塞咱们能经过 isBlocking() 方法查看。
SelectableChannel是经过选择器(Selector)复用的通道,为了配合与选择器一块儿使用,可以使用 register() 方法,这个方法会返回一个SelectionKey对象,表示已经在选择器里注册了,这个通道会一直保持到注销为止,同时也包括已经分配在这个选择器上的这个通道的资源。通常状况下,通道本身不能直接在通道上注销,咱们能够调用以前注册时返回的SelectionKey对象的 cancel() 方法,能够显式注销。另外,阅读代码咱们发现,几乎全部的实现方法都有同步控制,因此,在多个并发线程下使用是安全的。服务器
Channel,它既是缓冲区数据的入口,也是其数据的出口。
在上面的类图咱们能够看到,Channel分别抽象了ReadableByteChannel和WritableByteChannel接口,两个接口各自定义了 read() 和 write() 方法,读取和写入ByteBuffer对象。并在这个基础上又进一步定义了ScatteringByteChannel和GatheringByteChannel接口,一样,这两个接口一样存在继承自ReadableByteChannel和WritableByteChannel接口的read() 和 write() 方法,此外,还分别重载了 read() 和 write() 方法,增长了读取或写入ByteBuffer数组。
网络
ByteBuffer buffer1 = ByteBuffer.allocate(1024); ByteBuffer buffer2 = ByteBuffer.allocate(1024); ByteBuffer[] buffers = {buffer1, buffer2}; channel.read(buffers);
ByteBuffer buffer1 = ByteBuffer.allocate(1024); ByteBuffer buffer2 = ByteBuffer.allocate(1024); ByteBuffer[] buffers = {buffer1, buffer2}; channel.write(buffers);
FileChannel的 transferFrom() 和 transferTo() 两个方法实现了通道之间的数据传输,当有一方是FileChannel时,另外一方实现了 ReadableByteChannel和WritableByteChannel接口的通道就可以与FileChannel传输数据。目前为止,只有文件通道(FileChannel)可以双向传输。并发
FileChannel fromChannel = fromFile.getChannel(); FileChannel toChannel = toFile.getChannel(); // transferFrom toChannel.transferFrom(fromChannel, position, count); // transferTo fromChannel.transferTo(position,count,toChannel);
咱们在讲Buffer的时候就已经说了position,意指读写位置,count指的是数据大小。oracle
建立SocketChannel仅须要调用它的 open() 方法:
SocketChannel channel = SocketChannel.open();
事实上,可能在实际使用的时候,咱们也会在的句柄对象的处理方法里,使用SelectionKey的 channel() 方法来获取,或者ServerSocketChannel对象的 accept() 方法来获取。
SelectionKey key = ... //接受消息处理 SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); //或者 SocketChannel channel = (SocketChannel) key.channel();
在咱们手动 open() 打开这个socket以后,其实还未链接,这时候咱们对这个通道进行I/O操做会抛出NotYetConnectedException。咱们可使用 connect() 方法来链接通道的socket,与此同时,也可使用 isConnected() 判断是否已链接。若是这个Socket在非阻塞模型中,就须要 finishConnect() 来肯定链接已完成,也能够经过 isConnectionPending() 来测定该通道的链接是否正在进行中。
// 设置非阻塞 channel.configureBlocking(false); if(channel.isConnected()) { channel.connect(new InetSocketAddress(8080)); while (channel.finishConnect()){ //connected - do something } }
固然,关于channel的读写操做,在讲Scatter/Gather时已经讲过,这里再也不赘述。
每次使用完成以后,都会调用 close() 方法关闭。须要说明的是,这里的 close() 方法支持异步关闭,当一个线程执行 close() 方法同时,另外一个线程的在同一个通道上执行读操做时会被阻塞,而后这个读取操做不会返回任何数据,只会返回-1标识。固然,另外一个线程若是是写操做的话也一样会被阻塞,不一样的是会抛出 AsynchronousCloseException 异常。咱们在上文中描述 InterruptibleChannel 这个可中断通道接口的时候也提到这个问题。
channel.close();
建立ServerSocketChannel与SocketChannel相似,不一样的是ServerSocketChannel是监听套接字链接。因此在建立它的时候,须要将它绑定,若是没有绑定就执行 accept() 方法,那么会抛出 NotYetBoundException 异常。
ServerSocketChannel channel = ServerSocketChannel.open(); channel.socket().bind(new InetSocketAddress(8080));
监听到的链接可使用 accept() 方法来返回该链接的 SocketChannel。执行 accept() 方法会一直阻塞直到有链接到达。
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
下面是一段简单的ServerSocketChannel和SocketChannel应用,能够看到是如何使用这两个类的。当浏览器输入localhost:8080的时候,会在控制台打印出这个链接的请求信息。
public static void main(String[] args) throws IOException { ServerSocketChannel channel = ServerSocketChannel.open(); channel.socket().bind(new InetSocketAddress(8080)); //设置非阻塞 channel.configureBlocking(false); //注册 Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_ACCEPT); //处理器 Handler handler = new Handler(1024); while (true) { //等待请求,每次等待阻塞5s,5s后线程继续向下执行,若是传入0或者不传参数将一直阻塞 if (selector.select(5000) == 0) { continue; } //获取待处理的SelectionKey Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); //当接受到请求 if (key.isAcceptable()) { handler.handleAccept(key); } try { //读数据 if (key.isReadable()) { handler.handleRead(key); } } catch (IOException e) { keyIterator.remove(); e.printStackTrace(); } keyIterator.remove(); } } } private static class Handler { private int bufferSize = 1024; private String localCharset = "UTF-8"; public Handler() { } public Handler(int bufferSize) { this.bufferSize = bufferSize; } public Handler(String localCharset) { this.localCharset = localCharset; } public Handler(int bufferSize, String localCharset) { this.bufferSize = bufferSize; this.localCharset = localCharset; } public void handleAccept(SelectionKey key) { try { SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); channel.configureBlocking(false); channel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize)); } catch (IOException e) { e.printStackTrace(); } } public void handleRead(SelectionKey key) throws IOException { //获取channel SocketChannel channel = (SocketChannel) key.channel(); //获取buffer 重置 ByteBuffer buffer = (ByteBuffer) key.attachment(); buffer.clear(); if (channel.read(buffer) == -1) { channel.close(); } else { //转为读状态 buffer.flip(); String receivedString = Charset.forName(localCharset) .newDecoder().decode(buffer).toString(); System.out.printf("接受到客户端数据" + receivedString); //返回数据给客户端 String sendString = "接受数据:" + receivedString; buffer = ByteBuffer.wrap(sendString.getBytes(localCharset)); channel.write(buffer); channel.close(); } } }