在《芋道 Spring Boot WebSocket 入门》文章中,咱们使用 WebSocket 实现了一个简单的 IM 功能,支持身份认证、私聊消息、群聊消息。前端
而后就有胖友私信艿艿,但愿使用纯 Netty 实现一个相似的功能。良心的艿艿,固然不会给她发红人卡,所以就有了本文。可能有胖友不知道 Netty 是什么,这里简单介绍下:java
Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。git
也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 能够确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。github
Netty 至关简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。web
下面,咱们来新建三个项目,以下图所示:算法
lab-67-netty-demo-server
项目:搭建 Netty 服务端。lab-67-netty-demo-client
项目:搭建 Netty 客户端。lab-67-netty-demo-common
项目:提供 Netty 的基础封装,提供消息的编解码、分发的功能。另外,咱们也会提供 Netty 经常使用功能的示例:spring
不哔哔,直接开干。数据库
友情提示:可能会胖友担忧,没有 Netty 基础是否是没法阅读本文?!艿艿的想法,看!就硬看,按照代码先本身能搭建一下哈~文末,艿艿会提供一波 Netty 基础入门的文章。apache
本文在提供完整代码示例,可见 https://github.com/YunaiV/Spr... 的 lab-67 目录。原创不易,给点个 Star 嘿,一块儿冲鸭!编程
本小节,咱们先来使用 Netty 构建服务端与客户端的核心代码,让胖友对项目的代码有个初始的认知。
建立 lab-67-netty-demo-server
项目,搭建 Netty 服务端。以下图所示:
下面,咱们只会暂时看看 server
包下的代码,避免信息量过大,击穿胖友的秃头。
建立 NettyServer 类,Netty 服务端。代码以下:
@Component public class NettyServer { private Logger logger = LoggerFactory.getLogger(getClass()); @Value("${netty.port}") private Integer port; @Autowired private NettyServerHandlerInitializer nettyServerHandlerInitializer; /** * boss 线程组,用于服务端接受客户端的链接 */ private EventLoopGroup bossGroup = new NioEventLoopGroup(); /** * worker 线程组,用于服务端接受客户端的数据读写 */ private EventLoopGroup workerGroup = new NioEventLoopGroup(); /** * Netty Server Channel */ private Channel channel; /** * 启动 Netty Server */ @PostConstruct public void start() throws InterruptedException { // <2.1> 建立 ServerBootstrap 对象,用于 Netty Server 启动 ServerBootstrap bootstrap = new ServerBootstrap(); // <2.2> 设置 ServerBootstrap 的各类属性 bootstrap.group(bossGroup, workerGroup) // <2.2.1> 设置两个 EventLoopGroup 对象 .channel(NioServerSocketChannel.class) // <2.2.2> 指定 Channel 为服务端 NioServerSocketChannel .localAddress(new InetSocketAddress(port)) // <2.2.3> 设置 Netty Server 的端口 .option(ChannelOption.SO_BACKLOG, 1024) // <2.2.4> 服务端 accept 队列的大小 .childOption(ChannelOption.SO_KEEPALIVE, true) // <2.2.5> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能 .childOption(ChannelOption.TCP_NODELAY, true) // <2.2.6> 容许较小的数据包的发送,下降延迟 .childHandler(nettyServerHandlerInitializer); // <2> 绑定端口,并同步等待成功,即启动服务端 ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { channel = future.channel(); logger.info("[start][Netty Server 启动在 {} 端口]", port); } } /** * 关闭 Netty Server */ @PreDestroy public void shutdown() { // <3.1> 关闭 Netty Server if (channel != null) { channel.close(); } // <3.2> 优雅关闭两个 EventLoopGroup 对象 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
🔥 ① 在类上,添加 @Component
注解,把 NettyServer 的建立交给 Spring 管理。
port
属性,读取 application.yml
配置文件的 netty.port
配置项。#start()
方法,添加 @PostConstruct
注解,启动 Netty 服务器。#shutdown()
方法,添加 @PreDestroy
注解,关闭 Netty 服务器。🔥 ② 咱们来详细看看 #start()
方法的代码,如何实现 Netty Server 的启动。
<2.1>
处,建立 ServerBootstrap 类,Netty 提供的服务器的启动类,方便咱们初始化 Server。
<2.2>
处,设置 ServerBootstrap 的各类属性。
友情提示:这里涉及较多 Netty 组件的知识,艿艿先以简单的语言描述,后续胖友在文末的 Netty 基础入门的文章,补充学噢。
<2.2.1>
处,调用 #group(EventLoopGroup parentGroup, EventLoopGroup childGroup)
方法,设置使用 bossGroup
和 workerGroup
。其中:
bossGroup
属性:Boss 线程组,用于服务端接受客户端的链接。workerGroup
属性:Worker 线程组,用于服务端接受客户端的数据读写。Netty 采用的是多 Reactor 多线程的模型,服务端能够接受更多客户端的数据读写的能力。缘由是:
- 建立专门用于接受客户端链接的
bossGroup
线程组,避免由于已链接的客户端的数据读写频繁,影响新的客户端的链接。- 建立专门用于接收客户端读写的
workerGroup
线程组,多个线程进行客户端的数据读写,能够支持更多客户端。课后习题:感兴趣的胖友,后续能够看看《【NIO 系列】——之 Reactor 模型》文章。
<2.2.2>
处,调用 #channel(Class<? extends C> channelClass)
方法,设置使用 NioServerSocketChannel 类,它是 Netty 定义的 NIO 服务端 TCP Socket 实现类。
<2.2.3>
处,调用 #localAddress(SocketAddress localAddress)
方法,设置服务端的端口。
<2.2.4>
处,调用 option#(ChannelOption<T> option, T value)
方法,设置服务端接受客户端的链接队列大小。由于 TCP 创建链接是三次握手,因此第一次握手完成后,会添加到服务端的链接队列中。
课后习题:更多相关内容,后续能够看看 《浅谈 TCP Socket 的 backlog 参数》文章。
<2.2.5>
处,调用 #childOption(ChannelOption<T> childOption, T value)
方法,TCP Keepalive 机制,实现 TCP 层级的心跳保活功能。
课后习题:更多相关内容,后续能够看看 《TCP Keepalive 机制刨根问底》文章。
<2.2.6>
处,调用 #childOption(ChannelOption<T> childOption, T value)
方法,容许较小的数据包的发送,下降延迟。
课后习题:更多相关内容,后续能够看看 《详解 Socket 编程 --- TCP_NODELAY 选项》文章。
<2.2.7>
处,调用 #childHandler(ChannelHandler childHandler)
方法,设置客户端链接上来的 Channel 的处理器为 NettyServerHandlerInitializer。稍后咱们在「2.1.2 NettyServerHandlerInitializer」小节来看看。
<2.3>
处,调用 #bind()
+ #sync()
方法,绑定端口,并同步等待成功,即启动服务端。
🔥 ③ 咱们来详细看看 #shutdown()
方法的代码,如何实现 Netty Server 的关闭。
<3.1>
处,调用 Channel 的 #close()
方法,关闭 Netty Server,这样客户端就再也不能链接了。
<3.2>
处,调用 EventLoopGroup 的 #shutdownGracefully()
方法,优雅关闭 EventLoopGroup。例如说,它们里面的线程池。
在看 NettyServerHandlerInitializer 的代码以前,咱们须要先了解下 Netty 的 ChannelHandler 组件,用来处理 Channel 的各类事件。这里的事件很普遍,好比能够是链接、数据读写、异常、数据转换等等。
ChannelHandler 有很是多的子类,其中有个很是特殊的 ChannelInitializer,它用于 Channel 建立时,实现自定义的初始化逻辑。这里咱们建立的 NettyServerHandlerInitializer 类,就继承了 ChannelInitializer 抽象类,代码以下:
@Component public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> { /** * 心跳超时时间 */ private static final Integer READ_TIMEOUT_SECONDS = 3 * 60; @Autowired private MessageDispatcher messageDispatcher; @Autowired private NettyServerHandler nettyServerHandler; @Override protected void initChannel(Channel ch) { // <1> 得到 Channel 对应的 ChannelPipeline ChannelPipeline channelPipeline = ch.pipeline(); // <2> 添加一堆 NettyServerHandler 到 ChannelPipeline 中 channelPipeline // 空闲检测 .addLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS)) // 编码器 .addLast(new InvocationEncoder()) // 解码器 .addLast(new InvocationDecoder()) // 消息分发器 .addLast(messageDispatcher) // 服务端处理器 .addLast(nettyServerHandler) ; } }
在每个客户端与服务端创建完成链接时,服务端会建立一个 Channel 与之对应。此时,NettyServerHandlerInitializer 会进行执行 #initChannel(Channel c)
方法,进行自定义的初始化。
友情提示:建立的客户端的 Channel,不要和 「2.1.1 NettyServer」小节的 NioServerSocketChannel 混淆,不是同一个哈。在
#initChannel(Channel ch)
方法的ch
参数,就是此时建立的客户端 Channel。
① <1>
处,调用 Channel 的 #pipeline()
方法,得到客户端 Channel 对应的 ChannelPipeline。ChannelPipeline 由一系列的 ChannelHandler 组成,又或者说是 ChannelHandler 链。这样, Channel 全部上全部的事件都会通过 ChannelPipeline,被其上的 ChannelHandler 所处理。
② <2>
处,添加五个 ChannelHandler 到 ChannelPipeline 中,每个的做用看其上的注释。具体的,咱们会在后续的小节详细解释。
建立 NettyServerHandler 类,继承 ChannelInboundHandlerAdapter 类,实现客户端 Channel 创建链接、断开链接、异常时的处理。代码以下:
@Component @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private NettyChannelManager channelManager; @Override public void channelActive(ChannelHandlerContext ctx) { // 从管理器中添加 channelManager.add(ctx.channel()); } @Override public void channelUnregistered(ChannelHandlerContext ctx) { // 从管理器中移除 channelManager.remove(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error("[exceptionCaught][链接({}) 发生异常]", ctx.channel().id(), cause); // 断开链接 ctx.channel().close(); } }
① 在类上添加 @ChannelHandler.Sharable
注解,标记这个 ChannelHandler 能够被多个 Channel 使用。
② channelManager
属性,是咱们实现的客户端 Channel 的管理器。
#channelActive(ChannelHandlerContext ctx)
方法,在客户端和服务端创建链接完成时,调用 NettyChannelManager 的 #add(Channel channel)
方法,添加到其中。#channelUnregistered(ChannelHandlerContext ctx)
方法,在客户端和服务端断开链接时,调用 NettyChannelManager 的 #add(Channel channel)
方法,从其中移除。具体的 NettyChannelManager 的源码,咱们在「2.1.4 NettyChannelManager」 小节中来瞅瞅~
③ #exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
方法,在处理 Channel 的事件发生异常时,调用 Channel 的 #close()
方法,断开和客户端的链接。
建立 NettyChannelManager 类,提供两种功能。
🔥 ① 客户端 Channel 的管理。代码以下:
@Component public class NettyChannelManager { /** * {@link Channel#attr(AttributeKey)} 属性中,表示 Channel 对应的用户 */ private static final AttributeKey<String> CHANNEL_ATTR_KEY_USER = AttributeKey.newInstance("user"); private Logger logger = LoggerFactory.getLogger(getClass()); /** * Channel 映射 */ private ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>(); /** * 用户与 Channel 的映射。 * * 经过它,能够获取用户对应的 Channel。这样,咱们能够向指定用户发送消息。 */ private ConcurrentMap<String, Channel> userChannels = new ConcurrentHashMap<>(); /** * 添加 Channel 到 {@link #channels} 中 * * @param channel Channel */ public void add(Channel channel) { channels.put(channel.id(), channel); logger.info("[add][一个链接({})加入]", channel.id()); } /** * 添加指定用户到 {@link #userChannels} 中 * * @param channel Channel * @param user 用户 */ public void addUser(Channel channel, String user) { Channel existChannel = channels.get(channel.id()); if (existChannel == null) { logger.error("[addUser][链接({}) 不存在]", channel.id()); return; } // 设置属性 channel.attr(CHANNEL_ATTR_KEY_USER).set(user); // 添加到 userChannels userChannels.put(user, channel); } /** * 将 Channel 从 {@link #channels} 和 {@link #userChannels} 中移除 * * @param channel Channel */ public void remove(Channel channel) { // 移除 channels channels.remove(channel.id()); // 移除 userChannels if (channel.hasAttr(CHANNEL_ATTR_KEY_USER)) { userChannels.remove(channel.attr(CHANNEL_ATTR_KEY_USER).get()); } logger.info("[remove][一个链接({})离开]", channel.id()); } }
🔥 ② 向客户端 Channel 发送消息。代码以下:
@Component public class NettyChannelManager { /** * 向指定用户发送消息 * * @param user 用户 * @param invocation 消息体 */ public void send(String user, Invocation invocation) { // 得到用户对应的 Channel Channel channel = userChannels.get(user); if (channel == null) { logger.error("[send][链接不存在]"); return; } if (!channel.isActive()) { logger.error("[send][链接({})未激活]", channel.id()); return; } // 发送消息 channel.writeAndFlush(invocation); } /** * 向全部用户发送消息 * * @param invocation 消息体 */ public void sendAll(Invocation invocation) { for (Channel channel : channels.values()) { if (!channel.isActive()) { logger.error("[send][链接({})未激活]", channel.id()); return; } // 发送消息 channel.writeAndFlush(invocation); } } }
建立 pom.xml
文件,引入 Netty 依赖。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>lab-67-netty-demo</artifactId> <groupId>cn.iocoder.springboot.labs</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>lab-67-netty-demo-server</artifactId> <properties> <!-- 依赖相关配置 --> <spring.boot.version>2.2.4.RELEASE</spring.boot.version> <!-- 插件相关配置 --> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- Spring Boot 基础依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- Netty 依赖 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.50.Final</version> </dependency> <!-- 引入 netty-demo-common 封装 --> <dependency> <groupId>cn.iocoder.springboot.labs</groupId> <artifactId>lab-67-netty-demo-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
建立 NettyServerApplication 类,Netty Server 启动类。代码以下:
@SpringBootApplication public class NettyServerApplication { public static void main(String[] args) { SpringApplication.run(NettyServerApplication.class, args); } }
执行 NettyServerApplication 类,启动 Netty Server 服务器。日志以下:
... // 省略其余日志 2020-06-21 00:16:38.801 INFO 41948 --- [ main] c.i.s.l.n.server.NettyServer : [start][Netty Server 启动在 8888 端口] 2020-06-21 00:16:38.893 INFO 41948 --- [ main] c.i.s.l.n.NettyServerApplication : Started NettyServerApplication in 0.96 seconds (JVM running for 1.4)
Netty Server 启动在 8888 端口。
建立 lab-67-netty-demo-client
项目,搭建 Netty 客户端。以下图所示:
下面,咱们只会暂时看看 client
包下的代码,避免信息量过大,击穿胖友的秃头。
建立 NettyClient 类,Netty 客户端。代码以下:
@Component public class NettyClient { /** * 重连频率,单位:秒 */ private static final Integer RECONNECT_SECONDS = 20; private Logger logger = LoggerFactory.getLogger(getClass()); @Value("${netty.server.host}") private String serverHost; @Value("${netty.server.port}") private Integer serverPort; @Autowired private NettyClientHandlerInitializer nettyClientHandlerInitializer; /** * 线程组,用于客户端对服务端的链接、数据读写 */ private EventLoopGroup eventGroup = new NioEventLoopGroup(); /** * Netty Client Channel */ private volatile Channel channel; /** * 启动 Netty Server */ @PostConstruct public void start() throws InterruptedException { // <2.1> 建立 Bootstrap 对象,用于 Netty Client 启动 Bootstrap bootstrap = new Bootstrap(); // <2.2> bootstrap.group(eventGroup) // <2.2.1> 设置一个 EventLoopGroup 对象 .channel(NioSocketChannel.class) // <2.2.2> 指定 Channel 为客户端 NioSocketChannel .remoteAddress(serverHost, serverPort) // <2.2.3> 指定链接服务器的地址 .option(ChannelOption.SO_KEEPALIVE, true) // <2.2.4> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能 .option(ChannelOption.TCP_NODELAY, true) //<2.2.5> 容许较小的数据包的发送,下降延迟 .handler(nettyClientHandlerInitializer); // <2.3> 链接服务器,并异步等待成功,即启动客户端 bootstrap.connect().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // 链接失败 if (!future.isSuccess()) { logger.error("[start][Netty Client 链接服务器({}:{}) 失败]", serverHost, serverPort); reconnect(); return; } // 链接成功 channel = future.channel(); logger.info("[start][Netty Client 链接服务器({}:{}) 成功]", serverHost, serverPort); } }); } public void reconnect() { // ... 暂时省略代码。 } /** * 关闭 Netty Server */ @PreDestroy public void shutdown() { // <3.1> 关闭 Netty Client if (channel != null) { channel.close(); } // <3.2> 优雅关闭一个 EventLoopGroup 对象 eventGroup.shutdownGracefully(); } /** * 发送消息 * * @param invocation 消息体 */ public void send(Invocation invocation) { if (channel == null) { logger.error("[send][链接不存在]"); return; } if (!channel.isActive()) { logger.error("[send][链接({})未激活]", channel.id()); return; } // 发送消息 channel.writeAndFlush(invocation); } }
友情提示:总体代码,是和 「2.1.1 NettyServer」对等,且基本是一致的。
🔥 ① 在类上,添加 @Component
注解,把 NettyClient 的建立交给 Spring 管理。
serverHost
和 serverPort
属性,读取 application.yml
配置文件的 netty.server.host
和 netty.server.port
配置项。#start()
方法,添加 @PostConstruct
注解,启动 Netty 客户端。#shutdown()
方法,添加 @PreDestroy
注解,关闭 Netty 客户端。🔥 ② 咱们来详细看看 #start()
方法的代码,如何实现 Netty Client 的启动,创建和服务器的链接。
<2.1>
处,建立 Bootstrap 类,Netty 提供的客户端的启动类,方便咱们初始化 Client。
<2.2>
处,设置 Bootstrap 的各类属性。
<2.2.1>
处,调用 #group(EventLoopGroup group)
方法,设置使用 eventGroup
线程组,实现客户端对服务端的链接、数据读写。
<2.2.2>
处,调用 #channel(Class<? extends C> channelClass)
方法,设置使用 NioSocketChannel 类,它是 Netty 定义的 NIO 服务端 TCP Client 实现类。
<2.2.3>
处,调用 #remoteAddress(SocketAddress localAddress)
方法,设置链接服务端的地址。
<2.2.4>
处,调用 #option(ChannelOption<T> childOption, T value)
方法,TCP Keepalive 机制,实现 TCP 层级的心跳保活功能。
<2.2.5>
处,调用 #childOption(ChannelOption<T> childOption, T value)
方法,容许较小的数据包的发送,下降延迟。
<2.2.7>
处,调用 #handler(ChannelHandler childHandler)
方法,设置本身 Channel 的处理器为 NettyClientHandlerInitializer。稍后咱们在「2.2.2 NettyClientHandlerInitializer」小节来看看。
<2.3>
处,调用 #connect()
方法,链接服务器,并异步等待成功,即启动客户端。同时,添加回调监听器 ChannelFutureListener,在链接服务端失败的时候,调用 #reconnect()
方法,实现定时重连。😈 具体 #reconnect()
方法的代码,咱们稍后在瞅瞅哈。
③ 咱们来详细看看 #shutdown()
方法的代码,如何实现 Netty Client 的关闭。
<3.1>
处,调用 Channel 的 #close()
方法,关闭 Netty Client,这样客户端就断开和服务端的链接。
<3.2>
处,调用 EventLoopGroup 的 #shutdownGracefully()
方法,优雅关闭 EventLoopGroup。例如说,它们里面的线程池。
④ #send(Invocation invocation)
方法,实现向服务端发送消息。
由于 NettyClient 是客户端,因此无需像 NettyServer 同样使用「2.1.4 NettyChannelManager」维护 Channel 的集合。
建立的 NettyClientHandlerInitializer 类,就继承了 ChannelInitializer 抽象类,实现和服务端创建链接后,添加相应的 ChannelHandler 处理器。代码以下:
@Component public class NettyClientHandlerInitializer extends ChannelInitializer<Channel> { /** * 心跳超时时间 */ private static final Integer READ_TIMEOUT_SECONDS = 60; @Autowired private MessageDispatcher messageDispatcher; @Autowired private NettyClientHandler nettyClientHandler; @Override protected void initChannel(Channel ch) { ch.pipeline() // 空闲检测 .addLast(new IdleStateHandler(READ_TIMEOUT_SECONDS, 0, 0)) .addLast(new ReadTimeoutHandler(3 * READ_TIMEOUT_SECONDS)) // 编码器 .addLast(new InvocationEncoder()) // 解码器 .addLast(new InvocationDecoder()) // 消息分发器 .addLast(messageDispatcher) // 客户端处理器 .addLast(nettyClientHandler) ; } }
和「2.1.2 NettyServerHandlerInitializer」的代码基本同样,差异在于空闲检测额外增长 IdleStateHandler,客户端处理器换成了 NettyClientHandler。
建立 NettyClientHandler 类,实现客户端 Channel 断开链接、异常时的处理。代码以下:
@Component @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private NettyClient nettyClient; @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 发起重连 nettyClient.reconnect(); // 继续触发事件 super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error("[exceptionCaught][链接({}) 发生异常]", ctx.channel().id(), cause); // 断开链接 ctx.channel().close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception { // 空闲时,向服务端发起一次心跳 if (event instanceof IdleStateEvent) { logger.info("[userEventTriggered][发起一次心跳]"); HeartbeatRequest heartbeatRequest = new HeartbeatRequest(); ctx.writeAndFlush(new Invocation(HeartbeatRequest.TYPE, heartbeatRequest)) .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { super.userEventTriggered(ctx, event); } } }
① 在类上添加 @ChannelHandler.Sharable
注解,标记这个 ChannelHandler 能够被多个 Channel 使用。
② #channelInactive(ChannelHandlerContext ctx)
方法,实如今和服务端断开链接时,调用 NettyClient 的 #reconnect()
方法,实现客户端定时和服务端重连。
③ #exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
方法,在处理 Channel 的事件发生异常时,调用 Channel 的 #close()
方法,断开和客户端的链接。
④ #userEventTriggered(ChannelHandlerContext ctx, Object event)
方法,在客户端在空闲时,向服务端发送一次心跳,即心跳机制。这块的内容,咱们稍后详细讲讲。
建立 pom.xml
文件,引入 Netty 依赖。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>lab-67-netty-demo</artifactId> <groupId>cn.iocoder.springboot.labs</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>lab-67-netty-demo-client</artifactId> <properties> <!-- 依赖相关配置 --> <spring.boot.version>2.2.4.RELEASE</spring.boot.version> <!-- 插件相关配置 --> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- 实现对 Spring MVC 的自动化配置 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Netty 依赖 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.50.Final</version> </dependency> <!-- 引入 netty-demo-common 封装 --> <dependency> <groupId>cn.iocoder.springboot.labs</groupId> <artifactId>lab-67-netty-demo-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
建立 NettyClientApplication 类,Netty Client 启动类。代码以下:
@SpringBootApplication public class NettyClientApplication { public static void main(String[] args) { SpringApplication.run(NettyClientApplication.class, args); } }
执行 NettyClientApplication 类,启动 Netty Client 客户端。日志以下:
... // 省略其余日志 2020-06-21 09:06:12.205 INFO 44029 --- [ntLoopGroup-2-1] c.i.s.l.n.client.NettyClient : [start][Netty Client 链接服务器(127.0.0.1:8888) 成功]
同时 Netty Server 服务端发现有一个客户端接入,打印以下日志:
2020-06-21 09:06:12.268 INFO 41948 --- [ntLoopGroup-3-1] c.i.s.l.n.server.NettyChannelManager : [add][一个链接(db652822)加入]
至此,咱们已经构建 Netty 服务端和客户端完成。由于 Netty 提供的 API 很是便利,因此咱们不会像直接使用 NIO 时,须要处理大量底层且细节的代码。
不过,如上的内容仅仅是本文的开胃菜,正片即将开始!美滋滋,继续往下看,奥利给!
在「2. 构建 Netty 服务端与客户端」小节中,咱们实现了客户端和服务端的链接功能。而本小节,咱们要让它们两可以说上话,即进行数据的读写。
在平常项目的开发中,前端和后端之间采用 HTTP 做为通讯协议,使用文本内容进行交互,数据格式通常是 JSON。可是在 TCP 的世界里,咱们须要本身基于二进制构建,构建客户端和服务端的通讯协议。
咱们以客户端向服务端发送消息来举个例子,假设客户端要发送一个登陆请求,对应的类以下:
public class AuthRequest { /** 用户名 **/ private String username; /** 密码 **/ private String password; }
友情提示:服务端向客户端发消息,也是同样的过程哈!
序列化的工具很是多,例如说 Google 提供的 Protobuf,性能高效,且序列化出来的二进制数据较小。Netty 对 Protobuf 进行集成,提供了相应的编解码器。以下图所示:
可是考虑到不少胖友对 Protobuf 并不了解,由于它实现序列化又增长胖友的额外学习成本。所以,艿艿仔细一个捉摸,仍是采用 JSON 方式进行序列化。可能胖友会疑惑,JSON 不是将对象转换成字符串吗?嘿嘿,咱们再把字符串转换成 byte 字节数组就能够啦~
下面,咱们新建 lab-67-netty-demo-common
项目,并在 codec
包下,实现咱们自定义的通讯协议。以下图所示:
建立 Invocation 类,通讯协议的消息体。代码以下:
/** * 通讯协议的消息体 */ public class Invocation { /** * 类型 */ private String type; /** * 消息,JSON 格式 */ private String message; // 空构造方法 public Invocation() { } public Invocation(String type, String message) { this.type = type; this.message = message; } public Invocation(String type, Message message) { this.type = type; this.message = JSON.toJSONString(message); } // ... 省略 setter、getter、toString 方法 }
① type
属性,类型,用于匹配对应的消息处理器。若是类比 HTTP 协议,type
属性至关于请求地址。
② message
属性,消息内容,使用 JSON 格式。
另外,Message 是咱们定义的消息接口。代码以下:
public interface Message { // ... 空,做为标记接口 }
在开始看 Invocation 的编解码处理器以前,咱们先了解下粘包与拆包的概念。
若是的内容,引用 《Netty 解决粘包和拆包问题的四种方案》文章的内容,进行二次编辑。
产生粘包和拆包问题的主要缘由是,操做系统在发送 TCP 数据的时候,底层会有一个缓冲区,例如 1024 个字节大小。
若是一次请求发送的数据量比较小,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送,这就造成了粘包问题。
例如说,在 《详解 Socket 编程 --- TCP_NODELAY 选项》文章中咱们能够看到,在关闭 Nagle 算法时,请求不会等待知足缓冲区大小,而是尽快发出,下降延迟。
以下图展现了粘包和拆包的一个示意图,演示了粘包和拆包的三种状况:
对于粘包和拆包问题,常见的解决方案有三种:
🔥 ① 客户端在发送数据包的时候,每一个包都固定长度。好比 1024 个字节大小,若是客户端发送的数据长度不足 1024 个字节,则经过补充空格的方式补全到指定长度。
这种方式,艿艿暂时没有找到采用这种方式的案例。
🔥 ② 客户端在每一个包的末尾使用固定的分隔符。例如 \r\n
,若是一个包被拆分了,则等待下一个包发送过来以后找到其中的 \r\n
,而后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就获得了一个完整的包。
具体的案例,有 HTTP、WebSocket、Redis。
🔥 ③ 将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息以后才算是读到了一个完整的消息。
友情提示:方案 ③ 是 ① 的升级版, 动态长度。
本文,艿艿将采用这种方式,在每次 Invocation 序列化成字节数组写入 TCP Socket 以前,先将字节数组的长度写到其中。以下图所示:
建立 InvocationEncoder 类,实现将 Invocation 序列化,并写入到 TCP Socket 中。代码以下:
public class InvocationEncoder extends MessageToByteEncoder<Invocation> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) { // <2.1> 将 Invocation 转换成 byte[] 数组 byte[] content = JSON.toJSONBytes(invocation); // <2.2> 写入 length out.writeInt(content.length); // <2.3> 写入内容 out.writeBytes(content); logger.info("[encode][链接({}) 编码了一条消息({})]", ctx.channel().id(), invocation.toString()); } }
① MessageToByteEncoder 是 Netty 定义的编码 ChannelHandler 抽象类,将泛型 <I>
消息转换成字节数组。
② #encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out)
方法,进行编码的逻辑。
<2.1>
处,调用 JSON 的 #toJSONBytes(Object object, SerializerFeature... features)
方法,将 Invocation 转换成 字节数组。
<2.2>
处,将字节数组的长度,写入到 TCP Socket 当中。这样,后续「3.4 InvocationDecoder」能够根据该长度,解析到消息,解决粘包和拆包的问题。
友情提示:MessageToByteEncoder 会最终将
ByteBuf out
写到 TCP Socket 中。
<2.3>
处,将字节数组,写入到 TCP Socket 当中。
建立 InvocationDecoder 类,实现从 TCP Socket 读取字节数组,反序列化成 Invocation。代码以下:
public class InvocationDecoder extends ByteToMessageDecoder { private Logger logger = LoggerFactory.getLogger(getClass()); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // <2.1> 标记当前读取位置 in.markReaderIndex(); // <2.2> 判断是否可以读取 length 长度 if (in.readableBytes() <= 4) { return; } // <2.3> 读取长度 int length = in.readInt(); if (length < 0) { throw new CorruptedFrameException("negative length: " + length); } // <3.1> 若是 message 不够可读,则退回到原读取位置 if (in.readableBytes() < length) { in.resetReaderIndex(); return; } // <3.2> 读取内容 byte[] content = new byte[length]; in.readBytes(content); // <3.3> 解析成 Invocation Invocation invocation = JSON.parseObject(content, Invocation.class); out.add(invocation); logger.info("[decode][链接({}) 解析到一条消息({})]", ctx.channel().id(), invocation.toString()); } }
① ByteToMessageDecoder 是 Netty 定义的解码 ChannelHandler 抽象类,在 TCP Socket 读取到新数据时,触发进行解码。
② 在 <2.1>
、<2.2>
、<2.3>
处,从 TCP Socket 中读取长度。
③ 在 <3.1>
、<3.2>
、<3.3>
处,从 TCP Socket 中读取字节数组,并反序列化成 Invocation 对象。
最终,添加 List<Object> out
中,交给后续的 ChannelHandler 进行处理。稍后,咱们将在「4. 消息分发」小结中,会看到 MessageDispatcher 将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。
建立 pom.xml
文件,引入 Netty、FastJSON 等等依赖。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>lab-67-netty-demo</artifactId> <groupId>cn.iocoder.springboot.labs</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>lab-67-netty-demo-common</artifactId> <properties> <!-- 插件相关配置 --> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> </properties> <dependencies> <!-- Netty 依赖 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.50.Final</version> </dependency> <!-- FastJSON 依赖 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.71</version> </dependency> <!-- 引入 Spring 相关依赖 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>5.2.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.2.5.RELEASE</version> </dependency> <!-- 引入 SLF4J 依赖 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> </dependencies> </project>
至此,咱们已经完成通讯协议的定义、编解码的逻辑,是否是蛮有趣的?!
另外,咱们在 NettyServerHandlerInitializer 和 NettyClientHandlerInitializer 的初始化代码中,将编解码器添加到其中。以下图所示:
在 SpringMVC 中,DispatcherServlet 会根据请求地址、方法等,将请求分发到匹配的 Controller 的 Method 方法上。
在 lab-67-netty-demo-client
项目的 dispatcher
包中,咱们建立了 MessageDispatcher 类,实现和 DispatcherServlet 相似的功能,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。
下面,咱们来看看具体的代码实现。
建立 Message 接口,定义消息的标记接口。代码以下:
public interface Message { }
下图,是咱们涉及到的 Message 实现类。以下图所示:
建立 MessageHandler 接口,消息处理器接口。代码以下:
public interface MessageHandler<T extends Message> { /** * 执行处理消息 * * @param channel 通道 * @param message 消息 */ void execute(Channel channel, T message); /** * @return 消息类型,即每一个 Message 实现类上的 TYPE 静态字段 */ String getType(); }
<T>
,须要是 Message 的实现类。下图,是咱们涉及到的 MessageHandler 实现类。以下图所示:
建立 MessageHandlerContainer 类,做为 MessageHandler 的容器。代码以下:
public class MessageHandlerContainer implements InitializingBean { private Logger logger = LoggerFactory.getLogger(getClass()); /** * 消息类型与 MessageHandler 的映射 */ private final Map<String, MessageHandler> handlers = new HashMap<>(); @Autowired private ApplicationContext applicationContext; @Override public void afterPropertiesSet() throws Exception { // 经过 ApplicationContext 得到全部 MessageHandler Bean applicationContext.getBeansOfType(MessageHandler.class).values() // 得到全部 MessageHandler Bean .forEach(messageHandler -> handlers.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中 logger.info("[afterPropertiesSet][消息处理器数量:{}]", handlers.size()); } /** * 得到类型对应的 MessageHandler * * @param type 类型 * @return MessageHandler */ MessageHandler getMessageHandler(String type) { MessageHandler handler = handlers.get(type); if (handler == null) { throw new IllegalArgumentException(String.format("类型(%s) 找不到匹配的 MessageHandler 处理器", type)); } return handler; } /** * 得到 MessageHandler 处理的消息类 * * @param handler 处理器 * @return 消息类 */ static Class<? extends Message> getMessageClass(MessageHandler handler) { // 得到 Bean 对应的 Class 类名。由于有可能被 AOP 代理过。 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler); // 得到接口的 Type 数组 Type[] interfaces = targetClass.getGenericInterfaces(); Class<?> superclass = targetClass.getSuperclass(); while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { // 此处,是以父类的接口为准 interfaces = superclass.getGenericInterfaces(); superclass = targetClass.getSuperclass(); } if (Objects.nonNull(interfaces)) { // 遍历 interfaces 数组 for (Type type : interfaces) { // 要求 type 是泛型参数 if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; // 要求是 MessageHandler 接口 if (Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) { Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); // 取首个元素 if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return (Class<Message>) actualTypeArguments[0]; } else { throw new IllegalStateException(String.format("类型(%s) 得到不到消息类型", handler)); } } } } } throw new IllegalStateException(String.format("类型(%s) 得到不到消息类型", handler)); } }
① 实现 InitializingBean 接口,在 #afterPropertiesSet()
方法中,扫描全部 MessageHandler Bean ,添加到 MessageHandler 集合中。
② 在 #getMessageHandler(String type)
方法中,得到类型对应的 MessageHandler 对象。稍后,咱们会在 MessageDispatcher 调用该方法。
③ 在 #getMessageClass(MessageHandler handler)
方法中,经过 MessageHandler 中,经过解析其类上的泛型,得到消息类型对应的 Class 类。这是参考 rocketmq-spring
项目的 DefaultRocketMQListenerContainer#getMessageType()
方法,进行略微修改。
友情提示:若是胖友对 Java 的泛型机制没有作过一点了解,可能略微有点硬核。能够先暂时跳过,知道意图便可。
建立 MessageDispatcher 类,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。代码以下:
@ChannelHandler.Sharable public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> { @Autowired private MessageHandlerContainer messageHandlerContainer; private final ExecutorService executor = Executors.newFixedThreadPool(200); @Override protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) { // <3.1> 得到 type 对应的 MessageHandler 处理器 MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType()); // 得到 MessageHandler 处理器的消息类 Class<? extends Message> messageClass = MessageHandlerContainer.getMessageClass(messageHandler); // <3.2> 解析消息 Message message = JSON.parseObject(invocation.getMessage(), messageClass); // <3.3> 执行逻辑 executor.submit(new Runnable() { @Override public void run() { // noinspection unchecked messageHandler.execute(ctx.channel(), message); } }); } }
① 在类上添加 @ChannelHandler.Sharable
注解,标记这个 ChannelHandler 能够被多个 Channel 使用。
② SimpleChannelInboundHandler 是 Netty 定义的消息处理 ChannelHandler 抽象类,处理消息的类型是 <I>
泛型时。
③ #channelRead0(ChannelHandlerContext ctx, Invocation invocation)
方法,处理消息,进行分发。
<3.1>
处,调用 MessageHandlerContainer 的 #getMessageHandler(String type)
方法,得到 Invocation 的 type
对应的 MessageHandler 处理器。
而后,调用 MessageHandlerContainer 的 #getMessageClass(messageHandler)
方法,得到 MessageHandler 处理器的消息类。
<3.2>
处,调用 JSON 的 # parseObject(String text, Class<T> clazz)
方法,将 Invocation 的 message
解析成 MessageHandler 对应的消息对象。
<3.3>
处,丢到线程池中,而后调用 MessageHandler 的 #execute(Channel channel, T message)
方法,执行业务逻辑。
注意,为何要丢到 executor
线程池中呢?咱们先来了解下 EventGroup 的线程模型。
友情提示:在咱们启动 Netty 服务端或者客户端时,都会设置其 EventGroup。
EventGroup 咱们能够先简单理解成一个线程池,而且线程池的大小仅仅是 CPU 数量 * 2。每一个 Channel 仅仅会被分配到其中的一个线程上,进行数据的读写。而且,多个 Channel 会共享一个线程,即便用同一个线程进行数据的读写。
那么胖友试着思考下,MessageHandler 的具体逻辑视线中,每每会涉及到 IO 处理,例如说进行数据库的读取。这样,就会致使一个 Channel 在执行 MessageHandler 的过程当中,阻塞了共享当前线程的其它 Channel 的数据读取。
所以,咱们在这里建立了 executor
线程池,进行 MessageHandler 的逻辑执行,避免阻塞 Channel 的数据读取。
可能会有胖友说,咱们是否是可以把 EventGroup 的线程池设置大一点,例如说 200 呢?对于长链接的 Netty 服务端,每每会有 1000 ~ 100000 的 Netty 客户端链接上来,这样不管设置多大的线程池,都会出现阻塞数据读取的状况。
友情提示:executor
线程池,咱们通常称之为业务线程池或者逻辑线程池,顾名思义,就是执行业务逻辑的。这样的设计方式,目前 Dubbo 等等 RPC 框架,都采用这种方式。
后续,胖友能够认真阅读下《【NIO 系列】——之 Reactor 模型》文章,进一步理解。
建立 NettyServerConfig 配置类,建立 MessageDispatcher 和 MessageHandlerContainer Bean。代码以下:
@Configuration public class NettyServerConfig { @Bean public MessageDispatcher messageDispatcher() { return new MessageDispatcher(); } @Bean public MessageHandlerContainer messageHandlerContainer() { return new MessageHandlerContainer(); } }
友情提示:和 「4.5 NettyServerConfig」小结一致。
建立 NettyClientConfig 配置类,建立 MessageDispatcher 和 MessageHandlerContainer Bean。代码以下:
@Configuration public class NettyClientConfig { @Bean public MessageDispatcher messageDispatcher() { return new MessageDispatcher(); } @Bean public MessageHandlerContainer messageHandlerContainer() { return new MessageHandlerContainer(); } }
后续,咱们将在以下小节,具体演示消息分发的使用:
Netty 客户端须要实现断开重连机制,解决各类状况下的断开状况。例如说:
具体的代码实现比较简单,只须要在两个地方增长重连机制。
考虑到重连会存在失败的状况,咱们采用定时重连的方式,避免占用过多资源。
① 在 NettyClient 中,提供 #reconnect()
方法,实现定时重连的逻辑。代码以下:
// NettyClient.java public void reconnect() { eventGroup.schedule(new Runnable() { @Override public void run() { logger.info("[reconnect][开始重连]"); try { start(); } catch (InterruptedException e) { logger.error("[reconnect][重连失败]", e); } } }, RECONNECT_SECONDS, TimeUnit.SECONDS); logger.info("[reconnect][{} 秒后将发起重连]", RECONNECT_SECONDS); }
经过调用 EventLoop 提供的 #schedule(Runnable command, long delay, TimeUnit unit)
方法,实现定时逻辑。而在内部的具体逻辑,调用 NettyClient 的 #start()
方法,发起链接 Netty 服务端。
又由于 NettyClient 在 #start()
方法在链接 Netty 服务端失败时,又会调用 #reconnect()
方法,从而再次发起定时重连。如此循环反复,知道 Netty 客户端链接上 Netty 服务端。以下图所示:
② 在 NettyClientHandler 中,实现 #channelInactive(ChannelHandlerContext ctx)
方法,在发现和 Netty 服务端断开时,调用 Netty Client 的 #reconnect()
方法,发起重连。代码以下:
// NettyClientHandler.java @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 发起重连 nettyClient.reconnect(); // 继续触发事件 super.channelInactive(ctx); }
① 启动 Netty Client,不要启动 Netty Server,控制台打印日志以下图:
能够看到 Netty Client 在链接失败时,不断发起定时重连。
② 启动 Netty Server,控制台打印以下图:
能够看到 Netty Client 成功重连上 Netty Server。
在上文中,艿艿推荐胖友阅读《TCP Keepalive 机制刨根问底》文章,咱们能够了解到 TCP 自带的空闲检测机制,默认是 2 小时。这样的检测机制,从系统资源层面上来讲是能够接受的。
可是在业务层面,若是 2 小时才发现客户端与服务端的链接实际已经断开,会致使中间很是多的消息丢失,影响客户的使用体验。
所以,咱们须要在业务层面,本身实现空闲检测,保证尽快发现客户端与服务端实际已经断开的状况。实现逻辑以下:
考虑到客户端和服务端之间并非一直有消息的交互,因此咱们须要增长心跳机制:
友情提示:
- 为何是 180 秒?能够加大或者减少,看本身但愿多快检测到链接异常。太短的时间,会致使心跳过于频繁,占用过多资源。
- 为何是 60 秒?三次机会,确认是否心跳超时。
虽然听起来有点复杂,可是实现起来并不复杂哈。
在 NettyServerHandlerInitializer 中,咱们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。以下图所示:
经过这样的方式,实现服务端发现 180 秒未从客户端读取到消息,主动断开链接。
友情提示:和 「6.1 服务端的空闲检测」一致。
在 NettyClientHandlerInitializer 中,咱们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。以下图所示:
经过这样的方式,实现客户端发现 180 秒未从服务端读取到消息,主动断开链接。
Netty 提供了 IdleStateHandler 处理器,提供空闲检测的功能,在 Channel 的读或者写空闲时间太长时,将会触发一个 IdleStateEvent 事件。
这样,咱们只须要在 NettyClientHandler 处理器中,在接收到 IdleStateEvent 事件时,客户端向客户端发送一次心跳消息。以下图所示:
同时,咱们在服务端项目中,建立了一个 HeartbeatRequestHandler 消息处理器,在收到客户端的心跳请求时,回复客户端一条确认消息。代码以下:
@Component public class HeartbeatRequestHandler implements MessageHandler<HeartbeatRequest> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void execute(Channel channel, HeartbeatRequest message) { logger.info("[execute][收到链接({}) 的心跳请求]", channel.id()); // 响应心跳 HeartbeatResponse response = new HeartbeatResponse(); channel.writeAndFlush(new Invocation(HeartbeatResponse.TYPE, response)); } @Override public String getType() { return HeartbeatRequest.TYPE; } }
启动 Netty Server 服务端,再启动 Netty Client 客户端,耐心等待 60 秒后,能够看到心跳日志以下:
// ... 客户端 2020-06-22 08:24:47.275 INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.c.handler.NettyClientHandler : [userEventTriggered][发起一次心跳] 2020-06-22 08:24:47.335 INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [encode][链接(44223e18) 编码了一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})] 2020-06-22 08:24:47.408 INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(44223e18) 解析到一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})] 2020-06-22 08:24:47.409 INFO 57005 --- [pool-1-thread-1] c.i.s.l.n.m.h.HeartbeatResponseHandler : [execute][收到链接(44223e18) 的心跳响应] // ... 服务端 2020-06-22 08:24:47.388 INFO 56998 --- [ntLoopGroup-3-1] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(34778465) 解析到一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})] 2020-06-22 08:24:47.390 INFO 56998 --- [pool-1-thread-1] c.i.s.l.n.m.h.HeartbeatRequestHandler : [execute][收到链接(34778465) 的心跳请求] 2020-06-22 08:24:47.399 INFO 56998 --- [ntLoopGroup-3-1] c.i.s.l.n.codec.InvocationEncoder : [encode][链接(34778465) 编码了一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]
友情提示:从本小节开始,咱们就具体看看业务逻辑的处理示例。
认证的过程,以下图所示:
建立 AuthRequest 类,定义用户认证请求。代码以下:
public class AuthRequest implements Message { public static final String TYPE = "AUTH_REQUEST"; /** * 认证 Token */ private String accessToken; // ... 省略 setter、getter、toString 方法 }
这里咱们使用 accessToken
认证令牌进行认证。
由于通常状况下,咱们使用 HTTP 进行登陆系统,而后使用登陆后的身份标识(例如说 accessToken
认证令牌),将客户端和当前用户进行认证绑定。
建立 AuthResponse 类,定义用户认证响应。代码以下:
public class AuthResponse implements Message { public static final String TYPE = "AUTH_RESPONSE"; /** * 响应状态码 */ private Integer code; /** * 响应提示 */ private String message; // ... 省略 setter、getter、toString 方法 }
服务端...
建立 AuthRequestHandler 类,为服务端处理客户端的认证请求。代码以下:
@Component public class AuthRequestHandler implements MessageHandler<AuthRequest> { @Autowired private NettyChannelManager nettyChannelManager; @Override public void execute(Channel channel, AuthRequest authRequest) { // <1> 若是未传递 accessToken if (StringUtils.isEmpty(authRequest.getAccessToken())) { AuthResponse authResponse = new AuthResponse().setCode(1).setMessage("认证 accessToken 未传入"); channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse)); return; } // <2> ... 此处应有一段 // <3> 将用户和 Channel 绑定 // 考虑到代码简化,咱们先直接使用 accessToken 做为 User nettyChannelManager.addUser(channel, authRequest.getAccessToken()); // <4> 响应认证成功 AuthResponse authResponse = new AuthResponse().setCode(0); channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse)); } @Override public String getType() { return AuthRequest.TYPE; } }
代码比较简单,胖友看看 <1>
、<2>
、<3>
、<4>
上的注释。
客户端...
建立 AuthResponseHandler 类,为客户端处理服务端的认证响应。代码以下:
@Component public class AuthResponseHandler implements MessageHandler<AuthResponse> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void execute(Channel channel, AuthResponse message) { logger.info("[execute][认证结果:{}]", message); } @Override public String getType() { return AuthResponse.TYPE; } }
打印个认证结果,方便调试。
客户端...
建立 TestController 类,提供 /test/mock
接口,模拟客户端向服务端发送请求。代码以下:
@RestController @RequestMapping("/test") public class TestController { @Autowired private NettyClient nettyClient; @PostMapping("/mock") public String mock(String type, String message) { // 建立 Invocation 对象 Invocation invocation = new Invocation(type, message); // 发送消息 nettyClient.send(invocation); return "success"; } }
启动 Netty Server 服务端,再启动 Netty Client 客户端,而后使用 Postman 模拟一次认证请求。以下图所示:
同时,能够看到认证成功的日志以下:
// 客户端... 2020-06-22 08:41:12.364 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [encode][链接(9e086597) 编码了一条消息(Invocation{type='AUTH_REQUEST', message='{"accessToken": "yunai"}'})] 2020-06-22 08:41:12.390 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(9e086597) 解析到一条消息(Invocation{type='AUTH_RESPONSE', message='{"code":0}'})] 2020-06-22 08:41:12.392 INFO 57583 --- [pool-1-thread-1] c.i.s.l.n.m.auth.AuthResponseHandler : [execute][认证结果:AuthResponse{code=0, message='null'}] // 服务端... 2020-06-22 08:41:12.374 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(791f122b) 解析到一条消息(Invocation{type='AUTH_REQUEST', message='{"accessToken": "yunai"}'})] 2020-06-22 08:41:12.379 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder : [encode][链接(791f122b) 编码了一条消息(Invocation{type='AUTH_RESPONSE', message='{"code":0}'})]
私聊的过程,以下图所示:
服务端负责将客户端 A 发送的私聊消息,转发给客户端 B。
建立 ChatSendToOneRequest 类,发送给指定人的私聊消息的请求。代码以下:
public class ChatSendToOneRequest implements Message { public static final String TYPE = "CHAT_SEND_TO_ONE_REQUEST"; /** * 发送给的用户 */ private String toUser; /** * 消息编号 */ private String msgId; /** * 内容 */ private String content; // ... 省略 setter、getter、toString 方法 }
建立 ChatSendResponse 类,聊天发送消息结果的响应。代码以下:
public class ChatSendResponse implements Message { public static final String TYPE = "CHAT_SEND_RESPONSE"; /** * 消息编号 */ private String msgId; /** * 响应状态码 */ private Integer code; /** * 响应提示 */ private String message; // ... 省略 setter、getter、toString 方法 }
建立 ChatRedirectToUserRequest 类, 转发消息给一个用户的请求。代码以下:
public class ChatRedirectToUserRequest implements Message { public static final String TYPE = "CHAT_REDIRECT_TO_USER_REQUEST"; /** * 消息编号 */ private String msgId; /** * 内容 */ private String content; // ... 省略 setter、getter、toString 方法 }
友情提示:写完以后,艿艿忽然发现少了一个
fromUser
字段,表示来自谁的消息。
服务端...
建立 ChatSendToOneHandler 类,为服务端处理客户端的私聊请求。代码以下:
@Component public class ChatSendToOneHandler implements MessageHandler<ChatSendToOneRequest> { @Autowired private NettyChannelManager nettyChannelManager; @Override public void execute(Channel channel, ChatSendToOneRequest message) { // <1> 这里,伪装直接成功 ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0); channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse)); // <2> 建立转发的消息,发送给指定用户 ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId()) .setContent(message.getContent()); nettyChannelManager.send(message.getToUser(), new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest)); } @Override public String getType() { return ChatSendToOneRequest.TYPE; } }
代码比较简单,胖友看看 <1>
、<2>
上的注释。
客户端...
建立 ChatSendResponseHandler 类,为客户端处理服务端的聊天响应。代码以下:
@Component public class ChatSendResponseHandler implements MessageHandler<ChatSendResponse> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void execute(Channel channel, ChatSendResponse message) { logger.info("[execute][发送结果:{}]", message); } @Override public String getType() { return ChatSendResponse.TYPE; } }
打印个聊天发送结果,方便调试。
客户端
建立 ChatRedirectToUserRequestHandler 类,为客户端处理服务端的转发消息的请求。代码以下:
@Component public class ChatRedirectToUserRequestHandler implements MessageHandler<ChatRedirectToUserRequest> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void execute(Channel channel, ChatRedirectToUserRequest message) { logger.info("[execute][收到消息:{}]", message); } @Override public String getType() { return ChatRedirectToUserRequest.TYPE; } }
打印个聊天接收消息,方便调试。
① 启动 Netty Server 服务端。
② 启动 Netty Client 客户端 A。而后使用 Postman 模拟一次认证请求(用户为 yunai
)。以下图所示:
③ 启动 Netty Client 客户端 B。注意,须要设置 --server.port
端口为 8081,避免冲突。以下图所示:
而后使用 Postman 模拟一次认证请求(用户为 tutou
)。以下图所示:
④ 最后使用 Postman 模拟一次 yunai
芋艿给 tutou
土豆发送一次私聊消息。以下图所示:
同时,能够看到客户端 A 向客户端 B 发送私聊消息的日志以下:
// 客户端 A...(芋艿) 2020-06-22 08:48:09.505 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [decode][链接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tudou", msgId: "1", content: "你猜"}'})] 2020-06-22 08:48:09.510 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})] 2020-06-22 08:48:09.511 INFO 57583 --- [ool-1-thread-69] c.i.s.l.n.m.c.ChatSendResponseHandler : [execute][发送结果:ChatSendResponse{msgId='1', code=0, message='null'}] 2020-06-22 08:48:35.148 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [decode][链接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tutou", msgId: "1", content: "你猜"}'})] 2020-06-22 08:48:35.150 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})] 2020-06-22 08:48:35.150 INFO 57583 --- [ool-1-thread-70] c.i.s.l.n.m.c.ChatSendResponseHandler : [execute][发送结果:ChatSendResponse{msgId='1', code=0, message='null'}] // 服务端 ... 2020-06-22 08:48:35.149 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(791f122b) 解析到一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tutou", msgId: "1", content: "你猜"}'})] 2020-06-22 08:48:35.149 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder : [decode][链接(791f122b) 编码了一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})] 2020-06-22 08:48:35.149 INFO 56998 --- [ntLoopGroup-3-3] c.i.s.l.n.codec.InvocationEncoder : [decode][链接(79cb3a1e) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"你猜","msgId":"1"}'})] // 客户端 B...(秃头) 2020-06-22 08:48:18.277 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.c.handler.NettyClientHandler : [userEventTriggered][发起一次心跳] 2020-06-22 08:48:18.278 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [encode][链接(24fbc3e8) 编码了一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})] 2020-06-22 08:48:18.280 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(24fbc3e8) 解析到一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})] 2020-06-22 08:48:18.281 INFO 59613 --- [pool-1-thread-4] c.i.s.l.n.m.h.HeartbeatResponseHandler : [execute][收到链接(24fbc3e8) 的心跳响应] 2020-06-22 08:48:35.150 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(24fbc3e8) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"你猜","msgId":"1"}'})] 2020-06-22 08:48:35.151 INFO 59613 --- [pool-1-thread-5] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='1', content='你猜'}]
群聊的过程,以下图所示:
服务端负责将客户端 A 发送的群聊消息,转发给客户端 A、B、C。
友情提示:考虑到逻辑简洁,艿艿提供的本小节的示例,并非一个一个群,而是全部人在一个大的群聊中哈~
建立 ChatSendToOneRequest 类,发送给全部人的群聊消息的请求。代码以下:
public class ChatSendToAllRequest implements Message { public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST"; /** * 消息编号 */ private String msgId; /** * 内容 */ private String content; // ... 省略 setter、getter、toString 方法 }
友情提示:若是是正经的群聊,会有一个
groupId
字段,表示群编号。
和「8.2 ChatSendResponse」小节一致。
和「8.3 ChatRedirectToUserRequest」小节一致。
服务端...
建立 ChatSendToAllHandler 类,为服务端处理客户端的群聊请求。代码以下:
@Component public class ChatSendToAllHandler implements MessageHandler<ChatSendToAllRequest> { @Autowired private NettyChannelManager nettyChannelManager; @Override public void execute(Channel channel, ChatSendToAllRequest message) { // <1> 这里,伪装直接成功 ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0); channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse)); // <2> 建立转发的消息,并广播发送 ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId()) .setContent(message.getContent()); nettyChannelManager.sendAll(new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest)); } @Override public String getType() { return ChatSendToAllRequest.TYPE; } }
代码比较简单,胖友看看 <1>
、<2>
上的注释。
和「8.5 ChatSendResponseHandler」小节一致。
和「8.6 ChatRedirectToUserRequestHandler」小节一致。
① 启动 Netty Server 服务端。
② 启动 Netty Client 客户端 A。而后使用 Postman 模拟一次认证请求(用户为 yunai
)。以下图所示:
③ 启动 Netty Client 客户端 B。注意,须要设置 --server.port
端口为 8081,避免冲突。
④ 启动 Netty Client 客户端 C。注意,须要设置 --server.port
端口为 8082,避免冲突。
⑤ 最后使用 Postman 模拟一次发送群聊消息。以下图所示:
同时,能够看到客户端 A 群发给全部客户端的日志以下:
// 客户端 A... 2020-06-22 08:55:44.898 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [decode][链接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ALL_REQUEST', message='{msgId: "2", content: "广播消息"}'})] 2020-06-22 08:55:44.901 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"2"}'})] 2020-06-22 08:55:44.901 INFO 57583 --- [ol-1-thread-148] c.i.s.l.n.m.c.ChatSendResponseHandler : [execute][发送结果:ChatSendResponse{msgId='2', code=0, message='null'}] 2020-06-22 08:55:44.901 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(9e086597) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})] 2020-06-22 08:55:44.903 INFO 57583 --- [ol-1-thread-149] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}] // 服务端... 2020-06-22 08:55:44.898 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(791f122b) 解析到一条消息(Invocation{type='CHAT_SEND_TO_ALL_REQUEST', message='{msgId: "2", content: "广播消息"}'})] 2020-06-22 08:55:44.901 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder : [decode][链接(791f122b) 编码了一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"2"}'})] 2020-06-22 08:55:44.901 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder : [decode][链接(791f122b) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})] 2020-06-22 08:55:44.901 INFO 56998 --- [ntLoopGroup-3-3] c.i.s.l.n.codec.InvocationEncoder : [decode][链接(79cb3a1e) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})] 2020-06-22 08:55:44.901 INFO 56998 --- [ntLoopGroup-3-4] c.i.s.l.n.codec.InvocationEncoder : [decode][链接(9dc03826) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})] // 客户端 B... 2020-06-22 08:55:44.902 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(24fbc3e8) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})] 2020-06-22 08:55:44.902 INFO 59613 --- [ool-1-thread-83] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}] // 客户端 C... 2020-06-22 08:55:44.901 INFO 61597 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][链接(9128c71c) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})] 2020-06-22 08:55:44.903 INFO 61597 --- [ool-1-thread-16] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]
至此,咱们已经经过 Netty 实现了一个简单的 IM 功能,是否是收获蛮大的,嘿嘿。
下面,良心的艿艿,再来推荐一波文章,嘿嘿。
等后续,艿艿会在 https://github.com/YunaiV/one... 开源项目中,实现一个相对完整的客服功能,哈哈哈~