nio的好伙伴——netty

NIO的那些事

咱们在前段时间学习了IONIO的一些概念性的东西,而且写了一些简单的例子进行实践,虽然简单,但基本上覆盖了NIO的一些最基本的概念了。
若是还没看过的,若是翻一下以前的文章了解一下,或者看一下网上的其余文章。html

JAVANIO的那些痛

既然咱们学过NIO,那咱们以JAVANIO来举个例子,说明一下咱们使用NIO的一些基本流程:java

  1. 打开ServerSocketChannel(Server端)或SocketChannel(Client端),监听对应的端口或链接对应的端口
  2. 设置configureBlocking(false)为非阻塞
  3. 经过register注册要监听的描述符
  4. 经过Selector.open打开Selector
  5. 调用Selector.select获得已就绪的SelectionKey
  6. 遍历SelectionKey进行相应的处理

这里我把以前的某些步骤合并了,可能跟以前有前面的文章有点不一致,但整体步骤是同样的。程序员

其实,上面的步骤咱们大能够了解到,咱们真正须要关注的步骤只是第6步,或者说是咱们真正要处理IO事件的一些逻辑,其余的都是一些通用流程而已。spring

既然如此,咱们真的有必要把时间花费在这些通用的地方吗?bootstrap

偷懒的程序员确定不想这样作,因此有人开发了minanetty一类的NIO框架,旨在把程序员从这些烦杂的通用流程中释放出来,而是只关注真正的业务逻辑,把这些交由框架去作处理。服务器

minanetty的做者都是同一我的(Trustin Lee,牛人老是各类牛)。
006ARE9vgy1ftcrabdjzwj303b040jrk.jpgmybatis

但鉴于netty基本上已是事实上的NIO标准框架了,而且社区一直比较活跃,而mina已经归档好久了,都已经没更新不少年了。为了不精力太过度散(实际上是我没学习过mina,不懂-_- ),咱们这里不讨论mina,直接学习netty,里面有不少值得咱们学习的东西。多线程

前置知识

线程模型

在开始介绍netty相关的知识前,咱们来了解一下线程模型相关的一些知识,这里参考了不少网上的一些文章,加以本身整理了一下,但愿可以给一些看其余文章不清楚的朋友一些不同的理解。架构

单线程模型

单线程.png

图片来自: https://www.jianshu.com/p/738...

这里的单线程指的是分派线程和工做线程都在同一个线程,能够看回咱们的JAVANIO示例代码,这里为了方便,咱们也贴在下面:框架

public class MyServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8001));

        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        String str = "";
        while(!Thread.currentThread().isInterrupted()) {
            //这里是一直阻塞,直到有描述符就绪
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while(keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                //链接创建
                if (key.isAcceptable()) {
                    try {
                        SocketChannel clientChannel = serverSocketChannel.accept();
                        clientChannel.configureBlocking(false);
                        clientChannel.register(selector, SelectionKey.OP_READ);
                    } catch (ClosedChannelException e) {
                        e.printStackTrace();
                    }
                }
                //链接可读,这时能够直接读
                else if (key.isReadable()) {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);

                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    try {
                        int num = socketChannel.read(readBuffer);
                        str = new String(readBuffer.array(), 0, num);
                        System.out.println("received message:" + str);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

}

咱们能够看到在咱们nio的例子中,咱们没有明确使用多线程,这里就是使用了单线程来处理的。
它有什么好处呢?

实现简单。这是固然的,全部不涉及到多线程的代码都是相对比较简单的,注意,是 相对

有优势的同时确定有缺点,那么这种单线程有什么缺点呢:

性能相对比较差。只有一个线程进行请求的处理,也就是只有一个线程处理 CPU的描述符,假设同一时间有不少信号都就绪了,而且咱们读到 IO数据后的真正处理逻辑可能比较复杂,那么全部的请求都须要等待当前的请求处理完成后才能处理其余的。这也就致使了它的性能相对(这里的相对是对比其余多线程的处理方式)比较弱。

多线程模型

多线程.png

图片来自: https://www.jianshu.com/p/738...

这里的多线程指的是处理逻辑的多线程,对应到咱们的NIO代码逻辑里面就是对SelectionKey的处理是多线程的,咱们直接看代码会直观点:

public class MyServerMultipleThread {

    @SuppressWarnings("Duplicates")
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8001));

        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while(!Thread.currentThread().isInterrupted()) {
            //这里是一直阻塞,直到有描述符就绪
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while(keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                //链接创建
                if (key.isAcceptable()) {
                    try {
                        SocketChannel clientChannel = serverSocketChannel.accept();
                        clientChannel.configureBlocking(false);
                        clientChannel.register(selector, SelectionKey.OP_READ);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                //链接可读,这时能够直接读
                else if (key.isReadable()) {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    int num = socketChannel.read(readBuffer);
                    new Thread(() -> {
                        String str = new String(readBuffer.array(), 0, num);
                        System.out.println("received message:" + str);
                    }).start();
                }
            }
        }
    }

}

这里咱们能够看到,在进行SelectionKey遍历读完数据后真正处理的时候,咱们新起了一个新的线程进行NIO的相关处理。

固然,这里的只是一个示例,真正写代码的时候不该该这样无限制的新起线程,而是应该使用线程池,更合理的使用线程,避免线程数量太多,致使 CPU切换太频繁,这样反而起不到优化性能的做用。

主从多线程模型

主从多线程.png

图片来自: https://www.jianshu.com/p/738...

一看到这图,估计不少人头都大了,这都什么鬼,这么复杂啊。
实际能够简单一点理解:

  • 原来的接收请求是单线程,如今变成了多线程(线程池)
  • 原来的处理逻辑是单线程,如今是使用多线程(线程池)
注意,这里的多线程不包括 accept请求, accept仍是由单个线程进行分发。

咱们直接看一下代码会比较容易理解

public class MyServerMultipleThread2 {

    @SuppressWarnings("Duplicates")
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8001));
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while(!Thread.currentThread().isInterrupted()) {
            selector.select();
            //这里是一直阻塞,直到有描述符就绪
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while(keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                //链接创建
                if (key.isAcceptable()) {
                    try {
                        SocketChannel clientChannel = serverSocketChannel.accept();
                        clientChannel.configureBlocking(false);
                        clientChannel.register(selector, SelectionKey.OP_READ);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                //链接可读,这时能够直接读
                else if (key.isReadable()) {
                    new Thread(() -> {
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        int[] num = new int[]{0};
                        try {
                            num[0] = socketChannel.read(readBuffer);
                            new Thread(() -> {
                                String str = new String(readBuffer.array(), 0, num[0]);
                                System.out.println("received message:" + str);
                            }).start();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }).start();
                    key.channel().register(key.selector(), SelectionKey.OP_WRITE);
                }
            }
        }
    }
}

这里我没有参考一些网上比较复杂的作法,可能实现起来不大一致,但相对容易理解一点。

  1. 咱们在接受到accept请求时,仍是由单前线程单独处理
  2. READWRITE等请求时,咱们新起一个线程去作处理,而且在真正的处理逻辑时,仍是跟上面的多线程逻辑同样,是新起一个线程去作处理。
  3. 咱们在处理READWRITE时,注意,须要从新注册相应的WRITEREAD事件——由于新起线程后,当前SelectionKey的信号仍是READ,若是咱们不作修改,会致使当前的线程会重复屡次处理。具体你们能够下来试试,把后面的register去掉,看一下会出现什么状况。

咱们看到,上面的线程模型,都以性能提高为目的,一步步去进行优化,但同时咱们也看到了,代码是愈来愈复杂,使得咱们在维护咱们真正的逻辑时,有点像是大海捞针,真正的代码逻辑就那么一点,而不少都是一些模板代码。

为了解决这些问题,就须要引出咱们的框架了,框架正是为了帮咱们去约定好一些通用的逻辑而出现的,好比 spring,帮我作好了 IOCAOP等的一些逻辑,这些不须要咱们去额外关注;而 mybatis帮咱们作好了 ORM相关的一些处理, DB映射等,这些流程化的东西都已经固化了;而咱们这里要说的 netty,它帮咱们把 NIO这些线程模型相关的东西帮咱们作了不少的优化和抽取,咱们再也不须要管这些流程化的东西,只须要写咱们本身的逻辑。

netty出场

netty做为一个高性能的NIO框架,基本上已是事实上的NIO标准了,包括dubbozookeeper等内部都比较大量地使用了netty。或者说具体点,这些框架可以有这么好的性能,大部分功劳要归结到netty身上。

netty基础知识

看例子前咱们先来补充一些基础知识。
netty有几个重要概念:

  • ChannelHandler
channel的事件处理器,里面封装了针对当前 channel的生命周期的方法
  • ChannelInBoundHandler
channelREAD请求处理器,里面封装了当前 channel的对于接收请求相关的生命周期方法
  • ChannelOutBoundHandler
channelWRITE请求处理器,里面封装了当前 channel的对象发出请求的生命周期方法。
  • ChannelPipeline
此类是 netty架构中比较重要的一个类,它使用了 责任链模式,把请求从 ChannelHandler中一个个的日后传递,最终到达咱们的业务 Handler。关于 Pipeline的详细描述,咱们后面再详细看看。
  • ByteBuf
netty封装了本身的 ByteBuf,与 JDK自带的 ByteBuffer的最主要的区别是它有两个指针,一个供读 readerIndex,一个供写 writerIndex。而至于该类的一些详细信息,你们能够看一下它的 JavaDoc,写得很是详细。

关于OutBound和上面的InBound的区别,你们能够简单地区分一下,In就是请求进入,对应的就是READOut就是请求发出,对应的就是WRITE

基本的概念了解清楚了,那咱们来看一下简单的例子。
其实netty最好的文档是它的官网文档。咱们就仍是以相似官方源码里面的一个example来学习一下,实现的功能很简单:

Client链接成功后传一句话给 ServerServer回复收到。

实战

server

ServerHandler
public class MyNettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from client:" + msg);
        ctx.writeAndFlush("I received your message:" + msg + System.lineSeparator());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}
咱们的代码比较简单,打印收到的文本,而且再发回一条语句。咱们能够看到咱们输出的时候加多了一个 换行符—— System.lineSeparator(),这是为何呢?
这里涉及到另一个 TCP/IP一个比较重要的问题, 拆包和粘包,这里咱们先不细说,后面我会有专门的文章来讲一下 拆包和粘包还有一系列 TCP/IP相关的知识,这是很是大的一块了。咱们如今就先简单的知道,加这个 换行符是为了让 Handler知道咱们的消息从哪里结束。
Server
public class MyNettyServer {

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LineBasedFrameDecoder(4096));
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new MyNettyServerHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        ChannelFuture channelFuture = null;
        try {
            channelFuture = serverBootstrap.bind("127.0.0.1", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

这里涉及到比较多的知识点,总体结构咱们先无论它,咱们重要先关注一下:

  • 线程池
这里定义了两个线程组进行处理, BossGroupWorkerGroup,对应咱们上面的 多线程模型,缘由是 netty并不使用 主从多线程模型——这个咱们之后的文章有机会再细说。
  • ServerBootStrap
netty工具类,有助于编写服务器的相关代码,而 Client端对应的就是 Bootstrap了。
  • pipeline的添加
ch.pipeline().addLast(new LineBasedFrameDecoder(4096));                 ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new MyNettyServerHandler());

这里把4个Handler添加到Pipeline的末尾,至于为何是末尾,相应看到后面的pipeline的解析的时候你们就会知道了。
我这里大概描述一下几个Handler的做用:

  • LineBasedFrameDecoder
根据换行符 \n\r\n进行内容的分割——即 拆包
  • StringDecoder
把接收到的内容解析为 String字符串
  • StringEncoder
把发出的内容解析为 String字符串
  • MyNettyServerHandler
咱们的真正逻辑处理类,这个应该是在前面的几个处理完成后再进行。咱们在后面的 pipeline执行顺序中能够看到为何这样添加。
后面的 Client中的 Handler也能够参考上面的。

Client

ClientHandler
public class MyNettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("helloworld" + System.lineSeparator());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from server:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}

这里咱们的代码也比较简单,就是链接成功的时候发条helloworld过去服务端,而后再从服务端读到返回的内容。咱们就不细说了。

Client
public class MyNettyClient {

    public static void main(String[] args) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LineBasedFrameDecoder(4096));
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new MyNettyClientHandler());
                    }
                })
                .option(ChannelOption.SO_KEEPALIVE, true);

        try {
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}

对比上面的Server代码,这里的区别,最大的就是咱们只有一个EventLoopGroup,由于Client端并不须要接收请求,因此并不须要所谓的BossGroup

一切就绪后,咱们能够跑一下看看运行状况:
先运行server,再运行client
server能够看到
Jietu20200130-192501@2x.jpg
client能够看到
Jietu20200130-192447@2x.jpg
这表示咱们已经使用netty写了一个基本能够用的NIO程序了。
Jietu20200130-191258@2x.jpg

ChannelPipeline详解

ChannelPipeline做为netty的一个底层重要组成部分,ChannelHandler都须要依靠它进行调度,重要性不言而喻。那咱们如今就一块儿来看看ChannelPipeline到底是怎么调度的。
查看ChannelPipelineJavaDoc咱们能够看到这样一串描述(牛人写描述都是特别认真的)。
Jietu20200130-191826@2x.jpg
大概的意思就是这样的:

  1. 请求进来时,按照InBoundHandler的添加顺序,从前日后执行。
  2. 请求出去时,按照OutBoundHandler的添加顺序,从后往前执行。

另外,文档中又举了一个例子:
Jietu20200130-192431@2x.jpg
咱们套用一下咱们的Server例子来分析一下:
LineBasedFrameDecoder,StringDecoder,StringEncoder,MyNettyServerHandler

当咱们收到消息时,须要执行的 Handler的顺序为: LineBasedFrameDecoder, StringDecoder, MyNettyServerHandler
当咱们发出消息时,须要执行的 OutboundHandler的顺序为: StringEncoder.

基于上面的分析,咱们就能够分析为何咱们前面的例子能够获得那样的结果。

总结

这篇文章,咱们从一开始的线程模型到后面的netty的示例,这些种种都是为了性能的提升去作的一些优化。在当前大数据的趋势下,更多须要咱们把性能去作到极致。
后面,咱们会再根据netty中的一些最佳实践来分析它是怎么解析粘包和拆分的。

参考文章

https://www.jianshu.com/p/738095702b75
https://netty.io/wiki/user-guide-for-4.x.html

相关文章
相关标签/搜索