IM web application with protobuff & TypeScript

Background

Recently I'm making a customer feedback web applciation.Since the backend and app end already using the protocal ProtoBuf(Protocol Buffer).So I just socket with'em in the same way.git

Doing some researches about ProtoBuf stuff and found that it really deserve to be the chosen one.github

So What are protocol buffers?web

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.算法

Advantages

  1. Less bytes comparing with all other common protocals
  2. The protobuff definition itself can be a interface document.
  3. Validations and Extensibility are more perfect.

Implement

Talk is cheap,let's focus on code,here I use typescript to develop.typescript

Here's the protocal we base on the protobuff.数据库

the base protocal

firstly we need to use http to get the IMserver address. After we get the address,we need to use sync command to sync the history data.Everytime we post a message,we use syncData command to notice the server.And everytime there's a new message,we will receive a notify command from the server.Every message has a unique msgId in it's header,and the later message'msgId is always bigger than the former one.

for the multi end

As we know, if user are logged in multi ends,except the current seqId,we need to know the source of the msg .So above is the strategy for the synchronization of the multi ends.

and the profobuff file im.proto
/**
* 应用层的协议
*/

syntax = "proto3";
package protoc;


/*
协议头:
五部分组成:
首字节:      是否压缩标志位。0:不压缩;1:压缩。 gzip压缩方式
第二个字节:   用于序列化方式: 0:pb      1:json
第3、四个字节:   表示请求命令:
   0x01     ConfigReq
   0x02     ConfigResp
   0x03     ConfigAck
   0x04     Sync
   0x05     SyncData
   0x06     SyncDataFin
   0x07     SyncDataFinAck
   0x08     Notify               MsgGroup group = 4;              //命令消息指向的消息的大类
   0x09     NotifyFin
   0x0A     NotifyFinAck
第五到八个字节:
   用于存放uuid信息,记录用户回话信息
其余:剩余8个字节为保留字段
*/

//消息大类
enum MsgGroup {
   IM = 0;           //即时消息类
   Comment = 1;      //评论类
   Up = 2;           //点赞类
   Follow = 3;       //关注类
   Review = 4;       //审核类
}


//配置相关
//客户端发送配置信息
message  ConfigReq {
   string uid         = 1;        //用户id
   string sid        = 2;         //用户sid、设备id 
   string appver  = 3;            //当前版本信息
   string token    = 4;           //客户端携带token信息
   string appid    = 5;           //app相关
   string did         = 6;        //设备惟一标识
   uint32 protover = 7;           //协议版本号
   string logInfo  = 8;           //用于日志 client_id=xxx`uid=yyy`
}

//增长错误码标识
//服务端返回
message  ConfigResp {
   string channelAes      =  1;          //加解密算法  不用加密:""
   uint32 errorCode       =  2;          //错误码信息   700:成功  701 token错误  702 token
   uint64 seqId               =  3 ;     //服务端当前消息位置
}


//客户端确认 无需内容
message ConfigAck {
}

//消息分类类型
enum  ChannelType {
   IMMsg = 0;               //即时消息
   CommandMsg = 1;  //命令消息
   SystemMsg  = 2;      //系统消息
}


//消息类型 Message Type
enum IMMsgType {
   Text = 0;              //文本
   Audio = 1;           //音频
   Video = 2;           //视频
   Image = 3;           //图片
}

//消息状态:
enum CommandMsgType {
   Read      = 0;     // 已读
   Received  = 1;     //已达
   Cancel    = 2;     //撤销
   Deleted   = 3;     //删除
}

//系统消息
enum SystemMsgType {
  CommentUp = 0;          //评论点赞
  CommentReply = 1;     //评论回复
  ContentComment = 2; //内容评论,feed流
  ContentUp = 3;            //内容点赞
  ContentReview = 4;     //审核被拒消息
  Followed = 5;               //关注
}


//消息头部
message MessageHeader {
   uint64 seqId = 1; //消息ID
   string from = 2; //消息发送方,   系统消息:业务名
   string to = 3; //消息的接收方
   int64 createTime = 4; //消息建立时间  
   uint64 sourceId = 5; //消息初始seq id
}

//文本消息
message TextMessage {
   string text = 2; //消息体
}

//图片消息
message ImageMessage {
   string coverUrl = 2; //图片缩略图
   string imageUrl = 3; //图片uri
   uint32  height = 4; //图片高
   uint32  width    =5;  //图片宽
}

//notice消息
message NoticeMessage {
   string data = 2;          //Notice消息进行透传
   string noticeId = 3;      //系统消息的id
}

//comment消息
message CommentMessage {
    string data = 2;          //评论消息进行透传
    string noticeId = 3;    //系统消息的id
}

//command消息
message CommandMessage {
   uint64 msgSoxurceId = 2;          //命令消息指向的消息
   uint64 msgTargetId = 3;          //多端read问题
   MsgGroup group = 4;              //命令消息指向的消息的大类
}

//---------------------------------消息协议--------------------------------------
//用户发起Sync包
message Sync {
    uint64  seqId = 2;      //sync包发起序号
}

//sync返回的数据
message SyncData {
   uint64     seqId  = 2;                 //当前的消息位置
   repeated DataEntry  data = 3;                   //最终序列化后的数据
   //数据体
   message DataEntry {
      MessageHeader header = 1;
      bool  isFcm = 14;                            
      ChannelType  channel = 4;
      oneof MessageType {
            IMMsgType             imMsgType = 5;        //即时消息类型 
            CommandMsgType       commandType = 6;    //命令类型
            SystemMsgType     systemMsgType = 7; //系统消息类型
      }
      //存放具体的消息内容
      oneof Body {
         TextMessage textMessage  = 8;
         ImageMessage imageMessage = 9;
         NoticeMessage noticeMessage = 10;
         CommandMessage commandMessage = 11;
         CommentMessage commentMessage = 12;
         AudioMessage audioMessage = 13;
      }

      //bytes body = 8; // 存放消息信息   <===>结构体
   }
}

//客户端数据接收结束
message SyncDataFin {
   repeated SyncDataResult syncDataResult = 1;
}

message SyncDataResult {
   uint64 seqId        = 1;
   int32  errorCode    = 2;     //700 succ  711 协议不支持 712其余错误
   int64  createTime   = 3;
}

//服务端确认
message SyncDataFinAck {
}

//服务端通知客户端拉取消息
message Notify {
   uint64     seqId  = 1;                 //当前的消息位置
}

//客户端收到通知
message NotifyFin {                                
}

//服务端确认
message NotifyFinAck {                              
}

message AudioMessage {
   uint32 test1 = 1;
   string test2 = 2;
}
复制代码

According the protobuff file im.proto,we need to make up the message entity.We use protobufjs to load the protobuff file im.proto.json

As the loading action in asynchronous.So we directly lookupType all the entity in the constructor in order to use it directly in the afterward codes.bash

import * as protobuf from "protobufjs"
constructor() {
    //把proto里面的对象所有取出来
    protobuf.load(require('./im.proto'), (err, root) => {
      this.ConfigReq = root.lookupType('protoc.ConfigReq');
      this.ConfigResp = root.lookupType('protoc.ConfigResp');
      this.Sync = root.lookupType('protoc.Sync');
      this.SyncData = root.lookupType('protoc.SyncData');
      this.SyncDataFin = root.lookupType('protoc.SyncDataFin');
      this.Notify = root.lookupType('protoc.Notify');
    })
}
复制代码

Init socket is the trigger of the im class,the uid here means the currentUser's id,The toAcc means other user's id we talking to. Before connecting the websocket,we pull the history message from the database.and then using Websocket to connect the server. The msg content from the server is blog,so we use FileReader to change it into Uint8Array,if it's needed,we need to unzip the body by gzip-buffer服务器

initSocket(uid: string, sid: string, toAcc: string) {
    this.initParams(uid, sid, toAcc);
    //先到数据库查询聊天历史
    request.get('im/history', { fromUid: uid, toUid: toAcc }).then((res: ImMsgInfoList) => {
      this.seqId = res.seqId || 0;
      [this.seqIdList, this.adaptMessageList] = adaptInitialMessages(res.list)
      //获取websocket连接
      return request.get('im/init', { uid, sid })
    }
    ).then((res: { data: string }) => {
      return res.data;
    }).then((url: string) => {
      //链接websocket
      this.socket = new WebSocket(`${url}/rpc/conn`);
      this.socket.onopen = (event) => {
        //完成初始化
        this.configRequest();
      }
      this.socket.onmessage = (event) => {
        //将blog转换为ArrayBuffer
        const reader = new FileReader();
        reader.readAsArrayBuffer(event.data);
        reader.onload = () => {
          const resBuffer = new Uint8Array(reader.result as ArrayBuffer);
          //从头部拿到消息类型
          const msgType = resBuffer[SCOKET_TYPE_POSITION];
          //从头部拿到压缩字段
          const bodyIsGzipped = resBuffer[COMPRESS_POSITION];
          //真正的内容body
          const contentBuffer = resBuffer.slice(SCOKET_HEADER_SIZE);
          if (bodyIsGzipped) {
            //解压body
            gzipBuffer.gunzip(contentBuffer, (data: Uint8Array) => {
              this.handleResult(msgType, data);
            });
          } else {
            this.handleResult(msgType, contentBuffer);
          }
        }
      }
      //错误重连
      this.socket.onerror = (event) => {
        console.log('websocket error,', event);
        this.caughtErr('connect error');
      }
      //关闭重连
      this.socket.onclose = (event) => {
        //若是是意外关闭
        if (!this.positiveClose) {
          console.log('socket closed accidently', event);
          console.log('try to reconnect.....');
          this.initSocket(uid, sid, toAcc);
          this.onReconnect();
        }
      }
    })
  }
复制代码
/**
   * 组装Scoket Buffer头部
   */
  private plugSocketBufferHead(buffer: Uint8Array, msgType: MSG_TYPE) {
    //消息类型
    buffer.set([msgType], SCOKET_TYPE_POSITION);
    //消息惟一id
    buffer.set(generateMsgId(), SCOKET_ID_POSITION);
    return buffer;
  }
  /**
   * 首次连接
   */
  private configRequest() {
    const payload = { uid: this.uid, sid: this.sid };
    const config = this.ConfigReq.create(payload);
    const contentBuffer: Uint8Array = this.ConfigReq.encode(config).finish();
    let realBuffer = new Uint8Array(contentBuffer.length + SCOKET_HEADER_SIZE);
    realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.CONFIGREQ);
    realBuffer.set(contentBuffer, SCOKET_HEADER_SIZE);
    this.socket.send(realBuffer)
  }
复制代码

And here shows how to make a message as well as send to the server side.websocket

Fully code
//im.ts
import request from "request";
import * as protobuf from "protobufjs"
import * as moment from 'moment'
const gzipBuffer = require('gzip-buffer');
import { generateMsgId, adaptComingMessages, getNeedMarkedCommandMessage, adaptInitialMessages } from './im_utils'
import { ConfigResult, SyncData, TextMessage, ImageMessage, ChannelType, IMMsgType, Notify, AdaptMessageEntity, AdaptMessageStatus, CommandMessage } from "./msg_result"
import { ImMsgInfoList } from "../request_result"
//消息类型,根据proto定义的类型,判断返回类型以及传送头
enum MSG_TYPE {
  CONFIGREQ = 1,
  CONFIGRESP,
  CONFIGACK,
  SYNC,
  SYNCDATA,
  SYNCDATAFIN,
  SYNCDATAFINACK,
  NOTIFY,
  NOTIFYFIN,
  NOTIFYFINACK
}
//proto 头部须要16个字节
const SCOKET_HEADER_SIZE = 16;
//头部里面type的位置
const SCOKET_TYPE_POSITION = 2;
//头部里面MSGID的位置
const SCOKET_ID_POSITION = 4;
//success code
const SUCCESS_CODE = 700;
//首字节:是否压缩标志位。0:不压缩;1:压缩。 gzip压缩方式
const COMPRESS_POSITION = 0;

enum SOCKET_READY_STATE {
  CONNECTING,
  OPEN,
  CLOSING,
  CLOSED 
}


export default abstract class IM {
  //是否是主动去关闭这个websocket的,用于断开从新
  private positiveClose: boolean;
  private uid: string;
  private sid: string;
  private toAcc: string;

  //正在发送中的消息列表
  private sendingMessageList: Map<number, AdaptMessageEntity>;
  //整理好的信息列表
  private adaptMessageList: AdaptMessageEntity[];
  //目前已有的seqId list
  private seqIdList: Set<number>;
  //当前消息的offsetId
  private seqId: number;
  private socket: WebSocket;
  private initializing: boolean;
  //抓到错误会触发
  abstract caughtErr(errorMsg: string): void;
  //初始化结束
  abstract initFinish(): void;
  //消息列表变更 
  abstract onMsgListChange(msgList: AdaptMessageEntity[]): void;
  //socket意外断开,触发reconnect
  abstract onReconnect(): void;
  /**
   * protoBuff定义的一些类
   */
  private ConfigReq: protobuf.Type;
  private ConfigResp: protobuf.Type;
  private Sync: protobuf.Type;
  private SyncData: protobuf.Type;
  private SyncDataFin: protobuf.Type;
  private Notify: protobuf.Type;
  constructor() {
    //把proto里面的对象所有取出来
    protobuf.load(require('./im.proto'), (err, root) => {
      this.ConfigReq = root.lookupType('protoc.ConfigReq');
      this.ConfigResp = root.lookupType('protoc.ConfigResp');
      this.Sync = root.lookupType('protoc.Sync');
      this.SyncData = root.lookupType('protoc.SyncData');
      this.SyncDataFin = root.lookupType('protoc.SyncDataFin');
      this.Notify = root.lookupType('protoc.Notify');
    })
  }
  /**
   * 组装Scoket Buffer头部
   */
  private plugSocketBufferHead(buffer: Uint8Array, msgType: MSG_TYPE) {
    //消息类型
    buffer.set([msgType], SCOKET_TYPE_POSITION);
    //消息惟一id
    buffer.set(generateMsgId(), SCOKET_ID_POSITION);
    return buffer;
  }
  /**
   * 首次连接
   */
  private configRequest() {
    const payload = { uid: this.uid, sid: this.sid };
    const config = this.ConfigReq.create(payload);
    const contentBuffer: Uint8Array = this.ConfigReq.encode(config).finish();
    let realBuffer = new Uint8Array(contentBuffer.length + SCOKET_HEADER_SIZE);
    realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.CONFIGREQ);
    realBuffer.set(contentBuffer, SCOKET_HEADER_SIZE);
    this.socket.send(realBuffer)
  }

  /**
   * ack 统一都用这个哦
   * @param type 
   */
  private ack(type: MSG_TYPE.CONFIGACK | MSG_TYPE.NOTIFYFIN | MSG_TYPE.SYNCDATAFINACK, needSync = true) {
    let realBuffer = new Uint8Array(SCOKET_HEADER_SIZE);
    realBuffer = this.plugSocketBufferHead(realBuffer, type);
    this.socket.send(realBuffer)
    if (Object.is(type, MSG_TYPE.CONFIGACK) || needSync) {
      //拉消息
      this.sync();
    }
  }
  //拉消息
  private sync() {
    const payload = { seqId: this.seqId || 0 };
    console.log('sync', payload);
    const syncPayload = this.Sync.create(payload);
    const contentBuffer: Uint8Array = this.Sync.encode(syncPayload).finish();
    let realBuffer = new Uint8Array(contentBuffer.length + SCOKET_HEADER_SIZE);
    realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.SYNC);
    realBuffer.set(contentBuffer, SCOKET_HEADER_SIZE);
    this.socket.send(realBuffer)
  }

  private makeMessageHeader(from: string, to: string, seqId: number) {
    return {
      from,
      to,
      seqId,
      createTime: moment().format('X')
    }
  }
  /**
    * 消息的工厂方法,目前只支持imageMessage或者textMessage
    * 后续能够继续++++,参考proto文件
    * @param msgPayload 
    *
    */
  private makeImMessage(msgPayload: TextMessage | ImageMessage, imMsgType: IMMsgType) {
    const header = this.makeMessageHeader(this.uid, this.toAcc, this.seqId + 1);
    switch (imMsgType) {
      case IMMsgType.Image: {
        //图片类型
        return { data: [{ header, imageMessage: msgPayload as ImageMessage, channel: ChannelType.IMMsg, imMsgType: IMMsgType.Image }] }
      }
      case IMMsgType.Text: {
        //文本类型
        return { data: [{ header, textMessage: msgPayload as TextMessage, channel: ChannelType.IMMsg, imMsgType: IMMsgType.Text }] }
      }
    }
  }
  /**
   * 发消息
   * @param msgPayload
   * @param IMMsgType
   */
  sendMessage(msgPayload: TextMessage | ImageMessage, imMsgType: IMMsgType) {
    const payload = this.makeImMessage(msgPayload, imMsgType);
    this.syncData(payload);
    //插入到当前AdaptMessageList,并将状态置为sending
    const adaptMessage: any = payload.data[0];
    adaptMessage.status = AdaptMessageStatus.Sending;
    this.seqIdList.add(adaptMessage.header.seqId);
    this.adaptMessageList.push(adaptMessage);
    //放入到发送中的消息列表
    this.sendingMessageList.set(adaptMessage.header.seqId, adaptMessage);
  }

  private syncData(payload: any) {
    const syncData = this.SyncData.create(payload);
    const contentBuffer: Uint8Array = this.SyncData.encode(syncData).finish();
    //压缩body
    gzipBuffer.gzip(contentBuffer, (data: Uint8Array) => {
      let realBuffer = new Uint8Array(data.length + SCOKET_HEADER_SIZE);
      realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.SYNCDATA);
      //告诉服务器要压缩
      realBuffer.set([1], COMPRESS_POSITION);
      realBuffer.set(data, SCOKET_HEADER_SIZE);
      this.socket.send(realBuffer)
    });
  }

  /**
   * 告诉服务器已读
   * @param sourceSeqIdList sourceSeqId列表
   */
  private sendCommands(commandMessage: CommandMessage) {
    if (commandMessage && commandMessage.msgSourceId) {
      const msgTargetId = this.seqId + 1;
      const header = this.makeMessageHeader(this.uid, this.toAcc, msgTargetId);
      const payload = { data: [{ header, commandMessage, channel: ChannelType.CommandMsg, commandType: AdaptMessageStatus.Read }] }
      this.syncData(payload);
    }
  }
  closeSocket() {
    //关闭websoket
    if (this.socket && (Object.is(this.socket.readyState, SOCKET_READY_STATE.OPEN)
      || Object.is(this.socket.readyState, SOCKET_READY_STATE.CONNECTING))) {
      this.positiveClose = true;
      this.socket.close();
    }
    this.socket = null;
  }
  private initParams(uid: string, sid: string, toAcc: string) {
    if (!uid || !sid || !toAcc) {
      throw Error('uid,sid,toAcc must not be null');
    }
    this.uid = uid;
    this.sid = sid;
    this.toAcc = toAcc;
    this.adaptMessageList = [] as AdaptMessageEntity[];
    this.seqIdList = new Set();
    this.seqId = 0;
    this.positiveClose = false;
    this.initializing = true;
    this.sendingMessageList = new Map();
  }
  initSocket(uid: string, sid: string, toAcc: string) {
    this.initParams(uid, sid, toAcc);
    //先到数据库查询聊天历史
    request.get('im/history', { fromUid: uid, toUid: toAcc }).then((res: ImMsgInfoList) => {
      this.seqId = res.seqId || 0;
      [this.seqIdList, this.adaptMessageList] = adaptInitialMessages(res.list)
      //获取websocket连接
      return request.get('im/init', { uid, sid })
    }
    ).then((res: { data: string }) => {
      return res.data;
    }).then((url: string) => {
      //链接websocket
      this.socket = new WebSocket(`${url}/rpc/conn`);
      this.socket.onopen = (event) => {
        //完成初始化
        this.configRequest();
      }
      this.socket.onmessage = (event) => {
        //将blog转换为ArrayBuffer
        const reader = new FileReader();
        reader.readAsArrayBuffer(event.data);
        reader.onload = () => {
          const resBuffer = new Uint8Array(reader.result as ArrayBuffer);
          //从头部拿到消息类型
          const msgType = resBuffer[SCOKET_TYPE_POSITION];
          //从头部拿到压缩字段
          const bodyIsGzipped = resBuffer[COMPRESS_POSITION];
          //真正的内容body
          const contentBuffer = resBuffer.slice(SCOKET_HEADER_SIZE);
          if (bodyIsGzipped) {
            //解压body
            gzipBuffer.gunzip(contentBuffer, (data: Uint8Array) => {
              this.handleResult(msgType, data);
            });
          } else {
            this.handleResult(msgType, contentBuffer);
          }
        }
      }
      //错误重连
      this.socket.onerror = (event) => {
        console.log('websocket error,', event);
        this.caughtErr('connect error');
      }
      //关闭重连
      this.socket.onclose = (event) => {
        //若是是意外关闭
        if (!this.positiveClose) {
          console.log('socket closed accidently', event);
          console.log('try to reconnect.....');
          this.initSocket(uid, sid, toAcc);
          this.onReconnect();
        }
      }
    })
  }
  private handleResult(msgType: MSG_TYPE, contentBuffer: Uint8Array) {
    switch (msgType) {
      case MSG_TYPE.CONFIGRESP:
        this.configResult(contentBuffer);
        break;
      case MSG_TYPE.SYNCDATA:
        this.syncResult(contentBuffer);
        break;
      case MSG_TYPE.SYNCDATAFIN:
        this.ack(MSG_TYPE.SYNCDATAFINACK)
        break;
      case MSG_TYPE.NOTIFY:
        this.notifyResult(contentBuffer);
        break;
    }
  }
  private notifyResult(buffer: Uint8Array) {
    const result = this.Notify.decode(buffer) as Notify;
    console.log('notifyResult', result);
    //有新消息就去拿,没有就不拿
    this.ack(MSG_TYPE.NOTIFYFIN, result.seqId > this.seqId);
    if (result.seqId <= this.seqId && this.initializing) {
      // 正式完成注册,调用初始化完毕回调
      this.initializing = false;
      this.initFinish();
    }
  }
  private getSyncDataFinBody(syncDataRes: SyncData) {
    return syncDataRes.data.map(data => {
      return {
        seqId: data.header.seqId,
        errorCode: SUCCESS_CODE,
        createTime: data.header.createTime
      }
    })
  }
  private syncDataFin(syncDataRes: SyncData) {
    const payload = { syncDataResult: this.getSyncDataFinBody(syncDataRes) }
    const syncDataFin = this.SyncDataFin.create(payload);
    const contentBuffer: Uint8Array = this.SyncDataFin.encode(syncDataFin).finish();
    let realBuffer = new Uint8Array(contentBuffer.length + SCOKET_HEADER_SIZE);
    realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.SYNCDATAFIN);
    realBuffer.set(contentBuffer, SCOKET_HEADER_SIZE);
    this.socket.send(realBuffer)
  }
  private syncResult(buffer: Uint8Array) {
    const result = this.SyncData.decode(buffer) as SyncData;
    console.log('syncResult', result)
    this.syncDataFin(result);
    this.seqId = result.seqId;
    //若是没数据,就结束
    if (!result.data.length && this.initializing) {
      // 正式完成注册,调用初始化完毕回调
      this.initializing = false;
      this.initFinish();
      this.onMsgListChange(this.adaptMessageList);
      return;
    }
    //接受新来的message而且装配
    [this.seqIdList, this.adaptMessageList, this.sendingMessageList] = adaptComingMessages(this.uid, this.toAcc, this.seqIdList, result.data, this.adaptMessageList, this.sendingMessageList);
    if (!this.initializing) {
      //若是初始化完成了,每次syncResult都检查一下是否须要发送command
      let commandMsg = null;
      [commandMsg, this.adaptMessageList] = getNeedMarkedCommandMessage(this.toAcc, this.adaptMessageList);
      this.sendCommands(commandMsg);
      this.onMsgListChange(this.adaptMessageList);
    }
  }
  private configResult(buffer: Uint8Array) {
    const result = this.ConfigResp.decode(buffer) as ConfigResult;
    if (!Object.is(result.errorCode, SUCCESS_CODE)) {
      //错误码信息   700:成功  701 token错误  702 token
      this.caughtErr('config error');
      return;
    }
    this.ack(MSG_TYPE.CONFIGACK);
  }
}
复制代码
//im_utils.ts
import { MessageEntity, AdaptMessageEntity, ChannelType, AdaptMessageStatus, CommandMessage, IMMsgType } from "./msg_result"
import { ImMsgInfo } from "../request_result"
//用于生成每次socket 发送时候的随机msgId
export const generateMsgId = () => {
  //全部候选组成验证码的字符
  const codeChars: Array<number> = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  return [
    codeChars[Math.floor(Math.random() * codeChars.length)],
    codeChars[Math.floor(Math.random() * codeChars.length)],
    codeChars[Math.floor(Math.random() * codeChars.length)],
    codeChars[Math.floor(Math.random() * codeChars.length)]
  ]
}
/**
 * 对比消息体是否相同
 */
export const compareMsgList = (newMsgList: AdaptMessageEntity[], oldMsgList: AdaptMessageEntity[]): boolean => {
  //若是长度不同,那确定不同的了
  if (oldMsgList.length != newMsgList.length)
    return false;
  for (let i = 0; i < newMsgList.length; i++) {
    //若是某个item的状态不同
    if (!Object.is(newMsgList[i].status, oldMsgList[i].status)) {
      return false;
    }
  }
  return true;
}
/**
 * 将IM收到的信息转化为AdaptMessageEntity用来渲染
 * @param fromUid 本身的uid,用来区分commandMsg的设置
 * @param toUid 本身的toUid,用来筛选只是当前对话的用户
 * @param seqIdList 已经目前收到的全部message的seqId
 * @param comingMessageList 服务端推过来的messages
 * @param adaptMessageList 真正用于显示的messages
 * @param sendingMessageList 正在发送的messages
 */
export const adaptComingMessages = (fromUid: string, toUid: string, seqIdList: Set<number>, comingMessageList: MessageEntity[], adaptMessageList: AdaptMessageEntity[], sendingMessageList: Map<number, AdaptMessageEntity>): [Set<number>, AdaptMessageEntity[], Map<number, AdaptMessageEntity>] => {
  const newSeqIdList = new Set([...seqIdList]);
  const newAdaptMessageList = [...adaptMessageList];
  const newSendingMessageList = new Map(sendingMessageList);

  //from或者to其一要等于toUid
  const usefulComingMsgs = comingMessageList.filter((message: MessageEntity) => {
    return (Object.is(message.header.from, toUid) || Object.is(message.header.to, toUid));
  })

  for (const message of usefulComingMsgs) {
 
    /**
     *  IMMsg = 0;               //即时消息
     *  CommandMsg = 1;          //命令消息
     *  SystemMsg  = 2;          //系统消息(暂时不处理)
     */
    if (Object.is(message.channel, ChannelType.CommandMsg)) {
      /**
       * 参照多端已读的图,https://nemo.yuque.com/starhalo/rd/nm0mv9
       * 若是from是本身就用msgSourceId
       * 不然就用msgTargetId
       *  */
      const keyId = Object.is(message.header.to, fromUid) ? message.commandMessage.msgSourceId : message.commandMessage.msgTargetId;
      //若是以前根本没有这条seqId,就不用管了
      if (!newSeqIdList.has(keyId))
        continue;
      //倒着来找,由于消息都是最近的,这样快一些吧
      for (let i = newAdaptMessageList.length - 1; i >= 0; i--) {
        //找到匹配的seqId,而后修改成对应的状态
        if (Object.is(newAdaptMessageList[i].header.seqId, keyId)) {
          newAdaptMessageList[i].status = message.commandType;
          break;
        }
      }
    } else if (Object.is(message.channel, ChannelType.SystemMsg)) {
      //系统消息直接不处理
      continue;
    } else {
      //IM消息类型,textMessage或者imageMessage

      //若是是本身刚刚发送过的
      if (newSeqIdList.has(message.header.seqId)) {
        //倒着来找,由于消息都是最近的,这样快一些吧
        for (let i = newAdaptMessageList.length - 1; i >= 0; i--) {
          //找到匹配的seqId,而后修改成对应的状态
          if (Object.is(newAdaptMessageList[i].header.seqId, message.header.seqId)) {
            newAdaptMessageList[i].status = AdaptMessageStatus.Sent;
            newSendingMessageList.delete(message.header.seqId);
            break;
          }
        }
      } else {
        const { Body, MessageType, isFcm, ...adaptMessage } = message;
        (adaptMessage as AdaptMessageEntity).status = AdaptMessageStatus.Sent;
        newAdaptMessageList.push(adaptMessage);
      }
    }
    newSeqIdList.add(message.header.seqId);
  }
  return [newSeqIdList, newAdaptMessageList, newSendingMessageList];
}
/**
 * 获取须要发送已读的列表
 * @param toUid 对方的uid
 * @param msgList  目前的消息列表
 * 只须要通知最晚的一条就能够了
 */
export const getNeedMarkedCommandMessage = (toUid: string, msgList: AdaptMessageEntity[]): [CommandMessage, AdaptMessageEntity[]] => {
  let result = null;
  let newMsgList = [...msgList];
  for (let i = msgList.length - 1; i >= 0; i--) {
    //是别人发的,并且状态不是已读,而且目前还没知道过的
    if (!result && Object.is(msgList[i].header.from, toUid) && !Object.is(msgList[i].status, AdaptMessageStatus.Read)) {
      result = { msgSourceId: msgList[i].header.sourceId, msgTargetId: msgList[i].header.seqId };
    }
    if (Object.is(msgList[i].header.from, toUid)) {
      //是别人发的,将它置为已读
      newMsgList[i].status = AdaptMessageStatus.Read;
    }
  }
  return [result, newMsgList];
}
/**
 * 将数据库记录的信息转化为AdaptMessageEntity用来渲染
 * @param fromUid 
 * @param initialMessageList 
 */
export const adaptInitialMessages = (initialMessageList: ImMsgInfo[]): [Set<number>, AdaptMessageEntity[]] => {
  let seqIdList = new Set<number>();
  let adaptMessageList = [];
  for (let initialMessage of initialMessageList) {
    seqIdList.add(initialMessage.seqid);
    const header = { seqId: initialMessage.seqid, from: initialMessage.fromuid, to: initialMessage.touid, createTime: initialMessage.msg_time };
    let adaptMessage: any = { header }
    if (Object.is(initialMessage.ctype, 'text')) {
      adaptMessage[`${initialMessage.ctype}Message`] = { text: initialMessage.content };
      adaptMessage.imMsgType = IMMsgType.Text;
    } else {
      adaptMessage[`${initialMessage.ctype}Message`] = { imageUrl: initialMessage.content };
      adaptMessage.imMsgType = IMMsgType.Image;
    }
    //所有看成已读
    adaptMessage.status = AdaptMessageStatus.Read;
    adaptMessageList.push(adaptMessage);
  }
  return [seqIdList, adaptMessageList]
}
复制代码
//msg_result.ts
/**
 * 对应proto文件返回的结果
 */
import * as protobuf from "protobufjs"
//消息分类类型
export enum ChannelType {
  IMMsg = 0,       //即时消息
  CommandMsg = 1,  //命令消息
  SystemMsg = 2   //系统消息
}
export enum MessageType {
  IMMsgType = 5,       //即时消息类型 
  CommandMsgType = 6,   //命令类型
  SystemMsgType = 7 //系统消息类型
}

export enum AdaptMessageStatus {
  Read = 0,     // 已读
  Received = 1,     //已达
  Cancel = 2,     //撤销
  Deleted = 3,     //删除
  Sent = 4,         //发过去服务端了,还没知道结果
  Sending = 5,      //正在发送
}
//消息类型 Message Type
export enum IMMsgType {
  Text = 0,             //文本
  Audio = 1,           //音频
  Video = 2,           //视频
  Image = 3           //图片
}
export interface ConfigResult extends protobuf.Message {
  errorCode: number,
  channelAes: string,
  seqId: string
}

export interface TextMessage extends protobuf.Message {
  text: string
}

export interface ImageMessage extends protobuf.Message {
  coverUrl?: string,
  imageUrl: string,
  height?: number,
  width?: number
}
export interface MessageHeader extends protobuf.Message {
  seqId?: number,
  from: string,
  to: string,
  createTime: number
  sourceId?: number  //消息初始seq id
}

export interface CommandMessage {
  msgSourceId: number;          //命令消息指向的消息
  msgTargetId: number;          //多端read问题
}

export interface MessageEntity {
  header?: MessageHeader,
  channel?: ChannelType,
  imMsgType?: IMMsgType,
  textMessage?: TextMessage,
  imageMessage?: ImageMessage,
  commandMessage?: CommandMessage,
  commandType?: AdaptMessageStatus
  isFcm?: boolean,
  MessageType?: string,
  Body?: string,

}

export interface AdaptMessageEntity {
  header?: MessageHeader,
  textMessage?: TextMessage,
  imMsgType?: IMMsgType,
  imageMessage?: ImageMessage
  status?: AdaptMessageStatus
}

export interface SyncData extends protobuf.Message {
  seqId?: number,
  data: Array<MessageEntity>
}
export interface SyncDataResult extends protobuf.Message {
  seqId: number,
  errorCode: number,
  createTime: number
}

export interface Notify extends protobuf.Message {
  seqId: number
}
复制代码

It took me over 2 weeks to accomplish the application including the ui end.Also, for the UI emoji set I choose emoji-mart. This time I just provide some variants,if you have any of ideas,don't forget to leave an comment! And also star haha. Before this,I've been making a chatting system by socket.io,maybe I will also post another article including demonstration about that system next few days.

Things still need to deal with

  1. The Im class importing the gzip and unzip file is to large for the build file,need to replace with webWorker.

References

  1. 5 Reasons to Use Protocol Buffers Instead of JSON For Your Next Service
  2. Protocol Buffers
相关文章
相关标签/搜索