上面两篇文章中,咱们分别讲解了阻塞式同步IO、非阻塞式同步IO、多路复用IO 这三种IO模型,以及JAVA对于这三种IO模型的支持。重点说明了IO模型是由操做系统提供支持,且这三种IO模型都是同步IO,都是采用的“应用程序不询问我,我毫不会主动通知”的方式。 java
异步IO则是采用“订阅-通知”模式:即应用程序向操做系统注册IO监听,而后继续作本身的事情。当操做系统发生IO事件,而且准备好数据后,在主动通知应用程序,触发相应的函数: 程序员
和同步IO同样,异步IO也是由操做系统进行支持的。微软的windows系统提供了一种异步IO技术:IOCP(I/O Completion Port,I/O完成端口); apache
Linux下因为没有这种异步IO技术,因此使用的是epoll(上文介绍过的一种多路复用IO技术的实现)对异步IO进行模拟。 json
一样的犹如《架构设计:系统间通讯(4)——IO通讯模型和JAVA实践 中篇》中对JAVA NIO框架的实现分析,这里也没有将JAVA AIO框架全部的实现类画完,只是经过这个结构分析要告诉各位读者JAVA AIO中类设计和操做系统的相关性 windows
在文中咱们一再说明JAVA AIO框架在windows下使用windows IOCP技术,在Linux下使用epoll多路复用IO技术模拟异步IO,这个从JAVA AIO框架的部分类设计上就能够看出来。例如框架中,在Windows下负责实现套接字通道的具体类是 “sun.nio.ch.WindowsAsynchronousSocketChannelImpl”,其引用的IOCP类型文档注释如是: 缓存
/**
* Windows implementation of AsynchronousChannelGroup encapsulating an I/O
* completion port.
*/ 服务器
若是您感兴趣,固然能够去看看所有完整代码(建议从“java.nio.channels.spi.AsynchronousChannelProvider”这个类看起)。 网络
下面,咱们经过一个代码示例,来说解JAVA AIO框架的具体使用,先上代码,在针对代码编写和运行中的要点进行讲解: 架构
package testASocket; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; /** * JAVA AIO框架测试。请必定将 * 《架构设计:系统间通讯(4)——IO通讯模型和JAVA实践 中篇》看了后再看本篇测试代码。 * 这样对您理解代码的关键点很是有益。 * @author yinwenjie */ public class SocketServer { static { BasicConfigurator.configure(); } private static final Object waitObject = new Object(); /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { /* * 对于使用的线程池技术,我必定要多说几句 * 一、Executors是线程池生成工具,经过这个工具咱们能够很轻松的生成“固定大小的线程池”、“调度池”、“可伸缩线程数量的池”。具体请看API Doc * 二、固然您也能够经过ThreadPoolExecutor直接生成池。 * 三、这个线程池是用来获得操做系统的“IO事件通知”的,不是用来进行“获得IO数据后的业务处理的”。要进行后者的操做,您能够再使用一个池(最好不要混用) * 四、您也能够不使用线程池(不推荐),若是决定不使用线程池,直接AsynchronousServerSocketChannel.open()就好了。 * */ ExecutorService threadPool = Executors.newFixedThreadPool(20); AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(threadPool); final AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(group); //设置要监听的端口“0.0.0.0”表明本机全部IP设备 serverSocket.bind(new InetSocketAddress("0.0.0.0", 83)); //为AsynchronousServerSocketChannel注册监听,注意只是为AsynchronousServerSocketChannel通道注册监听 //并不包括为 随后客户端和服务器 socketchannel通道注册的监听 serverSocket.accept(null, new ServerSocketChannelHandle(serverSocket)); //等待,以便观察现象(这个和要讲解的原理自己没有任何关系,只是为了保证守护线程不会退出) synchronized(waitObject) { waitObject.wait(); } } } /** * 这个处理器类,专门用来响应 ServerSocketChannel 的事件。 * 还记得咱们在《架构设计:系统间通讯(4)——IO通讯模型和JAVA实践 中篇》中所提到的内容吗?ServerSocketChannel只有一种事件:接受客户端的链接 * @author yinwenjie */ class ServerSocketChannelHandle implements CompletionHandler<AsynchronousSocketChannel, Void> { /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(ServerSocketChannelHandle.class); private AsynchronousServerSocketChannel serverSocketChannel; /** * @param serverSocketChannel */ public ServerSocketChannelHandle(AsynchronousServerSocketChannel serverSocketChannel) { this.serverSocketChannel = serverSocketChannel; } /** * 注意,咱们分别观察 this、socketChannel、attachment三个对象的id。 * 来观察不一样客户端链接到达时,这三个对象的变化,以说明ServerSocketChannelHandle的监听模式 */ @Override public void completed(AsynchronousSocketChannel socketChannel, Void attachment) { ServerSocketChannelHandle.LOGGER.info("completed(AsynchronousSocketChannel result, ByteBuffer attachment)"); //每次都要从新注册监听(一次注册,一次响应),可是因为“文件状态标示符”是独享的,因此不须要担忧有“漏掉的”事件 this.serverSocketChannel.accept(attachment, this); //为这个新的socketChannel注册“read”事件,以便操做系统在收到数据并准备好后,主动通知应用程序 //在这里,因为咱们要将这个客户端屡次传输的数据累加起来一块儿处理,因此咱们将一个stringbuffer对象做为一个“附件”依附在这个channel上 // ByteBuffer readBuffer = ByteBuffer.allocate(50); socketChannel.read(readBuffer, new StringBuffer(), new SocketChannelReadHandle(socketChannel , readBuffer)); } /* (non-Javadoc) * @see java.nio.channels.CompletionHandler#failed(java.lang.Throwable, java.lang.Object) */ @Override public void failed(Throwable exc, Void attachment) { ServerSocketChannelHandle.LOGGER.info("failed(Throwable exc, ByteBuffer attachment)"); } } /** * 负责对每个socketChannel的数据获取事件进行监听。<p> * * 重要的说明:一个socketchannel都会有一个独立工做的SocketChannelReadHandle对象(CompletionHandler接口的实现), * 其中又都将独享一个“文件状态标示”对象FileDescriptor、 * 一个独立的由程序员定义的Buffer缓存(这里咱们使用的是ByteBuffer)、 * 因此不用担忧在服务器端会出现“窜对象”这种状况,由于JAVA AIO框架已经帮您组织好了。<p> * * 可是最重要的,用于生成channel的对象:AsynchronousChannelProvider是单例模式,不管在哪组socketchannel, * 对是一个对象引用(但这不要紧,由于您不会直接操做这个AsynchronousChannelProvider对象)。 * @author yinwenjie */ class SocketChannelReadHandle implements CompletionHandler<Integer, StringBuffer> { /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(SocketChannelReadHandle.class); private AsynchronousSocketChannel socketChannel; /** * 专门用于进行这个通道数据缓存操做的ByteBuffer<br> * 固然,您也能够做为CompletionHandler的attachment形式传入。<br> * 这是,在这段示例代码中,attachment被咱们用来记录全部传送过来的Stringbuffer了。 */ private ByteBuffer byteBuffer; public SocketChannelReadHandle(AsynchronousSocketChannel socketChannel , ByteBuffer byteBuffer) { this.socketChannel = socketChannel; this.byteBuffer = byteBuffer; } /* (non-Javadoc) * @see java.nio.channels.CompletionHandler#completed(java.lang.Object, java.lang.Object) */ @Override public void completed(Integer result, StringBuffer historyContext) { //若是条件成立,说明客户端主动终止了TCP套接字,这时服务端终止就能够了 if(result == -1) { try { this.socketChannel.close(); } catch (IOException e) { SocketChannelReadHandle.LOGGER.error(e); } return; } SocketChannelReadHandle.LOGGER.info("completed(Integer result, Void attachment) : 而后咱们来取出通道中准备好的值"); /* * 实际上,因为咱们从Integer result知道了本次channel从操做系统获取数据总长度 * 因此实际上,咱们不须要切换成“读模式”的,可是为了保证编码的规范性,仍是建议进行切换。 * * 另外,不管是JAVA AIO框架仍是JAVA NIO框架,都会出现“buffer的总容量”小于“当前从操做系统获取到的总数据量”, * 但区别是,JAVA AIO框架中,咱们不须要专门考虑处理这样的状况,由于JAVA AIO框架已经帮咱们作了处理(作成了屡次通知) * */ this.byteBuffer.flip(); byte[] contexts = new byte[1024]; this.byteBuffer.get(contexts, 0, result); this.byteBuffer.clear(); try { String nowContent = new String(contexts , 0 , result , "UTF-8"); historyContext.append(nowContent); SocketChannelReadHandle.LOGGER.info("================目前的传输结果:" + historyContext); } catch (UnsupportedEncodingException e) { SocketChannelReadHandle.LOGGER.error(e); } //若是条件成立,说明尚未接收到“结束标记” if(historyContext.indexOf("over") == -1) { return; } //========================================================================= // 和上篇文章的代码相同,咱们以“over”符号做为客户端完整信息的标记 //========================================================================= SocketChannelReadHandle.LOGGER.info("=======收到完整信息,开始处理业务========="); historyContext = new StringBuffer(); //还要继续监听(一次监听一次通知) this.socketChannel.read(this.byteBuffer, historyContext, this); } /* (non-Javadoc) * @see java.nio.channels.CompletionHandler#failed(java.lang.Throwable, java.lang.Object) */ @Override public void failed(Throwable exc, StringBuffer historyContext) { SocketChannelReadHandle.LOGGER.info("=====发现客户端异常关闭,服务器将关闭TCP通道"); try { this.socketChannel.close(); } catch (IOException e) { SocketChannelReadHandle.LOGGER.error(e); } } }
注意在JAVA NIO框架中,咱们说到了一个重要概念“selector”(选择器)。它负责代替应用查询中全部已注册的通道到操做系统中进行IO事件轮询、管理当前注 册的通道集合,定位发生事件的通道等操操做;可是在JAVA AIO框架中,因为应用程序不是“轮询”方式,而是订阅-通知方式,因此再也不须要“selector”(选择器)了,改由channel通道直接到操做系统注册监听。 并发
JAVA AIO框架中,只实现了两种网络IO通道“AsynchronousServerSocketChannel”(服务器监听通道)、 “AsynchronousSocketChannel”(socket套接字通道)。可是不管哪一种通道他们都有独立的fileDescriptor(文 件标识符)、attachment(附件,附件可使任意对象,相似“通道上下文”),并被独立的SocketChannelReadHandle类实例 引用。咱们经过debug操做来看看它们的引用结构:
在测试过程当中,咱们启动了两个客户端(客户端用什么语言来写都行,用阻塞或者非阻塞方式也都行,只要是支持 TCP Socket套接字的就行。若是您非要看看客户端是怎么写的,您能够参见个人《架构设计:系统间通讯(3)——IO通讯模型和JAVA实践 上篇》这篇文章中的客户端代码示例),而后咱们观察服务器端对这两个客户端通道的处理状况:
能够看到,在服务器端分别为客户端1和客户端2建立的两个WindowsAsynchronousSocketChannelImpl对象为:
客户端1:WindowsAsynchronousSocketChannelImpl:760 | FileDescriptor:762
客户端2:WindowsAsynchronousSocketChannelImpl:792 | FileDescriptor:797
接下来,咱们让两个客户端发送信息到服务器端,并观察服务器端的处理状况。客户端1发来的消息和客户端2发来的消息,在服务器端的处理状况以下图所示:
客户端1:WindowsAsynchronousSocketChannelImpl:760 | FileDescriptor:762 | SocketChannelReadHandle:803 | HeapByteBuffer:808
客户端2:WindowsAsynchronousSocketChannelImpl:792 | FileDescriptor:797 | SocketChannelReadHandle:828 | HeapByteBuffer:833
能够明显看到,服务器端处理每个客户端通道所使用的SocketChannelReadHandle(处理器)对象都是独立的,而且所引用的SocketChannel对象都是独立的。
固然,以上代码是示例代码,目标是为了让您了解JAVA AIO框架的基本使用。因此它还有不少改造的空间,例如:
在生产环境下,咱们须要记录这个通道上“用户的登陆信息”。那么这个需求可使用JAVA AIO中的“附件”功能进行实现。
咱们在本文和上文(《架构设计:系统间通讯(4)——IO通讯模型和JAVA实践 中篇》)中,都是使用“自定义文本”格式传输内容,并检查“over”关键字。可是在正式生产环境下,您会这样用吗?
显然是不会的,由于它压缩率不高。要么咱们会使用json格式:由于它在相同的压缩率的前提下,有更好的信息结构;咱们还可使用 protobuffer:由于它兼顾传输效率和良好的信息结构;甚至还可使用TLV格式:提供很好的信息传输效率(它连一个多余的byte描述都没 有),这几种格式的讲解,您能够参考《架构设计:系统间通讯(1)——概述从“聊天”开始上篇》。
记住JAVA AIO 和 JAVA NIO 框架都是要使用线程池的(固然您也能够不用),线程池的使用原则,必定是只有业务处理部分才使用, 使用后立刻结束线程的执行(还回线程池或者消灭它)。JAVA AIO框架中还有一个线程池,是拿给“通知处理器”使用的,这是由于JAVA AIO框架是基于“订阅-通知”模型的,“订阅”操做能够由主线程完成,可是您总不能要求在应用程序中并发的“通知”操做也在主线程上完成吧^_^。
最好的改进方式,固然就是使用Netty或者Mina咯。
那么有的读者可能就会问,既然JAVA NIO / JAVA AIO已经实现了各主流操做系统的底层支持,那么为何如今主流的JAVA NIO技术会是Netty和MINA呢?答案很简单:由于更好用,这里举几个方面的例子:
虽然JAVA NIO 和 JAVA AIO框架提供了 多路复用IO/异步IO的支持,可是并无提供上层“信息格式”的良好封装。例如前二者并无提供针对 Protocol Buffer、JSON这些信息格式的封装,可是Netty框架提供了这些数据格式封装(基于责任链模式的编码和解码功能)
要编写一个可靠的、易维护的、高性能的(注意它们的排序)NIO/AIO 服务器应用。除了框架自己要兼容实现各种操做系统的实现外。更重要的是它应该还要处理不少上层特有服务,例如:客户端的权限、还有上面提到的信息格式封 装、简单的数据读取。这些Netty框架都提供了响应的支持。
JAVA NIO框架存在一个poll/epoll bug:Selector doesn’t block on Selector.select(timeout),不能block意味着CPU的使用率会变成100%(这是底层JNI的问题,上层要处理这个异常实际 上也好办)。固然这个bug只有在Linux内核上才能重现。
这个问题在JDK 1.7版本中尚未被彻底解决:http://bugs.java.com/bugdatabase/view_bug.do?bug_id=2147719。虽然Netty 4.0中也是基于JAVA NIO框架进行封装的(上文中已经给出了Netty中NioServerSocketChannel类的介绍),可是Netty已经将这个bug进行了处理。
其余缘由,用过Netty后,您就能够本身进行比较了。
经过三篇文章,咱们把操做系统的四种IO模型都进行了介绍,而且说明了JAVA对这四种IO模型的支持,也给出了代码讲解。有读者反映仍是不够深 入,例如典型的EPOLL技术的工做细节并无讲解,也没有进行各类IO模型的性能比较,等等。别慌,我计划将来的3-4个月咱们都会讨论“系统间通讯技 术”,因此就想作“负载均衡”那个系列的专栏同样,咱们会在后面的时间进行补全。固然本人的技术水平有限,写博客的目的主要也是为了分享和总结,因此欢迎 各位读者多多吐槽。
从下篇文章开始,咱们将话一到两篇文章的内容,讨论Netty框架(以Netty4.0版本做为讨论基础)。随后咱们将开始介绍JAVA 的RIM,并从RIM引导进入RPC技术的介绍。