深刻了解NIO底层原理

Redis 为什么能支持高并发?

Redis底层采用NIO中的多路IO复用的机制,对多个不一样的链接(TCP)实现IO复用,很好地支持高并发,而且能实现线程安全java

Redis官方没有windows版本,只有Linux版本。nginx

NIO在不一样的操做系统上实现的方式有所不一样,在Windows操做系统使用select实现轮训,并且还存在空轮训的状况,效率很是低。时间复杂度是为O(n)。其次默认对轮训的数据有必定限制,因此难于支持上万的TCP链接。
在Linux操做系统采用epoll实现事件驱动回调,不会存在空轮训的状况,只对活跃的socket链接实现主动回调,这样在性能上有大大的提高,时间复杂度是为O(1)web

Windows 操做系统是没有epoll,只有Linux系统才有epoll。redis

这就是为何nginx、redis都可以很是好的支持高并发,最终都是Linux中的IO多路复用机制epoll。windows

阻塞和非阻塞

阻塞和非阻塞一般形容多线程间的相互影响。好比一个线程占用了临界区资源,那么其它全部须要这个资源的线程就必须在这个临界区中进行等待,等待会致使线程挂起。这种状况就是阻塞。此时,若是占用资源的线程一直不肯意释放资源,那么其它全部阻塞在这个临界区上的线程都不能工做。而非阻塞容许多个线程同时进入临界区。安全

阻塞调用是指调用结果返回以前,当前线程会被挂起。调用线程只有在获得结果以后才会返回。
非阻塞调用指在不能马上获得结果以前,该调用不会阻塞当前线程。多线程

BIO NIO AIO 概念

BIO(blocking IO):就是传统的 java.io 包,它是基于流模型实现的,交互的方式是同步、阻塞方式,也就是说在读入输入流或者输出流时,在读写动做完成以前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序。优势是代码比较简单、直观;缺点是 IO 的效率和扩展性很低,容易成为应用性能瓶颈。
NIO(non-blocking IO) :Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,能够构建多路复用的、同步非阻塞 IO 程序,同时提供了更接近操做系统底层高性能的数据操做方式。
AIO(Asynchronous IO) :是 Java 1.7 以后引入的包,是 NIO 的升级版本,提供了异步非堵塞的 IO 操做方式,因此人们叫它 AIO(Asynchronous IO),异步 IO 是基于事件和回调机制实现的,也就是应用操做以后会直接返回,不会堵塞在那里,当后台处理完成,操做系统会通知相应的线程进行后续的操做。并发

NIO 讲解

咱们知道,BIO是阻塞式IO,是面向于流传输也便是根据每一个字节实现传输,效率比较低;而NIO是同步非阻塞式的,式面向于缓冲区的,它的亮点是IO多路复用
咱们能够这样理解IO多路复用,多路能够指有多个不一样的TCP链接,复用是一个线程来维护多个不一样的IO操做。因此它的好处是占用CPU资源很是小,并且线程安全。异步

NIO核心组件

管道channel:数据传输都是通过管道的。channel都是统一注册到Selector上的。
选择器Selector:也可称为多路复用器。能够在单线程的状况下维护多个Channel,也能够维护多个链接。socket

在这里插入图片描述

BIO 和 NIO 代码演示

传统的BIO阻塞式Socket过程:

先启动一个Socket服务端,此时控制台会输出开始等待接收数据中...,并等待客户端链接。

package com.nobody;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;

/** * @author Mr.nobody * @Description * @date 2020/7/4 */
public class SocketTcpBioServer { 
 
   

    private static byte[] bytes = new byte[1024];

    public static void main(String[] args) { 
 
   

        try { 
 
   
            // 建立ServerSocket
            final ServerSocket serverSocket = new ServerSocket();
            // 绑定监听端口号
            serverSocket.bind(new InetSocketAddress(8080));

            while (true) { 
 
   
                System.out.println("开始等待接收数据中...");
                Socket accept = serverSocket.accept();
                int read = 0;
                read = accept.getInputStream().read(bytes);
                String result = new String(bytes);
                System.out.println("接收到数据:" + result);
            }

        } catch (IOException e) { 
 
   
            e.printStackTrace();
        }

    }
}

在这里插入图片描述

再启动一个Socket客户端,先不进行输入。

package com.nobody;

import java.io.IOException;
import java.net.*;
import java.util.Scanner;

/** * @author Mr.nobody * @Description * @date 2020/7/4 */
public class ClientTcpSocket { 
 
   

    public static void main(String[] args) { 
 
   
        Socket socket = new Socket();
        try { 
 
   
            // 与服务端创建链接
            SocketAddress socketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
            socket.connect(socketAddress);
            while (true) { 
 
   
                Scanner scanner = new Scanner(System.in);
                socket.getOutputStream().write(scanner.next().getBytes());
            }
        } catch (UnknownHostException e) { 
 
   
            e.printStackTrace();
        } catch (IOException e) { 
 
   
            e.printStackTrace();
        }
    }

}

再启动另一个Socket客户端02,输入client02

package com.nobody;

import java.io.IOException;
import java.net.*;
import java.util.Scanner;

/** * @author Mr.nobody * @Description * @date 2020/7/4 */
public class ClientTcpSocket02 { 
 
   

    public static void main(String[] args) { 
 
   
        Socket socket = new Socket();
        try { 
 
   
            // 与服务端创建链接
            SocketAddress socketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
            socket.connect(socketAddress);
            while (true) { 
 
   
                Scanner scanner = new Scanner(System.in);
                socket.getOutputStream().write(scanner.next().getBytes());
            }
        } catch (UnknownHostException e) { 
 
   
            e.printStackTrace();
        } catch (IOException e) { 
 
   
            e.printStackTrace();
        }
    }

}

在这里插入图片描述
此时能够看到服务端没有接收到数据,由于Socket客户端01先链接,可是还未输入数据,因此服务端一直等待客户端01的输入,致使客户端02阻塞。

若是咱们这时在客户端01输入client01,服务端控制台显示以下,先输出客户端01的数据,完成后才能输出客户端02的数据。
在这里插入图片描述
固然,若是不想后链接的客户端不阻塞,可使用多线程实现伪异步IO,只需将服务端代码修改成以下:

public static void main(String[] args) { 
 
   

    try { 
 
   
        // 建立ServerSocket
        final ServerSocket serverSocket = new ServerSocket();
        // 绑定监听端口号
        serverSocket.bind(new InetSocketAddress(8080));

        while (true) { 
 
   
            System.out.println("开始等待接收数据中...");
            Socket accept = serverSocket.accept();
            new Thread(new Runnable() { 
 
   
                @Override
                public void run() { 
 
   
                    int read = 0;
                    try { 
 
   
                        read = accept.getInputStream().read(bytes);
                    } catch (IOException e) { 
 
   
                        e.printStackTrace();
                    }
                    String result = new String(bytes);
                    System.out.println("接收到数据:" + result);
                }
            }).start();
        }

    } catch (IOException e) { 
 
   
        e.printStackTrace();
    }
}

固然上面代码有个缺点是建立的线程会频繁建立和销毁,频繁进行CPU调度,而且也消耗内存资源,可以使用线程池机制优化。

NIO非阻塞式Socket过程:
前面两个客户端代码不变,服务端代码以下:

package com.nobody.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

/** * @author Mr.nobody * @Description * @date 2020/7/4 */
public class NioServer { 
 
   

    private Selector selector;

    public void iniServer() { 
 
   
        try { 
 
   
            // 建立管道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            // 设置管道为非阻塞
            serverSocketChannel.configureBlocking(false);
            // 将管道绑定到8080端口
            serverSocketChannel.bind(new InetSocketAddress(8080));
            // 建立一个选择器
            this.selector = Selector.open();
            // 将管道注册到选择器上,注册为SelectionKey.OP_ACCEPT事件,
            // 当事件到达后,selector.select()会返回,不然改方法会一直阻塞。
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) { 
 
   
            e.printStackTrace();
        }
    }

    public void listen() throws IOException { 
 
   
        System.out.println("服务端启动成功...");
        // 轮询访问Selector
        while (true) { 
 
   
            // 当事件到达后,selector.select()会返回,不然改方法会一直阻塞。
            int select = selector.select(10);
            // 没有发送消息,跳过
            if (0 == select) { 
 
   
                continue;
            }

            // selector中选中的注册事件
            Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
            while (iterator.hasNext()) { 
 
   
                SelectionKey key = iterator.next();
                // 删除已选中的key,避免重复处理
                iterator.remove();
                if (key.isAcceptable()) { 
 
    // 客户端链接事件
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    // 得到与客户端链接的管道
                    SocketChannel socketChannel = server.accept();
                    // 设置管道为非阻塞
                    socketChannel.configureBlocking(false);
                    // 与客户端链接后,为了能接收到客户端的消息,为管道设置可读权限
                    socketChannel.register(this.selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) { 
 
    // 可读事件
                    // 建立读取数据的缓冲区
                    ByteBuffer byteBuffer = ByteBuffer.allocate(512);
                    SocketChannel channel = (SocketChannel) key.channel();
                    channel.read(byteBuffer);
                    byte[] bytes = byteBuffer.array();
                    String msg = new String(bytes).trim();
                    System.out.println("服务端收到消息:" + msg);
                    ByteBuffer outByteBuffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
                    // 回应消息给客户端
                    channel.write(outByteBuffer);
                }
            }
        }
    }

    public static void main(String[] args) throws IOException { 
 
   
        NioServer nioServer = new NioServer();
        nioServer.iniServer();
        nioServer.listen();
    }
}

启动服务端,而后再启动两个客户端,两个客户端都不会阻塞。
在这里插入图片描述
在这里插入图片描述

本文同步分享在 博客“Μr.ηobοdy”(CSDN)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索