java艺术开发(3)-NIO

1. 前言

和大多数人同样,NIO在课本里面看到过,可是基本没有用过。毕竟应该不多有程序员本身去在java里面实现NIO,大多都基于Netty框架来实现。我以前开发过一个基于websocket的项目,websocket是基于tomcat实现的,可是根据线上的效果来看,websocket链接传输的性能并很差,因此准备着手优化。java

目前主流的websocket实现框架有tomcat、netty-socketIO和netty,而且网友一边倒的倾向于netty相关的实现。鄙视tomcat的理由是: 依赖于容器,性能较差;基于BIO,当并发量高的时候会有资源瓶颈。 可是通过个人实际测试,我当前使用的内嵌tomcat9已经默认实现NIO了。只能说tomcat实现websocket性能较差的缘由,在于它基于容器的websocket实现不够完善,NIO的实现也不如netty成熟。react

固然,我最终选择了用netty来重构当前的websocket框架。不过,我对于NIO的实现过程也产生了兴趣,连tomcat也在向它倾斜。程序员

2. BIO、NIO、AIO

Java 中的 BIO、NIO和 AIO 理解为是 Java 语言对操做系统的各类 IO 模型的封装。程序员在使用这些 API 的时候,不须要关心操做系统层面的知识,也不须要根据不一样操做系统编写不一样的代码。只须要使用Java的API就能够了。web

在讲 BIO,NIO,AIO 以前先来回顾一下这样几个概念:同步与异步,阻塞与非阻塞。spring

2.1. 同步与异步

同步就是发起一个调用后,被调用者未处理完请求以前,调用不返回。异步就是发起一个调用后,马上获得被调用者的回应表示已接收到请求,可是被调用者并无返回结果,此时咱们能够处理其余的请求,被调用者一般依靠事件,回调等机制来通知调用者其返回结果。apache

同步和异步的区别最大在于异步的话调用者不须要等待处理结果,被调用者会经过回调等机制来通知调用者其返回结果。咱们能够用打电话和发短信来很好的比喻同步与异步操做。数组

2.2. 阻塞和非阻塞

阻塞与非阻塞主要是从 CPU 的消耗上来讲的,阻塞就是 CPU 停下来等待一个慢的操做完成 CPU 才接着完成其它的事。非阻塞就是在这个慢的操做在执行时 CPU 去干其它别的事,等这个慢的操做完成时,CPU 再接着完成后续的操做。缓存

虽然表面上看非阻塞的方式能够明显的提升 CPU 的利用率,可是也带了另一种后果就是系统的线程切换增长。增长的 CPU 使用时间能不能补偿系统的切换成本须要好好评估。tomcat

那么同步阻塞同步非阻塞异步非阻塞又表明什么意思呢?springboot

我在网上看到一个很好的例子:你妈妈让你烧水,小时候你比较笨啊,在哪里傻等着水开(同步阻塞)。等你稍微再长大一点,你知道每次烧水的空隙能够去干点其余事,而后只须要时不时来看看水开了没有(同步非阻塞)。后来,大家家用上了水开了会发出声音的壶,这样你就只须要听到响声后就知道水开了,在这期间你能够随便干本身的事情,你须要去倒水了(异步非阻塞)。

2.3. BIO、NIO、AIO

  • BIO:就是传统的 java.io 包,它是基于流模型实现的,交互的方式是同步、阻塞方式,也就是说在读入输入流或者输出流时,在读写动做完成以前,线程会一直阻塞在那里,它们之间的调用时可靠的线性顺序。它的有点就是代码比较简单、直观;缺点就是 IO 的效率和扩展性很低,容易成为应用性能瓶颈。
  • NIO :是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,能够构建多路复用的、同步非阻塞 IO 程序,同时提供了更接近操做系统底层高性能的数据操做方式。
  • AIO:是 Java 1.7 以后引入的包,是 NIO 的升级版本,提供了异步非堵塞的 IO 操做方式,因此人们叫它 AIO(Asynchronous IO),异步 IO 是基于事件和回调机制实现的,也就是应用操做以后会直接返回,不会堵塞在那里,当后台处理完成,操做系统会通知相应的线程进行后续的操做。

3. 关于NIO

3.1. NIO说明

NIO是一种新的IO模型(Recator模型),新主要体如今多路复用,事件驱动上

一、多路复用,一个线程能够处理多个socket请求,经过多个socket注册在一个select上面,而后不断调用select来获取被激活的socket,即达到在一个线程中,处理多个socket请求目的,而在传统(同步阻塞)IO模型中,须要经过多线程的方式才能达到此目的,传统的IO模型因为使用多线程,就会有线程数量以及线程上下文切换等限制。

二、事件驱动(其实就是观察者模式),模型图以下

image.png

image.png

如图所示,EventHandler为IO的事件处理器(观察者),Reactor为管理EventHandler类,事件的注册,删除等(被观察者),reactor的handle_event函数会不断循环调用内核的selec()函数(同步事件多路分离器(通常是内核)的多路分离函数),只要某个文件句柄被激活(可读写),select()函数就返回,handle_event会调用相关的事件处理函数EventHandler上的handle_event()函数。

时序图如上图所示,使用reactor模型以后,用户线程注册事件以后,能够去执行其余事情(异步),等相关读写工做就绪以后,Reactor会通知用户线程进行读写。用户IO线程轮询是否读写好等工做由Reactor上的handle_events处理,Reactor会调用内核select函数检查socket的状态。当socket被激活的时候,通知用户线程(或调用户线程的回掉函数)。执行EventHandler的hand_event()函数。因为select函数是阻塞的,因此多了复用模型被叫作异步阻塞模型,注意,这里所说的阻塞并非socket上read等操做的阻塞,socket上这些操做时非阻塞的(事件模型)。

NIO有3个实体:Buffer(缓冲区),Channel(通道),Selector(多路复用器)。
image.png

Buffer是客户端存放服务端信息的一个容器,服务端若是把数据准备好了,就会经过Channel往Buffer里面传。Buffer有7个类型:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer。

image.png

Channel是客户端与服务端之间的双工链接通道。因此在请求的过程当中,客户端与服务端中间的Channel就在不停的执行“链接、询问、断开”的过程。直到数据准备好,再经过Channel传回来。Channel主要有4个类型:FileChannel(从文件读取数据)、DatagramChannel(读写UDP网络协议数据)、SocketChannel(读写TCP网络协议数据)、ServerSocketChannel(能够监听TCP链接)

Selector是服务端选择Channel的一个复用器。Seletor有两个核心任务:监控数据是否准备好,应答Channel。具体说来,多个Channel反复轮询时,Selector就看该Channel所需的数据是否准备好了;若是准备好了,则将数据经过Channel返回给该客户端的Buffer,该客户端再进行后续其余操做;若是没准备好,则告诉Channel还须要继续轮询;多个Channel反复询问Selector,Selector为这些Channel一一解答。

3.2. NIO和BIO对比

旦有请求到来(无论是几个同时到仍是只有一个到),都会调用对应IO处理函数处理,因此:

(1)NIO适合处理链接数目特别多,可是链接比较短(轻操做)的场景,Jetty,Mina,ZooKeeper等都是基于java nio实现。

(2)BIO方式适用于链接数目比较小且固定的场景,这种方式对服务器资源要求比较高,并发局限于应用中。

4. 零拷贝

4.1. 传统IO

数据须要从磁盘拷贝到内核空间,再从内核空间拷到用户空间(JVM)。
程序可能进行数据修改等操做
再将数据拷贝到内核空间,内核空间再拷贝到网卡内存,经过网络发送出去(或拷贝到磁盘)。
即数据的读写(这里用户空间发到网络也算做写),都至少须要两次拷贝。

固然磁盘到内核空间属于DMA拷贝(DMA即直接内存存取,原理是外部设备不经过CPU而直接与系统内存交换数据)。而内核空间到用户空间则须要CPU的参与进行拷贝,既然须要CPU参与,也就涉及到了内核态和用户态的相互切换

3.gif

4.2. NIO零拷贝

改进的地方:

  • 已经将上下文切换次数从4次减小到了2次;
  • 将数据拷贝次数从4次减小到了3次(其中只有1次涉及了CPU,另外2次是DMA直接存取)。

但这尚未达到咱们零拷贝的目标。若是底层NIC(网络接口卡)支持gather操做,咱们能进一步减小内核中的数据拷贝。在Linux 2.4以及更高版本的内核中,socket缓冲区描述符已被修改用来适应这个需求。这种方式不但减小屡次的上下文切换,同时消除了须要CPU参与的重复的数据拷贝。用户这边的使用方式不变,而内部已经有了质的改变。

NIO的零拷贝由transferTo()方法实现。transferTo()方法将数据从FileChannel对象传送到可写的字节通道(如Socket Channel等)。在内部实现中,由native方法transferTo0()来实现,它依赖底层操做系统的支持。在UNIX和Linux系统中,调用这个方法将会引发sendfile()系统调用。

4.gif

4.3. NIO直接内存

首先,它的做用位置处于传统IO(BIO)与零拷贝之间,为什么这么说?

传统IO,能够把磁盘的文件通过内核空间,读到JVM空间,而后进行各类操做,最后再写到磁盘或是发送到网络,效率较慢但支持数据文件操做。

零拷贝则是直接在内核空间完成文件读取并转到磁盘(或发送到网络)。因为它没有读取文件数据到JVM这一环,所以程序没法操做该文件数据,尽管效率很高!

而直接内存则介于二者之间,效率通常且可操做文件数据。直接内存(mmap技术)将文件直接映射到内核空间的内存,返回一个操做地址(address),它解决了文件数据须要拷贝到JVM才能进行操做的窘境。而是直接在内核空间直接进行操做,省去了内核空间拷贝到用户空间这一步操做。

NIO的直接内存是由MappedByteBuffer实现的。核心便是map()方法,该方法把文件映射到内存中,得到内存地址addr,而后经过这个addr构造MappedByteBuffer类,以暴露各类文件操做API。

因为MappedByteBuffer申请的是堆外内存,所以不受Minor GC控制,只能在发生Full GC时才能被回收。而DirectByteBuffer改善了这一状况,它是MappedByteBuffer类的子类,同时它实现了DirectBuffer接口,维护一个Cleaner对象来完成内存回收。所以它既能够经过Full GC来回收内存,也能够调用clean()方法来进行回收。

另外,直接内存的大小可经过jvm参数来设置:-XX:MaxDirectMemorySize。

NIO的MappedByteBuffer还有一个兄弟叫作HeapByteBuffer。顾名思义,它用来在堆中申请内存,本质是一个数组。因为它位于堆中,所以可受GC管控,易于回收。

5. tomcat的应用

仍是那个websocket的项目,以前说过,网上不少人都认为tomcat默认是实现BIO的。但我在运行springboot项目后,无心中看到控制台的日志中有个"nio-exec-"前缀的线程。我觉得是由于引入了netty,可是当时那个接口是http的,和netty不要紧,最终查阅资料后了解到tomcat不一样版本也在作改变。

一、BIO:阻塞式I/O操做即便用的是传统 I/O操做,Tomcat7如下版本默认状况下是以BIO模式运行的,因为每一个请求都要建立一个线程来处理,线程开销较大,不能处理高并发的场景,在三种模式中性能也最低。启动tomcat后,日志中会有 http-bio-端口 的内容。

二、NIO是Java 1.4 及后续版本提供的一种新的I/O操做方式,是一个基于缓冲区、并能提供非阻塞I/O操做的Java API,它拥有比传统I/O操做(BIO)更好的并发运行性能。tomcat 8版本及以上默认就是在NIO模式下容许。启动tomcat后,日志中会有 http-nio-端口 的内容。

三、APR(Apache Portable Runtime/Apache可移植运行时),是Apache HTTP服务器的支持库。你能够简单地理解为,Tomcat将以JNI的形式调用Apache HTTP服务器的核心动态连接库来处理文件读取或网络传输操做,从而大大地提升Tomcat对静态文件的处理性能。 Tomcat apr也是在Tomcat上运行高并发应用的首选模式。启动tomcat后,日志中会有 http-apr-端口 的内容。

固然,tomcat的版本也不必定绝对匹配到,若是你想看你的tomcat是什么版本的,仍是要看日志。若是你是启动springboot运行的,内嵌tomcat的日志可能不够完整,能够经过在配置文件中加上如下的属性来开启完整日志:

logging.level.org.apache.tomcat=debug
logging.level.org.apache.catalina=debug

6. 附录代码

附上简单实现java nio的代码做为参考(也是copy来的)。
NioServer.java

/**
 * @Copyright: Shanghai Definesys Company.All rights reserved.
 * @Description:
 * ServerSocketChannel:有效事件为 OP_ACCEPT。
 * SocketChannel:有效事件为 OP_CONNECT、OP_READ、OP_WRITE
 * 他们之间是互斥的,若是 OP_READ为true,其余的就为false
 *
 * @author: kerry.wu
 * @since: 2020/4/26 8:52
 * @history: 1.2020/4/26 created by kerry.wu
 */
public class NioServer {
    private int port;
    private Selector selector;
    private ExecutorService executorService= Executors.newFixedThreadPool(5);

    public NioServer(int port){
        this.port=port;
    }

    /**
     * 服务器端注册 OP_ACCEPT
     */
    public void init(){
        ServerSocketChannel serverSocketChannel=null;
        try {
            serverSocketChannel=ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(port));
            selector=Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("NIO Server open ...");
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void accept(SelectionKey selectionKey){
        try {
            ServerSocketChannel serverSocketChannel=(ServerSocketChannel) selectionKey.channel();
            SocketChannel socketChannel=serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector,SelectionKey.OP_READ);
            System.out.println("accept a client:"+socketChannel.socket().getInetAddress().getHostName());
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void start(){
        //注册 OP_ACCEPT
        this.init();
        while (true){
            try {
                Thread.sleep(3000);
                int events=selector.select();
                if(events>0){
                    java.util.Iterator<SelectionKey> selectionKeyIterator=selector.selectedKeys().iterator();
                    while (selectionKeyIterator.hasNext()){
                        SelectionKey selectionKey=selectionKeyIterator.next();
                        selectionKeyIterator.remove();
                        //SelectionKey当前事件是 OP_ACCEPT 时,注册 SocketChannel 的 OP_READ
                        if(selectionKey.isAcceptable()){
                            accept(selectionKey);
                        }else {
                            //SelectionKey当前事件是 OP_READ 时,分配线程接受并处理消息
                            executorService.submit(new NioServerHandler(selectionKey));
                        }
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new NioServer(8081).start();
    }
}

NioServerHandler.java

public class NioServerHandler implements Runnable {
    private SelectionKey selectionKey;

    public NioServerHandler(SelectionKey selectionKey){
        this.selectionKey=selectionKey;
    }

    /**
     * ByteBuffer:
     * 1.allocate(int capacity):从堆空间中分配一个容量大小为capacity的byte数组做为缓冲区的byte数据存储器
     * 2.allocateDirect(int capacity):不使用JVM堆栈而是经过操做系统来建立内存块用做缓冲区,它与当前操做系统可以更好的耦合,所以能进一步提升I/O操做速度。可是分配直接缓冲区的系统开销很大,所以只有在缓冲区较大并长期存在,或者须要常常重用时,才使用这种缓冲区
     * 3.wrap(byte[] array):这个缓冲区的数据会存放在byte数组中,bytes数组或buff缓冲区任何一方中数据的改动都会影响另外一方。其实ByteBuffer底层原本就有一个bytes数组负责来保存buffer缓冲区中的数据,经过allocate方法系统会帮你构造一个byte数组
     * 4.flip:缓存字节数组的指针设置为数组的开始序列即数组下标0。这样就能够从buffer开头,对该buffer进行遍历(读取)了。
     */
    @Override
    public void run(){
        try {
            if(selectionKey.isReadable()){
                SocketChannel socketChannel=(SocketChannel) selectionKey.channel();
                ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
                socketChannel.read(byteBuffer);
                byteBuffer.flip();
                System.out.println("ACCEPT:"+selectionKey.isAcceptable());
                System.out.println("READ:"+selectionKey.isReadable());
                System.out.println("CONNECT:"+selectionKey.isConnectable());
                System.out.println("收到客户端"+socketChannel.socket().getInetAddress().getHostName()+"的数据:"+new String(byteBuffer.array()));

                socketChannel.write(ByteBuffer.wrap(("回复客户端"+socketChannel.socket().getInetAddress().getHostName()).getBytes()) );
                selectionKey.cancel();

            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

NioClient.java

public class NioClient {
    private static final String host = "127.0.0.1";
    private static final int port = 8081;
    private Selector selector;

    public static void main(String[] args){
        for (int i=0;i<3;i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    NioClient client = new NioClient();
                    client.connect(host, port);
                    client.listen();
                }
            }).start();
        }
    }

    /**
     * 建立链接,注册 OP_CONNECT
     * @param host
     * @param port
     */
    public void connect(String host, int port) {
        try {
            SocketChannel sc = SocketChannel.open();
            sc.configureBlocking(false);
            this.selector = Selector.open();
            //注册 OP_CONNECT
            sc.register(selector, SelectionKey.OP_CONNECT);
            sc.connect(new InetSocketAddress(host, port));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void listen() {
        while (true) {
            try {
                int events = selector.select();
                if (events > 0) {
                    Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
                    while (selectionKeys.hasNext()) {
                        SelectionKey selectionKey = selectionKeys.next();
                        selectionKeys.remove();
                        //当 OP_CONNECT 可链接事件,注册 OP_READ
                        if (selectionKey.isConnectable()) {
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            if (socketChannel.isConnectionPending()) {
                                socketChannel.finishConnect();
                            }
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            socketChannel.write(ByteBuffer.wrap(("Hello this is " + Thread.currentThread().getName()).getBytes()));
                        }
                        //当 OP_READ 可读事件,
                        else if (selectionKey.isReadable()) {
                            SocketChannel sc = (SocketChannel) selectionKey.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            //从 SocketChannel 中读取数据
                            sc.read(buffer);
                            buffer.flip();
                            Thread.sleep(3000);
                            System.out.println("收到服务端的数据:"+new String(buffer.array()));
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
相关文章
相关标签/搜索