https://colobu.com/2014/11/13/java-aio-introduction/
Java NIO (JSR 51)定义了Java new I/O API,提案2000年提出,2002年正式发布。 JDK 1.4起包含了相应的API实现。
JAVA NIO2 (JSR 203)定义了更多的 New I/O APIs, 提案2003提出,直到2011年才发布, 最终在JDK 7中才实现。
JSR 203除了提供更多的文件系统操作API(包括可插拔的自定义的文件系统), 还提供了对socket和文件的异步 I/O操作。 同时实现了JSR-51提案中的socket channel全部功能,包括对绑定, option配置的支持以及多播multicast的实现。
当前很多的项目还停留在JAVA NIO的实现上, 对JAVA AIO(asynchronous I/O)着墨不多。 本文整理了一些关于JAVA AIO的介绍,以及netty对AIO的支持。
以下内容只针对socket的I/O操作, 不涉及对文件的处理。
首先介绍以下I/O模型。
Unix定义了五种I/O模型, 下图是五种I/O模型的比较。
POSIX把I/O操作划分成两类:
传统的Java BIO (blocking I/O)是Unix I/O模型中的第一种。
Java NIO中如果不使用select模式,而只把channel配置成nonblocking则是第二种模型。
Java NIO select实现的是一种多路复用I/O。 底层使用epoll或者相应的poll系统调用, 参看我以前整理的一篇文章: java 和netty epoll实现
第四种模型JDK应该是没有实现。
Java NIO2增加了对第五种模型的支持,也就是AIO。
OpenJDK在不同平台上的AIO实现
在不同的操作系统上,AIO由不同的技术实现。
通用实现可以查看这里。
Windows上是使用完成接口(IOCP)实现,可以参看WindowsAsynchronousServerSocketChannelImpl,
其它平台上使用aio调用UnixAsynchronousServerSocketChannelImpl, UnixAsynchronousSocketChannelImpl, SolarisAsynchronousChannelProvider
常用类
AsynchronousServerSocketChannel
还实现了Asynchronous accept
AsynchronousDatagramChannel
异步channel API提供了两种方式监控/控制异步操作(connect,accept, read,write等)。第一种方式是返回java.util.concurrent.Future对象, 检查Future的状态可以得到操作是否完成还是失败,还是进行中, future.get阻塞当前进程。
第二种方式为操作提供一个回调参数java.nio.channels.CompletionHandler,这个回调类包含completed
,failed
两个方法。
channel的每个I/O操作都为这两种方式提供了相应的方法, 你可以根据自己的需要选择合适的方式编程。
下面以一个最简单的Time服务的例子演示如何使用异步I/O。 客户端连接到服务器后服务器就发送一个当前的时间字符串给客户端。 客户端毋须发送请求。 逻辑很简单。
Server实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.util.Date; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class Server { private static Charset charset = Charset.forName("US-ASCII"); private static CharsetEncoder encoder = charset.newEncoder(); public static void main(String[] args) throws Exception { AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4)); AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress("0.0.0.0", 8013)); server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { @Override public void completed(AsynchronousSocketChannel result, Void attachment) { server.accept(null, this); // 接受下一个连接 try { String now = new Date().toString(); ByteBuffer buffer = encoder.encode(CharBuffer.wrap(now + "\r\n")); //result.write(buffer, null, new CompletionHandler<Integer,Void>(){...}); //callback or Future<Integer> f = result.write(buffer); f.get(); System.out.println("sent to client: " + now); result.close(); } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Void attachment) { exc.printStackTrace(); } }); group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } } |
这个例子使用了两种方式。 accept
使用了回调的方式, 而发送数据使用了future
的方式。
Client实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
public class Client { public static void main(String[] args) throws Exception { AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); Future<Void> future = client.connect(new InetSocketAddress("127.0.0.1", 8013)); future.get(); ByteBuffer buffer = ByteBuffer.allocate(100); client.read(buffer, null, new CompletionHandler<Integer, Void>() { @Override public void completed(Integer result, Void attachment) { System.out.println("client received: " + new String(buffer.array())); } @Override public void failed(Throwable exc, Void attachment) { exc.printStackTrace(); try { client.close(); } catch (IOException e) { e.printStackTrace(); } } }); Thread.sleep(10000); } } |
客户端也使用了两种方式, connect
使用了future方式,而接收数据使用了回调的方式。
Netty也支持AIO并提供了相应的类: AioEventLoopGroup
,AioCompletionHandler
, AioServerSocketChannel
, AioSocketChannel
, AioSocketChannelConfig
。
其它使用方法和NIO类似。
官方文档Using as a generic library描述了两者的区别,主要还是友好性,扩展和性能的考虑。
http://zizihaier.iteye.com/blog/1767409也提到:
ByteBuffer主要有两个继承的类分别是:HeapByteBuffer和MappedByteBuffer。他们的不同之处在于HeapByteBuffer会在JVM的堆上分配内存资源,而MappedByteBuffer的资源则会由JVM之外的操作系统内核来分配。DirectByteBuffer继承了MappedByteBuffer,采用了直接内存映射的方式,将文件直接映射到虚拟内存,同时减少在内核缓冲区和用户缓冲区之间的调用,尤其在处理大文件方面有很大的性能优势。但是在使用内存映射的时候会造成文件句柄一直被占用而无法删除的情况,网上也有很多介绍。
Netty中使用ChannelBuffer来处理读写,之所以废弃ByteBuffer,官方说法是ChannelBuffer简单易用并且有性能方面的优势。在ChannelBuffer中使用ByteBuffer或者byte[]来存储数据。同样的,ChannelBuffer也提供了几个标记来控制读写并以此取代ByteBuffer的position和limit,分别是:
0 <= readerIndex <= writerIndex <= capacity,同时也有类似于mark的markedReaderIndex和markedWriterIndex。当写入buffer时,writerIndex增加,从buffer中读取数据时readerIndex增加,而不能超过writerIndex。有了这两个变量后,就不用每次写入buffer后调用flip()方法,方便了很多。
Netty的“零拷贝”主要体现在如下三个方面:
1) Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。 2) Netty提供了组合Buffer对象,可以聚合多个ByteBuffer对象,用户可以像操作一个Buffer那样方便的对组合Buffer进行操作,避免了传统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer。 3) Netty的文件传输采用了transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题。