好消息:IM1.0.0版本已经上线啦,支持特性:html
github连接: https://github.com/yuanrw/IMnode
本篇将带你们从零开始搭建一个轻量级的IM服务端,IM的总体设计思路和架构在个人上篇博客中已经讲过了,没看过的同窗请点击从零开始开发IM(即时通信)服务端 。git
这篇将给你们带来更多的细节实现。我将从三个方面来阐述如何构建一个完整可靠的IM系统。github
什么是可靠性?对于一个IM系统来讲,可靠的定义至少是不丢消息、消息不重复、不乱序,知足这三点,才能说有一个好的聊天体验。数据库
咱们先从不丢消息开始讲起。数组
首先复习一下上一篇设计的服务端架构:
安全
咱们先从一个简单例子开始思考:当Alice给Bob发送一条消息时,可能要通过这样一条链路:
服务器
在这整个链路中的每一个环节都有可能出问题,虽然tcp协议是可靠的,可是它只能保证链路层的可靠,没法保证应用层的可靠。网络
例如在第一步中,connector
收到了从client
发出的消息,可是转发给transfer
失败,那么这条消息Bob就没法收到,而Alice也不会意识到消息发送失败了。session
若是Bob状态是离线,那么消息链路就是:
若是在第三步中,transfer
收到了来自connector
的消息,可是离线消息入库失败,
那么这个消息也是传递失败了。
为了保证应用层的可靠,咱们必需要有一个ack机制,使发送方可以确认对方收到了这条消息。
具体的实现,咱们模仿tcp协议作一个应用层的ack机制。
tcp的报文是以字节(byte)
为单位的,而咱们以message
单位。
发送方每次发送一个消息,就要等待对方的ack回应,在ack确认消息中应该带有收到的id以便发送方识别。
其次,发送方须要维护一个等待ack的队列。 每次发送一个消息以后,就将消息和一个计时器入队。
另外存在一个线程一直轮询队列,若是有超时未收到ack的,就取出消息重发。
超时未收到ack的消息有两种处理方式:
connector
长时间未收到client
的ack,那么能够主动断开和客户端的链接,剩下未发送的消息就做为离线消息入库,客户端断连后尝试重连服务器便可。有的时候由于网络缘由可能致使ack收到较慢,发送方就会重复发送,那么接收方必须有一个去重机制。
去重的方式是给每一个消息增长一个惟一id。这个惟一id并不必定是全局的,只须要在一个会话中惟一便可。例如某两我的的会话,或者某一个群。若是网络断连了,从新链接后,就是新的会话了,id会从新从0开始。
接收方须要在当前会话中维护收到的最后一个消息的id,叫作lastId
。
每次收到一个新消息, 就将id与lastId
做比较看是否连续,若是不连续,就放入一个暂存队列 queue中稍后处理。
例如:
当前会话的lastId
=1,接着服务器收到了消息msg(id=2)
,能够判断收到的消息是连续的,就处理消息,将lastId
修改成2。
可是若是服务器收到消息msg(id=3)
,就说明消息乱序到达了,那么就将这个消息入队,等待lastId
变为2后,(即服务器收到消息msg(id=2)
并处理完了),再取出这个消息处理。
所以,判断消息是否重复只须要判断msgId>lastId && !queue.contains(msgId)
便可。若是收到重复的消息,能够判断是ack未送达,就再发送一次ack。
接收方收到消息后完整的处理流程以下:
伪代码以下:
class ProcessMsgNode{ /** * 接收到的消息 */ private Message message; /** * 处理消息的方法 */ private Consumer<Message> consumer; } public CompletableFuture<Void> offer(Long id,Message message,Consumer<Message> consumer) { if (isRepeat(id)) { //消息重复 sendAck(id); return null; } if (!isConsist(id)) { //消息不连续 notConsistMsgMap.put(id, new ProcessMsgNode(message, consumer)); return null; } //处理消息 return process(id, message, consumer); } private CompletableFuture<Void> process(Long id, Message message, Consumer<Message> consumer) { return CompletableFuture .runAsync(() -> consumer.accept(message)) .thenAccept(v -> sendAck(id)) .thenAccept(v -> lastId.set(id)) .thenComposeAsync(v -> { Long nextId = nextId(id); if (notConsistMsgMap.containsKey(nextId)) { //队列中有下个消息 ProcessMsgNode node = notConsistMsgMap.get(nextId); return process(nextId, node.getMessage(), consumer); } else { //队列中没有下个消息 CompletableFuture<Void> future = new CompletableFuture<>(); future.complete(null); return future; } }) .exceptionally(e -> { logger.error("[process received msg] has error", e); return null; }); }
不管是聊天记录仍是离线消息,确定都会在服务端存储备份,那么消息的安全性,保护客户的隐私也相当重要。
所以全部的消息都必需要加密处理。
在存储模块里,维护用户信息和关系链有两张基础表,分别是im_user
用户表和im_relation
关系链表。
im_user
表用于存放用户常规信息,例如用户名密码等,结构比较简单。im_relation
表用于记录好友关系,结构以下:CREATE TABLE `im_relation` ( `id` bigint(20) COMMENT '关系id', `user_id1` varchar(100) COMMENT '用户1id', `user_id2` varchar(100) COMMENT '用户2id', `encrypt_key` char(33) COMMENT 'aes密钥', `gmt_create` timestamp DEFAULT CURRENT_TIMESTAMP, `gmt_update` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `USERID1_USERID2` (`user_id1`,`user_id2`) );
user_id1
和user_id2
是互为好友的用户id,为了不重复,存储时按照user_id1
<user_id2
的顺序存,而且加上联合索引。encrypt_key
是随机生成的密钥。当客户端登陆时,就会从数据库中获取该用户的全部的relation
,存在内存中,以便后续加密解密。客户端完整登陆流程以下:
relation
。那为何connector要先推送离线消息再更新session呢?咱们思考一下若是顺序倒过来会发生什么:
Alice
登陆服务器connector
更新session若是离线消息还在推送的过程当中,Bob发送了新消息给Alice,服务器获取到Alice的session,就会马上推送。这时新消息就有可能夹在一堆离线消息当中推过去了,那这时,Alice收到的消息就乱序了。
而咱们必须保证离线消息的顺序在新消息以前。
那么若是先推送离线消息,以后才更新session。在离线消息推送的过程当中,Alice的状态就是“未上线”,这时Bob新发送的消息只会入库im_offline
,im_offline
表中的数据被读完以后才会“上线”开始接受新消息。这也就避免了乱序。
当用户不在线时,离线消息必然要存储在服务端,等待用户上线再推送。理解了上一个小节后,离线消息的存储就很是容易了。增长一张离线消息表im_offline
,表结构以下:
CREATE TABLE `im_offline` ( `id` int(11) COMMENT '主键', `msg_id` bigint(20) COMMENT '消息id', `msg_type` int(2) COMMENT '消息类型', `content` varbinary(5000) COMMENT '消息内容', `to_user_id` varchar(100) COMMENT '收件人id', `has_read` tinyint(1) COMMENT '是否阅读', `gmt_create` timestamp COMMENT '建立时间', PRIMARY KEY (`id`) );
msg_type
用于区分消息类型(chat
,ack
),content
加密后的消息内容以byte数组的形式存储。
用户上线时按照条件to_user_id=用户id
拉取记录便可。
咱们思考一下多端登陆的状况,Alice有两台设备同时登录,在这种并发的状况下,咱们就须要某种机制来保证离线消息只被读取一次。
这里利用CAS机制来实现:
has_read=false
的字段。has_read
值是否为false,若是是,则改成true。这是原子操做。update im_offline set has_read = true where id = ${msg_id} and has_read = false
相信到这里,同窗们已经能够本身动手搭建一个完整可用的IM服务端了。更多问题欢迎评论区留言~~
IM1.0.0版本已上线,github连接:
https://github.com/yuanrw/IM
以为对你有帮助请点个star吧~!