传统的BIO模型使用是java.net包中API实现Socket通讯,而传统的BIO通讯中阻塞共发如今两个 地方。java
服务端ServerSocker::accept
客户端Socket数据读写react
这种IO的弊端便是没法处理并发的客户端请求,所以能够经过为每一个客户端单独分配一个线程,则客户的端的Socket数据读写不在阻塞,能够知足并发的客户端请求。linux
一样的在高并发,大量客户端链接形成大量线程,容易产生线程OOM,同时也有大量的线程上下文切换影响性能。nginx
JAVA的NIO一种重要的方法为configureBlocking,示例代码以下:web
package com.zte.sunquan.nio; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; public class SocketServerNIO { public static void main(String[] args) throws Exception { List<SocketChannel> channels = new ArrayList<>(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.bind(new InetSocketAddress(9090)); ssc.configureBlocking(false); //非阻塞 while (true) { Thread.sleep(1000); //不阻塞,但须要每次都问一下内核 SocketChannel clientSocket = ssc.accept(); if (clientSocket == null) { System.out.println("No client...."); } else { //将clientSocket保留 clientSocket.configureBlocking(false); System.out.println("client connected in:" + clientSocket.socket().getPort()); channels.add(clientSocket); } ByteBuffer buffer = ByteBuffer.allocate(4096); //不阻塞,但这里每次也须要问一下内核,即便有些客户端没有事件 for (SocketChannel channel : channels) { int num = channel.read(buffer); if (num > 0) { buffer.flip(); byte[] content=new byte[buffer.limit()]; buffer.get(content); String s = new String(content); System.out.println(channel.socket().getPort()+" Read client msg:"+s); buffer.clear(); } } } } }
如上代码中一个线程负责了IO读写与链接创建,固然能够优雅一点的方法,即将链接创建与IO读写分线程处理,让IO的读写不影响高并发下链接请求与创建。但上述代码仍有明显的弊端。编程
for (SocketChannel channel : channels) {
int num = channel.read(buffer);后端
在上述代码中,每次循环都要与全部创建的客户端进行一次read操做,涉及用户究竟与内核空间的切换,考虑到一个链接数特别多背景下,一次可能只会有几个链接有IO事件,如上的实现会形成大量的性能浪费。
那有没有一种可能,让内核主动告知咱们哪些链接有IO事件,这样应用精确地去指定的链接上进行IO事件的处理,而不是傻傻地每一个链接read一遍?安全
多路复用器使用,能够解决第二节最后的问题,经过selector.select,内核只会将有事件的socket返回,避免了应用循环遍历尝试。多线程
下面示例代码描述了使用JAVA中NIO的API实现的服务端代码并发
package com.zte.sdn.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * 多路复用器示例代码 **/ public class NioServer { private Selector startServer() throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.bind(new InetSocketAddress(9090)); ssc.configureBlocking(false); //打开一个多路复用器 //poll系统调用: //epoll系统调用: Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Start server at port:9090"); return selector; } public static void main(String[] args) throws IOException { NioServer nioServer = new NioServer(); Selector selector = nioServer.startServer(); while (true) { System.out.println("ask"); //使用select向内核询问是否有事件(因为一开始只注册了ServerSocket的OP_ACCEPT) //因此第一次只判断是否有链接事件 //后面因为注册客户端OP_READ,从面判断是否有可读事件 while (selector.select(10) > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); nioServer.handler(selectionKey, selector); } } } } private void handler(SelectionKey selectionKey, Selector selector) throws IOException { if (selectionKey.isAcceptable()) { //一个链接事件 ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel(); SocketChannel client = channel.accept(); client.configureBlocking(false); //再注册进去 client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(8192)); System.out.println("receive connect:" + client.getRemoteAddress()); } else if (selectionKey.isReadable()) { //可读事件 SocketChannel client = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = (ByteBuffer) selectionKey.attachment(); buffer.clear(); int read = 0; while (true) { read = client.read(buffer); if (read > 0) { buffer.flip(); while (buffer.hasRemaining()) { //讲到内容写回客户端 client.write(buffer); } System.out.println("receive client msg:" + new String(buffer.array())); buffer.clear(); } else if (read == 0) { break; } else { client.close(); break; } } } } }
在上述示例中,一个线程完成了服务端接口客户端链接以及IO读写动做。思考上面编程思路的弊端是什么?
考虑到一个IO的读写如何很是耗时,必然会影响客户端创建链接并发性能,以及大量IO读写的性能。
天然地针对客户端链接与IO读写能够分不一样selector单独处理,各司其职,因此改进的实现以下:
else if (selectionKey.isReadable()) { executorService.submit(()->{ //可读事件 try { SocketChannel client = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = (ByteBuffer) selectionKey.attachment(); buffer.clear(); int read = 0; while (true) { read = client.read(buffer); if (read > 0) { buffer.flip(); while (buffer.hasRemaining()) { //讲到内容写回客户端 client.write(buffer); } System.out.println("receive client msg:" + new String(buffer.array())); buffer.clear(); } else if (read == 0) { break; } else { client.close(); break; } } }catch (Exception e){ e.printStackTrace(); } }); }
如上所示,实现了IO的处理异步化。但上述实现虽然缓解了大量长时间IO带来的性能问题,但不能从根本上解决,那有没有办法,将客户端链接事件与IO事件彻底分离开?固然若是IO读取的数据业务处理比较耗时,则能够另起线程再进行异步处理。
代码:
package com.zte.sdn.nio; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class IOHandler extends Thread { private Selector selector; public IOHandler(Selector selector, String name) { this.selector = selector; this.setName(name); } @Override public void run() { while (true) { try { handler(); } catch (IOException e) { e.printStackTrace(); } } } private void handler() throws IOException { while (selector.select(10) > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (selectionKey.isReadable()) { SocketChannel client = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = (ByteBuffer) selectionKey.attachment(); buffer.clear(); int read = 0; while (true) { try { read = client.read(buffer); if (read > 0) { buffer.flip(); while (buffer.hasRemaining()) { //讲到内容写回客户端 client.write(buffer); } System.out.println(Thread.currentThread().getName() + " receive client msg:" + new String(buffer.array())); buffer.clear(); } else if (read == 0) { break; } else { client.close(); break; } } catch (Exception e) { try { client.close(); } catch (IOException ex) { ex.printStackTrace(); } break; } } } } } } }
MultiNioServer
package com.zte.sdn.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 多路复用器示例代码 **/ public class MultiNioServer { private ExecutorService executorService = Executors.newFixedThreadPool(5); private Selector startServer() throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.bind(new InetSocketAddress(9090)); ssc.configureBlocking(false); //打开一个多路复用器 //poll系统调用: //epoll系统调用: Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Start server at port:9090"); return selector; } public static void main(String[] args) throws IOException { MultiNioServer nioServer = new MultiNioServer(); Selector selector = nioServer.startServer(); Selector selector1 = Selector.open(); Selector selector2 = Selector.open(); Selector[] selectors = new Selector[]{ selector1, selector2}; new IOHandler(selector1, "A").start(); new IOHandler(selector2, "B").start(); int i = 0; while (true) { while (selector.select(10) > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (selectionKey.isAcceptable()) { //一个链接事件 ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel(); SocketChannel client = channel.accept(); client.configureBlocking(false); //循环注册至另外的Selector client.register(selectors[i++ % 2], SelectionKey.OP_READ, ByteBuffer.allocate(8192)); System.out.println("receive connect:" + client.getRemoteAddress()); } } } } } }
传统的NIO(不使用多路利用器),虽然解决了IO阻塞问题,但须要用户程序遍历地向内核询问全部客户端;
当使用了多路复用器(使用select/poll),每次循环须要将客户端列表传递给内核,由内核遍历将有事件的fd返回,避免用户态与内核态的频繁切换。
但使用select/poll仍然存在弊端,即每次循环都要进行fd列表的传递,如何可以避免每次循环向内核传递fd列表?
思考:如内核能提早申请一块空间存储(红黑树)事件句柄,在有客户端链接上,则记录在该空间,如此客户端程序询问是否事件时,则只需简单问一句,而不须要每次都传递fd列表。
这说的其实epoll内核调用能解决的。
咱们老师去教室收做业进行批改的场景为例类比上述实现:
IO类型 | 说明 | 弊端 |
---|---|---|
BIO | 老师来到教室,依次询问每一个学生写了做业没有,写了则批改,没写则需等他写好再批改,期间有同窗入学毕业则须要等老师忙完这一轮 | 两处阻塞 |
NIO | 老师来到教室,依次询问每一个学生写了做业没有,写了则批改,没写则直接下一个,期间有同窗入学毕业则须要等老师忙完这一轮,相对较快 | 依次询问,没写做业的也要问 |
NIO-select/poll | 老师每次来到教室,拿一个名单贴到教室黑板,并告知在名单上同窗且完成做业,报给我,后序老师直接拿到报名同窗做业,批改便可 | 不一样于每一个同窗依次询问,但还要每次准备名单 |
NIO-epoll | 开班时,则在教室黑板贴上人员名单,有新同窗加入毕业,则增长删除,老师每次来到教室,不用准确名单,只须要告知在名单上同窗且完成做业,报给我,后序老师直接拿到报名同窗做业,批改便可 | 解决上述全部弊端 |
ps.select的fd有1024的数量约束,poll无此限制
Netty拥有两个NIO线程池,分别是bossGroup和workerGroup,前者处理新建链接请求,而后将新创建的链接轮询交给workerGroup中的其中一个NioEventLoop来处理,后续该链接上的读写操做都是由同一个NioEventLoop来处理。注意,虽然bossGroup也能指定多个NioEventLoop(一个NioEventLoop对应一个线程),可是默认状况下只会有一个线程,由于通常状况下应用程序只会使用一个对外监听端口。
为什么不能使用多线程来监听同一个对外端口么,即多线程epoll_wait到同一个epoll实例上?
这里会引来惊群的问题和epoll设置的是LT模式
现代linux中,多个socker同时监听同一个端口也是可行的,nginx 1.9.1也支持这一行为。linux 3.9以上内核支持SO_REUSEPORT选项,容许多个socker bind/listen在同一端口上。这样,多个进程能够各自申请socker监听同一端口,当链接事件来临时,内核作负载均衡,唤醒监听的其中一个进程来处理,reuseport机制有效的解决了epoll惊群问题
Reactor 单线程模型,是指全部的 I/O 操做都在同一个 NIO 线程上面完成的,此时NIO线程职责包括:接收新建链接请求、读写操做等。
Rector 多线程模型与单线程模型最大的区别就是有一组 NIO 线程来处理链接读写操做,一个NIO线程处理Accept。一个NIO线程能够处理多个链接事件,一个链接的事件只能属于一个NIO线程
主从 Reactor 线程模型的特色是:服务端用于接收客户端链接的再也不是一个单独的 NIO 线程,而是一个独立的 NIO 线程池。Acceptor 接收到客户端 TCP链接请求并处理完成后(可能包含接入认证等),将新建立的 SocketChannel注 册 到 I/O 线 程 池(sub reactor 线 程 池)的某个I/O线程上, 由它负责SocketChannel 的读写和编解码工做。Acceptor 线程池仅仅用于客户端的登陆、握手和安全认证,一旦链路创建成功,就将链路注册到后端 subReactor 线程池的 I/O 线程上,由 I/O 线程负责后续的 I/O 操做。