从BIO和NIO到Netty实践


1、什么是BIO

传统的BIO模型使用是java.net包中API实现Socket通讯,而传统的BIO通讯中阻塞共发如今两个 地方。java

服务端ServerSocker::accept
客户端Socket数据读写react

这种IO的弊端便是没法处理并发的客户端请求,所以能够经过为每一个客户端单独分配一个线程,则客户的端的Socket数据读写不在阻塞,能够知足并发的客户端请求。linux

一样的在高并发,大量客户端链接形成大量线程,容易产生线程OOM,同时也有大量的线程上下文切换影响性能。nginx

2、什么是NIO

  1. 在JAVA中NIO是指new IO,是JDK为实现非阻塞IO实现的一套新API
  2. 在linux中NIO是指非阻塞的IO,主要与poll和epoll内核调用有关

JAVA的NIO一种重要的方法为configureBlocking,示例代码以下:web

package com.zte.sunquan.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;

public class SocketServerNIO { 
 
  
    public static void main(String[] args) throws Exception { 
 
  
        List<SocketChannel> channels = new ArrayList<>();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(9090));
        ssc.configureBlocking(false);
        //非阻塞
        while (true) { 
 
  
            Thread.sleep(1000);
            //不阻塞,但须要每次都问一下内核
            SocketChannel clientSocket = ssc.accept();
            if (clientSocket == null) { 
 
  
                System.out.println("No client....");
            } else { 
 
  
                //将clientSocket保留
                clientSocket.configureBlocking(false);
                System.out.println("client connected in:" + clientSocket.socket().getPort());
                channels.add(clientSocket);
            }

            ByteBuffer buffer = ByteBuffer.allocate(4096);
            //不阻塞,但这里每次也须要问一下内核,即便有些客户端没有事件
            for (SocketChannel channel : channels) { 
 
  
                int num = channel.read(buffer);
                if (num > 0) { 
 
  
                    buffer.flip();
                    byte[] content=new byte[buffer.limit()];
                    buffer.get(content);
                    String s = new String(content);
                    System.out.println(channel.socket().getPort()+" Read client msg:"+s);
                    buffer.clear();
                }
            }

        }
    }
}

如上代码中一个线程负责了IO读写与链接创建,固然能够优雅一点的方法,即将链接创建与IO读写分线程处理,让IO的读写不影响高并发下链接请求与创建。但上述代码仍有明显的弊端。编程

for (SocketChannel channel : channels) {
int num = channel.read(buffer);后端

在上述代码中,每次循环都要与全部创建的客户端进行一次read操做,涉及用户究竟与内核空间的切换,考虑到一个链接数特别多背景下,一次可能只会有几个链接有IO事件,如上的实现会形成大量的性能浪费。
那有没有一种可能,让内核主动告知咱们哪些链接有IO事件,这样应用精确地去指定的链接上进行IO事件的处理,而不是傻傻地每一个链接read一遍?安全

3、多路复用器

多路复用器使用,能够解决第二节最后的问题,经过selector.select,内核只会将有事件的socket返回,避免了应用循环遍历尝试。多线程

下面示例代码描述了使用JAVA中NIO的API实现的服务端代码并发

package com.zte.sdn.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;


/** * 多路复用器示例代码 **/
public class NioServer { 
 
  

    private Selector startServer() throws IOException { 
 
  
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(9090));
        ssc.configureBlocking(false);
        //打开一个多路复用器
        //poll系统调用:
        //epoll系统调用:
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("Start server at port:9090");
        return selector;
    }


    public static void main(String[] args) throws IOException { 
 
  
        NioServer nioServer = new NioServer();
        Selector selector = nioServer.startServer();
        while (true) { 
 
  
            System.out.println("ask");
            //使用select向内核询问是否有事件(因为一开始只注册了ServerSocket的OP_ACCEPT)
            //因此第一次只判断是否有链接事件
            //后面因为注册客户端OP_READ,从面判断是否有可读事件
            while (selector.select(10) > 0) { 
 
  
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) { 
 
  
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    nioServer.handler(selectionKey, selector);
                }
            }
        }
    }

    private void handler(SelectionKey selectionKey, Selector selector) throws IOException { 
 
  
        if (selectionKey.isAcceptable()) { 
 
  
            //一个链接事件
            ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
            SocketChannel client = channel.accept();
            client.configureBlocking(false);
            //再注册进去
            client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(8192));
            System.out.println("receive connect:" + client.getRemoteAddress());
        } else if (selectionKey.isReadable()) { 
 
  
            //可读事件
            SocketChannel client = (SocketChannel) selectionKey.channel();
            ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
            buffer.clear();
            int read = 0;
            while (true) { 
 
  
                read = client.read(buffer);
                if (read > 0) { 
 
  
                    buffer.flip();
                    while (buffer.hasRemaining()) { 
 
  
                        //讲到内容写回客户端
                        client.write(buffer);
                    }
                    System.out.println("receive client msg:" + new String(buffer.array()));
                    buffer.clear();
                } else if (read == 0) { 
 
  
                    break;
                } else { 
 
  
                    client.close();
                    break;
                }
            }
        }
    }
}

在上述示例中,一个线程完成了服务端接口客户端链接以及IO读写动做。思考上面编程思路的弊端是什么?
考虑到一个IO的读写如何很是耗时,必然会影响客户端创建链接并发性能,以及大量IO读写的性能。
天然地针对客户端链接与IO读写能够分不一样selector单独处理,各司其职,因此改进的实现以下:

else if (selectionKey.isReadable()) { 
 
  
            executorService.submit(()->{ 
 
  
                //可读事件
                try { 
 
  
                    SocketChannel client = (SocketChannel) selectionKey.channel();
                    ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                    buffer.clear();
                    int read = 0;
                    while (true) { 
 
  
                        read = client.read(buffer);
                        if (read > 0) { 
 
  
                            buffer.flip();
                            while (buffer.hasRemaining()) { 
 
  
                                //讲到内容写回客户端
                                client.write(buffer);
                            }
                            System.out.println("receive client msg:" + new String(buffer.array()));
                            buffer.clear();
                        } else if (read == 0) { 
 
  
                            break;
                        } else { 
 
  
                            client.close();
                            break;
                        }
                    }
                }catch (Exception e){ 
 
  
                    e.printStackTrace();
                }
            });

        }

如上所示,实现了IO的处理异步化。但上述实现虽然缓解了大量长时间IO带来的性能问题,但不能从根本上解决,那有没有办法,将客户端链接事件与IO事件彻底分离开?固然若是IO读取的数据业务处理比较耗时,则能够另起线程再进行异步处理。

4、多Selector版本

代码:

package com.zte.sdn.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class IOHandler extends Thread { 
 
  
    private Selector selector;

    public IOHandler(Selector selector, String name) { 
 
  
        this.selector = selector;
        this.setName(name);
    }

    @Override
    public void run() { 
 
  
        while (true) { 
 
  
            try { 
 
  
                handler();
            } catch (IOException e) { 
 
  
                e.printStackTrace();
            }
        }
    }


    private void handler() throws IOException { 
 
  
        while (selector.select(10) > 0) { 
 
  
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) { 
 
  
                SelectionKey selectionKey = iterator.next();
                iterator.remove();
                if (selectionKey.isReadable()) { 
 
  
                    SocketChannel client = (SocketChannel) selectionKey.channel();
                    ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                    buffer.clear();
                    int read = 0;
                    while (true) { 
 
  
                        try { 
 
  
                            read = client.read(buffer);
                            if (read > 0) { 
 
  
                                buffer.flip();
                                while (buffer.hasRemaining()) { 
 
  
                                    //讲到内容写回客户端
                                    client.write(buffer);
                                }
                                System.out.println(Thread.currentThread().getName() + " receive client msg:" + new String(buffer.array()));
                                buffer.clear();
                            } else if (read == 0) { 
 
  
                                break;
                            } else { 
 
  
                                client.close();
                                break;
                            }
                        } catch (Exception e) { 
 
  
                            try { 
 
  
                                client.close();
                            } catch (IOException ex) { 
 
  
                                ex.printStackTrace();
                            }
                            break;
                        }

                    }
                }
            }
        }
    }
}

MultiNioServer

package com.zte.sdn.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


/** * 多路复用器示例代码 **/
public class MultiNioServer { 
 
  

    private ExecutorService executorService = Executors.newFixedThreadPool(5);

    private Selector startServer() throws IOException { 
 
  
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(9090));
        ssc.configureBlocking(false);
        //打开一个多路复用器
        //poll系统调用:
        //epoll系统调用:
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("Start server at port:9090");
        return selector;
    }


    public static void main(String[] args) throws IOException { 
 
  
        MultiNioServer nioServer = new MultiNioServer();
        Selector selector = nioServer.startServer();
        Selector selector1 = Selector.open();
        Selector selector2 = Selector.open();
        Selector[] selectors = new Selector[]{ 
 
  selector1, selector2};
        new IOHandler(selector1, "A").start();
        new IOHandler(selector2, "B").start();
        int i = 0;
        while (true) { 
 
  
            while (selector.select(10) > 0) { 
 
  
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) { 
 
  
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    if (selectionKey.isAcceptable()) { 
 
  
                        //一个链接事件
                        ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
                        SocketChannel client = channel.accept();
                        client.configureBlocking(false);
                        //循环注册至另外的Selector
                        client.register(selectors[i++ % 2], SelectionKey.OP_READ, ByteBuffer.allocate(8192));
                        System.out.println("receive connect:" + client.getRemoteAddress());
                    }
                }
            }
        }
    }

}

5、IO总结

传统的NIO(不使用多路利用器),虽然解决了IO阻塞问题,但须要用户程序遍历地向内核询问全部客户端;
当使用了多路复用器(使用select/poll),每次循环须要将客户端列表传递给内核,由内核遍历将有事件的fd返回,避免用户态与内核态的频繁切换。
但使用select/poll仍然存在弊端,即每次循环都要进行fd列表的传递,如何可以避免每次循环向内核传递fd列表?
思考:如内核能提早申请一块空间存储(红黑树)事件句柄,在有客户端链接上,则记录在该空间,如此客户端程序询问是否事件时,则只需简单问一句,而不须要每次都传递fd列表。
这说的其实epoll内核调用能解决的。

咱们老师去教室收做业进行批改的场景为例类比上述实现:

IO类型 说明 弊端
BIO 老师来到教室,依次询问每一个学生写了做业没有,写了则批改,没写则需等他写好再批改,期间有同窗入学毕业则须要等老师忙完这一轮 两处阻塞
NIO 老师来到教室,依次询问每一个学生写了做业没有,写了则批改,没写则直接下一个,期间有同窗入学毕业则须要等老师忙完这一轮,相对较快 依次询问,没写做业的也要问
NIO-select/poll 老师每次来到教室,拿一个名单贴到教室黑板,并告知在名单上同窗且完成做业,报给我,后序老师直接拿到报名同窗做业,批改便可 不一样于每一个同窗依次询问,但还要每次准备名单
NIO-epoll 开班时,则在教室黑板贴上人员名单,有新同窗加入毕业,则增长删除,老师每次来到教室,不用准确名单,只须要告知在名单上同窗且完成做业,报给我,后序老师直接拿到报名同窗做业,批改便可 解决上述全部弊端

ps.select的fd有1024的数量约束,poll无此限制

6、Strace分析

7、Netty的线程模型

Netty拥有两个NIO线程池,分别是bossGroupworkerGroup,前者处理新建链接请求,而后将新创建的链接轮询交给workerGroup中的其中一个NioEventLoop来处理,后续该链接上的读写操做都是由同一个NioEventLoop来处理。注意,虽然bossGroup也能指定多个NioEventLoop(一个NioEventLoop对应一个线程),可是默认状况下只会有一个线程,由于通常状况下应用程序只会使用一个对外监听端口。

为什么不能使用多线程来监听同一个对外端口么,即多线程epoll_wait到同一个epoll实例上?

这里会引来惊群的问题和epoll设置的是LT模式

现代linux中,多个socker同时监听同一个端口也是可行的,nginx 1.9.1也支持这一行为。linux 3.9以上内核支持SO_REUSEPORT选项,容许多个socker bind/listen在同一端口上。这样,多个进程能够各自申请socker监听同一端口,当链接事件来临时,内核作负载均衡,唤醒监听的其中一个进程来处理,reuseport机制有效的解决了epoll惊群问题

单线程模型

Reactor 单线程模型,是指全部的 I/O 操做都在同一个 NIO 线程上面完成的,此时NIO线程职责包括:接收新建链接请求、读写操做等。

多线程模型

Rector 多线程模型与单线程模型最大的区别就是有一组 NIO 线程来处理链接读写操做,一个NIO线程处理Accept。一个NIO线程能够处理多个链接事件,一个链接的事件只能属于一个NIO线程

主从模型

主从 Reactor 线程模型的特色是:服务端用于接收客户端链接的再也不是一个单独的 NIO 线程,而是一个独立的 NIO 线程池。Acceptor 接收到客户端 TCP链接请求并处理完成后(可能包含接入认证等),将新建立的 SocketChannel注 册 到 I/O 线 程 池(sub reactor 线 程 池)的某个I/O线程上, 由它负责SocketChannel 的读写和编解码工做。Acceptor 线程池仅仅用于客户端的登陆、握手和安全认证,一旦链路创建成功,就将链路注册到后端 subReactor 线程池的 I/O 线程上,由 I/O 线程负责后续的 I/O 操做。