本节在《netty 学习笔记二》之上进行了一段大跃进,所以在本节你将会一股脑看到 netty自定义协议设计、数据载体 ByteBuf API、通讯协议编解码、pipeline 结构、ChannelHandler 生命周期 和 热插拔效果、单聊和群聊实现、心跳与空闲检测、netty IM 系统的性能优化等等内容。因为 netty 已经造了不少好用的轮子,如粘包拆包处理器、空闲检测处理器、通用编解码器等,咱们只须要配置一些构造参数,基本上就能够足够使用,不用再重复造轮子了。前端
代码已经上传到个人 github:https://github.com/christmad/code-share/tree/master/netty-group-chatjava
一种通用的通讯协议设计:
1. 魔数(magic number)
魔数做为第一个字段,一般状况下为固定的几个字节,咱们能够规定为 4个字节。值通常设定为不容易被猜到的。
魔数能够认为是一种显示的起始标志,在 java 的二进制文件中以魔数 0xcafebabe 做为开头,有殊途同归之妙。
在源源不断的网络包中,起始标志能够减小错误率,迅速找出正确的包。
在编程中,magic number 也用来描述不使用变量名而直接使用数字的编程习惯,直接使用数字一般会引发歧义。
2. 版本号
一般是预留字段, IP 协议中也有一个 version 字段用来标识 IP 协议的版本是 IPv4 或 IPv6。
3. 序列化算法
序列化算法,是指如何把对象转为二进制数据,以及把二进制数据转为对象,此处是 java 对象。
好比 java 自带的序列化算法,json,hessian 等序列化方式。
规定一个字节,能够表示 256 种算法,足够用了。
4. 指令
好比 IM即时通讯系统中客户端登陆、聊天等指令。
对于 IM系统 ,能够规定 1个字节,能够表示 256 种指令,彻底够用。
5. 数据长度
规定4个字节。
6. 数据内容
变长 N 字节,具体内容序列化后能够占不一样的长度。git
目前除了版本号外,这里设计的每个字段在 ChannelHandler 里都提现出来了。魔数对应 IMProtocolSplitter(同时完成了服务端拆包工做);序列化算法对应 UltimatePacketCodecHandler;指令对应 IMHandler;数据对应了具体类型的 Packet。序列化算法采用了 alibaba 的 FASTJSON,JSON 也是前端大量在使用的一种序列化方式。github
有了自定义协议设定,编码时只要逐字段按照协议拼装字节便可,一般咱们的 java 对象使用 FASTJSON 序列化后会塞到“数据”字段里。解码时比较关键的两个字段是“指令”和“数据”,根据“指令”类型获取对应的 Class 类型,而后获取序列化过的 byte[],最后 FASTJSON API 使用这两个参数进行反序列化。 算法
Bytebuf 数据结构以下图:数据库
API 这块最终每一个人都有本身不一样的熟悉程度,很差谈。只讲两点:编程
1. get/set 方法不会改变 读写指针,而 read/write 方法会改变读写指针。json
2. 若是遇到内存紧张的问题,必定是没有释放内存。netty 某些 decoder 会自动释放内存,但若是假设我这个项目中用的 MessageToMessageCodec 底层没有帮咱们管理内存而致使内存泄漏,咱们就应该本身在程序中手动释放内存。对应的方法是 ByteBuf#release(),它将会把 ByteBuf 引用计数减 1,减到 0 时表示能被回收。默认申请完一块 ByteBuf 默认计数为 1。对应的增长计数的方法为 retain(),在 slice()、duplicate() 场景下会用到。缓存
因为本次实现的 netty IM系统 server端 和 client端都由 java 实现,而咱们的 client端使用了控制台来实现。所以代码中的每一种 ConsoleCommand 就对应了实际项目中的 UI 控件按钮,如 createGroup、listGroupMembers 等“一级指令”就对应一个个 UI 上的按钮。构造了一个 ConsoleCommandManager 方便聚合全部的“二级指令”,这个功用和 IMHandler 有点类似,均可以简化多层 if else 代码。性能优化
链接假死的现象是:在某一端(服务端或者客户端)看来,底层的 TCP 链接已经断开了,可是应用程序并无捕获到,所以会认为这条链接仍然是存在的,从 TCP 层面来讲,只有收到四次握手数据包或者一个 RST 数据包,链接的状态才表示已断开。
1. 对于服务端来讲,由于每条链接都会耗费 cpu 和内存资源,大量假死的链接会逐渐耗光服务器的资源,最终致使性能逐渐降低,程序崩溃。
2. 对于客户端来讲,链接假死会形成发送数据超时,影响用户体验。
a. 应用程序出现线程堵塞,没法进行数据的读写
b. 客户端或者服务端网络相关的设备出现故障,好比网卡,机房故障
c. 公网丢包。公网环境相对内网而言,很是容易出现丢包,网络抖动等现象,若是在一段时间内用户接入的网络连续出现丢包现象,那么对客户端来讲数据一直发送不出去,而服务端也是一直收不到客户端来的数据,链接就一直耗着
空闲检测指的是每隔一段时间,检测这段时间内是否有数据读写,简化一下,咱们的服务端只须要检测一段时间内,是否收到过客户端发来的数据便可,Netty 自带的 IdleStateHandler 就能够实现这个功能
PS:这个问题上服务端和客户端的策略是同样的
1. 链接假死。
2. 非假死状态下确实没有发送数据
只须要排查第二种状况。使用 Netty 自带的 IdleStateHandler 就能够实现这个功能,见代码 IMIdleStateHandler.java:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/handler/IMIdleStateHandler.java
优化一般指在服务端优化,服务端单机可能会面对十几万甚至几十万链接,须要进行一些对象碎片管理、优化(缩短)调用链(netty 中叫作 缩短事件传播路径)、阻塞方法优化等。
在 ServerBootstrap 的 childHandler() 方法中,ChannelInitializer 类的 initChannel 逻辑是:每次有新链接到来的时候,都会调用 ChannelInitializer 的 initChannel() 方法,而后把咱们添加的 ChannelHandler 都 new 一次,插入到 channel pipeline 中。
仔细观察这些 handler ,它们方法中是没有成员变量的,也就是无状态的,所以能够用单例模式来优化这些实例。在单机十几万甚至几十万链接的状况下,单例使得性能获得必定程度提高,建立的小对象也大大减小了。
而后重要的一点是,在 netty 中声明一个 ChannelHandler 是共享的,须要使用注解 @ChannelHandler.Sharable 来告诉 netty 这个 handler 是能够被多个 channel 共享的。
在没有单例优化前,你的 ChannelInitializer # initChannel() 方法多是这样的:
1 serverBootstrap 2 .childHandler(new ChannelInitializer<NioSocketChannel>() { 3 protected void initChannel(NioSocketChannel ch) { 4 ch.pipeline().addLast(new Spliter()); 5 ch.pipeline().addLast(new PacketDecoder()); 6 ch.pipeline().addLast(new LoginRequestHandler()); 7 ch.pipeline().addLast(new AuthHandler()); 8 ch.pipeline().addLast(new MessageRequestHandler()); 9 ch.pipeline().addLast(new CreateGroupRequestHandler()); 10 ch.pipeline().addLast(new JoinGroupRequestHandler()); 11 ch.pipeline().addLast(new QuitGroupRequestHandler()); 12 ch.pipeline().addLast(new ListGroupMembersRequestHandler()); 13 ch.pipeline().addLast(new GroupMessageRequestHandler()); 14 ch.pipeline().addLast(new LogoutRequestHandler()); 15 ch.pipeline().addLast(new PacketEncoder()); 16 } 17 });
使用单例改造后,ChannelInitializer # initChannel() 方法是这样的:
serverBootstrap .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new Spliter()); ch.pipeline().addLast(new PacketDecoder()); ch.pipeline().addLast(LoginRequestHandler.INSTANCE); ch.pipeline().addLast(AuthHandler.INSTANCE); ch.pipeline().addLast(MessageRequestHandler.INSTANCE); ch.pipeline().addLast(CreateGroupRequestHandler.INSTANCE); ch.pipeline().addLast(JoinGroupRequestHandler.INSTANCE); ch.pipeline().addLast(QuitGroupRequestHandler.INSTANCE); ch.pipeline().addLast(ListGroupMembersRequestHandler.INSTANCE); ch.pipeline().addLast(GroupMessageRequestHandler.INSTANCE); ch.pipeline().addLast(LogoutRequestHandler.INSTANCE); ch.pipeline().addLast(new PacketEncoder()); } });
另外,须要注意的是,Splitter 不能被共享。虽然看起来咱们的 Splitter 方法内也没有引用任何成员变量,但也许是由于每一个链接都要维护本身的 ByteBuf,所以 Splitter 继承了 拆包器-LengthFieldBasedFrameDecoder 以后因为父类的有状态而致使 Splitter 也有状态了。若是你不信,能够强行试试把 Splitter 改形成单例。最后你会发现,控制台会输出一个错误。debug 后你会看到在 Splitter 某个父类中的构造器是这样的:
protected ByteToMessageDecoder() { ensureNotSharable(); }
这已经在告诉你不能把 ByteToMessageDecoder 和 它的派生子类设为共享 handler。个人 netty 版本用的是 4.1.24.final,而在这以前的一些版本中 ensureNotSharable() 方法还并非在 ChannelHandler 继承体系中的一个方法,是用了某种 Util 工具来存放这个方法。不太重点是,咱们知道运行起来效果是同样的。
Netty 内部提供了一个类,叫作 MessageToMessageCodec,使用它可让咱们的编解码操做放到一个类里面去实现。而且这个 codec 也是能够共享的。详情见代码 UltimatePacketCodecHandler.java:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/handler/UltimatePacketCodecHandler.java
1 @ChannelHandler.Sharable 2 public class UltimatePacketCodecHandler extends MessageToMessageCodec<ByteBuf, Packet> { 3 public static final UltimatePacketCodecHandler INSTANCE = new UltimatePacketCodecHandler(); 4 5 private UltimatePacketCodecHandler() {} 6 7 @Override 8 protected void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) { 9 // 使用 channel 上的 ByteBuf alloc,方便 netty 管理内存 10 ByteBuf byteBuf = ctx.channel().alloc().ioBuffer(); 11 out.add(PacketCodec.INSTANCE.encode(byteBuf, packet)); 12 } 13 14 @Override 15 protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) { 16 out.add(PacketCodec.INSTANCE.decode(buf)); 17 } 18 }
对咱们的 IM 应用来讲,每次从控制台(对应一个UI按钮)只会传一个指令到服务器,而且这个指令只会被某一个 handler 处理,所以这些指令 handler 有一个“平行”的概念。咱们能够将这些平行的 handler 压缩为一个 handler,如 IMRequestHandler 所示:
1 @ChannelHandler.Sharable 2 public class IMRequestHandler extends SimpleChannelInboundHandler<Packet> { 3 4 public static final IMRequestHandler INSTANCE = new IMRequestHandler(); 5 6 private Map<Byte, SimpleChannelInboundHandler<? extends Packet>> channelMap; 7 8 private IMRequestHandler() { 9 channelMap = new HashMap<>(); 10 // 将指令类型 和 request handler 作映射 11 channelMap.put(Command.MESSAGE_REQUEST, MessageRequestHandler.INSTANCE); 12 channelMap.put(Command.LOGIN_REQUEST, LoginRequestHandler.INSTANCE); 13 channelMap.put(Command.LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE); 14 channelMap.put(Command.CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE); 15 channelMap.put(Command.JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE); 16 channelMap.put(Command.LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE); 17 channelMap.put(Command.QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE); 18 channelMap.put(Command.GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE); 19 } 20 21 @Override 22 protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception { 23 SimpleChannelInboundHandler<? extends Packet> simpleChannelInboundHandler = channelMap.get(msg.getCommand()); 24 if (simpleChannelInboundHandler != null) { 25 // 只关心能处理的类型 26 simpleChannelInboundHandler.channelRead(ctx, msg); 27 } 28 } 29 }
再看看代码中的 IMResponseHandler:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/client/handler/IMResponseHandler.java
其实客户端没有必要进行这种程度的优化,不过能够再次感觉一下 netty 给咱们带来的编码上的方便。
一般咱们的应用会涉及数据库或网络操做,好比在 LoginRequestHandler 中,实际上在 valid() 或 checkUser() 方法中作的事情是把用户名和密码拿到数据库或某个网络中间件里面去进行比较,而例子中我只是粗暴验证直接返回 true 并简单的生成一个 userId 返回了。实际场景以下:
1 protected void channelRead0(ChannelHandlerContext ctx, T packet) { 2 // 1. balabala 一些逻辑 3 // 2. 数据库或者网络等一些耗时的操做 4 // 3. writeAndFlush() 5 // 4. balabala 其余的逻辑 6 }
对于第2个过程当中的耗时操做,一般不会直接这样写。为何?先来看看 netty 一条 NIO 线程的处理逻辑抽象:
1 List<Channel> channelList = 已有数据可读的 channel 2 for (Channel channel in channelist) { 3 for (ChannelHandler handler in channel.pipeline()) { 4 handler.channelRead0(ctx, msg); 5 } 6 }
当咱们执行 NioEventLoopGroup worker = new NioEventLoopGroup(); 这行代码时,netty 默认会启动 2倍 CPU 核数的 NIO 线程,在单机大量链接(几万甚至十几万以上)状况下, 一条 NIO 线程管理着几千条甚至上万条链接。若是在某个链接上执行 channelRead0() 时发生阻塞,最终都会拖慢绑定在该 NIO 线程上的其余 channel 的执行速度。
这时咱们应该把耗时操做扔到业务线程池中去处理,处理逻辑如 LoginRequestHandler.java 中代码所示:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/server/handler/LoginRequestHandler.java,伪代码以下:
1 ThreadPool threadPool = xxx; 2 3 protected void channelRead0(ChannelHandlerContext ctx, T packet) { 4 threadPool.submit(new Runnable() { 5 // 1. balabala 一些逻辑 6 // 2. 数据库或者网络等一些耗时的操做 7 // 3. writeAndFlush() 8 // 4. balabala 其余的逻辑 9 }); 10 }
最后,其余小细节就不在本篇里长篇大论了,之后应该会收集一个系列来专门记录编程里的小技巧。不少功能也没有在这个版本里一并实现,好比消息的存储,须要加上数据库。以及“模拟打开聊天窗口”时查看最近的一些消息等。参考 QQ 最近这几年的变化,打开聊天窗口加载的消息数量变少了,若是有关注的话应该会对这个变化有印象,以前的一些版本中打开窗口就能看到以前聊过的十几行消息,后面慢慢变成几行,目前(2019-11-07)打开窗口只能看三行了。只要有时间,这些功能都是能够添加的。好比消息存储和历史消息这块,先有消息存储后,后续就能够作一个 7天内、3天内的、当天的N条 等不一样级别的历史消息缓存级别。后面有空我也会继续持续完善这个 IM 系统的功能,毕竟这是我兴趣的项目之一。
有缘下篇博客见,See ya~~