高清思惟导图原件(xmind/pdf/jpg
)能够关注公众号:一枝花算不算浪漫
回复netty01
便可。java
前言
上一篇文章讲了NIO
相关的知识点,相比于传统IO
,NIO
已经作得很优雅了,为何咱们还要使用Netty
?ios
上篇文章最后留了不少坑,讲了NIO
使用的弊端,也是为了引出Netty
而设立的,这篇文章咱们就来好好揭开Netty
的神秘面纱。编程
本篇文章的目的很简单,但愿看事后你能看懂Netty
的示例代码,针对于简单的网络通讯,本身也能用Netty
手写一个开发应用出来!bootstrap
一个简单的Netty示例
如下是一个简单聊天室Server端的程序,代码参考自:http://www.imooc.com/read/82/article/2166
设计模式
代码有点长,主要核心代码是在main()
方法中,这里代码也但愿你们看懂,后面也会一步步剖析。缓存
PS:我是用mac
系统,直接在终端输入telnet 127.0.0.1 8007
便可启动一个聊天框,若是提示找不到telnet
命令,能够经过brew
进行安装,具体步骤请自行百度。安全
/** * @Description netty简易聊天室 * * @Author 一枝花算不算浪漫 * @Date 2020/8/10 6:52 上午 */ public final class NettyChatServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // 1. EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 2. 服务端引导器 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 3. 设置线bootStrap信息 serverBootstrap.group(bossGroup, workerGroup) // 4. 设置ServerSocketChannel的类型 .channel(NioServerSocketChannel.class) // 5. 设置参数 .option(ChannelOption.SO_BACKLOG, 100) // 6. 设置ServerSocketChannel对应的Handler,只能设置一个 .handler(new LoggingHandler(LogLevel.INFO)) // 7. 设置SocketChannel对应的Handler .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // 能够添加多个子Handler p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } }); // 8. 绑定端口 ChannelFuture f = serverBootstrap.bind(PORT).sync(); // 9. 等待服务端监听端口关闭,这里会阻塞主线程 f.channel().closeFuture().sync(); } finally { // 10. 优雅地关闭两个线程池 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private static class ChatNettyHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("one conn active: " + ctx.channel()); // channel是在ServerBootstrapAcceptor中放到EventLoopGroup中的 ChatHolder.join((SocketChannel) ctx.channel()); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String content = new String(bytes, StandardCharsets.UTF_8); System.out.println(content); if (content.equals("quit\r\n")) { ctx.channel().close(); } else { ChatHolder.propagate((SocketChannel) ctx.channel(), content); } } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("one conn inactive: " + ctx.channel()); ChatHolder.quit((SocketChannel) ctx.channel()); } } private static class ChatHolder { static final Map<SocketChannel, String> USER_MAP = new ConcurrentHashMap<>(); /** * 加入群聊 */ static void join(SocketChannel socketChannel) { // 有人加入就给他分配一个id String userId = "用户"+ ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); send(socketChannel, "您的id为:" + userId + "\n\r"); for (SocketChannel channel : USER_MAP.keySet()) { send(channel, userId + " 加入了群聊" + "\n\r"); } // 将当前用户加入到map中 USER_MAP.put(socketChannel, userId); } /** * 退出群聊 */ static void quit(SocketChannel socketChannel) { String userId = USER_MAP.get(socketChannel); send(socketChannel, "您退出了群聊" + "\n\r"); USER_MAP.remove(socketChannel); for (SocketChannel channel : USER_MAP.keySet()) { if (channel != socketChannel) { send(channel, userId + " 退出了群聊" + "\n\r"); } } } /** * 扩散说话的内容 */ public static void propagate(SocketChannel socketChannel, String content) { String userId = USER_MAP.get(socketChannel); for (SocketChannel channel : USER_MAP.keySet()) { if (channel != socketChannel) { send(channel, userId + ": " + content); } } } /** * 发送消息 */ static void send(SocketChannel socketChannel, String msg) { try { ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; ByteBuf writeBuffer = allocator.buffer(msg.getBytes().length); writeBuffer.writeCharSequence(msg, Charset.defaultCharset()); socketChannel.writeAndFlush(writeBuffer); } catch (Exception e) { e.printStackTrace(); } } } }
代码有点长,执行完的效果如上图所示,下面全部内容都是围绕着如何看懂
以及如何写出
这样的代码来展开的,但愿你看完 也能轻松手写Netty
服务端代码~。经过简单demo开发让你们体验了Netty
实现相比NIO
确实要简单的多,但优势不限于此,只须要知道选择Netty就对了。服务器
Netty核心组件
对应着文章开头的思惟导图,咱们知道Netty
的核心组件主要有:网络
- Bootstrap && ServerBootstrap
- EventLoopGroup
- EventLoop
- ByteBuf
- Channel
- ChannelHandler
- ChannelFuture
- ChannelPipeline
- ChannelHandlerContext
类图以下:架构
Bootstrap & ServerBootstrap
一看到BootStrap
你们就应该想到启动类、引导类这样的词汇,以前分析过EurekaServer项目启动类时介绍过EurekaBootstrap
, 他的做用就是上下文初始化、配置初始化。
在Netty
中咱们也有相似的类,Bootstrap
和ServerBootstrap
它们都是Netty
程序的引导类,主要用于配置各类参数,并启动整个Netty
服务,咱们看下文章开头的示例代码:
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } });
Bootstrap
和ServerBootstrap
是针对于Client
和Server
端定义的两套启动类,区别以下:
Bootstrap
是客户端引导类,而ServerBootstrap
是服务端引导类。Bootstrap
一般使用connect()
方法链接到远程的主机和端口,做为一个TCP客户端
。ServerBootstrap
一般使用bind()
方法绑定本地的端口,等待客户端来链接。ServerBootstrap
能够处理Accept
事件,这里面childHandler
是用来处理Channel
请求的,咱们能够查看chaildHandler()
方法的注解:
Bootstrap
客户端引导只须要一个EventLoopGroup
,可是一个ServerBootstrap
一般须要两个(上面的boosGroup
和workerGroup
)。
EventLoopGroup && EventLoop
EventLoopGroup
及EventLoop
这两个类名称定义的很奇怪,对于初学者来讲每每没法经过名称来了解其中的含义,包括我也是这样。
EventLoopGroup
能够理解为一个线程池,对于服务端程序,咱们通常会绑定两个线程池,一个用于处理 Accept
事件,一个用于处理读写事件,看下EventLoop
系列的类目录:
经过上面的类图,咱们才恍然大悟,个人亲娘咧,这不就是一个线程池嘛?(名字气的犄角拐弯的真是难认)
EventLoopGroup
是EventLoop
的集合,一个EventLoopGroup
包含一个或者多个EventLoop
。咱们能够将EventLoop
看作EventLoopGroup
线程池中的一个个工做线程。
至于这里为何要用到两个线程池,具体的其实能够参考Reactor
设计模式,这里暂时不作过多的讲解。
- 一个 EventLoopGroup 包含一个或多个 EventLoop ,即 EventLoopGroup : EventLoop = 1 : n
- 一个 EventLoop 在它的生命周期内,只能与一个 Thread 绑定,即 EventLoop : Thread = 1 : 1
- 全部有 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,从而保证线程安全,即 Thread : EventLoop = 1 : 1
- 一个 Channel 在它的生命周期内只能注册到一个 EventLoop 上,即 Channel : EventLoop = n : 1
- 一个 EventLoop 可被分配至一个或多个 Channel ,即 EventLoop : Channel = 1 : n
当一个链接到达时,Netty
就会建立一个 Channel
,而后从 EventLoopGroup
中分配一个 EventLoop
来给这个 Channel
绑定上,在该 Channel
的整个生命周期中都是有这个绑定的 EventLoop
来服务的。
ByteBuf
在Java NIO
中咱们有 ByteBuffer
缓冲池,对于它的操做咱们应该印象深入,往Buffer
中写数据时咱们须要关注写入的位置,切换成读模式时咱们还要切换读写状态,否则将会出现大问题。
针对于NIO
中超级难用的Buffer
类, Netty
提供了ByteBuf
来替代。ByteBuf
声明了两个指针:一个读指针,一个写指针,使得读写操做进行分离,简化buffer
的操做流程。
另外Netty
提供了发几种ByteBuf
的实现以供咱们选择,ByteBuf
能够分为:
Pooled
和Unpooled
池化和非池化- Heap 和 Direct,堆内存和堆外内存,NIO中建立Buffer也能够指定
- Safe 和 Unsafe,安全和非安全
对于这么多种建立Buffer
的方式该怎么选择呢?Netty
也为咱们处理好了,咱们能够直接使用(真是暖男Ntetty
):
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; ByteBuf buffer = allocator.buffer(length);
使用这种方式,Netty将最大努力的使用池化、Unsafe、对外内存的方式为咱们建立buffer。
Channel
提起Channel
并不陌生,上一篇讲NIO
的三大组件提到过,最多见的就是java.nio.SocketChannel
和java.nio.ServerSocketChannel
,他们用于非阻塞的I/0操做。相似于NIO
的Channel
,Netty提供了本身的Channel
和其子类实现,用于异步I/0操做和其余相关的操做。
在 Netty
中, Channel
是一个 Socket
链接的抽象, 它为用户提供了关于底层 Socket
状态(是不是链接仍是断开) 以及对 Socket
的读写等操做。每当 Netty
创建了一个链接后, 都会有一个对应的 Channel
实例。而且,有父子channel
的概念。 服务器链接监听的channel
,也叫 parent channel
。 对应于每个 Socket
链接的channel
,也叫 child channel
。
既然channel
是 Netty 抽象出来的网络 I/O 读写相关的接口,为何不使用 JDK NIO
原生的 Channel
而要另起炉灶呢,主要缘由以下:
JDK
的SocketChannel
和ServersocketChannel
没有统一的Channel
接口供业务开发者使用,对一于用户而言,没有统一的操做视图,使用起来并不方便。JDK
的SocketChannel
和ScrversockctChannel
的主要职责就是网络 I/O 操做,因为他们是SPI
类接口,由具体的虚拟机厂家来提供,因此经过继承 SPI 功能直接实现ServersocketChannel
和SocketChannel
来扩展其工做量和从新Channel
功类是差很少的。- Netty 的
ChannelPipeline Channel
须要够跟 Netty 的总体架构融合在一块儿,例如 I/O 模型、基的定制模型,以及基于元数据描述配置化的 TCP 参数等,这些JDK SocketChannel
和ServersocketChannel
都没有提供,须要从新封装。 - 自定义的
Channel
,功实现更加灵活。
基于上述 4 缘由,它的设计原理比较简单, Netty 从新设计了 Channel
接口,而且给予了不少不一样的实现。可是功能却比较繁杂,主要的设计理念以下:
- 在
Channel
接口层,相关联的其余操做封装起来,采用Facade
模式进行统一封装,将网络 I/O 操做、网络 I/O 统一对外提供。 Channel
接口的定义尽可能大而全,统一的视图,由不一样子类实现不一样的功能,公共功能在抽象父类中实现,最大程度上实现接口的重用。- 具体实现采用聚合而非包含的方式,将相关的功类聚合在
Channel
中,由Channel
统一负责分配和调度,功能实现更加灵活。
Channel
的实现类很是多,继承关系复杂,从学习的角度咱们抽取最重要的两个 NioServerSocketChannel
和 NioSocketChannel
。
服务端 NioServerSocketChannel
的继承关系类图以下:
客户端 NioSocketChannel
的继承关系类图以下:
后面文章源码系列会具体分析,这里就不进一步阐述分析了。
ChannelHandler
ChannelHandler
是Netty
中最经常使用的组件。ChannelHandler
主要用来处理各类事件,这里的事件很普遍,好比能够是链接、数据接收、异常、数据转换等。
ChannelHandler
有两个核心子类 ChannelInboundHandler
和 ChannelOutboundHandler
,其中 ChannelInboundHandler
用于接收、处理入站( Inbound
)的数据和事件,而 ChannelOutboundHandler
则相反,用于接收、处理出站( Outbound
)的数据和事件。
ChannelInboundHandler
ChannelInboundHandler
处理入站数据以及各类状态变化,当Channel
状态发生改变会调用ChannelInboundHandler
中的一些生命周期方法.这些方法与Channel
的生命密切相关。
入站数据,就是进入socket
的数据。下面展现一些该接口的生命周期API
:
当某个 ChannelInboundHandler
的实现重写 channelRead()
方法时,它将负责显式地释放与池化的 ByteBuf
实例相关的内存。 Netty 为此提供了一个实用方法ReferenceCountUtil.release()
。
@Sharable public class DiscardHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); } }
这种方式还挺繁琐的,Netty提供了一个SimpleChannelInboundHandler
,重写channelRead0()
方法,就能够在调用过程当中会自动释放资源.
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { // 不用调用ReferenceCountUtil.release(msg)也会释放资源 } }
ChannelOutboundHandler
出站操做和数据将由 ChannelOutboundHandler
处理。它的方法将被 Channel
、 ChannelPipeline
以及 ChannelHandlerContext
调用。 ChannelOutboundHandler
的一个强大的功能是能够按需推迟操做或者事件,这使得能够经过一些复杂的方法来处理请求。例如, 若是到远程节点的写入被暂停了, 那么你能够推迟冲刷操做并在稍后继续。
ChannelPromise
与ChannelFuture
: ChannelOutboundHandler
中的大部分方法都须要一个ChannelPromise
参数, 以便在操做完成时获得通知。 ChannelPromise
是ChannelFuture
的一个子类,其定义了一些可写的方法,如setSuccess()
和setFailure()
,从而使ChannelFuture
不可变。
ChannelHandlerAdapter
ChannelHandlerAdapter
顾名思义,就是handler
的适配器。你须要知道什么是适配器模式,假设有一个A接口,咱们须要A的subclass
实现功能,可是B类中正好有咱们须要的功能,不想复制粘贴B中的方法和属性了,那么能够写一个适配器类Adpter
继承B实现A,这样一来Adapter
是A的子类而且能直接使用B中的方法,这种模式就是适配器模式。
就好比Netty中的SslHandler
类,想使用ByteToMessageDecoder
中的方法进行解码,可是必须是ChannelHandler
子类对象才能加入到ChannelPipeline
中,经过以下签名和其实现细节(SslHandler
实现细节就不贴了)就可以做为一个handler
去处理消息了。
public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler
ChannelHandlerAdapter
提供了一些实用方法isSharable()
若是其对应的实现被标注为 Sharable
, 那么这个方法将返回 true
, 表示它能够被添加到多个 ChannelPipeline
中 。若是想在本身的ChannelHandler
中使用这些适配器类,只须要扩展他们,重写那些想要自定义的方法便可。
ChannelPipeline
每个新建立的 Channel
都将会被分配一个新的 ChannelPipeline
。这项关联是永久性的; Channel
既不能附加另一个 ChannelPipeline
,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定的操做,不须要开发人员的任何干预。
Netty 的 ChannelHandler
为处理器提供了基本的抽象, 目前你能够认为每一个 ChannelHandler
的实例都相似于一种为了响应特定事件而被执行的回调。从应用程序开发人员的角度来看, 它充当了全部处理入站和出站数据的应用程序逻辑的拦截载体。ChannelPipeline
提供了 ChannelHandler
链的容器,并定义了用于在该链上传播入站和出站事件流的 API
。当 Channel
被建立时,它会被自动地分配到它专属的 ChannelPipeline
。
ChannelHandler
安装到 ChannelPipeline
中的过程以下所示:
- 一个
ChannelInitializer
的实现被注册到了ServerBootstrap
中 - 当
ChannelInitializer.initChannel()
方法被调用时,ChannelInitializer
将在ChannelPipeline
中安装一组自定义的ChannelHandler
ChannelInitializer
将它本身从ChannelPipeline
中移除
如上图所示:这是一个同时具备入站和出站 ChannelHandler
的 ChannelPipeline
的布局,而且印证了咱们以前的关于 ChannelPipeline
主要由一系列的 ChannelHandler
所组成的说法。 ChannelPipeline
还提供了经过 ChannelPipeline
自己传播事件的方法。若是一个入站事件被触发,它将被从 ChannelPipeline
的头部开始一直被传播到 Channel Pipeline 的尾端。
你可能会说, 从事件途经 ChannelPipeline
的角度来看, ChannelPipeline
的头部和尾端取决于该事件是入站的仍是出站的。然而 Netty 老是将 ChannelPipeline
的入站口(图 的左侧)做为头部,而将出站口(该图的右侧)做为尾端。 当你完成了经过调用 ChannelPipeline.add*()
方法将入站处理器( ChannelInboundHandler
)和 出 站 处 理 器 ( ChannelOutboundHandler
) 混 合 添 加 到 ChannelPipeline
之 后 , 每 一 个ChannelHandler
从头部到尾端的顺序位置正如同咱们方才所定义它们的同样。所以,若是你将图 6-3 中的处理器( ChannelHandler
)从左到右进行编号,那么第一个被入站事件看到的 ChannelHandler
将是1,而第一个被出站事件看到的 ChannelHandler
将是 5。
在 ChannelPipeline
传播事件时,它会测试 ChannelPipeline
中的下一个 Channel Handler 的类型是否和事件的运动方向相匹配。若是不匹配, ChannelPipeline
将跳过该ChannelHandler
并前进到下一个,直到它找到和该事件所指望的方向相匹配的为止。 (固然, ChannelHandler
也能够同时实现ChannelInboundHandler
接口和 ChannelOutboundHandler
接口。)
修改ChannelPipeline
修改指的是添加或删除ChannelHandler
,见代码示例:
ChannelPipeline pipeline = ..; FirstHandler firstHandler = new FirstHandler(); // 先添加一个Handler到ChannelPipeline中 pipeline.addLast("handler1", firstHandler); // 这个Handler放在了first,意味着放在了handler1以前 pipeline.addFirst("handler2", new SecondHandler()); // 这个Handler被放到了last,意味着在handler1以后 pipeline.addLast("handler3", new ThirdHandler()); ... // 经过名称删除 pipeline.remove("handler3"); // 经过对象删除 pipeline.remove(firstHandler); // 名称"handler2"替换成名称"handler4",并切handler2的实例替换成了handler4的实例 pipeline.replace("handler2", "handler4", new ForthHandler());
ChannelPipeline
的出入站API
入站API
所示:
[图片上传失败...(image-6037f5-1598167949595)]
出站API
所示:
ChannelPipeline
这个组件上面所讲的大体只须要记住这三点便可:
ChannelPipeline
保存了与Channel
相关联的ChannelHandler
ChannelPipeline
能够根据须要,经过添加或者删除ChannelHandler
来动态地修改ChannelPipeline
有着丰富的API
用以被调用,以响应入站和出站事件
ChannelHandlerContext
当 ChannelHandler
被添加到 ChannelPipeline
时,它将会被分配一个 ChannelHandlerContext
,它表明了 ChannelHandler
和 ChannelPipeline
之间的绑定。ChannelHandlerContext
的主要功能是管理它所关联的ChannelHandler
和在同一个 ChannelPipeline
中的其余ChannelHandler
之间的交互。
若是调用Channel
或ChannelPipeline
上的方法,会沿着整个ChannelPipeline
传播,若是调用ChannelHandlerContext
上的相同方法,则会从对应的当前ChannelHandler
进行传播。
ChannelHandlerContext API
以下表所示:
ChannelHandlerContext
和ChannelHandler
之间的关联(绑定)是永远不会改变的,因此缓存对它的引用是安全的;- 如同在本节开头所解释的同样,相对于其余类的同名方法,
ChannelHandlerContext
的方法将产生更短的事件流, 应该尽量地利用这个特性来得到最大的性能。
与ChannelHandler
、ChannelPipeline
的关联使用
从ChannelHandlerContext
访问channel
ChannelHandlerContext ctx = ..; // 获取channel引用 Channel channel = ctx.channel(); // 经过channel写入缓冲区 channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
从ChannelHandlerContext
访问ChannelPipeline
ChannelHandlerContext ctx = ..; // 获取ChannelHandlerContext ChannelPipeline pipeline = ctx.pipeline(); // 经过ChannelPipeline写入缓冲区 pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
有时候咱们不想从头传递数据,想跳过几个handler
,从某个handler
开始传递数据.咱们必须获取目标handler
以前的handler
关联的ChannelHandlerContext
。
ChannelHandlerContext ctx = ..; // 直接经过ChannelHandlerContext写数据,发送到下一个handler ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
好了,ChannelHandlerContext
的基本使用应该掌握了,可是你真的理解ChannelHandlerContext
,ChannelPipeline
和Channelhandler
之间的关系了吗?不理解也不要紧,由于源码之后会帮你理解的更为深入。
核心组件之间的关系
- 一个
Channel
对应一个ChannelPipeline
- 一个
ChannelPipeline
包含一条双向的ChannelHandlerContext
链 - 一个
ChannelHandlerContext
中包含一个ChannelHandler
- 一个
Channel
会绑定到一个EventLoop
上 - 一个
NioEventLoop
维护了一个Selector(
使用的是 Java 原生的 Selector) - 一个
NioEventLoop
至关于一个线程
粘包拆包问题
粘包拆包问题是处于网络比较底层的问题,在数据链路层、网络层以及传输层都有可能发生。咱们平常的网络应用开发大都在传输层进行,因为UDP
有消息保护边界,不会发生粘包拆包问题,而所以粘包拆包问题只发生在TCP
协议中。具体讲TCP
是个”流"协议,只有流的概念,没有包的概念,对于业务上层数据的具体含义和边界并不了解,它只会根据TCP
缓冲区的实际状况进行包的划分。因此在业务上认为,一个完整的包可能会被TCP
拆分红多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP
粘包和拆包问题。
问题举例说明
下面针对客户端分别发送了两个数据表Packet1
和Packet2
给服务端的时候,TCP
粘包和拆包会出现的状况进行列举说明:
(1)第一种状况,服务端分两次正常收到两个独立数据包,即没有发生拆包和粘包的现象;
(2)第二种状况,接收端只收到一个数据包,因为TCP
是不会出现丢包的,因此这一个数据包中包含了客户端发送的两个数据包的信息,这种现象即为粘包。这种状况因为接收端不知道这两个数据包的界限,因此对于服务接收端来讲很难处理。
(3)第三种状况,服务端分两次读取到了两个数据包,第一次读取到了完整的Packet1
和Packet2
包的部份内容,第二次读取到了Packet2
的剩余内容,这被称为TCP拆包;
(4)第四种状况,服务端分两次读取到了两个数据包,第一次读取到了部分的Packet1
内容,第二次读取到了Packet1
剩余内容和Packet2
的整包。
若是此时服务端TCP接收滑窗很是小,而数据包Packet1
和Packet2
比较大,颇有可能服务端须要分屡次才能将两个包接收彻底,期间发生屡次拆包。以上列举状况的背后缘由分别以下:
- 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包。
- 应用程序写入数据小于套接字缓冲区大小,网卡将应用屡次写入的数据发送到网络上,这将会发生粘包。
- 进行
MSS
(最大报文长度)大小的TCP
分段,当TCP
报文长度-TCP
头部长度>MSS
的时候将发生拆包。 - 接收方法不及时读取套接字缓冲区数据,这将发生粘包。
如何基于Netty处理粘包、拆包问题
因为底层的TCP
没法理解上层的业务数据,因此在底层是没法保证数据包不被拆分和重组的,这个问题只能经过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,能够概括以下:
- 消息定长,例如每一个报文的大小为固定长度200字节,若是不够,空位补空格;
- 在包尾增长回车换行符进行分割,例如
FTP
协议; - 将消息分为消息头和消息体,消息头中包含表示消息总长度的字段,一般设计思路为消息头的第一个字段使用
int32
来表示消息的总长度; - 更复杂的应用层协议。
以前Netty示例中其实并无考虑读半包问题,这在功能测试每每没有问题,可是一旦请求数过多或者发送大报文以后,就会存在该问题。若是代码没有考虑,每每就会出现解码错位或者错误,致使程序不能正常工做,下面看看Netty是如何根据主流的解决方案进行抽象实现来帮忙解决这一问题的。
以下表所示,Netty为了找出消息的边界,采用封帧方式:
方式 | 解码 | 编码 |
---|---|---|
固定长度 | FixedLengthFrameDecoder |
简单 |
分隔符 | DelimiterBasedFrameDecoder |
简单 |
专门的 length 字段 | LengthFieldBasedFrameDecoder |
LengthFieldPrepender |
注意到,Netty提供了对应的解码器来解决对应的问题,有了这些解码器,用户不须要本身对读取的报文进行人工解码,也不须要考虑TCP的粘包和半包问题。为何这么说呢?下面列举一个包尾增长分隔符的例子:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.util.concurrent.atomic.AtomicInteger; /** * @Author: wuxiaofei * @Date: 2020/8/15 0015 19:15 * @Version: 1.0 * @Description:入站处理器 */ @ChannelHandler.Sharable public class DelimiterServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger counter = new AtomicInteger(0); private AtomicInteger completeCounter = new AtomicInteger(0); /*** 服务端读取到网络数据后的处理*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept["+request +"] and the counter is:"+counter.incrementAndGet()); String resp = "Hello,"+request+". Welcome to Netty World!" + DelimiterEchoServer.DELIMITER_SYMBOL; ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); } /*** 服务端读取完成网络数据后的处理*/ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); System.out.println("the ReadComplete count is " +completeCounter.incrementAndGet()); } /*** 发生异常后的处理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import java.net.InetSocketAddress; /** * @Author: wuxiaofei * @Date: 2020/8/15 0015 19:17 * @Version: 1.0 * @Description:服务端 */ public class DelimiterEchoServer { public static final String DELIMITER_SYMBOL = "@~"; public static final int PORT = 9997; public static void main(String[] args) throws InterruptedException { DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer(); System.out.println("服务器即将启动"); delimiterEchoServer.start(); } public void start() throws InterruptedException { final DelimiterServerHandler serverHandler = new DelimiterServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/ .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/ /*服务端每接收到一个链接请求,就会新启一个socket通讯,也就是channel, 因此下面这段代码的做用就是为这个子channel增长handle*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/ System.out.println("服务器启动完成,等待客户端的链接和数据....."); f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/ } finally { group.shutdownGracefully().sync();/*优雅关闭线程组*/ } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL .getBytes()); //服务端收到数据包后通过DelimiterBasedFrameDecoder即分隔符基础框架解码器解码为一个个带有分隔符的数据包。 ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new DelimiterServerHandler()); } } }
添加到ChannelPipeline
的DelimiterBasedFrameDecoder
用于对使用分隔符结尾的消息进行自动解码,固然还有没有用到的FixedLengthFrameDecoder
用于对固定长度的消息进行自动解码等解码器。正如上门的代码使用案例,有了Netty提供的几码器能够轻松地完成对不少消息的自动解码,并且不须要考虑TCP粘包/拆包致使的读半包问题,极大地提高了开发效率。
Netty示例代码详解
相信看完上面的铺垫,你对Netty编码有了必定的了解了,下面再来总体梳理一遍吧。
一、设置EventLoopGroup
线程组(Reactor
线程组)
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
上面咱们说过Netty
中使用Reactor
模式,bossGroup
表示服务器链接监听线程组,专门接受 Accept
新的客户端client
链接。另外一个workerGroup
表示处理每一链接的数据收发的线程组,来处理消息的读写事件。
二、服务端引导器
ServerBootstrap serverBootstrap = new ServerBootstrap();
集成全部配置,用来启动Netty
服务端。
三、设置ServerBootstrap
信息
serverBootstrap.group(bossGroup, workerGroup);
将两个线程组设置到ServerBootstrap
中。
四、设置ServerSocketChannel
类型
serverBootstrap.channel(NioServerSocketChannel.class);
设置通道的IO
类型,Netty
不止支持Java NIO
,也支持阻塞式IO
,例如OIO
OioServerSocketChannel.class)
五、设置参数
serverBootstrap.option(ChannelOption.SO_BACKLOG, 100);
经过option()
方法能够设置不少参数,这里SO_BACKLOG
标识服务端接受链接的队列长度,若是队列已满,客户端链接将被拒绝。默认值,Windows
为200,其余为128,这里设置的是100。
六、设置Handler
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
设置 ServerSocketChannel
对应的Handler
,这里只能设置一个,它会在SocketChannel
创建起来以前执行。
七、设置子Handler
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } });
Netty
中提供了一种能够设置多个Handler
的途径,即便用ChannelInitializer
方式。ChannelPipeline
是Netty
处理请求的责任链,这是一个ChannelHandler
的链表,而ChannelHandler
就是用来处理网络请求的内容的。
每个channel
,都有一个处理器流水线。装配child channel
流水线,调用childHandler()
方法,传递一个ChannelInitializer
的实例。
在 child channel
建立成功,开始通道初始化的时候,在bootstrap启动器中配置的ChannelInitializer
实例就会被调用。
这个时候,才真正的执行去执行 initChannel
初始化方法,开始通道流水线装配。
流水线装配,主要是在流水线pipeline
的后面,增长负责数据读写、处理业务逻辑的handler
。
处理器 ChannelHandler
用来处理网络请求内容,有ChannelInboundHandler
和ChannelOutboundHandler
两种,ChannlPipeline
会从头至尾顺序调用ChannelInboundHandler
处理网络请求内容,从尾到头调用ChannelOutboundHandler
处理网络请求内容
八、绑定端口号
ChannelFuture f = serverBootstrap.bind(PORT).sync();
绑定端口号
九、等待服务端端口号关闭
f.channel().closeFuture().sync();
等待服务端监听端口关闭,sync()
会阻塞主线程,内部调用的是 Object
的 wait()
方法
十、关闭EventLoopGroup线程组
bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
总结
这篇文章主要是从一个demo
做为引子,而后介绍了Netty
的包结构、Reactor
模型、编程规范等等,目的很简单,但愿你可以读懂这段demo
并写出来。
后面开始继续Netty
源码解析部分,敬请期待。
参考资料
- 《Netty in Action》书籍
- 慕课Netty专栏
- 掘金闪电侠Netty小册
- 芋道源码Netty专栏
- Github[fork from krcys]
感谢Netty专栏做者们优秀的文章内容~