在传统的架构中,对于客户端的每一次请求,服务器都会建立一个新的线程或者利用线程池复用去处理用户的一个请求,而后返回给用户结果,这样作在高并发的状况下会存在很是严重的性能问题:对于用户的每一次请求都建立一个新的线程是须要必定内存的,同时线程之间频繁的上下文切换也是一个很大的开销。java
p.s: 本文涉及的完整实例代码均可以在个人GitHub上面下载。python
NIO的核心就是Selector,读懂了Selector就理解了异步机制的实现原理,下面先来简单的介绍一下什么是Selector。如今对于客户端的每一次请求到来时咱们再也不当即建立一个线程进行处理,相反以epool为例子当一个事件准备就绪以后经过回调机制将描述符加入到阻塞队列中,下面只须要经过遍历阻塞队列对相应的事件进行处理就好了,经过这种回调机制整个过程都不须要对于每个请求都去建立一个线程去单独处理。上面的解释仍是有些抽象,下面我会经过具体的代码实例来解释,在这以前咱们先来了解一下NIO中两个基础概念Buffer和Channel。git
若是你们对于多路IO复用好比select/epool彻底陌生的话,建议先读一下个人这篇Linux下的五种IO模型 :-)github
以ByteBuffer为例子,咱们能够经过ByteBuffer.allocate(n)来分配n个字节的缓冲区,对于缓冲区有四个重要的属性:数组
如上图所示,Buffer实际上也是分为两种,一种用于写数据,一种用于读取数据。服务器
经过直接阅读ByteBuffer源码能够清晰看出put方法是把一个byte变量x放到缓冲区中去,同时position加1:网络
public ByteBuffer put(byte x) { hb[ix(nextPutIndex())] = x; return this; } final int nextPutIndex() { if (position >= limit) throw new BufferOverflowException(); return position++; }
get方法是从缓冲区中读取一个字节,同时position加一:架构
public byte get() { return hb[ix(nextGetIndex())]; } final int nextGetIndex() { if (position >= limit) throw new BufferUnderflowException(); return position++; }
若是咱们想将buffer从写数据的状况变成读数据的状况,能够直接使用flip方法:并发
public final Buffer flip() { limit = position; position = 0; mark = -1; return this; }
mark是记住当前的位置用的,也就是保存position的值:异步
public final Buffer mark() { mark = position; return this; }
若是咱们在对缓冲区读写以前就调用了mark方法,那么之后当position位置变化以后,想回到以前的位置能够调用reset会将mark的值从新赋给position:
public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; }
利用NIO,当咱们读取数据的时候,会先从buffer加载到channel,而写入数据的时候,会先入到channel而后经过channel转移到buffer中去。channel给咱们提供了两个方法:经过channel.read(buffer)
能够将channel中的数据写入到buffer中,而经过channel.write(buffer)
则能够将buffer中的数据写入到到channel中。
Channel的话分为四种:
由于今天咱们的重点是Selector,因此来看一下SocketChannel的用法。在下面的代码利用SocketChannel模拟了一个简单的server-client程序。
WebServer
的代码以下,和传统的sock程序并无太多的差别,只是咱们引入了buffer和channel的概念:
ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress("127.0.0.1", 5000)); SocketChannel socketChannel = ssc.accept(); ByteBuffer readBuffer = ByteBuffer.allocate(128); socketChannel.read(readBuffer); readBuffer.flip(); while (readBuffer.hasRemaining()) { System.out.println((char)readBuffer.get()); } socketChannel.close(); ssc.close();
WebClient
的代码以下:
SocketChannel socketChannel = null; socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("127.0.0.1", 5000)); ByteBuffer writeBuffer = ByteBuffer.allocate(128); writeBuffer.put("hello world".getBytes()); writeBuffer.flip(); socketChannel.write(writeBuffer); socketChannel.close();
在上面的client程序中,咱们也能够同时将多个buffer中的数据放入到一个数组后而后统一放入到channel后传递给服务器:
ByteBuffer buffer1 = ByteBuffer.allocate(128); ByteBuffer buffer2 = ByteBuffer.allocate(16); buffer1.put("hello ".getBytes()); buffer2.put("world".getBytes()); buffer1.flip(); buffer2.flip(); ByteBuffer[] bufferArray = {buffer1, buffer2}; socketChannel.write(bufferArray);
经过使用selector,咱们能够经过一个线程来同时管理多个channel,省去了建立线程以及线程之间进行上下文切换的开销。
经过调用selector类的静态方法open咱们就能够建立一个selector对象:
Selector selector = Selector.open();
为了保证selector可以监听多个channel,咱们须要将channel注册到selector当中:
channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
咱们能够监听四种事件:
对SelectorKey调用channel
方法能够获得key对应的channel:
Channel channel = key.channel();
而key自身感兴趣的监听事件也能够经过interestOps
来得到:
int interestSet = selectionKey.interestOps();
对selector调用selectedKeys()
方法咱们能够获得注册的全部key:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
服务器的代码以下:
ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress("127.0.0.1", 5000)); ssc.configureBlocking(false); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); ByteBuffer readBuff = ByteBuffer.allocate(128); ByteBuffer writeBuff = ByteBuffer.allocate(128); writeBuff.put("received".getBytes()); writeBuff.flip(); // make buffer ready for reading while (true) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isAcceptable()) { SocketChannel socketChannel = ssc.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); readBuff.clear(); // make buffer ready for writing socketChannel.read(readBuff); readBuff.flip(); // make buffer ready for reading System.out.println(new String(readBuff.array())); key.interestOps(SelectionKey.OP_WRITE); } else if (key.isWritable()) { writeBuff.rewind(); // sets the position back to 0 SocketChannel socketChannel = (SocketChannel) key.channel(); socketChannel.write(writeBuff); key.interestOps(SelectionKey.OP_READ); } } }
客户端程序的代码以下,各位读者能够同时在终端下面多开几个程序来同时模拟多个请求,而对于多个客户端的程序咱们的服务器始终只用一个线程来处理多个请求。一个很常见的应用场景就是多个用户同时往服务器上传文件,对于每个上传请求咱们不在单独去建立一个线程去处理,同时利用Executor/Future咱们也能够不用阻塞在IO操做中而是当即返回用户结果。
SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("127.0.0.1", 5000)); ByteBuffer writeBuffer = ByteBuffer.allocate(32); ByteBuffer readBuffer = ByteBuffer.allocate(32); writeBuffer.put("hello".getBytes()); writeBuffer.flip(); // make buffer ready for reading while (true) { writeBuffer.rewind(); // sets the position back to 0 socketChannel.write(writeBuffer); // hello readBuffer.clear(); // make buffer ready for writing socketChannel.read(readBuffer); // recieved }
对于Python下异步IO感兴趣的朋友,也能够拓展阅读一下个人这篇Asyncio。
GitHub: https://github.com/ziwenxie
Blog: https://www.ziwenxie.site
本文为做者原创,转载请于开头明显处声明我的博客地址 :-)