ios端采用开源库CocoaAsyncSocket,进行TCP通信。html
private let delegateQueue = DispatchQueue.global()
private lazy var socket :GCDAsyncSocket = {
let socket = GCDAsyncSocket(delegate: self, delegateQueue: delegateQueue)
return socket
}()
复制代码
创建链接node
socket.delegate = self
try socket.connect(toHost: host, onPort: port)
socket.readData(withTimeout:-1, tag: 0)
复制代码
发送数据mysql
self.socket.write(data, withTimeout:5 * 60, tag: 0)
复制代码
链接成功监听ios
func socket(_ sock: GCDAsyncSocket, didConnectToHost host: String, port: UInt16) {
print("socket \(sock) didConnectToHost \(host) port \(port)")
}
复制代码
链接失败监听git
func socketDidDisconnect(_ sock: GCDAsyncSocket, withError err: Error?) {
print("socketDidDisconnect \(sock) withError \(err)")
}
复制代码
发送数据github
self.socket.write(data, withTimeout:5 * 60, tag: 0)
复制代码
接收数据sql
func socket(_ sock: GCDAsyncSocket, didRead data: Data, withTag tag: Int) {
let msgArr = SocketDataBuilder.shared().parse(data: data)
for (seq,socketData) in msgArr {
switch (socketData){
case .request(let comom):
handle(common: comom, seq: seq);
case .ping:
handlePing(seq: seq);
case .message(let msg):
handle(message: msg, seq: seq);
case .notification(let noti):
handle(notification: noti, seq: seq);
}
}
sock.readData(withTimeout: -1, tag: 0)
}
复制代码
值得注意的是,在接收到数据时,或者读超时的时候须要从新调用readData(withTimeout:tag)
方法 否则下个数据包到来时,不会再走这个方法。因为咱们还有透传体系,须要不间断的监听,因此timeout是-1无穷大数据库
var HOST = '0.0.0.0';
var PORT = 6969;
var server = net.createServer();
server.listen(PORT, HOST);
server.on('connection', function(sock) {
logger.info('CONNECTED: ' + sock.remoteAddress +':'+ sock.remotePort);
// 接收数据
sock.on('data', function(data) {
}
// 断开链接
sock.on('close', function(data) {
logger.info('CLOSED: ' + sock.remoteAddress + ' ' + sock.remotePort);
});
}
复制代码
对于一个TCP数据包,它包含一个二进制头部,和一个包体。包体是protubuf序列化后的数据流。包头一共8个字节,从第一个字节开始依次有如下含义npm
先定义一个数据结构来处理头部信息swift
struct BaseHeader {
private let margic_num : UInt8 = 0b10000001
var seq : UInt32
var type : UInt8
var length : UInt16
}
复制代码
序列化方法
func toData()->Data{
var marg = margic_num.bigEndian
var seq = self.seq.bigEndian
var type = self.type.bigEndian
var length = self.length.bigEndian
let mp = UnsafeBufferPointer(start: &marg, count: 1)
let sp = UnsafeBufferPointer(start: &seq, count: 1)
let tp = UnsafeBufferPointer(start: &type, count: 1)
let lp = UnsafeBufferPointer(start: &length, count: 1)
var data = Data(mp)
data.append(sp)
data.append(tp)
data.append(lp)
return data
}
复制代码
代码比较简单,值得注意的是两点
若是你对位运算比较熟悉,也能够采用下面这种方式。将原始数据转化为UInt8数组再进行拼接
var buf = [UInt8]()
append(margic_num, bufArr: &buf)
append(self.seq, bufArr: &buf)
append(self.type, bufArr: &buf)
append(self.length, bufArr: &buf)
let result = Data(buf)
func append<T:FixedWidthInteger>(_ value:T, bufArr:inout [UInt8]){
let size = MemoryLayout<T>.size
for i in 1...size {
let distance = (size - i) * 8;
let sub = (value >> distance) & 0xff
let value = UInt8(sub & 0xff)
bufArr.append(value)
}
}
复制代码
对应的反序列化以下。
init?(data:Data){
if data.count < header_length {
return nil
}
var headerData = Data(data)
let tag : UInt8 = headerData[0..<1].withUnsafeBytes{ $0.pointee }
if tag != margic_num {
return nil
}
let seq : UInt32 = headerData[1..<5].withUnsafeBytes({$0.pointee })
let typeValue : UInt8 = headerData[5..<6].withUnsafeBytes({$0.pointee })
let length : UInt16 = headerData[6..<8].withUnsafeBytes({$0.pointee })
self.seq = seq.bigEndian
self.type = typeValue.bigEndian
self.length = length.bigEndian
}
复制代码
Data结构体提供了很方便的下标索引方法
public subscript(bounds: Range<Data.Index>) -> Data
复制代码
获得的新的Data与原来的数据共用一块内存,只是改变指针的偏移。也就是说,相比原始数据,表明存储结构的_backing:_DataStorage
属性指向的是同一个对象,只是 _sliceRange:Range<Data.Index>
不一样
public func withUnsafeBytes<ResultType, ContentType>(_ body: (UnsafePointer<ContentType>) throws -> ResultType) rethrows -> ResultType
复制代码
利用这个带范型的方法,能够很容易,对data里面数据进行处理,提取出所须要类型的数据
一样的你也能够在UInt8数组上作文章
var index : Int = 0
let margic : UInt8 = getValue(data: headerData, index: &index)
let seqv : UInt32 = getValue(data: headerData, index: &index)
let typev : UInt8 = getValue(data: headerData, index: &index)
let len : UInt16 = getValue(data: headerData, index: &index)
func getValue<T:FixedWidthInteger>(data:Data,index:inout Int)->T{
let size = MemoryLayout<T>.size
var value:T = 0
for i in index..<(index+size) {
let distance = size - (i - index) - 1
value += T(data[i]) << distance
}
index += size
return value
}
复制代码
下面是反序列化代码,data是tcp接收到的数据
var header = data.slice(0,8)
var margic = header.readUInt8(0)
var seq = header.readUInt32BE(1)
var type = header.readUInt8(5)
var lenth = header.readUInt16BE(6)
复制代码
序列化方法以下,body为须要发送的包体数据
var margic = 129;
var lenth = body.length;
var header = new Buffer(8);
header.writeUInt8(margic);
header.writeUInt32BE(seq,1);
header.writeUInt8(type,5);
header.writeInt16BE(lenth,6);
复制代码
node.js中,从socket中读取或写入的数据,都是Buffer。调用对应的read或write的方法,很容易从二进制读取或填充所需数据类型的数据。值得注意的是,除了UInt8以外,其他方法都有BE后缀,这也和以前所说的Big-Endian有关
采用最新的protobuf3.0的语法,去除了required、optional关键字,枚举类型统一从0开始。
根据从请求头返回的type字段,除了心跳包包体为空外,其余类型包体分别解析为响应的protobuf类型。
其中type=2,被解析为Common类型,对应的是普通数据请求。实际上这部分业务应该做为普通HTTP请求处理。这里统一纳入TCP通信自定义协议体系中。
syntax = "proto3";
import "error.proto";
enum Common_method {
common_method_user = 0;
common_method_message = 1;
common_method_friend = 2;
common_method_p2p_connect = 3;
common_method_respond = 4;
}
message Common {
Common_method method = 1;
bytes body = 2;
}
message CommonRespon {
bool isSuc = 1;
bytes respon = 2;
ErrorMsg error = 3;
}
复制代码
syntax = "proto3";
enum error_type {
comom_err = 0;
invalid_params = 2;
}
message ErrorMsg {
error_type type = 1;
string msg = 2;
}
复制代码
Comon
根据不一样的type
,他的body
又能够被解析为对应的字类型数据,如signin_request
、login_request
、User_info_request
等等
syntax = "proto3"
import "base.proto";
enum User_cmd {
User_cmd_sign_in = 0;
User_cmd_login = 2;
User_cmd_logout = 3;
User_cmd_user_info = 4;
}
message User_msg {
User_cmd cmd = 1;
bytes body = 2;
}
message signin_request {
string nick_name = 1;
string pwd = 2;
}
message login_request {
string nick_name = 1; // 用户名
string pwd = 2; // 密码
string ip = 3; // 设备当前的ip
int32 port = 4; // 设备绑定的端口
string device_name = 5; // iOS/Andoird
string device_id = 6; // 设备标识符
string version = 7; // 软件版本
}
message logout_request {
int32 uid = 1;
}
// 注册成功 必须进行登陆 统一返回uid token
message sigin_response {
uint32 uid = 1;
string token = 2;
}
message login_response {
uint32 uid = 1;
string token = 2;
}
// 查询用户资料
message User_info_request {
uint32 uid = 1; // 所要查询用户的uid
}
message User_info_response {
User_info user_info = 1;
}
复制代码
type = 3时,对应的是Base_msg类型,对应正儿八经的即时通信业务模块
type=4时,Notification_msg类型,对应推送模块,及服务器向客户端发送的通知
因为代码量还算比较大,就不贴了。你们本身看源码
将protobuf-swift库导入工程中,在Podfile中加上
pod 'ProtocolBuffers-Swift', '4.0.1'
复制代码
电脑上安装protobuf
brew install protobuf
复制代码
cd到.proto文件目录,编译出swift平台代码
protoc *.proto --swift_out="./"
复制代码
将获得的*.pb.swift文件导入到项目工程当中
以登陆请求的包体构建为例为例子
let loginReq = LoginRequest().setPwd(pwd).setNickName(user)
let bodyData = try body.build().data()
let user = try UserMsg.Builder().setCmd(.userCmdLogin).setBody(bodyData).build().data()
let comom = try Common.Builder().setMethod(.commonMethodUser).setBody(user).build()
let data = comom.data()
复制代码
4.2.1 示例代码对应的反序列化,应该是这样子的
do {
let comon = try Common.parseFrom(data:data)
switch comon.type {
case .commonMethodUser:
let user = try UserMsg.parseFrom(data:comon.body)
switch user.cmd {
case .userCmdLogin:
let login = try LoginRequest.parseFrom(data:user.body)
...
}
...
}
}catch let err {
print(err)
}
复制代码
不管序列化仍是反序列化,都要用到一个中间桥架的结构体
enum RTPMessageGenerates {
case ping
case request(Common?)
case message(Message?)
case notification(NotificationMsg?)
init?(type:UInt8,data:Data){
switch type {
case 1:
self = .ping
case 2:
let comon = try? Common.parseFrom(data:data)
self = .request(comon)
case 3:
let msg = Message(data: data)
self = .message(msg)
case 4:
let noti = try? NotificationMsg.parseFrom(data: data)
self = .notification(noti)
default:
return nil
}
}
var type : UInt8 {
switch self {
case .ping:
return 1
case .request(_):
return 2
case .message(_):
return 3
case .notification(_):
return 4
}
}
var data : Data? {
switch self {
case .ping:
return Data()
case .request(let req):
return req?.data()
case .message(let msg):
return msg?.data
case .notification(let noti):
return noti?.data()
}
}
}
复制代码
构建过程以下
func rtpData(seq:UInt32,body:RTPMessageGenerates)->Data?{
guard let bodyData = body.data else { return nil }
let header = BaseHeader(seq: seq, type: body.type, length: UInt16(bodyData.count)).toData()
let data = header + bodyData
return data
}
复制代码
解析过程略微复杂点,须要进行拆包处理
func parse(data:Data)->[(seq:UInt32,body:RTPMessageGenerates)]{
var curIndex : UInt16 = 0
var temp = [(seq:UInt32,body:RTPMessageGenerates)]()
while curIndex < data.count{
if curIndex+header_length > data.count {
break
}
let headData = data[curIndex..<curIndex+header_length]
if let header = BaseHeader(data: headData) {
let body = data[8..<8+header.length]
if let msg = RTPMessageGenerates(type: header.type,data: body){
temp.append((header.seq,msg))
}
curIndex += header.length + 8
}else{
break;
}
}
return temp
}
复制代码
环境配置,包含数据库及日志库环境
npm install log4js
npm install mysql
npm install google-protobuf
sudo npm install protobufjs
pm2 install pm2-intercom
复制代码
编译.proto文件
protoc --js_out=import_style=commonjs,binary:. *.proto
复制代码
将*_pb.js文件导入项目工程当中
须要导入对应模块文件
var builder = require("../impb/common_pb"),
Common = builder.Common;
var MethodType = builder.Common_method;
复制代码
try {
var datas = Uint8Array(body);
var common = new Common.deserializeBinary(datas);
var method = common.getMethod();
var body = common.getBody();
}catch (err){
console.log(err);
}
复制代码
须要留意如下几点:
_
都被转化为驼峰命名法;枚举类型全部字符都被转化为了大写var comon = new Common();
comon.setMethod(MethodType.COMMON_METHOD_RESPOND);
comon.setBody(respond.serializeBinary());
var resData = comon.serializeBinary();
复制代码
主要是serializeBinary()方法的使用。注意赋值的时候要用set方法。获得的是Uint8Array,若是要进行下一步操做须要转化为Buffer类型
完整数据包解析
var tempData = new Buffer(data)
while (tempData.length){
var header = data.slice(0,8)
var margic = header.readUInt8(0)
var seq = header.readUInt32BE(1)
var type = header.readUInt8(5)
var lenth = header.readUInt16BE(6)
var body = tempData.slice(8,lenth+8)
var lest = tempData.length - ( lenth + 8 )
logger.info("Receive data :" + "margic=" + margic + " seq=" + seq + " type=" + type + " legth=" + lenth )
var bodyData = new Uint8Array(body)
routeWithReiceData(type,header,bodyData)
if (lest.length > 0){
logger.info("Has one more data packetge");
tempData = data.slice(lenth+8,lest)
}else {
tempData = lest;
break
}
}
}
复制代码
数据包的构建
var margic = 129;
var lenth = body.length;
var header = new Buffer(8);
header.writeUInt8(margic);
header.writeUInt32BE(seq,1);
header.writeUInt8(type,5);
header.writeInt16BE(lenth,6);
var buf = Buffer(body);
var result = Buffer.concat([header,buf])
复制代码
因为存在NAT超时,咱们必要在长时间没有数据交互时,主动发送数据包,来维持TCP链接。根据一些博客资料,NAT的超时时间最低的在5分钟左右。关于这些,能够参考这篇文章
咱们设计的心跳间隔是3分钟。心跳由客户端控制,服务器只负责再收到心跳包以后原样返回。小心跳包的响应超时的时候,或重试三次,三次都失败证实与服务器链接中断。主动断开链接再尝试从新链接。
心跳包大小是8个字节,即一个只有包头,包体为空的tcp数据包。
客户端代码以下
extension SocketManager {
private var pingDuration : TimeInterval { return 60 * 3 }
static var reTryCount = 0;
private func sentPing(){
sentPing { (isSuc) in
if isSuc {
SocketManager.reTryCount = 0;
}else{
if SocketManager.reTryCount < 3 {
self.sentPing()
SocketManager.reTryCount += 1
}else{
// 三次失败 链接已经断开 断开再重连
self.disconnect()
self.reconect(){_ in }
}
}
}
}
private func sentPing(completion:@escaping (Bool)->()){
self.sent(msg: .ping, completion: SocketManager.SentMsgCompletion.ping(completion))
}
func stopPing(){
self.pingTimer?.invalidate()
self.pingTimer = nil;
}
func startPing(){
sentPing()
if pingTimer == nil {
pingTimer = Timer(timeInterval:pingDuration , repeats: true, block: {[weak self] (timer) in
self?.sentPing()
})
}
}
}
复制代码
服务器代码:
function routeWithReiceData(type,header,body) {
switch (type){
case 1:
// 收到心跳包原样返回 客户端控制发送频率 必要时断开重连
sock.write(data)
break;
}
}
复制代码
附上源码项目地址: