上章提到过Java的NIO采起的是多路IO复用模式,其衍生出来的模型就是Reactor模型。多路IO复用有两种方式,一种是select/poll,另外一种是epoll。在windows系统上使用的是select/poll方式,在linux上使用的是epoll方式,主要是因为DefaultSelectorProvider具体选择的selector决定。epoll是在linux2.6以后才支持的,select的方式时间复杂度为O(N),最大fd限制是1024。epoll没有数量限制,时间复杂度是O(1)。html
再温习一遍多路IO复用的基本思路,阻塞发生在select上面,select管理全部注册在其上的socket请求,socket准备彻底就会交由用户程序处理。下面结合java的nio例子,来更细致的讲解一下这种模式,强化理解一下。要写出java的nio不难但要彻底正确毫不容易,相关概念不清楚就会产生难以理解的bug,这里有一些相关的陷阱。java
另外说明一下,这个例子不必定彻底正确,用于演示足够了。对于Java的NIO而言,有几个概念比较重要,这里先提两个channel和buffer。不论是客户端发送服务端接收,仍是服务端发送客户端接收,基本的流程都是:发送方发送数据->buffer->发送方channel->接收方channel->buffer->接收方接收数据。linux
对于服务端而言首先须要的就是肯定监听的端口,其次是与之对应的channel,然后就是selector,最后还须要一个线程池。为何会须要线程池呢?道理很简单,select模式获取了全部channel的change,对于服务端而言,change的可能有很是多的客户端channel,而用户程序只有一个线程,若是这么多个channel一个个顺序执行,若是有耗时严重的操做,那么后果是很是糟糕的,全部客户端都会延时处理,这也是多路IO复用的一个糟糕点。线程池就是为每一个客户端分配一个线程去处理,减缓这种状况的后果。Server的基本四个内容就出来了:windows
private int port; private Selector selector; private ServerSocketChannel serverSocketChannel; private ExecutorService executorService;
接下来就是初始化服务端。初始化的步骤也是通常化:1.初始化链接池;2.初始化Selector;3.初始化绑定端口;4.将socket注册到select上。大体步骤就是这些,可是还有些额外的细节。具体代码以下:服务器
1. executorService = Executors.newCachedThreadPool(); 2. selector = Selector.open(); 3. serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); 4. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
这里的一个细节就是socket必须是非阻塞模式。初始化完成以后就是正式的逻辑了,再来回忆一下多路IO复用的逻辑,管理多个IO的change事件,阻塞在select上,若是有change事件,select就能继续执行下去,选出change了的IO,只对这部分IO进行操做。这段描述就下面这段简单的代码了:多线程
int event = selector.select(); if(event != 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()) { SelectionKey key = it.next(); it.remove(); } }
这里就是调用selector.select()方法进行阻塞,若是change事件不为0(这个判断应该去掉好点),获取当前全部change事件。遍历处理,移除该事件。不移除,下次该事件依旧存在,至关于认为是没处理,会出现屡次触发错误。异步
下面详细介绍一下事件的类型,Java定义了4种类型:socket
1.针对服务端的ACCEPT事件,接收到客户端的链接请求;ide
2.针对客户端的CONNECT事件,发起对服务端的链接请求;性能
3.针对获取对端发送的数据的READ事件;
4.针对请求发送数据给对端时准备好了缓冲区的WRITE事件。
其中WRITE事件通常不进行使用,由于大部分状况缓冲区都是空闲的,会马上触发该事件,这个浪费CPU的性能,还会形成bug。下面代码就是server端处理的一个基本逻辑,也是有些要注意的点。
if(key.isValid() && key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); } else if(key.isValid() && key.isReadable()) { key.interestOps(0); executorService.execute(new Task(key)); }
服务端的事件就3个,write事件不用管,因此只须要关注accept和read事件。有请求进来,就接收这个请求,设置成非阻塞式,再注册到selector中,监听该请求的读事件。读事件到来,先将监听的时间改为无,这里是由于异步执行,可能没有读完数据,再次触发了该channel的读事件,重复读取,形成问题。Task就是一个runnabel任务,处理读取,发送应答,这里还须要从新将监听的事件改为读事件,即处理完了本次内容,等待下次内容。
Task的具体内容以下:
SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = -1; try { while((size = channel.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(), 0, size); buffer.clear(); } if(baos.size() == 0) { key.cancel(); } else { // 协议解析 String msg = new String(baos.toByteArray(), "UTF-8"); // 返回该数据 String reply = "get client msg: " + msg; ByteBuffer re = ByteBuffer.wrap(reply.getBytes()); while(re.hasRemaining()) { channel.write(re); } // 处理完毕后后设置成读状态,继续获取相关数据 key.interestOps(SelectionKey.OP_READ); key.selector().wakeup(); } } catch (Exception e) { key.cancel(); // 异常链接中断 }
这里的逻辑就是使用buffer将数据取出来了。取出为0,或者抛出异常,意味着客户端断开了链接,直接取消掉该channel的管理。回写了一个数据。以后就是将事件监听设置回监听读取事件,最后一步须要wakeup一下。wakeup是为了唤醒一下select,缘由以下:这个是因为前面先将监听的事件改为了0,后面才改回了read事件。不论是怎么修改,都不是马上生效的,须要下次select事件触发才能生效,问题也只会出在多线程中。试想一下下面这个过程:
1.A通道有数据了,A先置为0了,开始读取数据,由于是异步的,因此又走到了select阻塞了;
2.B链接进来,触发的select方法,这时A的0才正式生效,这也是咱们想要的,由于A以前的数据还在处理,并非新的数据到来,不须要再次触发读操做。这里先置为0的动做是正确的。
3.此时主线程又走到了select方法阻塞了,注意,此时A生效的是0,A结束这次读操做,等待下次读事件。问题就出在这里,若是不触发一下select方法,此时A即便有新的读事件,其也不会触发,由于重置为read并无生效,要等select触发才能生效。这就至关于A没接到消息了,若是B有读事件,触发了select方法,则A才能接到消息。wakeup在这里必须添加的目的就是强制触发一下select,使A更新回read事件,而不是不关系任何事件。
实际上触发没有这么麻烦,在客户端还会说到这个问题,有更简单的触发方法。
上面的代码也能够看出nio都是基于buffer操做的。buffer也有不少陷阱,使用正确不容易。下面给出一个个人完整例子,能够运行试试,不保证没bug。了解了上面的知识,测出bug调试应该也不难。
import java.io.ByteArrayOutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class NioServer { private int port; private Selector selector; private ServerSocketChannel serverSocketChannel; private ExecutorService executorService; public NioServer(int port) { this.port = port; } public void open() { this.executorService = Executors.newCachedThreadPool(); try { this.selector = Selector.open(); this.serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("server端启动..."); for(;;) { System.out.println("======>select的keys数量:" + selector.keys().size()); int event = selector.select(); System.out.println("======>select的keys数量:" + selector.keys().size() + ", change事件数量:" + event); if(event != 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); // System.out.println("======>真实未处理的change事件数量:" + keys.size()); while(it.hasNext()) { SelectionKey key = it.next(); it.remove(); // 移除这个key if(key.isValid() && key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); System.out.println("===>获取client链接,准备读取数据:"+ client.socket().getRemoteSocketAddress()); } else if(key.isValid() && key.isReadable()) { // 先置为0,防止异步线程未处理完该事件又被select // key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); key.interestOps(0); executorService.execute(new Task(key)); } else { System.out.println("其它事件:" + key.interestOps()); } } } } } catch (Exception e) { e.printStackTrace(); } } private class Task implements Runnable { private SelectionKey key; public Task(SelectionKey key) { this.key = key; } @Override public void run() { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = -1; try { // System.out.println("===>开始读取数据"); while((size = channel.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(), 0, size); buffer.clear(); } if(baos.size() == 0) { key.cancel(); System.out.println("======<client断开链接:"+ channel.socket().getRemoteSocketAddress()); } else { // 协议解析 String msg = new String(baos.toByteArray(), "UTF-8"); System.out.println("===>获取client数据: " + msg); // 返回该数据 String reply = "get client msg: " + msg; ByteBuffer re = ByteBuffer.wrap(reply.getBytes()); while(re.hasRemaining()) { channel.write(re); } // 处理完毕后后设置成读状态,继续获取相关数据 // key.interestOps((key.interestOps() | SelectionKey.OP_READ)); key.interestOps(SelectionKey.OP_READ); key.selector().wakeup(); System.out.println("===<返回server的获取结果"); } } catch (Exception e) { key.cancel(); // 异常链接中断 System.out.println("======<异常client断开链接:"+ channel.socket().getRemoteSocketAddress()); } } } public static void main(String[] args) { NioServer nioServer = new NioServer(7777); nioServer.open(); } }
第一节说过,在单个链接的时候,多路IO复用方式甚至没有阻塞式IO性能好,多路IO复用是针对了多个IO操做。这里仍是给出客户端的NIO写法。一样的,客户端须要上面的内容,不包括线程池,咱们只处理一个客户端链接。须要增长的一个字段就是服务端地址,因此总共也是4个内容:服务端地址、端口、链接通道、select。
private String host; private int port; private SocketChannel socketChannel; private Selector selector;
初始化也是基本操做:1.获取select;2.创建链接;3.注册到select
1. selector = Selector.open(); 2. socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.socket().setTcpNoDelay(true); socketChannel.connect(new InetSocketAddress(host, port)); 3. socketChannel.register(selector, SelectionKey.OP_CONNECT);
这里要注意的也就是要以非阻塞式的方式进行。后面的步骤也同样,进行select,获取change事件,根据不一样的事件处理不一样。write事件不使用,客户端关注的就connect和read事件了。
int event = selector.select(); if(event != 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()) { SelectionKey sk = it.next(); it.remove(); if(sk.isValid() && sk.isConnectable()) { if(socketChannel.isConnectionPending()) { if(socketChannel.finishConnect()) { sk.interestOps(SelectionKey.OP_READ); } else { sk.cancel(); } } } else if(sk.isValid() && sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = -1; while((size = sc.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(), 0, size); buffer.clear(); } } } }
这里要注意的是connect并无真正连上,要触发了connect事件,执行finishConnect才会链接成功。链接成功后更新成read事件。这里会有一个疑惑,server端的时候intersetOps设置成0或者read不是直接生效,要select执行后才能生效,为何这边connect设置成read事件就能直接改过来???...这是一个思惟陷阱:不是要执行后才能改变状态,而是select认准的状态是select操做以前一瞬间的状态。server端的例子,哪怕不须要两个线程,单个线程也能触发,只要是异步操做。主线程先接收到A的读取操做,设置A成0,而后又进行select了,此一瞬间A的状态是0,后面A处理完后,再来一条消息就没用了,由于此时select阻塞时检测的状态是0,后续改过来也没用,因此才须要wakeup一下,让其认识到其状态应该修改后的read。而上述例子为何不须要,就是由于这是一个同步的过程,这次connect事件,下次再select的时候必定变成了read。
其余的也没有什么值得一提的了,下面是客户端的完整代码。
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class NioClient { private String host; private int port; private SocketChannel socketChannel; private Selector selector; public NioClient(String host, int port) { this.host = host; this.port = port; } public void open() { try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.socket().setTcpNoDelay(true); socketChannel.connect(new InetSocketAddress(host, port)); socketChannel.register(selector, SelectionKey.OP_CONNECT); System.out.println("client端启动..."); for(;;) { System.out.println("======>select的keys数量:" + selector.keys().size()); int event = selector.select(); System.out.println("======>select的keys数量:" + selector.keys().size() + ", change事件数量:" + event); if(event != 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()) { SelectionKey sk = it.next(); it.remove(); if(sk.isValid() && sk.isConnectable()) { if(socketChannel.isConnectionPending()) { if(socketChannel.finishConnect()) { sk.interestOps(SelectionKey.OP_READ); System.out.println("链接上远程服务器:" + socketChannel.getRemoteAddress()); } else { sk.cancel(); System.out.println("链接未创建..."); } } } else if(sk.isValid() && sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = -1; while((size = sc.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(), 0, size); buffer.clear(); } System.out.println("接收服务器消息:" + new String(baos.toByteArray(), "UTF-8")); } else { System.out.println("其它事件:" + sk.interestOps()); } } } } } catch (IOException e) { e.printStackTrace(); } } public void close() { try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } public void send(String msg) { byte[] b = msg.getBytes(); ByteBuffer buffer = ByteBuffer.wrap(b); try { while (buffer.hasRemaining()) { socketChannel.write(buffer); } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { NioClient client = new NioClient("127.0.0.1", 7777); Thread thread = new Thread(new Runnable() { @Override public void run() { client.open(); } }); thread.setDaemon(true); thread.start(); Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String msg = scanner.nextLine(); if("close".equals(msg)) { client.close(); System.out.println("退出成功"); break; } else { client.send(msg); } } } }
此章结合java nio的实际demo增强一下对多路IO复用的理解,理解Java的nio基本流程,对于理解后面的netty设计的结构有很大的帮助。