- 苏格团队
- 做者:Jonny
咱们知道,TCP是面向链接流传输的,其采用Nagle算法,在缓冲区对上层数据进行了处理。避免触发自动分片机制和网络上大量小数据包的同时也形成了粘包(小包合并)和半包(大包拆分)问题,致使数据没有消息保护边界,接收端接收到一次数据没法判断是不是一个完整数据包。那有什么方案能够解决这问题呢?node
很简单,既然消息没有边界,那咱们在消息往下传以前给它加一个边界识别就行了。算法
第一种方案不够灵活;第二种有风险,若是数据内恰好有该特殊字符会出问题;第三种方案虽然要增长对消息头的解析,不过相对而言仍是要安全一些。json
既然使用第三种方案,就必然涉及到封包和拆包的问题。缓存
首先确定须要定义数据包的结构,这相似Http包同样,有包头和包体。包头其实上是个大小固定的结构体,其中有个结构体成员变量表示包体的长度,其余的结构体成员可根据须要本身定义。根据包头长度固定以及包头中含有包体长度的变量就能正确的拆分出一个完整的数据包。包体则存放数据内容。安全
在发送端,须要进行封包。封包就是给一段数据加上包头,这样一来数据包就分为包头和包体两部份内容了。网络
在接受端,则须要进行拆包。主要流程以下:ui
其中对于缓冲区的设计,主要由俩种:编码
定义了消息结构以后,发送端和接收端还须要统一字节序。咱们知道,不一样机器的本机字节序不一样,绝大多数X86机器都是小端字节序,而后仍是由少数机器是大端存储的。所以在数据流进行传输时,必须先统一字节序。通常约定在传输时采用网络字节序(大端),统一用unicode编码。spa
了解以上知识以后,咱们如今以后要作什么了。发送端按定义的协议规则封包,接受端把接收到的buffer放入缓冲区,当缓冲区内有完整包时开始拆包。封包拆包过程须要注意,读写超过一个字节的数据时须要按大端字节序读取。下面看node的代码实现(只提供核心实现片断):设计
1)发送端封包:
let head = new Buffer(4);
let jsonStr = JSON.stringify(json);
let body = new Buffer(jsonStr);
//超过一字节的大端写入
head.writeInt32BE(body.byteLength, 0);
let buffer = Buffer.concat([head, body]);
复制代码
2)接收端收到buffer入缓冲区:
let dataReadStart = 0; //新数据的起始位置
let dataLength = buffer.length; // 要拷贝数据的长度
let availableLen = _bufferLength - _dataLen; // 缓冲区剩余可用空间
// buffer剩余空间不足够存储本次数据
if (availableLen < dataLength) {
let newLength = Math.ceil((_dataLen + dataLength) / _bufferLength) * _bufferLength;
let _tempBuffer = Buffer.alloc(newLength);
// 将旧数据复制到新buffer而且修正相关参数
if (_writePointer < _readPointer) { // 数据存储在旧buffer的尾部+头部的顺序
let dataTailLen = _bufferLength - _readPointer;
_buffer.copy(_tempBuffer, 0, _readPointer, _readPointer + dataTailLen);
_buffer.copy(_tempBuffer, dataTailLen, 0, _writePointer);
} else { // 数据是按照顺序进行的完整存储
_buffer.copy(_tempBuffer, 0, _readPointer, _writePointer);
}
_bufferLength = newLength;
_buffer = _tempBuffer;
_tempBuffer = null;
_readPointer = 0;
_writePointer = _dataLen;
//存储新到来的buffer
buffer.copy(_buffer, _writePointer, dataReadStart, dataReadStart + dataLength);
_dataLen += dataLength;
_writePointer += dataLength;
} else if (_writePointer + dataLength > _bufferLength) {
// 空间够用状况下,可是数据会冲破缓冲区尾部,部分存到缓冲区旧数据后,一部分存到缓冲区开始位置
// 缓冲区尾部剩余空间的长度
let bufferTailLength = _bufferLength - _writePointer;
// 数据尾部位置
let dataEndPosition = dataReadStart + bufferTailLength;
buffer.copy(_buffer, _writePointer, dataReadStart, dataEndPosition);
// data剩余未拷贝进缓存的长度
let restDataLen = dataLength - bufferTailLength;
buffer.copy(_buffer, 0, dataEndPosition, dataLength);
_dataLen = _dataLen + dataLength;
_writePointer = restDataLen
} else { // 剩余空间足够存储数据,直接拷贝数据到缓冲区
buffer.copy(_buffer, _writePointer, dataReadStart, dataReadStart + dataLength);
_dataLen = _dataLen + dataLength;
_writePointer = _writePointer + dataLength
}
复制代码
2)取出缓冲区全部完整数据包(收到的buffer入缓冲区后)
let _dataHeadLen = 4;
timer && clearInterval(timer);
timer = setInterval(()=>{
// 缓冲区数据不够解析出包头
if (_dataLen < _dataHeadLen) {
console.log('数据长度小于包头规定长度,等待数据......')
clearInterval(timer);
}
// 解析包头长度
// 尾部最后剩余可读字节长度
let restDataLen = _bufferLength - _readPointer;
let dataLen = 0;
let headBuffer = Buffer.alloc(_dataHeadLen);
// 数据包为分段存储,不能直接解析出包头,先拼接
if (restDataLen < _dataHeadLen) {
// 取出第一部分头部字节
_buffer.copy(headBuffer, 0, _readPointer, _bufferLength)
// 取出第二部分头部字节
let unReadHeadLen = _dataHeadLen - restDataLen;
_buffer.copy(headBuffer, restDataLen, 0, unReadHeadLen)
dataLen = headBuffer.readUInt32BE(0);
} else {
_buffer.copy(headBuffer, 0, _readPointer, _readPointer + _dataHeadLen);
dataLen = headBuffer.readUInt32BE(0);;
}
// 数据长度不够读取,直接返回
if (_dataLen - _dataHeadLen < dataLen) {
log.info("缓冲区已有body数据长度小于包头定义body的长度,等待数据......")
clearInterval(timer);
} else { // 数据够读,读取数据包
let package = Buffer.alloc(dataLen);
// 数据是分段存储,须要分两次读取
if (_bufferLength - _readPointer < dataLen) {
let firstPartLen = _bufferLength - _readPointer;
// 读取第一部分,直接到字符尾部的数据
_buffer.copy(package, 0, _readPointer, firstPartLen + _readPointer);
// 读取第二部分,存储在开头的数据
let secondPartLen = dataLen - firstPartLen;
_buffer.copy(package, firstPartLen, 0, secondPartLen);
_readPointer = secondPartLen; //更新可读起点
} else { // 直接读取数据
_buffer.copy(package, 0, _readPointer, _readPointer + dataLen);
_readPointer += dataLen; //更新可读起点
}
_dataLen -= readData.length; //更新数据长度
// 已经读取完全部数据
if (_readPointer === _writePointer) {
clearInterval(timer)
}
//开始解包
callback(package);
}
}, 50);
复制代码
4)拆包获得数据
let headBytes = 4;
let head = new Buffer(headBytes);
buffer.copy(head, 0, 0, headBytes);
let dataLen = head.readUInt32BE();
const body = new Buffer(dataLen);
buffer.copy(body, 0, headBytes, headBytes + dataLen)
let content = null;
try {
const str = body.toString('utf-8');
if(str === ''){
content = null;
}else{
content = JSON.parse(body);
}
} catch (e) {
log.error('head指定body长度有问题')
}
//传递给业务层
callback(content);
复制代码
从上面咱们已经了解到了封包解包的一个过程。TCP是可靠传输的,同一时间在网络上只会有一个数据包,而且丢包会重传,所以不用担忧丢包或者数据包乱序问题。UDP有消息保护边界,不须要进行拆包解包,而后其是非可靠传输,也须要解决其余一些问题,譬如丢包和数据包排序问题。
上面进行数据包结构设计时只是简单地加了一个包体长度,事实上在业务场景能够自由增长须要的字段,譬如协议版本,协议类型等等。