跟着源码学IM(八):万字长文,手把手教你用Netty打造IM聊天

本文做者芋艿,原题“使用 Netty 实现 IM 聊天贼简单”,本底价有修订和改动。html

1、本文引言

上篇《跟着源码学IM(七):手把手教你用WebSocket打造Web端IM聊天》中,咱们使用 WebSocket 实现了一个简单的 IM 功能,支持身份认证、私聊消息、群聊消息。前端

而后就有人发私信,但愿使用纯 Netty 实现一个相似的功能,所以就有了本文。java

注:源码请从同步连接附件中下载,http://www.52im.net/thread-34...git

学习交流:github

  • 即时通信/推送技术开发交流5群:215477170 [推荐]
  • 移动端IM开发入门文章:《新手入门一篇就够:从零开发移动端IM》
  • 开源IM框架源码:https://github.com/JackJiang2...

(本文同步发布于:http://www.52im.net/thread-34...算法

2、知识准备

可能有人不知道 Netty 是什么,这里简单介绍下:spring

Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。数据库

也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 能够确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。编程

Netty 至关简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。后端

如下是几篇有关Netty的入门文章,值得一读:

《新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析》
《写给初学者:Java高性能NIO框架Netty的学习方法和进阶策略》
《史上最通俗Netty框架入门长文:基本介绍、环境搭建、动手实战》

若是你连Java的NIO都不知道是什么,下面的文章建议优先读一下:

《少啰嗦!一分钟带你读懂Java的NIO和经典IO的区别》
《史上最强Java NIO入门:担忧从入门到放弃的,请读这篇!》
《Java的BIO和NIO很难懂?用代码实践给你看,再不懂我转行!》

Netty源码和API的在线阅读地址:

1)Netty-4.1.x 完整源码(在线阅读版)(* 推荐)
2)Netty-4.0.x 完整源码(在线阅读版)
3)Netty-4.1.x API文档(在线版)(* 推荐)
4)Netty-4.0.x API文档(在线版)

3、本文源码

本文完整代码附件下载:请从同步连接附件中下载,http://www.52im.net/thread-34...

源码的目录结构,以下图所示:

如上图所示:

1)lab-67-netty-demo-server 项目:搭建 Netty 服务端;
2)lab-67-netty-demo-client 项目:搭建 Netty 客户端;
3)lab-67-netty-demo-common 项目:提供 Netty 的基础封装,提供消息的编解码、分发的功能。
另外,源码中也会提供 Netty 经常使用功能的示例:

1)心跳机制,实现服务端对客户端的存活检测;
2)断线重连,实现客户端对服务端的从新链接。
不哔哔,直接开干。

5、通讯协议

在上一章中,咱们实现了客户端和服务端的链接功能。而本小节,咱们要让它们两可以说上话,即进行数据的读写。

在平常项目的开发中,前端和后端之间采用 HTTP 做为通讯协议,使用文本内容进行交互,数据格式通常是 JSON。可是在 TCP 的世界里,咱们须要本身基于二进制构建,构建客户端和服务端的通讯协议。

咱们以客户端向服务端发送消息来举个例子,假设客户端要发送一个登陆请求。

对应的类以下:

public class AuthRequest {

/** 用户名 **/

private String username;

/** 密码 **/

private String password;

}

显然:咱们没法将一个 Java 对象直接丢到 TCP Socket 当中,而是须要将其转换成 byte 字节数组,才能写入到 TCP Socket 中去。即,须要将消息对象经过序列化,转换成 byte 字节数组。

同时:在服务端收到 byte 字节数组时,须要将其又转换成 Java 对象,即反序列化。否则,服务端对着一串 byte 字节处理个毛线?!

友情提示:服务端向客户端发消息,也是同样的过程哈!

序列化的工具很是多,例如说 Google 提供的 Protobuf,性能高效,且序列化出来的二进制数据较小。Netty 对 Protobuf 进行集成,提供了相应的编解码器。

以下图所示:

可是考虑到不少可能对 Protobuf 并不了解,由于它实现序列化又增长额外学习成本。所以,仔细一个捉摸,仍是采用 JSON 方式进行序列化。可能有人会疑惑,JSON 不是将对象转换成字符串吗?嘿嘿,咱们再把字符串转换成 byte 字节数组就能够啦~

下面,咱们新建 lab-67-netty-demo-common 项目,并在 codec 包下,实现咱们自定义的通讯协议。

以下图所示:

5.一、Invocation
建立 Invocation 类,通讯协议的消息体。

代码以下:

/**

  • 通讯协议的消息体

*/

public class Invocation {

/**

 * 类型

 */

private String type;

/**

 * 消息,JSON 格式

 */

private String message;



// 空构造方法

public Invocation() {

}



public Invocation(String type, String message) {

    this.type = type;

    this.message = message;

}



public Invocation(String type, Message message) {

    this.type = type;

    this.message = JSON.toJSONString(message);

}



// ... 省略 setter、getter、toString 方法

}

① type 属性,类型,用于匹配对应的消息处理器。若是类比 HTTP 协议,type 属性至关于请求地址。

② message 属性,消息内容,使用 JSON 格式。

另外,Message 是咱们定义的消息接口,代码以下:

public interface Message {

// ... 空,做为标记接口

}

5.二、粘包与拆包
在开始看 Invocation 的编解码处理器以前,咱们先了解下粘包与拆包的概念。

5.2.1 产生缘由
产生粘包和拆包问题的主要缘由是,操做系统在发送 TCP 数据的时候,底层会有一个缓冲区,例如 1024 个字节大小。

若是一次请求发送的数据量比较小,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送,这就造成了粘包问题。

例如说:在《详解 Socket 编程 --- TCP_NODELAY 选项》文章中咱们能够看到,在关闭 Nagle 算法时,请求不会等待知足缓冲区大小,而是尽快发出,下降延迟。

若是一次请求发送的数据量比较大,超过了缓冲区大小,TCP 就会将其拆分为屡次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。

以下图展现了粘包和拆包的一个示意图,演示了粘包和拆包的三种状况:

如上图所示:

1)A 和 B 两个包都恰好知足 TCP 缓冲区的大小,或者说其等待时间已经达到 TCP 等待时长,从而仍是使用两个独立的包进行发送;
2)A 和 B 两次请求间隔时间内较短,而且数据包较小,于是合并为同一个包发送给服务端;
3)B 包比较大,于是将其拆分为两个包 B_1 和 B_2 进行发送,而这里因为拆分后的 B_2 比较小,其又与 A 包合并在一块儿发送。
5.2.2 解决方案
对于粘包和拆包问题,常见的解决方案有三种。

① 客户端在发送数据包的时候,每一个包都固定长度。好比 1024 个字节大小,若是客户端发送的数据长度不足 1024 个字节,则经过补充空格的方式补全到指定长度。

这种方式,暂时没有找到采用这种方式的案例。

② 客户端在每一个包的末尾使用固定的分隔符。例如 \r\n,若是一个包被拆分了,则等待下一个包发送过来以后找到其中的 \r\n,而后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就获得了一个完整的包。具体的案例,有 HTTP、WebSocket、Redis。

③ 将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息以后才算是读到了一个完整的消息。

友情提示:方案 ③ 是 ① 的升级版,动态长度。

本文将采用这种方式,在每次 Invocation 序列化成字节数组写入 TCP Socket 以前,先将字节数组的长度写到其中。

以下图所示:

5.三、InvocationEncoder
建立 InvocationEncoder 类,实现将 Invocation 序列化,并写入到 TCP Socket 中。

代码以下:

public class InvocationEncoder extends MessageToByteEncoder<Invocation> {

private Logger logger = LoggerFactory.getLogger(getClass());



@Override

protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {

    // <2.1> 将 Invocation 转换成 byte[] 数组

    byte[] content = JSON.toJSONBytes(invocation);

    // <2.2> 写入 length

    out.writeInt(content.length);

    // <2.3> 写入内容

    out.writeBytes(content);

    logger.info("[encode][链接({}) 编码了一条消息({})]", ctx.channel().id(), invocation.toString());

}

}

① MessageToByteEncoder 是 Netty 定义的编码 ChannelHandler 抽象类,将泛型 消息转换成字节数组。

② #encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) 方法,进行编码的逻辑。

<2.1> 处,调用 JSON 的 #toJSONBytes(Object object, SerializerFeature... features) 方法,将 Invocation 转换成 字节数组。

<2.2> 处,将字节数组的长度,写入到 TCP Socket 当中。这样,后续「5.4 InvocationDecoder」能够根据该长度,解析到消息,解决粘包和拆包的问题。

友情提示:MessageToByteEncoder 会最终将 ByteBuf out 写到 TCP Socket 中。

<2.3> 处,将字节数组,写入到 TCP Socket 当中。

5.四、InvocationDecoder
建立 InvocationDecoder 类,实现从 TCP Socket 读取字节数组,反序列化成 Invocation。

代码以下:

① ByteToMessageDecoder 是 Netty 定义的解码 ChannelHandler 抽象类,在 TCP Socket 读取到新数据时,触发进行解码。

② 在 <2.1>、<2.2>、<2.3> 处,从 TCP Socket 中读取长度。

③ 在 <3.1>、<3.2>、<3.3> 处,从 TCP Socket 中读取字节数组,并反序列化成 Invocation 对象。

最终,添加 List<Object> out 中,交给后续的 ChannelHandler 进行处理。稍后,咱们将在「6. 消息分发」小结中,会看到 MessageDispatcher 将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。

5.五、引入依赖
建立 pom.xml 文件,引入 Netty、FastJSON 等等依赖。

5.六、本章小结
至此,咱们已经完成通讯协议的定义、编解码的逻辑,是否是蛮有趣的?!

另外,咱们在 NettyServerHandlerInitializer 和 NettyClientHandlerInitializer 的初始化代码中,将编解码器添加到其中。

以下图所示:

6、消息分发

在 SpringMVC 中,DispatcherServlet 会根据请求地址、方法等,将请求分发到匹配的 Controller 的 Method 方法上。

在 lab-67-netty-demo-client 项目的 dispatcher 包中,咱们建立了 MessageDispatcher 类,实现和 DispatcherServlet 相似的功能,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。

下面,咱们来看看具体的代码实现。

6.一、Message
建立 Message 接口,定义消息的标记接口。

代码以下:

public interface Message {

}

下图,是咱们涉及到的 Message 实现类。

以下图所示:

6.二、MessageHandler
建立 MessageHandler 接口,消息处理器接口。

代码以下:

public interface MessageHandler<T extendsMessage> {

/**

 * 执行处理消息

 *

 * @param channel 通道

 * @param message 消息

 */

voide xecute(Channel channel, T message);



/**

 * @return 消息类型,即每一个 Message 实现类上的 TYPE 静态字段

 */

String getType();

}

如上述代码所示:

1)定义了泛型 <T> ,须要是 Message 的实现类;
2)定义的两个接口方法,本身看下注释哈。
下图,是咱们涉及到的 MessageHandler 实现类。

以下图所示:

6.三、MessageHandlerContainer
建立 MessageHandlerContainer 类,做为 MessageHandler 的容器。

代码以下:

① 实现 InitializingBean 接口,在 #afterPropertiesSet() 方法中,扫描全部 MessageHandler Bean ,添加到 MessageHandler 集合中。

② 在 #getMessageHandler(String type) 方法中,得到类型对应的 MessageHandler 对象。稍后,咱们会在 MessageDispatcher 调用该方法。

③ 在 #getMessageClass(MessageHandler handler) 方法中,经过 MessageHandler 中,经过解析其类上的泛型,得到消息类型对应的 Class 类。这是参考 rocketmq-spring 项目的 DefaultRocketMQListenerContainer#getMessageType() 方法,进行略微修改。

6.四、MessageDispatcher
建立 MessageDispatcher 类,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。

代码以下:

@ChannelHandler.Sharable

public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> {

@Autowired

private MessageHandlerContainer messageHandlerContainer;



private final ExecutorService executor =  Executors.newFixedThreadPool(200);



@Override

protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) {

    // <3.1> 得到 type 对应的 MessageHandler 处理器

    MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());

    // 得到  MessageHandler 处理器的消息类

    Class<? extendsMessage> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);

    // <3.2> 解析消息

    Message message = JSON.parseObject(invocation.getMessage(), messageClass);

    // <3.3> 执行逻辑

    executor.submit(newRunnable() {



        @Override

        public void run() {

            // noinspection unchecked

            messageHandler.execute(ctx.channel(), message);

        }

    });

}

}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 能够被多个 Channel 使用。

② SimpleChannelInboundHandler 是 Netty 定义的消息处理 ChannelHandler 抽象类,处理消息的类型是 泛型时。

③ #channelRead0(ChannelHandlerContext ctx, Invocation invocation) 方法,处理消息,进行分发。

<3.1> 处,调用 MessageHandlerContainer 的 #getMessageHandler(String type) 方法,得到 Invocation 的 type 对应的 MessageHandler 处理器。

而后,调用 MessageHandlerContainer 的 #getMessageClass(messageHandler) 方法,得到 MessageHandler 处理器的消息类。

<3.2> 处,调用 JSON 的 ## parseObject(String text, Class<T> clazz) 方法,将 Invocation 的 message 解析成 MessageHandler 对应的消息对象。

<3.3> 处,丢到线程池中,而后调用 MessageHandler 的 #execute(Channel channel, T message) 方法,执行业务逻辑。

注意:为何要丢到 executor 线程池中呢?咱们先来了解下 EventGroup 的线程模型。

友情提示:在咱们启动 Netty 服务端或者客户端时,都会设置其 EventGroup。

EventGroup 咱们能够先简单理解成一个线程池,而且线程池的大小仅仅是 CPU 数量 * 2。每一个 Channel 仅仅会被分配到其中的一个线程上,进行数据的读写。而且,多个 Channel 会共享一个线程,即便用同一个线程进行数据的读写。

那么试着思考下,MessageHandler 的具体逻辑视线中,每每会涉及到 IO 处理,例如说进行数据库的读取。这样,就会致使一个 Channel 在执行 MessageHandler 的过程当中,阻塞了共享当前线程的其它 Channel 的数据读取。

所以,咱们在这里建立了 executor 线程池,进行 MessageHandler 的逻辑执行,避免阻塞 Channel 的数据读取。

可能会有人说,咱们是否是可以把 EventGroup 的线程池设置大一点,例如说 200 呢?对于长链接的 Netty 服务端,每每会有 1000 ~ 100000 的 Netty 客户端链接上来,这样不管设置多大的线程池,都会出现阻塞数据读取的状况。

友情提示:executor 线程池,咱们通常称之为业务线程池或者逻辑线程池,顾名思义,就是执行业务逻辑的。这样的设计方式,目前 Dubbo 等等 RPC 框架,都采用这种方式。后续,能够认真阅读下《【NIO 系列】——之 Reactor 模型》文章,进一步理解。

6.五、NettyServerConfig
建立 NettyServerConfig 配置类,建立 MessageDispatcher 和 MessageHandlerContainer Bean。

代码以下:

@Configuration

public class NettyServerConfig {

@Bean

public MessageDispatcher messageDispatcher() {

    return new MessageDispatcher();

}



@Bean

public MessageHandlerContainer messageHandlerContainer() {

    return new MessageHandlerContainer();

}

}

6.六、NettyClientConfig
建立 NettyClientConfig 配置类,建立 MessageDispatcher 和 MessageHandlerContainer Bean。

代码以下:

@Configuration

public class NettyClientConfig {

@Bean

public MessageDispatcher messageDispatcher() {

    return new MessageDispatcher();

}

@Bean

public MessageHandlerContainer messageHandlerContainer() {

    return new MessageHandlerContainer();

}

}

6.七、本章小结
后续,咱们将在以下小节,具体演示消息分发的使用。

7、断开重连

Netty 客户端须要实现断开重连机制,解决各类状况下的断开状况。

例如说:

1)Netty 客户端启动时,Netty 服务端处于挂掉,致使没法链接上;
2)在运行过程当中,Netty 服务端挂掉,致使链接被断开;
3)任一一端网络抖动,致使链接异常断开。
具体的代码实现比较简单,只须要在两个地方增长重连机制:

1)Netty 客户端启动时,没法链接 Netty 服务端时,发起重连;
2)Netty 客户端运行时,和 Netty 断开链接时,发起重连。
考虑到重连会存在失败的状况,咱们采用定时重连的方式,避免占用过多资源。

7.一、具体代码
① 在 NettyClient 中,提供 #reconnect() 方法,实现定时重连的逻辑。

代码以下:

// NettyClient.java

public void reconnect() {

eventGroup.schedule(new Runnable() {

    @Override

    publicvoidrun() {

        logger.info("[reconnect][开始重连]");

        try{

            start();

        } catch(InterruptedException e) {

            logger.error("[reconnect][重连失败]", e);

        }

    }

}, RECONNECT_SECONDS, TimeUnit.SECONDS);

logger.info("[reconnect][{} 秒后将发起重连]", RECONNECT_SECONDS);

}

经过调用 EventLoop 提供的 #schedule(Runnable command, long delay, TimeUnit unit) 方法,实现定时逻辑。而在内部的具体逻辑,调用 NettyClient 的 #start() 方法,发起链接 Netty 服务端。

又由于 NettyClient 在 #start() 方法在链接 Netty 服务端失败时,又会调用 #reconnect() 方法,从而再次发起定时重连。如此循环反复,知道 Netty 客户端链接上 Netty 服务端。

以下图所示:

② 在 NettyClientHandler 中,实现 #channelInactive(ChannelHandlerContext ctx) 方法,在发现和 Netty 服务端断开时,调用 Netty Client 的 #reconnect() 方法,发起重连。

代码以下:

// NettyClientHandler.java

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

// 发起重连

nettyClient.reconnect();

// 继续触发事件

super.channelInactive(ctx);

}

7.二、简单测试
① 启动 Netty Client,不要启动 Netty Server,控制台打印日志以下图:

能够看到 Netty Client 在链接失败时,不断发起定时重连。

② 启动 Netty Server,控制台打印以下图:

能够看到 Netty Client 成功重连上 Netty Server。

8、心跳机制与空闲检测

咱们能够了解到 TCP 自带的空闲检测机制,默认是 2 小时。这样的检测机制,从系统资源层面上来讲是能够接受的。

可是在业务层面,若是 2 小时才发现客户端与服务端的链接实际已经断开,会致使中间很是多的消息丢失,影响客户的使用体验。

所以,咱们须要在业务层面,本身实现空闲检测,保证尽快发现客户端与服务端实际已经断开的状况。

实现逻辑以下:

1)服务端发现 180 秒未从客户端读取到消息,主动断开链接;
2)客户端发现 180 秒未从服务端读取到消息,主动断开链接。
考虑到客户端和服务端之间并非一直有消息的交互,因此咱们须要增长心跳机制。

逻辑以下:

1)客户端每 60 秒向服务端发起一次心跳消息,保证服务端能够读取到消息;
2)服务端在收到心跳消息时,回复客户端一条确认消息,保证客户端能够读取到消息。
友情提示:

为何是 180 秒?能够加大或者减少,看本身但愿多快检测到链接异常。太短的时间,会致使心跳过于频繁,占用过多资源。

为何是 60 秒?三次机会,确认是否心跳超时。

虽然听起来有点复杂,可是实现起来并不复杂哈。

8.一、服务端的空闲检测
在 NettyServerHandlerInitializer 中,咱们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。

以下图所示:

经过这样的方式,实现服务端发现 180 秒未从客户端读取到消息,主动断开链接。

8.二、客户端的空闲检测
在 NettyClientHandlerInitializer 中,咱们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。

以下图所示:

经过这样的方式,实现客户端发现 180 秒未从服务端读取到消息,主动断开链接。

8.三、心跳机制
Netty 提供了 IdleStateHandler 处理器,提供空闲检测的功能,在 Channel 的读或者写空闲时间太长时,将会触发一个 IdleStateEvent 事件。

这样,咱们只须要在 NettyClientHandler 处理器中,在接收到 IdleStateEvent 事件时,客户端向客户端发送一次心跳消息。

以下图所示:

其中,HeartbeatRequest 是心跳请求。

同时,咱们在服务端项目中,建立了一个 HeartbeatRequestHandler 消息处理器,在收到客户端的心跳请求时,回复客户端一条确认消息。

代码以下:

@Component

public class HeartbeatRequestHandler implementsMessageHandler<HeartbeatRequest> {

private Logger logger = LoggerFactory.getLogger(getClass());



@Override

public void execute(Channel channel, HeartbeatRequest message) {

    logger.info("[execute][收到链接({}) 的心跳请求]", channel.id());

    // 响应心跳

    HeartbeatResponse response = newHeartbeatResponse();

    channel.writeAndFlush(newInvocation(HeartbeatResponse.TYPE, response));

}



@Override

public String getType() {

    return HeartbeatRequest.TYPE;

}

}

其中,HeartbeatResponse 是心跳确认响应。

8.四、简单测试
启动 Netty Server 服务端,再启动 Netty Client 客户端,耐心等待 60 秒后,能够看到心跳日志以下:

9、认证逻辑

从本小节开始,咱们就具体看看业务逻辑的处理示例。

认证的过程,以下图所示:

9.一、AuthRequest
建立 AuthRequest 类,定义用户认证请求。

代码以下:

public class AuthRequest implements Message {

public static final String TYPE = "AUTH_REQUEST";

/**

* 认证 Token

 */

private String accessToken;

// ... 省略 setter、getter、toString 方法

}

这里咱们使用 accessToken 认证令牌进行认证。

由于通常状况下,咱们使用 HTTP 进行登陆系统,而后使用登陆后的身份标识(例如说 accessToken 认证令牌),将客户端和当前用户进行认证绑定。

9.二、AuthResponse
建立 AuthResponse 类,定义用户认证响应。

代码以下:

public class AuthResponse implements Message {

public static final String TYPE = "AUTH_RESPONSE";



/**

 * 响应状态码

 */

private Integer code;

/**

 * 响应提示

 */

private String message;



// ... 省略 setter、getter、toString 方法

}

9.三、AuthRequestHandler
服务端...

建立 AuthRequestHandler 类,为服务端处理客户端的认证请求。

代码以下:

代码比较简单,看看 <1>、<2>、<3>、<4> 上的注释。

9.四、AuthResponseHandler
客户端...

建立 AuthResponseHandler 类,为客户端处理服务端的认证响应。

代码以下:

@Component

public class AuthResponseHandler implements MessageHandler<AuthResponse> {

private Logger logger = LoggerFactory.getLogger(getClass());



@Override

public void execute(Channel channel, AuthResponse message) {

    logger.info("[execute][认证结果:{}]", message);

}



@Override

public String getType() {

    return AuthResponse.TYPE;

}

}

打印个认证结果,方便调试。

9.五、TestController
客户端...

建立 TestController 类,提供 /test/mock 接口,模拟客户端向服务端发送请求。

代码以下:

@RestController

@RequestMapping("/test")

public class TestController {

@Autowired

private NettyClient nettyClient;



@PostMapping("/mock")

public String mock(String type, String message) {

    // 建立 Invocation 对象

    Invocation invocation = new Invocation(type, message);

    // 发送消息

    nettyClient.send(invocation);

    return "success";

}

}

9.六、简单测试
启动 Netty Server 服务端,再启动 Netty Client 客户端,而后使用 Postman 模拟一次认证请求。

以下图所示:

同时,能够看到认证成功的日志以下:

11、群聊逻辑

群聊的过程,以下图所示:

服务端负责将客户端 A 发送的群聊消息,转发给客户端 A、B、C。

友情提示:考虑到逻辑简洁,提供的本小节的示例并非一个一个群,而是全部人在一个大的群聊中哈~

11.一、ChatSendToAllRequest
建立 ChatSendToOneRequest 类,发送给全部人的群聊消息的请求。

代码以下:

public class ChatSendToAllRequest implements Message {

public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST";

/**

 * 消息编号

 */

private String msgId;

/**

 * 内容

 */

private String content;



// ... 省略 setter、getter、toString 方法

}

PS:若是是正经的群聊,会有一个 groupId 字段,表示群编号。

11.二、ChatSendToAllHandler
服务端...

建立 ChatSendToAllHandler 类,为服务端处理客户端的群聊请求。

代码以下:

代码比较简单,看看 <1>、<2> 上的注释。

11.三、简单测试
① 启动 Netty Server 服务端。

② 启动 Netty Client 客户端 A。而后使用 Postman 模拟一次认证请求(用户为 yunai)。

以下图所示:

③ 启动 Netty Client 客户端 B。注意,须要设置 --server.port 端口为 8081,避免冲突。

④ 启动 Netty Client 客户端 C。注意,须要设置 --server.port 端口为 8082,避免冲突。

⑤ 最后使用 Postman 模拟一次发送群聊消息。

以下图所示:

同时,能够看到客户端 A 群发给全部客户端的日志以下:

最后,要想系统地学习IM开发的方方面面,请继续阅读:《新手入门一篇就够:从零开发移动端IM》

附录、系列文章

《跟着源码学IM(一):手把手教你用Netty实现心跳机制、断线重连机制》
《跟着源码学IM(二:自已开发IM很难?手把手教你撸一个Andriod版IM》
《跟着源码学IM(三)基于Netty,从零开发一个IM服务端》
《跟着源码学IM(四)拿起键盘就是干,教你徒手开发一套分布式IM系统》
《跟着源码学IM(五):正确理解IM长链接、心跳及重连机制,并动手实现》
《跟着源码学IM(六):手把手教你用Go快速搭建高性能、可扩展的IM系统》
《跟着源码学IM(七):手把手教你用WebSocket打造Web端IM聊天》
《跟着源码学IM(八):万字长文,手把手教你用Netty打造IM聊天》(* 本文)

本文已同步发布于“即时通信技术圈”公众号。

▲ 本文在公众号上的连接是:点此进入。同步发布连接是:http://www.52im.net/thread-34...

相关文章
相关标签/搜索