一个TCP长链接设备管理后台工程(一)
一个TCP长链接设备管理后台工程(二)
一个TCP长链接设备管理后台工程(三)
一个TCP长链接设备管理后台工程(四)git
Github仓库地址github
帧过滤器的做用就是,从接收到的buff中,过滤出有效的完整jtt808数据包。因为是tcp通信,那么这其中不可避免的会涉及到数据包的两个常规处理:拆包和粘包。segmentfault
拆包和粘包的简要说明:api
假设客户端分别发送了两个数据包D1和D2给服务端,因为服务端一次读取到的字节数是不肯定的,故可能存在如下4种状况。 (1)服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包; (2)服务端一次接收到了两个数据包,D1和D2粘合在一块儿,被称为TCP粘包; (3)服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部份内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包; (4)服务端分两次读取到了两个数据包,第一次读取到了D1包的部份内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。 若是此时服务端TCP接收滑窗很是小,而数据包D1和D2比较大,颇有可能会发生第五种可能,即服务端分屡次才能将D1和D2包接收彻底,期间发生屡次拆包。
拆包与粘包的说明网上资料不少,此处不作过多说明。可是咱们设计出来的过滤器,要可以正常应对拆包和粘包的状况。app
首先,咱们根据jtt808协议定义咱们的数据包结构:tcp
type MultiField struct { MsgSum uint16 MsgIndex uint16 } type Header struct { MID uint16 Attr uint16 Version uint8 PhoneNum string SeqNum uint16 MutilFlag MultiField } type Message struct { HEADER Header BODY []byte }
因为Attr实际上是由多个位域字段组成,因此咱们再定义三个函数:函数
func (h *Header) IsMulti() bool { if ((h.Attr >> 12) & 0x0001) > 0 { return true } return false } //BodyLen is a function for get body len func (h *Header) BodyLen() int { return int(h.Attr & 0x03ff) } //MakeAttr is generate attr func MakeAttr(verFlag byte, mut bool, enc byte, lens uint16) uint16 { attr := lens & 0x03FF if verFlag > 0 { attr = attr & 0x4000 } if mut { attr = attr & 0x2000 } encMask := (uint16(enc) & 0x0007) << 10 return attr + encMask }
因为要考虑拆包和粘包问题,因此咱们的过滤器须要可以同时分析多包数据,可是基本单元函数分析一帧数据,因此咱们先实现一帧数据的过滤器:filterSigleui
咱们想要的过滤器原型是以下的一个函数:设计
func filterSigle(data []byte) (Message, int, error)
该函数接收一个存有从tcp端接收的数据流切片,须要返回咱们解析出来的Message、解析事后消耗的字节数,错误信息。code
很明显,这个返回的消耗字节数就是为了应对拆包和粘包用的。
咱们知道jtt808协议是以0x7e开始和结尾的,咱们定义一个常量:
const ( ProtoHeader byte = 0x7e )
filterSigle的第一个逻辑就是要识别帧头和帧尾了:
var usedLen int = 0 startindex := bytes.IndexByte(data, ProtoHeader) if startindex >= 0 { usedLen = startindex + 1 endindex := bytes.IndexByte(data[usedLen:], ProtoHeader) if endindex >= 0 { endindex = endindex + usedLen } }
此处的帧头和帧尾索引均相对于data的起始字节而言。理想的状况下,在startindex和endindex之间的数据就是咱们须要解析的数据,因此以后的解析都是对这部分数据进行分析。咱们对这部分的逻辑单独用一个函数来处理,这个函数主要完成三个逻辑:转义、校验检查和解析
func frameParser(data []byte) (Message, error) { }
data入参从startindex到endindex,不包括startindex和endindex。
在转义以前,首先判断基本的长度。经过对帧头固定字段分析能够知道,消息头部分的长度为17或者19个字节,加上帧头帧尾校验位的话最小就是17+3=20个字节,鉴于BODY部分可能为空,因此整个帧最小长度应该为20字节:
if len(data)+2 < 17+3 { return Message{}, fmt.Errorf("header is too short") }
转义比较简单,为了代码复用,定义成一个函数:
func Escape(data, oldBytes, newBytes []byte) []byte { buff := make([]byte, 0) var startindex int = 0 for startindex < len(data) { index := bytes.Index(data[startindex:], oldBytes) if index >= 0 { buff = append(buff, data[startindex:index]...) buff = append(buff, newBytes...) startindex = index + len(oldBytes) } else { buff = append(buff, data[startindex:]...) startindex = len(data) } } return buff }
调用:
//不包含帧头帧尾 frameData := Escape(data[:len(data)], []byte{0x7d, 0x02}, []byte{0x7e}) frameData = Escape(frameData, []byte{0x7d, 0x01}, []byte{0x7d})
校验就是简单的异或校验,从消息头到消息体结束,即data[:len(data)-1]
func checkSum(data []byte) byte { var sum byte = 0 for _, itemdata := range data { sum ^= itemdata } return sum }
调用:
rawcs := checkSum(frameData[:len(frameData)-1]) if rawcs != frameData[len(frameData)-1] { return Message{}, fmt.Errorf("cs is not match:%d--%d", rawcs, frameData[len(frameData)-1]) }
而后就是对frameData中的具体数据进行解析了:
var usedLen int = 0 var msg Message msg.HEADER.MID = codec.Bytes2Word(frameData[usedLen:]) usedLen = usedLen + 2 msg.HEADER.Attr = codec.Bytes2Word(frameData[usedLen:]) usedLen = usedLen + 2 msg.HEADER.Version = frameData[usedLen] usedLen = usedLen + 1
注意usedLen要跟着实时变化。
手机号固定为10个字节,不足的话前面会填充0,因此咱们要把前面无效的0去掉,使用bytes.TrimLeftFunc:
tempPhone := bytes.TrimLeftFunc(frameData[usedLen:usedLen+10], func(r rune) bool { return r == 0x00 }) msg.HEADER.PhoneNum = string(tempPhone) usedLen = usedLen + 10 msg.HEADER.SeqNum = codec.Bytes2Word(frameData[usedLen:]) usedLen = usedLen + 2
同时还要对多帧的状况进行判断:
if msg.HEADER.IsMulti() { msg.HEADER.MutilFlag.MsgSum = codec.Bytes2Word(frameData[usedLen:]) usedLen = usedLen + 2 msg.HEADER.MutilFlag.MsgIndex = codec.Bytes2Word(frameData[usedLen:]) usedLen = usedLen + 2 }
再次对usedLen长度判断一下,避免超过界限:
if len(frameData) < usedLen { return Message{}, fmt.Errorf("flag code is too short") }
处理到上面的地方后,接着的就是BODY部分了,直接copy对应的长度,长度为:
len(frameData)-usedLen
逻辑以下
msg.BODY = make([]byte, len(frameData)-usedLen) copy(msg.BODY, frameData[usedLen:len(frameData)]) usedLen = len(frameData) return msg, nil
到此正常的流程就走完了。
调用:
msg, err := frameParser(frameData)
当返回错误时,返回的长度值应该为endindex,即不包括endindex处对应的0x7e。由于这个0x7e多是后面数据的帧头。
msg, err := frameParser(data[startindex+1 : endindex]) if err != nil { return Message{}, endindex, err } return msg, endindex + 1, nil
对于
if endindex >= 0
条件不符合的,说明没有找到帧尾,能够包帧头前面的去掉了,可是帧头和帧头后面的数据要保留,用来跟以后的数据流拼接。
return Message{}, startindex, fmt.Errorf("can't find end flag")
对于
if startindex >= 0
条件不符合的,说明没有找到帧头,那就是整个帧都是无效的:
return Message{}, len(data), fmt.Errorf("can't find start flag")
这样就实现了一个单帧的过滤器。接着咱们在单帧过滤器的基础上来实现多帧过滤器。
咱们只须要对数据流进行单帧过滤,而后返回消耗的字节数。若是消耗了必定字节数后,还有剩余的字节,咱们再对这些字节进行单帧过滤。依次循环,直到字节数消耗完或者发生错误。
全部循环结束后,咱们还须要将剩余的字节数保留,用来跟下一次的数据流进行拼接。函数实现以下:
//Filter is proto Filter api func Filter(data []byte) ([]Message, int, error) { var usedLen int = 0 msgList := make([]Message, 0) var cnt int = 0 for { cnt++ if cnt > 10 { return []Message{}, 0, fmt.Errorf("time too much") } if usedLen >= len(data) { break } msg, lens, err := filterSigle(data[usedLen:]) if err != nil { usedLen = usedLen + lens fmt.Println("err:", err) return msgList, usedLen, nil } usedLen = usedLen + lens msgList = append(msgList, msg) } return msgList, usedLen, nil }
整个过滤器完整实现:
package proto import ( "bytes" "fmt" "tsp/codec" "tsp/utils" ) const ( ProtoHeader byte = 0x7e ) type MultiField struct { MsgSum uint16 MsgIndex uint16 } type Header struct { MID uint16 Attr uint16 Version uint8 PhoneNum string SeqNum uint16 MutilFlag MultiField } func (h *Header) IsMulti() bool { if ((h.Attr >> 12) & 0x0001) > 0 { return true } return false } //BodyLen is a function for get body len func (h *Header) BodyLen() int { return int(h.Attr & 0x03ff) } //MakeAttr is generate attr func MakeAttr(verFlag byte, mut bool, enc byte, lens uint16) uint16 { attr := lens & 0x03FF if verFlag > 0 { attr = attr & 0x4000 } if mut { attr = attr & 0x2000 } encMask := (uint16(enc) & 0x0007) << 10 return attr + encMask } //Message is struct for message for jtt808 type Message struct { HEADER Header BODY []byte } func Version() string { return "1.0.0" } func Name() string { return "jtt808" } //Filter is proto Filter api func Filter(data []byte) ([]Message, int, error) { var usedLen int = 0 msgList := make([]Message, 0) var cnt int = 0 for { //添加一个计数器,防止数据异常致使死循环 cnt++ if cnt > 10 { cnt = 0 return []Message{}, 0, fmt.Errorf("time too much") } if usedLen >= len(data) { break } msg, lens, err := filterSigle(data[usedLen:]) if err != nil { usedLen = usedLen + lens fmt.Println("err:", err) return msgList, usedLen, nil } usedLen = usedLen + lens msgList = append(msgList, msg) } return msgList, usedLen, nil } func filterSigle(data []byte) (Message, int, error) { var usedLen int = 0 startindex := bytes.IndexByte(data, ProtoHeader) if startindex >= 0 { usedLen = startindex + 1 endindex := bytes.IndexByte(data[usedLen:], ProtoHeader) if endindex >= 0 { endindex = endindex + usedLen msg, err := frameParser(data[startindex+1 : endindex]) if err != nil { return Message{}, endindex, err } return msg, endindex + 1, nil } return Message{}, startindex, fmt.Errorf("can't find end flag") } return Message{}, len(data), fmt.Errorf("can't find start flag") } func Escape(data, oldBytes, newBytes []byte) []byte { buff := make([]byte, 0) var startindex int = 0 for startindex < len(data) { index := bytes.Index(data[startindex:], oldBytes) if index >= 0 { buff = append(buff, data[startindex:index]...) buff = append(buff, newBytes...) startindex = index + len(oldBytes) } else { buff = append(buff, data[startindex:]...) startindex = len(data) } } return buff } func frameParser(data []byte) (Message, error) { if len(data)+2 < 17+3 { return Message{}, fmt.Errorf("header is too short") } //不包含帧头帧尾 frameData := Escape(data[:len(data)], []byte{0x7d, 0x02}, []byte{0x7e}) frameData = Escape(frameData, []byte{0x7d, 0x01}, []byte{0x7d}) //以后的操做都是基于frameData来处理 rawcs := checkSum(frameData[:len(frameData)-1]) if rawcs != frameData[len(frameData)-1] { return Message{}, fmt.Errorf("cs is not match:%d--%d", rawcs, frameData[len(frameData)-1]) } var usedLen int = 0 var msg Message msg.HEADER.MID = codec.Bytes2Word(frameData[usedLen:]) usedLen = usedLen + 2 msg.HEADER.Attr = codec.Bytes2Word(frameData[usedLen:]) usedLen = usedLen + 2 msg.HEADER.Version = frameData[usedLen] usedLen = usedLen + 1 tempPhone := bytes.TrimLeftFunc(frameData[usedLen:usedLen+10], func(r rune) bool { return r == 0x00 }) msg.HEADER.PhoneNum = string(tempPhone) usedLen = usedLen + 10 msg.HEADER.SeqNum = codec.Bytes2Word(frameData[usedLen:]) usedLen = usedLen + 2 if msg.HEADER.IsMulti() { msg.HEADER.MutilFlag.MsgSum = codec.Bytes2Word(frameData[usedLen:]) usedLen = usedLen + 2 msg.HEADER.MutilFlag.MsgIndex = codec.Bytes2Word(frameData[usedLen:]) usedLen = usedLen + 2 } if len(frameData) < usedLen { return Message{}, fmt.Errorf("flag code is too short") } msg.BODY = make([]byte, len(frameData)-usedLen) copy(msg.BODY, frameData[usedLen:len(frameData)]) usedLen = len(frameData) return msg, nil }