Netty之旅二:口口相传的高性能Netty究竟是什么?

d0iosx.png

高清思惟导图原件(xmind/pdf/jpg)能够关注公众号:一枝花算不算浪漫 回复netty01便可。html

前言

上一篇文章讲了NIO相关的知识点,相比于传统IONIO已经作得很优雅了,为何咱们还要使用Nettyjava

上篇文章最后留了不少坑,讲了NIO使用的弊端,也是为了引出Netty而设立的,这篇文章咱们就来好好揭开Netty的神秘面纱。ios

本篇文章的目的很简单,但愿看事后你能看懂Netty的示例代码,针对于简单的网络通讯,本身也能用Netty手写一个开发应用出来!编程

一个简单的Netty示例

如下是一个简单聊天室Server端的程序,代码参考自:http://www.imooc.com/read/82/article/2166bootstrap

代码有点长,主要核心代码是在main()方法中,这里代码也但愿你们看懂,后面也会一步步剖析。设计模式

PS:我是用mac系统,直接在终端输入telnet 127.0.0.1 8007 便可启动一个聊天框,若是提示找不到telnet命令,能够经过brew进行安装,具体步骤请自行百度。api

/**
 * @Description netty简易聊天室
 *
 * @Author 一枝花算不算浪漫
 * @Date 2020/8/10 6:52 上午
 */
public final class NettyChatServer {

    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // 1. EventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 2. 服务端引导器
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 3. 设置线bootStrap信息
            serverBootstrap.group(bossGroup, workerGroup)
                    // 4. 设置ServerSocketChannel的类型
                    .channel(NioServerSocketChannel.class)
                    // 5. 设置参数
                    .option(ChannelOption.SO_BACKLOG, 100)
                    // 6. 设置ServerSocketChannel对应的Handler,只能设置一个
                    .handler(new LoggingHandler(LogLevel.INFO))
                    // 7. 设置SocketChannel对应的Handler
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            // 能够添加多个子Handler
                            p.addLast(new LoggingHandler(LogLevel.INFO));
                            p.addLast(new ChatNettyHandler());
                        }
                    });

            // 8. 绑定端口
            ChannelFuture f = serverBootstrap.bind(PORT).sync();
            // 9. 等待服务端监听端口关闭,这里会阻塞主线程
            f.channel().closeFuture().sync();
        } finally {
            // 10. 优雅地关闭两个线程池
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private static class ChatNettyHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("one conn active: " + ctx.channel());
            // channel是在ServerBootstrapAcceptor中放到EventLoopGroup中的
            ChatHolder.join((SocketChannel) ctx.channel());
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
            byte[] bytes = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(bytes);
            String content = new String(bytes, StandardCharsets.UTF_8);
            System.out.println(content);

            if (content.equals("quit\r\n")) {
                ctx.channel().close();
            } else {
                ChatHolder.propagate((SocketChannel) ctx.channel(), content);
            }
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            System.out.println("one conn inactive: " + ctx.channel());
            ChatHolder.quit((SocketChannel) ctx.channel());
        }
    }

    private static class ChatHolder {
        static final Map<SocketChannel, String> USER_MAP = new ConcurrentHashMap<>();

        /**
         * 加入群聊
         */
        static void join(SocketChannel socketChannel) {
            // 有人加入就给他分配一个id
            String userId = "用户"+ ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
            send(socketChannel, "您的id为:" + userId + "\n\r");

            for (SocketChannel channel : USER_MAP.keySet()) {
                send(channel, userId + " 加入了群聊" + "\n\r");
            }

            // 将当前用户加入到map中
            USER_MAP.put(socketChannel, userId);
        }

        /**
         * 退出群聊
         */
        static void quit(SocketChannel socketChannel) {
            String userId = USER_MAP.get(socketChannel);
            send(socketChannel, "您退出了群聊" + "\n\r");
            USER_MAP.remove(socketChannel);

            for (SocketChannel channel : USER_MAP.keySet()) {
                if (channel != socketChannel) {
                    send(channel, userId + " 退出了群聊" + "\n\r");
                }
            }
        }

        /**
         * 扩散说话的内容
         */
        public static void propagate(SocketChannel socketChannel, String content) {
            String userId = USER_MAP.get(socketChannel);
            for (SocketChannel channel : USER_MAP.keySet()) {
                if (channel != socketChannel) {
                    send(channel, userId + ": " + content);
                }
            }
        }

        /**
         * 发送消息
         */
        static void send(SocketChannel socketChannel, String msg) {
            try {
                ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
                ByteBuf writeBuffer = allocator.buffer(msg.getBytes().length);
                writeBuffer.writeCharSequence(msg, Charset.defaultCharset());
                socketChannel.writeAndFlush(writeBuffer);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

dkeb0s.png

代码有点长,执行完的效果如上图所示,下面全部内容都是围绕着如何看懂以及如何写出这样的代码来展开的,但愿你看完 也能轻松手写Netty服务端代码~。经过简单demo开发让你们体验了Netty实现相比NIO确实要简单的多,但优势不限于此,只须要知道选择Netty就对了。缓存

Netty核心组件

对应着文章开头的思惟导图,咱们知道Netty的核心组件主要有:安全

  • Bootstrap && ServerBootstrap
  • EventLoopGroup
  • EventLoop
  • ByteBuf
  • Channel
  • ChannelHandler
  • ChannelFuture
  • ChannelPipeline
  • ChannelHandlerContext

类图以下:服务器

dk8ZC9.png

Bootstrap & ServerBootstrap

一看到BootStrap你们就应该想到启动类、引导类这样的词汇,以前分析过EurekaServer项目启动类时介绍过EurekaBootstrap, 他的做用就是上下文初始化、配置初始化。

Netty中咱们也有相似的类,BootstrapServerBootstrap它们都是Netty程序的引导类,主要用于配置各类参数,并启动整个Netty服务,咱们看下文章开头的示例代码:

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)      
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 100)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline p = ch.pipeline();
                p.addLast(new LoggingHandler(LogLevel.INFO));
                p.addLast(new ChatNettyHandler());
            }
        });

BootstrapServerBootstrap是针对于ClientServer端定义的两套启动类,区别以下:

  • Bootstrap是客户端引导类,而ServerBootstrap是服务端引导类。
  • Bootstrap一般使用connect()方法链接到远程的主机和端口,做为一个TCP客户端
  • ServerBootstrap一般使用bind()方法绑定本地的端口,等待客户端来链接。
  • ServerBootstrap能够处理Accept事件,这里面childHandler是用来处理Channel请求的,咱们能够查看chaildHandler()方法的注解:

dk884H.png

  • Bootstrap客户端引导只须要一个EventLoopGroup,可是一个ServerBootstrap一般须要两个(上面的boosGroupworkerGroup)。

EventLoopGroup && EventLoop

EventLoopGroupEventLoop这两个类名称定义的很奇怪,对于初学者来讲每每没法经过名称来了解其中的含义,包括我也是这样。

EventLoopGroup 能够理解为一个线程池,对于服务端程序,咱们通常会绑定两个线程池,一个用于处理 Accept 事件,一个用于处理读写事件,看下EventLoop系列的类目录:

dU4Roj.png

经过上面的类图,咱们才恍然大悟,个人亲娘咧,这不就是一个线程池嘛?(名字气的犄角拐弯的真是难认)

EventLoopGroupEventLoop的集合,一个EventLoopGroup 包含一个或者多个EventLoop。咱们能够将EventLoop看作EventLoopGroup线程池中的一个个工做线程。

至于这里为何要用到两个线程池,具体的其实能够参考Reactor设计模式,这里暂时不作过多的讲解。

  • 一个 EventLoopGroup 包含一个或多个 EventLoop ,即 EventLoopGroup : EventLoop = 1 : n
  • 一个 EventLoop 在它的生命周期内,只能与一个 Thread 绑定,即 EventLoop : Thread = 1 : 1
  • 全部有 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,从而保证线程安全,即 Thread : EventLoop = 1 : 1
  • 一个 Channel 在它的生命周期内只能注册到一个 EventLoop 上,即 Channel : EventLoop = n : 1
  • 一个 EventLoop 可被分配至一个或多个 Channel ,即 EventLoop : Channel = 1 : n

当一个链接到达时,Netty 就会建立一个 Channel,而后从 EventLoopGroup 中分配一个 EventLoop 来给这个 Channel 绑定上,在该 Channel 的整个生命周期中都是有这个绑定的 EventLoop 来服务的。

ByteBuf

Java NIO中咱们有 ByteBuffer缓冲池,对于它的操做咱们应该印象深入,往Buffer中写数据时咱们须要关注写入的位置,切换成读模式时咱们还要切换读写状态,否则将会出现大问题。

针对于NIO中超级难用的Buffer类, Netty 提供了ByteBuf来替代。ByteBuf声明了两个指针:一个读指针,一个写指针,使得读写操做进行分离,简化buffer的操做流程。

dkQocV.png

另外Netty提供了发几种ByteBuf的实现以供咱们选择,ByteBuf能够分为:

  • PooledUnpooled 池化和非池化
  • Heap 和 Direct,堆内存和堆外内存,NIO中建立Buffer也能够指定
  • Safe 和 Unsafe,安全和非安全

dkJ9TU.png

对于这么多种建立Buffer的方式该怎么选择呢?Netty也为咱们处理好了,咱们能够直接使用(真是暖男Ntetty):

ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
ByteBuf buffer = allocator.buffer(length);

使用这种方式,Netty将最大努力的使用池化、Unsafe、对外内存的方式为咱们建立buffer。

Channel

提起Channel并不陌生,上一篇讲NIO的三大组件提到过,最多见的就是java.nio.SocketChanneljava.nio.ServerSocketChannel,他们用于非阻塞的I/0操做。相似于NIOChannel,Netty提供了本身的Channel和其子类实现,用于异步I/0操做和其余相关的操做。

Netty 中, Channel 是一个 Socket 链接的抽象, 它为用户提供了关于底层 Socket 状态(是不是链接仍是断开) 以及对 Socket 的读写等操做。每当 Netty 创建了一个链接后, 都会有一个对应的 Channel 实例。而且,有父子channel的概念。 服务器链接监听的channel ,也叫 parent channel。 对应于每个 Socket 链接的channel,也叫 child channel

既然channel 是 Netty 抽象出来的网络 I/O 读写相关的接口,为何不使用 JDK NIO 原生的 Channel 而要另起炉灶呢,主要缘由以下:

  • JDK SocketChannelServersocketChannel 没有统一的 Channel 接口供业务开发者使用,对一于用户而言,没有统一的操做视图,使用起来并不方便。
  • JDKSocketChannel ScrversockctChannel 的主要职责就是网络 I/O 操做,因为他们是 SPI 类接口,由具体的虚拟机厂家来提供,因此经过继承 SPI 功能直接实现 ServersocketChannelSocketChannel 来扩展其工做量和从新 Channel 功类是差很少的。
  • Netty 的 ChannelPipeline Channel 须要够跟 Netty 的总体架构融合在一块儿,例如 I/O 模型、基的定制模型,以及基于元数据描述配置化的 TCP 参数等,这些 JDK SocketChannelServersocketChannel都没有提供,须要从新封装。
  • 自定义的 Channel ,功实现更加灵活。

基于上述 4 缘由,它的设计原理比较简单, Netty 从新设计了 Channel 接口,而且给予了不少不一样的实现。可是功能却比较繁杂,主要的设计理念以下:

  • Channel 接口层,相关联的其余操做封装起来,采用 Facade 模式进行统一封装,将网络 I/O 操做、网络 I/O 统一对外提供。
  • Channel 接口的定义尽可能大而全,统一的视图,由不一样子类实现不一样的功能,公共功能在抽象父类中实现,最大程度上实现接口的重用。
  • 具体实现采用聚合而非包含的方式,将相关的功类聚合在 Channel 中,由 Channel 统一负责分配和调度,功能实现更加灵活。

Channel 的实现类很是多,继承关系复杂,从学习的角度咱们抽取最重要的两个 NioServerSocketChannel NioSocketChannel

服务端 NioServerSocketChannel 的继承关系类图以下:

dUn8G4.png

客户端 NioSocketChannel 的继承关系类图以下:

dUnJz9.png

后面文章源码系列会具体分析,这里就不进一步阐述分析了。

ChannelHandler

ChannelHandlerNetty中最经常使用的组件。ChannelHandler 主要用来处理各类事件,这里的事件很普遍,好比能够是链接、数据接收、异常、数据转换等。

ChannelHandler 有两个核心子类 ChannelInboundHandlerChannelOutboundHandler,其中 ChannelInboundHandler 用于接收、处理入站( Inbound )的数据和事件,而 ChannelOutboundHandler 则相反,用于接收、处理出站( Outbound )的数据和事件。

dkJAp9.png

ChannelInboundHandler

ChannelInboundHandler处理入站数据以及各类状态变化,当Channel状态发生改变会调用ChannelInboundHandler中的一些生命周期方法.这些方法与Channel的生命密切相关。

入站数据,就是进入socket的数据。下面展现一些该接口的生命周期API

dUntMR.png

当某个 ChannelInboundHandler的实现重写 channelRead()方法时,它将负责显式地释放与池化的 ByteBuf 实例相关的内存。 Netty 为此提供了一个实用方法ReferenceCountUtil.release()

@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		ReferenceCountUtil.release(msg);
	}
}

这种方式还挺繁琐的,Netty提供了一个SimpleChannelInboundHandler,重写channelRead0()方法,就能够在调用过程当中会自动释放资源.

public class SimpleDiscardHandler
	extends SimpleChannelInboundHandler<Object> {
	@Override
	public void channelRead0(ChannelHandlerContext ctx,
									Object msg) {
			// 不用调用ReferenceCountUtil.release(msg)也会释放资源
	}
}

ChannelOutboundHandler

出站操做和数据将由 ChannelOutboundHandler 处理。它的方法将被 ChannelChannelPipeline 以及 ChannelHandlerContext 调用。
ChannelOutboundHandler 的一个强大的功能是能够按需推迟操做或者事件,这使得能够经过一些复杂的方法来处理请求。例如, 若是到远程节点的写入被暂停了, 那么你能够推迟冲刷操做并在稍后继续。

d0PxbT.png

ChannelPromiseChannelFuture: ChannelOutboundHandler中的大部分方法都须要一个ChannelPromise参数, 以便在操做完成时获得通知。 ChannelPromiseChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()setFailure(),从而使ChannelFuture不可变。

ChannelHandlerAdapter

ChannelHandlerAdapter顾名思义,就是handler的适配器。你须要知道什么是适配器模式,假设有一个A接口,咱们须要A的subclass实现功能,可是B类中正好有咱们须要的功能,不想复制粘贴B中的方法和属性了,那么能够写一个适配器类Adpter继承B实现A,这样一来Adapter是A的子类而且能直接使用B中的方法,这种模式就是适配器模式。

就好比Netty中的SslHandler类,想使用ByteToMessageDecoder中的方法进行解码,可是必须是ChannelHandler子类对象才能加入到ChannelPipeline中,经过以下签名和其实现细节(SslHandler实现细节就不贴了)就可以做为一个handler去处理消息了。

public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler

ChannelHandlerAdapter提供了一些实用方法isSharable()若是其对应的实现被标注为 Sharable, 那么这个方法将返回 true, 表示它能够被添加到多个 ChannelPipeline中 。若是想在本身的ChannelHandler中使用这些适配器类,只须要扩展他们,重写那些想要自定义的方法便可。

ChannelPipeline

每个新建立的 Channel 都将会被分配一个新的 ChannelPipeline。这项关联是永久性的; Channel 既不能附加另一个 ChannelPipeline,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定的操做,不须要开发人员的任何干预。

Netty 的 ChannelHandler 为处理器提供了基本的抽象, 目前你能够认为每一个 ChannelHandler 的实例都相似于一种为了响应特定事件而被执行的回调。从应用程序开发人员的角度来看, 它充当了全部处理入站和出站数据的应用程序逻辑的拦截载体。ChannelPipeline提供了 ChannelHandler 链的容器,并定义了用于在该链上传播入站和出站事件流的 API。当 Channel 被建立时,它会被自动地分配到它专属的 ChannelPipeline

ChannelHandler 安装到 ChannelPipeline 中的过程以下所示:

  • 一个ChannelInitializer的实现被注册到了ServerBootstrap
  • ChannelInitializer.initChannel()方法被调用时,ChannelInitializer将在 ChannelPipeline 中安装一组自定义的 ChannelHandler
  • ChannelInitializer 将它本身从 ChannelPipeline 中移除

dkJuTO.png

如上图所示:这是一个同时具备入站和出站 ChannelHandlerChannelPipeline的布局,而且印证了咱们以前的关于 ChannelPipeline 主要由一系列的 ChannelHandler 所组成的说法。 ChannelPipeline还提供了经过 ChannelPipeline 自己传播事件的方法。若是一个入站事件被触发,它将被从 ChannelPipeline的头部开始一直被传播到 Channel Pipeline 的尾端。

你可能会说, 从事件途经 ChannelPipeline的角度来看, ChannelPipeline 的头部和尾端取决于该事件是入站的仍是出站的。然而 Netty 老是将 ChannelPipeline的入站口(图 的左侧)做为头部,而将出站口(该图的右侧)做为尾端。
当你完成了经过调用 ChannelPipeline.add*()方法将入站处理器( ChannelInboundHandler)和 出 站 处 理 器 ( ChannelOutboundHandler ) 混 合 添 加 到 ChannelPipeline之 后 , 每 一 个ChannelHandler 从头部到尾端的顺序位置正如同咱们方才所定义它们的同样。所以,若是你将图 6-3 中的处理器( ChannelHandler)从左到右进行编号,那么第一个被入站事件看到的 ChannelHandler 将是1,而第一个被出站事件看到的 ChannelHandler 将是 5。

ChannelPipeline 传播事件时,它会测试 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。若是不匹配, ChannelPipeline 将跳过该ChannelHandler 并前进到下一个,直到它找到和该事件所指望的方向相匹配的为止。 (固然, ChannelHandler 也能够同时实现ChannelInboundHandler 接口和 ChannelOutboundHandler 接口。)

修改ChannelPipeline

修改指的是添加或删除ChannelHandler,见代码示例:

ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler();
// 先添加一个Handler到ChannelPipeline中
pipeline.addLast("handler1", firstHandler);
// 这个Handler放在了first,意味着放在了handler1以前
pipeline.addFirst("handler2", new SecondHandler());
// 这个Handler被放到了last,意味着在handler1以后
pipeline.addLast("handler3", new ThirdHandler());
...
// 经过名称删除
pipeline.remove("handler3");
// 经过对象删除
pipeline.remove(firstHandler);
// 名称"handler2"替换成名称"handler4",并切handler2的实例替换成了handler4的实例
pipeline.replace("handler2", "handler4", new ForthHandler());

ChannelPipeline的出入站API

入站API所示:

[图片上传失败...(image-6037f5-1598167949595)]

出站API所示:

dUndZ6.png

ChannelPipeline 这个组件上面所讲的大体只须要记住这三点便可:

  • ChannelPipeline 保存了与 Channel 相关联的 ChannelHandler
  • ChannelPipeline 能够根据须要,经过添加或者删除 ChannelHandler 来动态地修改
  • ChannelPipeline 有着丰富的API用以被调用,以响应入站和出站事件

ChannelHandlerContext

ChannelHandler 被添加到 ChannelPipeline 时,它将会被分配一个 ChannelHandlerContext ,它表明了 ChannelHandlerChannelPipeline 之间的绑定。ChannelHandlerContext 的主要功能是管理它所关联的ChannelHandler和在同一个 ChannelPipeline 中的其余ChannelHandler之间的交互。

若是调用ChannelChannelPipeline上的方法,会沿着整个ChannelPipeline传播,若是调用ChannelHandlerContext上的相同方法,则会从对应的当前ChannelHandler进行传播。

ChannelHandlerContext API以下表所示:

dUn0IO.png

  • ChannelHandlerContextChannelHandler之间的关联(绑定)是永远不会改变的,因此缓存对它的引用是安全的;
  • 如同在本节开头所解释的同样,相对于其余类的同名方法,ChannelHandlerContext的方法将产生更短的事件流, 应该尽量地利用这个特性来得到最大的性能。

ChannelHandlerChannelPipeline的关联使用

dUnDiD.png

ChannelHandlerContext访问channel

ChannelHandlerContext ctx = ..;
// 获取channel引用
Channel channel = ctx.channel();
// 经过channel写入缓冲区
channel.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));

ChannelHandlerContext访问ChannelPipeline

ChannelHandlerContext ctx = ..;
// 获取ChannelHandlerContext
ChannelPipeline pipeline = ctx.pipeline();
// 经过ChannelPipeline写入缓冲区
pipeline.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));

dUnrJe.png

有时候咱们不想从头传递数据,想跳过几个handler,从某个handler开始传递数据.咱们必须获取目标handler以前的handler关联的ChannelHandlerContext

ChannelHandlerContext ctx = ..;
// 直接经过ChannelHandlerContext写数据,发送到下一个handler
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

dUnyzd.png

好了,ChannelHandlerContext的基本使用应该掌握了,可是你真的理解ChannelHandlerContext,ChannelPipelineChannelhandler之间的关系了吗?不理解也不要紧,由于源码之后会帮你理解的更为深入。

核心组件之间的关系

  • 一个 Channel 对应一个 ChannelPipeline
  • 一个 ChannelPipeline 包含一条双向的 ChannelHandlerContext
  • 一个 ChannelHandlerContext 中包含一个 ChannelHandler
  • 一个 Channel 会绑定到一个EventLoop
  • 一个 NioEventLoop 维护了一个 Selector(使用的是 Java 原生的 Selector)
  • 一个 NioEventLoop 至关于一个线程

粘包拆包问题

粘包拆包问题是处于网络比较底层的问题,在数据链路层、网络层以及传输层都有可能发生。咱们平常的网络应用开发大都在传输层进行,因为UDP有消息保护边界,不会发生粘包拆包问题,而所以粘包拆包问题只发生在TCP协议中。具体讲TCP是个”流"协议,只有流的概念,没有包的概念,对于业务上层数据的具体含义和边界并不了解,它只会根据TCP缓冲区的实际状况进行包的划分。因此在业务上认为,一个完整的包可能会被TCP拆分红多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

问题举例说明

下面针对客户端分别发送了两个数据表Packet1Packet2给服务端的时候,TCP粘包和拆包会出现的状况进行列举说明:

(1)第一种状况,服务端分两次正常收到两个独立数据包,即没有发生拆包和粘包的现象;

dUncQA.png

(2)第二种状况,接收端只收到一个数据包,因为TCP是不会出现丢包的,因此这一个数据包中包含了客户端发送的两个数据包的信息,这种现象即为粘包。这种状况因为接收端不知道这两个数据包的界限,因此对于服务接收端来讲很难处理。

dUn2Lt.png

(3)第三种状况,服务端分两次读取到了两个数据包,第一次读取到了完整的Packet1Packet2包的部份内容,第二次读取到了Packet2的剩余内容,这被称为TCP拆包;

d0Pq8s.png

(4)第四种状况,服务端分两次读取到了两个数据包,第一次读取到了部分的Packet1内容,第二次读取到了Packet1剩余内容和Packet2的整包。

dUn5FS.png

若是此时服务端TCP接收滑窗很是小,而数据包Packet1Packet2比较大,颇有可能服务端须要分屡次才能将两个包接收彻底,期间发生屡次拆包。以上列举状况的背后缘由分别以下:

  1. 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包。
  2. 应用程序写入数据小于套接字缓冲区大小,网卡将应用屡次写入的数据发送到网络上,这将会发生粘包。
  3. 进行MSS(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>MSS的时候将发生拆包。
  4. 接收方法不及时读取套接字缓冲区数据,这将发生粘包。

如何基于Netty处理粘包、拆包问题

因为底层的TCP没法理解上层的业务数据,因此在底层是没法保证数据包不被拆分和重组的,这个问题只能经过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,能够概括以下:

  1. 消息定长,例如每一个报文的大小为固定长度200字节,若是不够,空位补空格;
  2. 在包尾增长回车换行符进行分割,例如FTP协议;
  3. 将消息分为消息头和消息体,消息头中包含表示消息总长度的字段,一般设计思路为消息头的第一个字段使用int32来表示消息的总长度;
  4. 更复杂的应用层协议。

以前Netty示例中其实并无考虑读半包问题,这在功能测试每每没有问题,可是一旦请求数过多或者发送大报文以后,就会存在该问题。若是代码没有考虑,每每就会出现解码错位或者错误,致使程序不能正常工做,下面看看Netty是如何根据主流的解决方案进行抽象实现来帮忙解决这一问题的。

以下表所示,Netty为了找出消息的边界,采用封帧方式:

方式 解码 编码
固定长度 FixedLengthFrameDecoder 简单
分隔符 DelimiterBasedFrameDecoder 简单
专门的 length 字段 LengthFieldBasedFrameDecoder LengthFieldPrepender

注意到,Netty提供了对应的解码器来解决对应的问题,有了这些解码器,用户不须要本身对读取的报文进行人工解码,也不须要考虑TCP的粘包和半包问题。为何这么说呢?下面列举一个包尾增长分隔符的例子:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author: wuxiaofei
 * @Date: 2020/8/15 0015 19:15
 * @Version: 1.0
 * @Description:入站处理器
 */
@ChannelHandler.Sharable
public class DelimiterServerHandler extends ChannelInboundHandlerAdapter {

    private AtomicInteger counter = new AtomicInteger(0);
    private AtomicInteger completeCounter = new AtomicInteger(0);

    /*** 服务端读取到网络数据后的处理*/
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf)msg;
        String request = in.toString(CharsetUtil.UTF_8);
        System.out.println("Server Accept["+request
                +"] and the counter is:"+counter.incrementAndGet());
        String resp = "Hello,"+request+". Welcome to Netty World!"
                + DelimiterEchoServer.DELIMITER_SYMBOL;
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
    }

    /*** 服务端读取完成网络数据后的处理*/
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        ctx.fireChannelReadComplete();
        System.out.println("the ReadComplete count is "
                +completeCounter.incrementAndGet());
    }

    /*** 发生异常后的处理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

import java.net.InetSocketAddress;

/**
 * @Author: wuxiaofei
 * @Date: 2020/8/15 0015 19:17
 * @Version: 1.0
 * @Description:服务端
 */
public class DelimiterEchoServer {

    public static final String DELIMITER_SYMBOL = "@~";
    public static final int PORT = 9997;

    public static void main(String[] args) throws InterruptedException {
        DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();
        System.out.println("服务器即将启动");
        delimiterEchoServer.start();
    }

    public void start() throws InterruptedException {
        final DelimiterServerHandler serverHandler = new DelimiterServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
        try {
            ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
            b.group(group)/*将线程组传入*/
                .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
                .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
                /*服务端每接收到一个链接请求,就会新启一个socket通讯,也就是channel,
                因此下面这段代码的做用就是为这个子channel增长handle*/
                .childHandler(new ChannelInitializerImp());
            ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
            System.out.println("服务器启动完成,等待客户端的链接和数据.....");
            f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
        } finally {
            group.shutdownGracefully().sync();/*优雅关闭线程组*/
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL
                    .getBytes());
            //服务端收到数据包后通过DelimiterBasedFrameDecoder即分隔符基础框架解码器解码为一个个带有分隔符的数据包。
            ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
                    delimiter));
            ch.pipeline().addLast(new DelimiterServerHandler());
        }
    }

}

添加到ChannelPipelineDelimiterBasedFrameDecoder用于对使用分隔符结尾的消息进行自动解码,固然还有没有用到的FixedLengthFrameDecoder用于对固定长度的消息进行自动解码等解码器。正如上门的代码使用案例,有了Netty提供的几码器能够轻松地完成对不少消息的自动解码,并且不须要考虑TCP粘包/拆包致使的读半包问题,极大地提高了开发效率。

Netty示例代码详解

相信看完上面的铺垫,你对Netty编码有了必定的了解了,下面再来总体梳理一遍吧。

dVp7yn.png

一、设置EventLoopGroup线程组(Reactor线程组)

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

上面咱们说过Netty中使用Reactor模式,bossGroup表示服务器链接监听线程组,专门接受 Accept 新的客户端client 链接。另外一个workerGroup表示处理每一链接的数据收发的线程组,来处理消息的读写事件。

二、服务端引导器

ServerBootstrap serverBootstrap = new ServerBootstrap();

集成全部配置,用来启动Netty服务端。

三、设置ServerBootstrap信息

serverBootstrap.group(bossGroup, workerGroup);

将两个线程组设置到ServerBootstrap中。

四、设置ServerSocketChannel类型

serverBootstrap.channel(NioServerSocketChannel.class);

设置通道的IO类型,Netty不止支持Java NIO,也支持阻塞式IO,例如OIOOioServerSocketChannel.class)

五、设置参数

serverBootstrap.option(ChannelOption.SO_BACKLOG, 100);

经过option()方法能够设置不少参数,这里SO_BACKLOG标识服务端接受链接的队列长度,若是队列已满,客户端链接将被拒绝。默认值,Windows为200,其余为128,这里设置的是100。

六、设置Handler

serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));

设置 ServerSocketChannel对应的Handler,这里只能设置一个,它会在SocketChannel创建起来以前执行。

七、设置子Handler

serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();
        p.addLast(new LoggingHandler(LogLevel.INFO));
        p.addLast(new ChatNettyHandler());
    }
});

Netty中提供了一种能够设置多个Handler的途径,即便用ChannelInitializer方式。ChannelPipelineNetty处理请求的责任链,这是一个ChannelHandler的链表,而ChannelHandler就是用来处理网络请求的内容的。

每个channel,都有一个处理器流水线。装配child channel流水线,调用childHandler()方法,传递一个ChannelInitializer 的实例。

child channel 建立成功,开始通道初始化的时候,在bootstrap启动器中配置的ChannelInitializer 实例就会被调用。

这个时候,才真正的执行去执行 initChannel 初始化方法,开始通道流水线装配。

流水线装配,主要是在流水线pipeline的后面,增长负责数据读写、处理业务逻辑的handler

处理器 ChannelHandler 用来处理网络请求内容,有ChannelInboundHandlerChannelOutboundHandler两种,ChannlPipeline会从头至尾顺序调用ChannelInboundHandler处理网络请求内容,从尾到头调用ChannelOutboundHandler处理网络请求内容

八、绑定端口号

ChannelFuture f = serverBootstrap.bind(PORT).sync();

绑定端口号

九、等待服务端端口号关闭

f.channel().closeFuture().sync();

等待服务端监听端口关闭,sync()会阻塞主线程,内部调用的是 Objectwait()方法

十、关闭EventLoopGroup线程组

bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();

总结

这篇文章主要是从一个demo做为引子,而后介绍了Netty的包结构、Reactor模型、编程规范等等,目的很简单,但愿你可以读懂这段demo并写出来。

后面开始继续Netty源码解析部分,敬请期待。

参考资料

  1. 《Netty in Action》书籍
  2. 慕课Netty专栏
  3. 掘金闪电侠Netty小册
  4. 芋道源码Netty专栏
  5. Github[fork from krcys]

感谢Netty专栏做者们优秀的文章内容~

原创干货分享.png