NIO之Reactor模式,Netty序章

Reactor模式

反应堆模式:“反应”器名字中”反应“的由来:数组

  • “反应”即“倒置”,“控制逆转”,具体事件处理程序不调用反应器,而向反应器注册一个事件处理器,表示本身对某些事件感兴趣,有时间来了,具体事件处理程序经过事件处理器对某个指定的事件发生作出反应。

单线程Reactor模式流程:

clipboard.png

  • ①服务器端的Reactor是一个线程对象,该线程会启动事件循环,并使用Selector(选择器)来实现IO的多路复用。channel注册一个Acceptor事件处理器到Reactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样Reactor会监听客户端向服务器端发起的链接请求事件(ACCEPT事件)。
  • ②客户端向服务器端发起一个链接请求,Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器经过accept()方法获得与这个客户端对应的链接(SocketChannel),而后将该链接所关注的READ事件以及对应的READ事件处理器注册到Reactor中,这样一来Reactor就会监听该链接的READ事件了。
  • ③当Reactor监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理。好比,读处理器会经过SocketChannel的read()方法读取数据,此时read()操做能够直接读取到数据,而不会堵塞与等待可读的数据到来。
  • ④每当处理完全部就绪的感兴趣的I/O事件后,Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理。

注意,Reactor的单线程模式的单线程主要是针对于I/O操做而言,也就是全部的I/O的accept()、read()、write()以及connect()操做都在一个线程上完成的。服务器

基于单线程反应器模式手写一个NIO通讯

先简单介绍NIO中几个重要对象:
Selector并发

  • Selector的英文含义是“选择器”,也能够称为为“轮询代理器”、“事件订阅器”、“channel容器管理机”都行。
  • 事件订阅和Channel管理: 应用程序将向Selector对象注册须要它关注的Channel,以及具体的某一个Channel会对哪些IO事件感兴趣。Selector中也会维护一个“已经注册的Channel”的容器。

Channels
通道,被创建的一个应用程序和操做系统交互事件、传递内容的渠道(注意是链接到操做系统)。那么既然是和操做系统进行内容的传递,那么说明应用程序能够经过通道读取数据,也能够经过通道向操做系统写数据。socket

  • 全部被Selector(选择器)注册的通道,只能是继承了SelectableChannel类的子类。
  • ServerSocketChannel:应用服务器程序的监听通道。只有经过这个通道,应用程序才能向操做系统注册支持“多路复用IO”的端口监听。同时支持UDP协议和TCP协议。
  • ScoketChannel:TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP:端口 到
    服务器IP:端口的通讯链接。
  • DatagramChannel:UDP 数据报文的监听通道。

通道中的数据老是要先读到一个Buffer,或者老是要从一个Buffer中写入。ide

服务端处理器:性能

/**
 * 类说明:nio通讯服务端处理器
 */
public class NioServerHandle implements Runnable{
    private Selector selector;
    private ServerSocketChannel serverChannel;
    private volatile boolean started;
    /**
     * 构造方法
     * @param port 指定要监听的端口号
     */
    public NioServerHandle(int port) {

        try {
            selector = Selector.open();
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.socket().bind(new InetSocketAddress(port));
            serverChannel.register(selector,SelectionKey.OP_ACCEPT);
            started = true;
            System.out.println("服务器已启动,端口号:"+port);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    public void stop(){
        started = false;
    }
    @Override
    public void run() {
        //循环遍历selector
        while(started){
            try{
                //阻塞,只有当至少一个注册的事件发生的时候才会继续.
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try{
                        handleInput(key);
                    }catch(Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch(Throwable t){
                t.printStackTrace();
            }
        }
        //selector关闭后会自动释放里面管理的资源
        if(selector != null)
            try{
                selector.close();
            }catch (Exception e) {
                e.printStackTrace();
            }
    }
    private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            //处理新接入的请求消息
            if(key.isAcceptable()){
                //得到关心当前事件的channel
                ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
                //经过ServerSocketChannel的accept建立SocketChannel实例
                //完成该操做意味着完成TCP三次握手,TCP物理链路正式创建
                SocketChannel sc = ssc.accept();
                System.out.println("======socket channel 创建链接" );
                //设置为非阻塞的
                sc.configureBlocking(false);
                //链接已经完成了,能够开始关心读事件了
                sc.register(selector,SelectionKey.OP_READ);
            }
            //读消息
            if(key.isReadable()){
                System.out.println("======socket channel 数据准备完成," +
                        "能够去读==读取=======");
                SocketChannel sc = (SocketChannel) key.channel();
                //建立ByteBuffer,并开辟一个1M的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //读取请求码流,返回读取到的字节数
                int readBytes = sc.read(buffer);
                //读取到字节,对字节进行编解码
                if(readBytes>0){
                    //将缓冲区当前的limit设置为position=0,
                    // 用于后续对缓冲区的读取操做
                    buffer.flip();
                    //根据缓冲区可读字节数建立字节数组
                    byte[] bytes = new byte[buffer.remaining()];
                    //将缓冲区可读字节数组复制到新建的数组中
                    buffer.get(bytes);
                    String message = new String(bytes,"UTF-8");
                    System.out.println("服务器收到消息:" + message);
                    //处理数据
                    String result = response(message) ;
                    //发送应答消息
                    doWrite(sc,result);
                }
                //链路已经关闭,释放资源
                else if(readBytes<0){
                    key.cancel();
                    sc.close();
                }
            }

        }
    }
    //发送应答消息
    private void doWrite(SocketChannel channel,String response)
            throws IOException {
        //将消息编码为字节数组
        byte[] bytes = response.getBytes();
        //根据数组容量建立ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        //将字节数组复制到缓冲区
        writeBuffer.put(bytes);
        //flip操做
        writeBuffer.flip();
        //发送缓冲区的字节数组
        channel.write(writeBuffer);
    }
}
public class NioServer {

    private static NioServerHandle nioServerHandle;

    public static void start(){
        if(nioServerHandle !=null)
            nioServerHandle.stop();
        nioServerHandle = new NioServerHandle(DEFAULT_PORT);
        new Thread(nioServerHandle,"Server").start();
    }
    public static void main(String[] args){
        start();
    }

}

客户端处理器大数据

public class NioClientHandle implements Runnable{
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;

    private volatile boolean started;

    public NioClientHandle(String ip, int port) {
        this.host = ip;
        this.port = port;
        try {
            //建立选择器
            selector = Selector.open();
            //打开通道
            socketChannel = SocketChannel.open();
            //若是为 true,则此通道将被置于阻塞模式;
            // 若是为 false,则此通道将被置于非阻塞模式
            socketChannel.configureBlocking(false);
            started = true;
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    public void stop(){
        started = false;
    }
    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        //循环遍历selector
        while(started){
            try {
                //阻塞,只有当至少一个注册的事件发生的时候才会继续
                selector.select();
                //获取当前有哪些事件可使用
                Set<SelectionKey> keys = selector.selectedKeys();
                //转换为迭代器
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (IOException e) {
                        e.printStackTrace();
                        if(key!=null){
                            key.cancel();
                            if(key.channel()!=null){
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        //selector关闭后会自动释放里面管理的资源
        if(selector!=null){
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    //具体的事件处理方法
    private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            //得到关心当前事件的channel
            SocketChannel sc = (SocketChannel)key.channel();
            if(key.isConnectable()){//链接事件
                if(sc.finishConnect()){}
                else{System.exit(1);}
            }
            //有数据可读事件
            if(key.isReadable()){
                //建立ByteBuffer,并开辟一个1M的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //读取请求码流,返回读取到的字节数
                int readBytes = sc.read(buffer);
                //读取到字节,对字节进行编解码
                if(readBytes>0){
                    //将缓冲区当前的limit设置为position,position=0,
                    // 用于后续对缓冲区的读取操做
                    buffer.flip();
                    //根据缓冲区可读字节数建立字节数组
                    byte[] bytes = new byte[buffer.remaining()];
                    //将缓冲区可读字节数组复制到新建的数组中
                    buffer.get(bytes);
                    String result = new String(bytes,"UTF-8");
                    System.out.println("accept message:"+result);
                }else if(readBytes<0){
                    key.cancel();
                    sc.close();
                }
            }
        }
    }

    //发送消息
    private void doWrite(SocketChannel channel,String request)
            throws IOException {
        //将消息编码为字节数组
        byte[] bytes = request.getBytes();
        //根据数组容量建立ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        //将字节数组复制到缓冲区
        writeBuffer.put(bytes);
        //flip操做
        writeBuffer.flip();
        //发送缓冲区的字节数组
        channel.write(writeBuffer);
    }

    private void doConnect() throws IOException {
        /*若是此通道处于非阻塞模式,
        则调用此方法将启动非阻塞链接操做。
        若是当即创建链接,就像本地链接可能发生的那样,则此方法返回true。
        不然,此方法返回false,
        稍后必须经过调用finishConnect方法完成链接操做。*/
        if(socketChannel.connect(new InetSocketAddress(host,port))){}
        else{
            //链接还未完成,因此注册链接就绪事件,向selector表示关注这个事件
            socketChannel.register(selector,SelectionKey.OP_CONNECT);
        }
    }

    //写数据对外暴露的API
    public void sendMsg(String msg) throws Exception{
        socketChannel.register(selector,SelectionKey.OP_READ);
        doWrite(socketChannel,msg);
    }
}
public class NioClient {

    private static NioClientHandle nioClientHandle;

    public static void start(){
        if(nioClientHandle !=null)
            nioClientHandle.stop();
        nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT);
        new Thread(nioClientHandle,"Client").start();
    }
    //向服务器发送消息
    public static boolean sendMsg(String msg) throws Exception{
        nioClientHandle.sendMsg(msg);
        return true;
    }
    public static void main(String[] args) throws Exception {
        start();
        System.out.println("请输入请求信息:");
        Scanner scanner = new Scanner(System.in);
        while(NioClient.sendMsg(scanner.next()));

    }

}

服务端过程:this

  1. 启动服务端,完成一些初始化工做,ServerSocketChannel绑定端口而且注册接受链接事件.
  2. 循环里selector.select()阻塞,只有当至少一个注册的事件发生的时候才会继续,循环里面处理发生的注册事件
  3. 注册事件发生时交给处理器,若为接受链接则accept取出socketChannel并完成链接,而后就是关注read读取事件即注册,有数据读取了则处理器读取请求数据并返回.

客户端过程:编码

  1. 启动客户端,完成一些初始化工做.
  2. 根据服务端ip及端口发起链接.
  3. 往服务端发送数据,并注册read读取事件
  4. 循环里selector.select()阻塞,只有当至少一个注册的事件发生的时候才会继续,循环里面处理发生的注册事件.
  5. 注册事件发生时交给处理器,若为链接事件而且链接成功则跳过即不予处理等待读取事件发送.

初始化工做如打开selector,channel,设置通道模式是否阻塞.spa


单线程Reactor,工做者线程池

但在单线程Reactor模式中,不只I/O操做在该Reactor线程上,连非I/O的业务操做也在该线程上进行处理了,这可能会大大延迟I/O请求的响应。因此咱们应该将非I/O的业务逻辑操做从Reactor线程上卸载,以此来加速Reactor线程对I/O请求的响应.

clipboard.png

添加了一个工做者线程池,并将非I/O操做从Reactor线程中移出转交给工做者线程池来执行。这样可以提升Reactor线程的I/O响应,不至于由于一些耗时的业务逻辑而延迟对后面I/O请求的处理。


改进的版本中,因此的I/O操做依旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操做。
对于一些小容量应用场景,可使用单线程模型。可是对于高负载、大并发或大数据量的应用场景却不合适,主要缘由以下:

  • ① 一个NIO线程同时处理成百上千的链路,性能上没法支撑,即使NIO线程的CPU负荷达到100%,也没法知足海量消息的读取和发送;
  • ②当NIO线程负载太重以后,处理速度将变慢,这会致使大量客户端链接超时,超时以后每每会进行重发,这更加剧了NIO线程的负载,最终会致使大量消息积压和处理超时,成为系统的性能瓶颈;

多Reactor线程模式

clipboard.png

Reactor线程池中的每一Reactor线程都会有本身的Selector、线程和分发的事件循环逻辑。
mainReactor能够只有一个,但subReactor通常会有多个。mainReactor线程主要负责接收客户端的链接请求,而后将接收到的SocketChannel传递给subReactor,由subReactor来完成和客户端的通讯。


流程:

  • ①注册一个Acceptor事件处理器到mainReactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样mainReactor会监听客户端向服务器端发起的链接请求事件(ACCEPT事件)。启动mainReactor的事件循环。
  • ②客户端向服务器端发起一个链接请求,mainReactor监听到了该ACCEPT事件并将该ACCEPT事件派发给Acceptor处理器来进行处理。Acceptor处理器经过accept()方法获得与这个客户端对应的链接(SocketChannel),而后将这个SocketChannel传递给subReactor线程池。
  • ③subReactor线程池分配一个subReactor线程给这个SocketChannel,即,将SocketChannel关注的READ事件以及对应的READ事件处理器注册到subReactor线程中。固然你也注册WRITE事件以及WRITE事件处理器到subReactor线程中以完成I/O写操做。Reactor线程池中的每一Reactor线程都会有本身的Selector、线程和分发的循环逻辑。
  • ④当有I/O事件就绪时,相关的subReactor就将事件派发给响应的处理器处理。注意,这里subReactor线程只负责完成I/O的read()操做,在读取到数据后将业务逻辑的处理放入到线程池中完成,若完成业务逻辑后须要返回数据给客户端,则相关的I/O的write操做仍是会被提交回subReactor线程来完成。

注意,因此的I/O操做(包括,I/O的accept()、read()、write()以及connect()操做)依旧仍是在Reactor线程(mainReactor线程 或 subReactor线程)中完成的。Thread Pool(线程池)仅用来处理非I/O操做的逻辑。
多Reactor线程模式将“接受客户端的链接请求”和“与该客户端的通讯”分在了两个Reactor线程来完成。mainReactor完成接收客户端链接请求的操做,它不负责与客户端的通讯,而是将创建好的链接转交给subReactor线程来完成与客户端的通讯,这样一来就不会由于read()数据量太大而致使后面的客户端链接请求得不到即时处理的状况。而且多Reactor线程模式在海量的客户端并发请求的状况下,还能够经过实现subReactor线程池来将海量的链接分发给多个subReactor线程,在多核的操做系统中这能大大提高应用的负载和吞吐量。

Netty服务端使用了“多Reactor线程模式”

相关文章
相关标签/搜索