NIO一般采用Reactor模式,AIO一般采用Proactor模式。AIO简化了程序的编写,stream的读取和写入都有OS来完成,不须要像NIO那样子遍历Selector。Windows基于IOCP实现AIO,Linux只有eppoll模拟实现了AIO。
Java7以前的JDK只支持NIO和BIO,从7开始支持AIO。
4种通讯方式:TCP/IP+BIO, TCP/IP+NIO, UDP/IP+BIO, UDP/IP+NIO。
TCP/IP+BIO、
Socket和ServerSocket实现,ServerSocket实现Server端端口监听,Socket用于创建网络IO链接。
不适用于处理多个请求 1.生成Socket会消耗过多的本地资源。2. Socket链接的创建通常比较慢。
BIO状况下,能支持的链接数有限,通常都采起accept获取Socket之后采用一个thread来处理,one connection one thread。不管链接是否有真正数据请求,都须要独占一个thread。
能够经过设立Socket池来必定程度上解决问题,可是使用池须要注意的问题是:1. 竞争等待比较多。 2. 须要控制好超时时间。
TCP/IP+NIO
使用Channel(SocketChannel和ServerSocketChannel)和Selector。
Server端一般由一个thread来监听connect事件,另外多个thread来监听读写事件。这样作的好处是这些链接只有在真是请求的时候才会建立thread来处理,one request one thread。这种方式在server端须要支持大量链接但这些链接同时发送请求的峰值不会不少的时候十分有效。
UDP/IP+BIO
DatagramSocket和DatagramPacket。DatagramSocket负责监听端口以及读写数据,DatagramPacket做为数据流对象进行传输。
UDP/IP是无链接的,没法进行双向通讯,除非双方都成为UDP Server。
UDP/IP+NIO
经过DatagramChannel和ByteBuffer实现。DatagramChannel负责端口监听及读写。ByteBuffer负责数据流传输。
若是要将消息发送到多台机器,若是为每一个目标机器都创建一个链接的话,会有很大的网络流量压力。这时候可使用基于UDP/IP的Multicast协议传输,Java中能够经过MulticastSocket和DatagramPacket来实现。
Multicast通常多用于多台机器的状态同步,好比JGroups。SRM, URGCP都是Multicast的实现方式。eBay就采用SRM来实现将数据从主数据库同步到各个搜索节点机器。
Java aio(异步网络IO)初探
按照《Unix网络编程》的划分,IO模型能够分为:阻塞IO、非阻塞IO、IO复用、信号驱动IO和异步IO,按照POSIX标准来划分只分为两类:同步IO和异步IO。如何区分呢?首先一个IO操做其实分红了两个步骤:发起IO请求和实际的IO操做,同步IO和异步IO的区别就在于第二个步骤是否阻塞,若是实际的IO读写阻塞请求进程,那么就是同步IO,所以阻塞IO、非阻塞IO、IO服用、信号驱动IO都是同步IO,若是不阻塞,而是操做系统帮你作完IO操做再将结果返回给你,那么就是异步IO。阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,若是阻塞直到完成那么就是传统的阻塞IO,若是不阻塞,那么就是非阻塞IO。
Java nio 2.0的主要改进就是引入了异步IO(包括文件和网络),这里主要介绍下异步网络IO API的使用以及框架的设计,以TCP服务端为例。首先看下为了支持AIO引入的新的类和接口:
java.nio.channels.AsynchronousChannel
标记一个channel支持异步IO操做。
java.nio.channels.AsynchronousServerSocketChannel
ServerSocket的aio版本,建立TCP服务端,绑定地址,监听端口等。
java.nio.channels.AsynchronousSocketChannel
面向流的异步socket channel,表示一个链接。
java.nio.channels.AsynchronousChannelGroup
异步channel的分组管理,目的是为了资源共享。一个AsynchronousChannelGroup绑定一个线程池,这个线程池执行两个任务:处理IO事件和派发CompletionHandler。AsynchronousServerSocketChannel建立的时候能够传入一个 AsynchronousChannelGroup,那么经过AsynchronousServerSocketChannel建立的 AsynchronousSocketChannel将同属于一个组,共享资源。
java.nio.channels.CompletionHandler
异步IO操做结果的回调接口,用于定义在IO操做完成后所做的回调工做。AIO的API容许两种方式来处理异步操做的结果:返回的Future模式或者注册CompletionHandler,我更推荐用CompletionHandler的方式,这些handler的调用是由 AsynchronousChannelGroup的线程池派发的。显然,线程池的大小是性能的关键因素。AsynchronousChannelGroup容许绑定不一样的线程池,经过三个静态方法来建立:
1public static AsynchronousChannelGroup withFixedThreadPool(int nThreads,
2 ThreadFactory threadFactory)
3 throws IOException
4
5public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,
6 int initialSize)
7
8public static AsynchronousChannelGroup withThreadPool(ExecutorService executor)
9 throws IOException
10
须要根据具体应用相应调整,从框架角度出发,须要暴露这样的配置选项给用户。
在介绍完了aio引入的TCP的主要接口和类以后,咱们来设想下一个aio框架应该怎么设计。参考非阻塞nio框架的设计,通常都是采用Reactor模式,Reacot负责事件的注册、select、事件的派发;相应地,异步IO有个Proactor模式,Proactor负责 CompletionHandler的派发,查看一个典型的IO写操做的流程来看二者的区别:
Reactor: send(msg) -> 消息队列是否为空,若是为空 -> 向Reactor注册OP_WRITE,而后返回 -> Reactor select -> 触发Writable,通知用户线程去处理 ->先注销Writable(不少人遇到的cpu 100%的问题就在于没有注销),处理Writeable,若是没有彻底写入,继续注册OP_WRITE。注意到,写入的工做仍是用户线程在处理。
Proactor: send(msg) -> 消息队列是否为空,若是为空,发起read异步调用,并注册CompletionHandler,而后返回。 -> 操做系统负责将你的消息写入,并返回结果(写入的字节数)给Proactor -> Proactor派发CompletionHandler。可见,写入的工做是操做系统在处理,无需用户线程参与。事实上在aio的API 中,AsynchronousChannelGroup就扮演了Proactor的角色。
CompletionHandler有三个方法,分别对应于处理成功、失败、被取消(经过返回的Future)状况下的回调处理:
1public interface CompletionHandler<V,A> {
2
3 void completed(V result, A attachment);
4
5 void failed(Throwable exc, A attachment);
6
7 void cancelled(A attachment);
8}
其中的泛型参数V表示IO调用的结果,而A是发起调用时传入的attchment。
在初步介绍完aio引入的类和接口后,咱们看看一个典型的tcp服务端是怎么启动的,怎么接受链接并处理读和写,这里引用的代码都是yanf4j 的aio分支中的代码,能够从svn checkout,svn地址: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio
第一步,建立一个AsynchronousServerSocketChannel,建立以前先建立一个 AsynchronousChannelGroup,上文提到AsynchronousServerSocketChannel能够绑定一个 AsynchronousChannelGroup,那么经过这个AsynchronousServerSocketChannel创建的链接都将同属于一个AsynchronousChannelGroup并共享资源:
1this.asynchronousChannelGroup = AsynchronousChannelGroup
2 .withCachedThreadPool(Executors.newCachedThreadPool(),
3 this.threadPoolSize);
而后初始化一个AsynchronousServerSocketChannel,经过open方法:
1this.serverSocketChannel = AsynchronousServerSocketChannel
2 .open(this.asynchronousChannelGroup);
3
经过nio 2.0引入的SocketOption类设置一些TCP选项:
1this.serverSocketChannel
2 .setOption(
3 StandardSocketOption.SO_REUSEADDR,true);
4this.serverSocketChannel
5 .setOption(
6 StandardSocketOption.SO_RCVBUF,16*1024);
绑定本地地址:
1this.serverSocketChannel
2 .bind(new InetSocketAddress("localhost",8080), 100);
其中的100用于指定等待链接的队列大小(backlog)。完了吗?尚未,最重要的监听工做还没开始,监听端口是为了等待链接上来以便accept产生一个AsynchronousSocketChannel来表示一个新创建的链接,所以须要发起一个accept调用,调用是异步的,操做系统将在链接创建后,将最后的结果——AsynchronousSocketChannel返回给你:
1public void pendingAccept() {
2 if (this.started && this.serverSocketChannel.isOpen()) {
3 this.acceptFuture = this.serverSocketChannel.accept(null,
4 new AcceptCompletionHandler());
5
6 } else {
7 throw new IllegalStateException("Controller has been closed");
8 }
9 }
10
注意,重复的accept调用将会抛出PendingAcceptException,后文提到的read和write也是如此。accept方法的第一个参数是你想传给CompletionHandler的attchment,第二个参数就是注册的用于回调的CompletionHandler,最后返回结果Future。你能够对future作处理,这里采用更推荐的方式就是注册一个CompletionHandler。那么accept的CompletionHandler中作些什么工做呢?显然一个赤裸裸的 AsynchronousSocketChannel是不够的,咱们须要将它封装成session,一个session表示一个链接(mina里就叫 IoSession了),里面带了一个缓冲的消息队列以及一些其余资源等。在链接创建后,除非你的服务器只准备接受一个链接,否则你须要在后面继续调用pendingAccept来发起另外一个accept请求:
1private final class AcceptCompletionHandler implements
2 CompletionHandler<AsynchronousSocketChannel, Object> {
3
4 @Override
5 public void cancelled(Object attachment) {
6 logger.warn("Accept operation was canceled");
7 }
8
9 @Override
10 public void completed(AsynchronousSocketChannel socketChannel,
11 Object attachment) {
12 try {
13 logger.debug("Accept connection from "
14 + socketChannel.getRemoteAddress());
15 configureChannel(socketChannel);
16 AioSessionConfig sessionConfig = buildSessionConfig(socketChannel);
17 Session session = new AioTCPSession(sessionConfig,
18 AioTCPController.this.configuration
19 .getSessionReadBufferSize(),
20 AioTCPController.this.sessionTimeout);
21 session.start();
22 registerSession(session);
23 } catch (Exception e) {
24 e.printStackTrace();
25 logger.error("Accept error", e);
26 notifyException(e);
27 } finally {
28 <strong>pendingAccept</strong>();
29 }
30 }
31
32 @Override
33 public void failed(Throwable exc, Object attachment) {
34 logger.error("Accept error", exc);
35 try {
36 notifyException(exc);
37 } finally {
38 <strong>pendingAccept</strong>();
39 }
40 }
41 }
42
注意到了吧,咱们在failed和completed方法中在最后都调用了pendingAccept来继续发起accept调用,等待新的链接上来。有的同窗可能要说了,这样搞是否是递归调用,会不会堆栈溢出?实际上不会,由于发起accept调用的线程与CompletionHandler回调的线程并不是同一个,不是一个上下文中,二者之间没有耦合关系。要注意到,CompletionHandler的回调共用的是 AsynchronousChannelGroup绑定的线程池,所以千万别在CompletionHandler回调方法中调用阻塞或者长时间的操做,例如sleep,回调方法最好能支持超时,防止线程池耗尽。
链接创建后,怎么读和写呢?回忆下在nonblocking nio框架中,链接创建后的第一件事是干什么?注册OP_READ事件等待socket可读。异步IO也一样如此,链接创建后立刻发起一个异步read调用,等待socket可读,这个是Session.start方法中所作的事情:
1public class AioTCPSession {
2 protected void start0() {
3 pendingRead();
4 }
5
6 protected final void pendingRead() {
7 if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {
8 if (!this.readBuffer.hasRemaining()) {
9 this.readBuffer = ByteBufferUtils
10 .increaseBufferCapatity(this.readBuffer);
11 }
12 this.readFuture = this.asynchronousSocketChannel.read(
13 this.readBuffer, this, this.readCompletionHandler);
14 } else {
15 throw new IllegalStateException(
16 "Session Or Channel has been closed");
17 }
18 }
19
20}
21
AsynchronousSocketChannel的read调用与AsynchronousServerSocketChannel的accept调用相似,一样是非阻塞的,返回结果也是一个Future,可是写的结果是整数,表示写入了多少字节,所以read调用返回的是 Future,方法的第一个参数是读的缓冲区,操做系统将IO读到数据拷贝到这个缓冲区,第二个参数是传递给 CompletionHandler的attchment,第三个参数就是注册的用于回调的CompletionHandler。这里保存了read的结果Future,这是为了在关闭链接的时候可以主动取消调用,accept也是如此。如今能够看看read的CompletionHandler的实现:
1public final class ReadCompletionHandler implements
2 CompletionHandler<Integer, AbstractAioSession> {
3
4 private static final Logger log = LoggerFactory
5 .getLogger(ReadCompletionHandler.class);
6 protected final AioTCPController controller;
7
8 public ReadCompletionHandler(AioTCPController controller) {
9 this.controller = controller;
10 }
11
12 @Override
13 public void cancelled(AbstractAioSession session) {
14 log.warn("Session(" + session.getRemoteSocketAddress()
15 + ") read operation was canceled");
16 }
17
18 @Override
19 public void completed(Integer result, AbstractAioSession session) {
20 if (log.isDebugEnabled())
21 log.debug("Session(" + session.getRemoteSocketAddress()
22 + ") read +" + result + " bytes");
23 if (result < 0) {
24 session.close();
25 return;
26 }
27 try {
28 if (result > 0) {
29 session.updateTimeStamp();
30 session.getReadBuffer().flip();
31 session.decode();
32 session.getReadBuffer().compact();
33 }
34 } finally {
35 try {
36 session.pendingRead();
37 } catch (IOException e) {
38 session.onException(e);
39 session.close();
40 }
41 }
42 controller.checkSessionTimeout();
43 }
44
45 @Override
46 public void failed(Throwable exc, AbstractAioSession session) {
47 log.error("Session read error", exc);
48 session.onException(exc);
49 session.close();
50 }
51
52}
53
若是IO读失败,会返回失败产生的异常,这种状况下咱们就主动关闭链接,经过session.close()方法,这个方法干了两件事情:关闭channel和取消read调用:
1if (null != this.readFuture) {
2 this.readFuture.cancel(true);
3 }
4this.asynchronousSocketChannel.close();
5
在读成功的状况下,咱们还须要判断结果result是否小于0,若是小于0就表示对端关闭了,这种状况下咱们也主动关闭链接并返回。若是读到必定字节,也就是result大于0的状况下,咱们就尝试从读缓冲区中decode出消息,并派发给业务处理器的回调方法,最终经过pendingRead继续发起read调用等待socket的下一次可读。可见,咱们并不须要本身去调用channel来进行IO读,而是操做系统帮你直接读到了缓冲区,而后给你一个结果表示读入了多少字节,你处理这个结果便可。而nonblocking IO框架中,是reactor通知用户线程socket可读了,而后用户线程本身去调用read进行实际读操做。这里还有个须要注意的地方,就是decode出来的消息的派发给业务处理器工做最好交给一个线程池来处理,避免阻塞group绑定的线程池。
IO写的操做与此相似,不过一般写的话咱们会在session中关联一个缓冲队列来处理,没有彻底写入或者等待写入的消息都存放在队列中,队列为空的状况下发起write调用:
1protected void write0(WriteMessage message) {
2 boolean needWrite = false;
3 synchronized (this.writeQueue) {
4 needWrite = this.writeQueue.isEmpty();
5 this.writeQueue.offer(message);
6 }
7 if (needWrite) {
8 pendingWrite(message);
9 }
10 }
11
12 protected final void pendingWrite(WriteMessage message) {
13 message = preprocessWriteMessage(message);
14 if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {
15 this.asynchronousSocketChannel.write(message.getWriteBuffer(),
16 this, this.writeCompletionHandler);
17 } else {
18 throw new IllegalStateException(
19 "Session Or Channel has been closed");
20 }
21 }
22
write调用返回的结果与read同样是一个Future,而write的CompletionHandler处理的核心逻辑大概是这样:
1@Override
2 public void completed(Integer result, AbstractAioSession session) {
3 if (log.isDebugEnabled())
4 log.debug("Session(" + session.getRemoteSocketAddress()
5 + ") writen " + result + " bytes");
6
7 WriteMessage writeMessage;
8 Queue<WriteMessage> writeQueue = session.getWriteQueue();
9 synchronized (writeQueue) {
10 writeMessage = writeQueue.peek();
11 if (writeMessage.getWriteBuffer() == null
12 || !writeMessage.getWriteBuffer().hasRemaining()) {
13 writeQueue.remove();
14 if (writeMessage.getWriteFuture() != null) {
15 writeMessage.getWriteFuture().setResult(Boolean.TRUE);
16 }
17 try {
18 session.getHandler().onMessageSent(session,
19 writeMessage.getMessage());
20 } catch (Exception e) {
21 session.onException(e);
22 }
23 writeMessage = writeQueue.peek();
24 }
25 }
26 if (writeMessage != null) {
27 try {
28 session.pendingWrite(writeMessage);
29 } catch (IOException e) {
30 session.onException(e);
31 session.close();
32 }
33 }
34 }
35
compete方法中的result就是实际写入的字节数,而后咱们判断消息的缓冲区是否还有剩余,若是没有就将消息从队列中移除,若是队列中还有消息,那么继续发起write调用。
重复一下,这里引用的代码都是yanf4j aio分支中的源码,感兴趣的朋友能够直接check out出来看看: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio。
在引入了aio以后,java对于网络层的支持已经很是完善,该有的都有了,java也已经成为服务器开发的首选语言之一。java的弱项在于对内存的管理上,因为这一切都交给了GC,所以在高性能的网络服务器上仍是Cpp的天下。java这种单一堆模型比之erlang的进程内堆模型仍是有差距,很难作到高效的垃圾回收和细粒度的内存管理。
这里仅仅是介绍了aio开发的核心流程,对于一个网络框架来讲,还须要考虑超时的处理、缓冲buffer的处理、业务层和网络层的切分、可扩展性、性能的可调性以及必定的通用性要求。