和大多数人同样,NIO在课本里面看到过,可是基本没有用过。毕竟应该不多有程序员本身去在java里面实现NIO,大多都基于Netty框架来实现。我以前开发过一个基于websocket的项目,websocket是基于tomcat实现的,可是根据线上的效果来看,websocket链接传输的性能并很差,因此准备着手优化。java
目前主流的websocket实现框架有tomcat、netty-socketIO和netty,而且网友一边倒的倾向于netty相关的实现。鄙视tomcat的理由是: 依赖于容器,性能较差;基于BIO,当并发量高的时候会有资源瓶颈。
可是通过个人实际测试,我当前使用的内嵌tomcat9已经默认实现NIO了。只能说tomcat实现websocket性能较差的缘由,在于它基于容器的websocket实现不够完善,NIO的实现也不如netty成熟。react
固然,我最终选择了用netty来重构当前的websocket框架。不过,我对于NIO的实现过程也产生了兴趣,连tomcat也在向它倾斜。程序员
Java 中的 BIO、NIO和 AIO 理解为是 Java 语言对操做系统的各类 IO 模型的封装。程序员在使用这些 API 的时候,不须要关心操做系统层面的知识,也不须要根据不一样操做系统编写不一样的代码。只须要使用Java的API就能够了。web
在讲 BIO,NIO,AIO 以前先来回顾一下这样几个概念:同步与异步,阻塞与非阻塞。spring
同步就是发起一个调用后,被调用者未处理完请求以前,调用不返回。异步就是发起一个调用后,马上获得被调用者的回应表示已接收到请求,可是被调用者并无返回结果,此时咱们能够处理其余的请求,被调用者一般依靠事件,回调等机制来通知调用者其返回结果。apache
同步和异步的区别最大在于异步的话调用者不须要等待处理结果,被调用者会经过回调等机制来通知调用者其返回结果。咱们能够用打电话和发短信来很好的比喻同步与异步操做。数组
阻塞与非阻塞主要是从 CPU 的消耗上来讲的,阻塞就是 CPU 停下来等待一个慢的操做完成 CPU 才接着完成其它的事。非阻塞就是在这个慢的操做在执行时 CPU 去干其它别的事,等这个慢的操做完成时,CPU 再接着完成后续的操做。缓存
虽然表面上看非阻塞的方式能够明显的提升 CPU 的利用率,可是也带了另一种后果就是系统的线程切换增长。增长的 CPU 使用时间能不能补偿系统的切换成本须要好好评估。tomcat
那么同步阻塞、同步非阻塞和异步非阻塞又表明什么意思呢?springboot
我在网上看到一个很好的例子:你妈妈让你烧水,小时候你比较笨啊,在哪里傻等着水开(同步阻塞)。等你稍微再长大一点,你知道每次烧水的空隙能够去干点其余事,而后只须要时不时来看看水开了没有(同步非阻塞)。后来,大家家用上了水开了会发出声音的壶,这样你就只须要听到响声后就知道水开了,在这期间你能够随便干本身的事情,你须要去倒水了(异步非阻塞)。
NIO是一种新的IO模型(Recator模型),新主要体如今多路复用,事件驱动上
一、多路复用,一个线程能够处理多个socket请求,经过多个socket注册在一个select上面,而后不断调用select来获取被激活的socket,即达到在一个线程中,处理多个socket请求目的,而在传统(同步阻塞)IO模型中,须要经过多线程的方式才能达到此目的,传统的IO模型因为使用多线程,就会有线程数量以及线程上下文切换等限制。
二、事件驱动(其实就是观察者模式),模型图以下
如图所示,EventHandler为IO的事件处理器(观察者),Reactor为管理EventHandler类,事件的注册,删除等(被观察者),reactor的handle_event函数会不断循环调用内核的selec()函数(同步事件多路分离器(通常是内核)的多路分离函数),只要某个文件句柄被激活(可读写),select()函数就返回,handle_event会调用相关的事件处理函数EventHandler上的handle_event()函数。
时序图如上图所示,使用reactor模型以后,用户线程注册事件以后,能够去执行其余事情(异步),等相关读写工做就绪以后,Reactor会通知用户线程进行读写。用户IO线程轮询是否读写好等工做由Reactor上的handle_events处理,Reactor会调用内核select函数检查socket的状态。当socket被激活的时候,通知用户线程(或调用户线程的回掉函数)。执行EventHandler的hand_event()函数。因为select函数是阻塞的,因此多了复用模型被叫作异步阻塞模型,注意,这里所说的阻塞并非socket上read等操做的阻塞,socket上这些操做时非阻塞的(事件模型)。
NIO有3个实体:Buffer(缓冲区),Channel(通道),Selector(多路复用器)。
Buffer是客户端存放服务端信息的一个容器,服务端若是把数据准备好了,就会经过Channel往Buffer里面传。Buffer有7个类型:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer。
Channel是客户端与服务端之间的双工链接通道。因此在请求的过程当中,客户端与服务端中间的Channel就在不停的执行“链接、询问、断开”的过程。直到数据准备好,再经过Channel传回来。Channel主要有4个类型:FileChannel(从文件读取数据)、DatagramChannel(读写UDP网络协议数据)、SocketChannel(读写TCP网络协议数据)、ServerSocketChannel(能够监听TCP链接)
Selector是服务端选择Channel的一个复用器。Seletor有两个核心任务:监控数据是否准备好,应答Channel。具体说来,多个Channel反复轮询时,Selector就看该Channel所需的数据是否准备好了;若是准备好了,则将数据经过Channel返回给该客户端的Buffer,该客户端再进行后续其余操做;若是没准备好,则告诉Channel还须要继续轮询;多个Channel反复询问Selector,Selector为这些Channel一一解答。
旦有请求到来(无论是几个同时到仍是只有一个到),都会调用对应IO处理函数处理,因此:
(1)NIO适合处理链接数目特别多,可是链接比较短(轻操做)的场景,Jetty,Mina,ZooKeeper等都是基于java nio实现。
(2)BIO方式适用于链接数目比较小且固定的场景,这种方式对服务器资源要求比较高,并发局限于应用中。
数据须要从磁盘拷贝到内核空间,再从内核空间拷到用户空间(JVM)。
程序可能进行数据修改等操做
再将数据拷贝到内核空间,内核空间再拷贝到网卡内存,经过网络发送出去(或拷贝到磁盘)。
即数据的读写(这里用户空间发到网络也算做写),都至少须要两次拷贝。
固然磁盘到内核空间属于DMA拷贝(DMA即直接内存存取,原理是外部设备不经过CPU而直接与系统内存交换数据)。而内核空间到用户空间则须要CPU的参与进行拷贝,既然须要CPU参与,也就涉及到了内核态和用户态的相互切换
改进的地方:
但这尚未达到咱们零拷贝的目标。若是底层NIC(网络接口卡)支持gather操做,咱们能进一步减小内核中的数据拷贝。在Linux 2.4以及更高版本的内核中,socket缓冲区描述符已被修改用来适应这个需求。这种方式不但减小屡次的上下文切换,同时消除了须要CPU参与的重复的数据拷贝。用户这边的使用方式不变,而内部已经有了质的改变。
NIO的零拷贝由transferTo()方法实现。transferTo()方法将数据从FileChannel对象传送到可写的字节通道(如Socket Channel等)。在内部实现中,由native方法transferTo0()来实现,它依赖底层操做系统的支持。在UNIX和Linux系统中,调用这个方法将会引发sendfile()系统调用。
首先,它的做用位置处于传统IO(BIO)与零拷贝之间,为什么这么说?
传统IO,能够把磁盘的文件通过内核空间,读到JVM空间,而后进行各类操做,最后再写到磁盘或是发送到网络,效率较慢但支持数据文件操做。
零拷贝则是直接在内核空间完成文件读取并转到磁盘(或发送到网络)。因为它没有读取文件数据到JVM这一环,所以程序没法操做该文件数据,尽管效率很高!
而直接内存则介于二者之间,效率通常且可操做文件数据。直接内存(mmap技术)将文件直接映射到内核空间的内存,返回一个操做地址(address),它解决了文件数据须要拷贝到JVM才能进行操做的窘境。而是直接在内核空间直接进行操做,省去了内核空间拷贝到用户空间这一步操做。
NIO的直接内存是由MappedByteBuffer实现的。核心便是map()方法,该方法把文件映射到内存中,得到内存地址addr,而后经过这个addr构造MappedByteBuffer类,以暴露各类文件操做API。
因为MappedByteBuffer申请的是堆外内存,所以不受Minor GC控制,只能在发生Full GC时才能被回收。而DirectByteBuffer改善了这一状况,它是MappedByteBuffer类的子类,同时它实现了DirectBuffer接口,维护一个Cleaner对象来完成内存回收。所以它既能够经过Full GC来回收内存,也能够调用clean()方法来进行回收。
另外,直接内存的大小可经过jvm参数来设置:-XX:MaxDirectMemorySize。
NIO的MappedByteBuffer还有一个兄弟叫作HeapByteBuffer。顾名思义,它用来在堆中申请内存,本质是一个数组。因为它位于堆中,所以可受GC管控,易于回收。
仍是那个websocket的项目,以前说过,网上不少人都认为tomcat默认是实现BIO的。但我在运行springboot项目后,无心中看到控制台的日志中有个"nio-exec-"前缀的线程。我觉得是由于引入了netty,可是当时那个接口是http的,和netty不要紧,最终查阅资料后了解到tomcat不一样版本也在作改变。
一、BIO:阻塞式I/O操做即便用的是传统 I/O操做,Tomcat7如下版本默认状况下是以BIO模式运行的,因为每一个请求都要建立一个线程来处理,线程开销较大,不能处理高并发的场景,在三种模式中性能也最低。启动tomcat后,日志中会有 http-bio-端口
的内容。
二、NIO是Java 1.4 及后续版本提供的一种新的I/O操做方式,是一个基于缓冲区、并能提供非阻塞I/O操做的Java API,它拥有比传统I/O操做(BIO)更好的并发运行性能。tomcat 8版本及以上默认就是在NIO模式下容许。启动tomcat后,日志中会有 http-nio-端口
的内容。
三、APR(Apache Portable Runtime/Apache可移植运行时),是Apache HTTP服务器的支持库。你能够简单地理解为,Tomcat将以JNI的形式调用Apache HTTP服务器的核心动态连接库来处理文件读取或网络传输操做,从而大大地提升Tomcat对静态文件的处理性能。 Tomcat apr也是在Tomcat上运行高并发应用的首选模式。启动tomcat后,日志中会有 http-apr-端口
的内容。
固然,tomcat的版本也不必定绝对匹配到,若是你想看你的tomcat是什么版本的,仍是要看日志。若是你是启动springboot运行的,内嵌tomcat的日志可能不够完整,能够经过在配置文件中加上如下的属性来开启完整日志:
logging.level.org.apache.tomcat=debug logging.level.org.apache.catalina=debug
附上简单实现java nio的代码做为参考(也是copy来的)。
NioServer.java
/** * @Copyright: Shanghai Definesys Company.All rights reserved. * @Description: * ServerSocketChannel:有效事件为 OP_ACCEPT。 * SocketChannel:有效事件为 OP_CONNECT、OP_READ、OP_WRITE * 他们之间是互斥的,若是 OP_READ为true,其余的就为false * * @author: kerry.wu * @since: 2020/4/26 8:52 * @history: 1.2020/4/26 created by kerry.wu */ public class NioServer { private int port; private Selector selector; private ExecutorService executorService= Executors.newFixedThreadPool(5); public NioServer(int port){ this.port=port; } /** * 服务器端注册 OP_ACCEPT */ public void init(){ ServerSocketChannel serverSocketChannel=null; try { serverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(port)); selector=Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("NIO Server open ..."); }catch (Exception e){ e.printStackTrace(); } } public void accept(SelectionKey selectionKey){ try { ServerSocketChannel serverSocketChannel=(ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel=serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector,SelectionKey.OP_READ); System.out.println("accept a client:"+socketChannel.socket().getInetAddress().getHostName()); }catch (Exception e){ e.printStackTrace(); } } public void start(){ //注册 OP_ACCEPT this.init(); while (true){ try { Thread.sleep(3000); int events=selector.select(); if(events>0){ java.util.Iterator<SelectionKey> selectionKeyIterator=selector.selectedKeys().iterator(); while (selectionKeyIterator.hasNext()){ SelectionKey selectionKey=selectionKeyIterator.next(); selectionKeyIterator.remove(); //SelectionKey当前事件是 OP_ACCEPT 时,注册 SocketChannel 的 OP_READ if(selectionKey.isAcceptable()){ accept(selectionKey); }else { //SelectionKey当前事件是 OP_READ 时,分配线程接受并处理消息 executorService.submit(new NioServerHandler(selectionKey)); } } } }catch (Exception e){ e.printStackTrace(); } } } public static void main(String[] args) { new NioServer(8081).start(); } }
NioServerHandler.java
public class NioServerHandler implements Runnable { private SelectionKey selectionKey; public NioServerHandler(SelectionKey selectionKey){ this.selectionKey=selectionKey; } /** * ByteBuffer: * 1.allocate(int capacity):从堆空间中分配一个容量大小为capacity的byte数组做为缓冲区的byte数据存储器 * 2.allocateDirect(int capacity):不使用JVM堆栈而是经过操做系统来建立内存块用做缓冲区,它与当前操做系统可以更好的耦合,所以能进一步提升I/O操做速度。可是分配直接缓冲区的系统开销很大,所以只有在缓冲区较大并长期存在,或者须要常常重用时,才使用这种缓冲区 * 3.wrap(byte[] array):这个缓冲区的数据会存放在byte数组中,bytes数组或buff缓冲区任何一方中数据的改动都会影响另外一方。其实ByteBuffer底层原本就有一个bytes数组负责来保存buffer缓冲区中的数据,经过allocate方法系统会帮你构造一个byte数组 * 4.flip:缓存字节数组的指针设置为数组的开始序列即数组下标0。这样就能够从buffer开头,对该buffer进行遍历(读取)了。 */ @Override public void run(){ try { if(selectionKey.isReadable()){ SocketChannel socketChannel=(SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer=ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); byteBuffer.flip(); System.out.println("ACCEPT:"+selectionKey.isAcceptable()); System.out.println("READ:"+selectionKey.isReadable()); System.out.println("CONNECT:"+selectionKey.isConnectable()); System.out.println("收到客户端"+socketChannel.socket().getInetAddress().getHostName()+"的数据:"+new String(byteBuffer.array())); socketChannel.write(ByteBuffer.wrap(("回复客户端"+socketChannel.socket().getInetAddress().getHostName()).getBytes()) ); selectionKey.cancel(); } }catch (Exception e){ e.printStackTrace(); } } }
NioClient.java
public class NioClient { private static final String host = "127.0.0.1"; private static final int port = 8081; private Selector selector; public static void main(String[] args){ for (int i=0;i<3;i++) { new Thread(new Runnable() { @Override public void run() { NioClient client = new NioClient(); client.connect(host, port); client.listen(); } }).start(); } } /** * 建立链接,注册 OP_CONNECT * @param host * @param port */ public void connect(String host, int port) { try { SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false); this.selector = Selector.open(); //注册 OP_CONNECT sc.register(selector, SelectionKey.OP_CONNECT); sc.connect(new InetSocketAddress(host, port)); } catch (IOException e) { e.printStackTrace(); } } public void listen() { while (true) { try { int events = selector.select(); if (events > 0) { Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator(); while (selectionKeys.hasNext()) { SelectionKey selectionKey = selectionKeys.next(); selectionKeys.remove(); //当 OP_CONNECT 可链接事件,注册 OP_READ if (selectionKey.isConnectable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); socketChannel.write(ByteBuffer.wrap(("Hello this is " + Thread.currentThread().getName()).getBytes())); } //当 OP_READ 可读事件, else if (selectionKey.isReadable()) { SocketChannel sc = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); //从 SocketChannel 中读取数据 sc.read(buffer); buffer.flip(); Thread.sleep(3000); System.out.println("收到服务端的数据:"+new String(buffer.array())); } } } } catch (Exception e) { e.printStackTrace(); } } } }