一个TCP长链接设备管理后台工程(一)
一个TCP长链接设备管理后台工程(二)
一个TCP长链接设备管理后台工程(三)
一个TCP长链接设备管理后台工程(四)
一个TCP长链接设备管理后台工程(五)git
Github仓库地址github
上面介绍了过滤器,过滤器实际就是一个可以处理粘包和拆包的解析器,和封包器的做用正好相反。可是封包器会很简单,由于封包没有粘包和拆包的处理。数据库
代码以下:segmentfault
//Packer is proto Packer api func Packer(msg Message) []byte { data := make([]byte, 0) tempbytes := codec.Word2Bytes(msg.HEADER.MID) data = append(data, tempbytes...) datalen := uint16(len(msg.BODY)) & 0x03FF datalen = datalen | 0x4000 tempbytes = utils.Word2Bytes(datalen) data = append(data, tempbytes...) data = append(data, msg.HEADER.Version) if len(msg.HEADER.PhoneNum) < 10 { data = append(data, make([]byte, 10-len(msg.HEADER.PhoneNum))...) data = append(data, msg.HEADER.PhoneNum...) } else { data = append(data, msg.HEADER.PhoneNum[:10]...) } tempbytes = utils.Word2Bytes(msg.HEADER.SeqNum) data = append(data, tempbytes...) if msg.HEADER.IsMulti() { data = append(data, utils.Word2Bytes(msg.HEADER.MutilFlag.MsgSum)...) data = append(data, utils.Word2Bytes(msg.HEADER.MutilFlag.MsgIndex)...) } data = append(data, msg.BODY...) csdata := byte(checkSum(data[:])) data = append(data, csdata) //添加头尾 var tmpdata []byte = []byte{0x7e} for _, item := range data { if item == 0x7d { tmpdata = append(tmpdata, 0x7d, 0x01) } else if item == 0x7e { tmpdata = append(tmpdata, 0x7d, 0x02) } else { tmpdata = append(tmpdata, item) } } tmpdata = append(tmpdata, 0x7e) return tmpdata }
处理器用来处理接收到的有效TCP数据包,它应该是比过滤器更上层的一个模块。由于咱们是用来管理TCP链接的,一个tcp链接表明着一个终端设备,这个终端设备有各类属性和操做逻辑,这些东西都是依附于TCP的长链接。咱们单独定义一个包来组织这部份内容:后端
package term
而咱们的处理器就存在于这个包中。因为这个模块是tcp数据的实际处理模块,因此会牵扯到许多相关连的包,好比前面的codec、proto等,还有数据库的操做。api
这一部分咱们主要只介绍处理器的逻辑。前面咱们说了,咱们要处理的包有:app
经过proto的filter咱们获得了各个Message,而且获取了其中的帧头信息,BODY部分尚未处理。而咱们的codec正是用来处理BODY部分的编/解码器。tcp
因此处理器的基本流程就是根据Message中Header信息,分别处理其Body数据,而后返回处理的结果。这个处理的结果每每就是须要响应的数据流。因此咱们的处理器函数的样子大概就是这样的:函数
func (t *Terminal) Handler(msg proto.Message) []byte{ }
传入一个Message,入后输出须要响应的数据,若是返回nil则代表没有数据须要响应。ui
其中Terminal这个结构体咱们在后端模型这个装接中有说起到:
type Terminal struct { authkey string imei string iccid string vin string tboxver string loginTime time.Time seqNum uint16 phoneNum []byte Conn net.Conn Engine *xorm.Engine Ch chan int }
同时为了使用codec的序列化和反序列化,咱们还须要定义以下结构体:
type TermAckBody struct { AckSeqNum uint16 AckID uint16 AckResult uint8 } type PlatAckBody struct { AckSeqNum uint16 AckID uint16 AckResult uint8 } type RegisterBody struct { ProID uint16 CityID uint16 ManufID []byte `len:"11"` TermType []byte `len:"30"` TermID []byte `len:"30"` LicPlateColor uint8 LicPlate string } type RegisterAckBody struct { AckSeqNum uint16 AckResult uint8 AuthKey string } type AuthBody struct { AuthKeyLen uint8 AuthKey string Imei []byte `len:"15"` Version []byte `len:"20"` } type GPSInfoBody struct { WarnFlag uint32 State uint32 Lat uint32 Lng uint32 Alt uint16 Speed uint16 Dir uint16 Time []byte `len:"6"` } type CtrlBody struct { Cmd uint8 Param string }
下面就来正式讲解Handler的实现。
首先获取保存Header中的电话号和流水号到Terminal中:
if t.phoneNum == nil { t.phoneNum = make([]byte, 10) } copy(t.phoneNum, []byte(msg.HEADER.PhoneNum)) t.seqNum = msg.HEADER.SeqNum
而后经过switch来匹配消息id,并对其body部分作相关处理:
switch msg.HEADER.MID { case proto.TermAck: // case proto.Register: // case proto.Login: // case proto.Heartbeat: // case proto.Gpsinfo: // } return nil
咱们先说注册,咱们使用帧头中的手机号,在数据库中查找对应的鉴权码。而后从msg中获取body部分,经过codec反序列话获得RegisterBody实例。为了简单,咱们此处不作其余数据验证,直接作出数据响应便可。生成须要响应的RegisterAckBody实例,而后序列化为body切片,而后生成响应的Message,再经过封包器封包为数据流返回:
devinfo := new(DevInfo) devinfo.PhoneNum = strings.TrimLeft(utils.HexBuffToString(t.phoneNum), "0") is, _ := t.Engine.Get(devinfo) if !is { return []byte{} } var reg RegisterBody _, err := codec.Unmarshal(msg.BODY, ®) if err != nil { fmt.Println("err:", err) } var body []byte body, err = codec.Marshal(&RegisterAckBody{ AckSeqNum: msg.HEADER.SeqNum, AckResult: 0, AuthKey: devinfo.Authkey, }) if err != nil { fmt.Println("err:", err) } msgAck := proto.Message{ HEADER: proto.Header{ MID: proto.RegisterAck, Attr: proto.MakeAttr(1, false, 0, uint16(len(body))), Version: 1, PhoneNum: string(t.phoneNum), SeqNum: t.seqNum, }, BODY: body, } return proto.Packer(msgAck)
上面有涉及到数据库的查询操做,这部分使用了xorm,具体的参考xorm官方文档:xorm官方文档
上面涉及一个utils.HexBuffToString函数,这个函数会将字符串转换为16进制格式的字符串,自己是基于strconv.FormatUint(uint64(value), 16)完成的,可是这个函数会没有办法指定转换后的填充值,好比0x0A会直接转换成"A"而不是"0A",因此须要作一点特殊处理:
func HexBuffToString(hex []byte) string { var ret string for _, value := range hex { str := strconv.FormatUint(uint64(value), 16) if len([]rune(str)) == 1 { ret = ret + "0" + str } else { ret = ret + str } } return ret }
Handler其余部分的流程大致差很少,就不作过多讲解了,完整代码:
//Handler is proto Handler api func (t *Terminal) Handler(msg proto.Message) []byte { if t.phoneNum == nil { t.phoneNum = make([]byte, 10) } copy(t.phoneNum, []byte(msg.HEADER.PhoneNum)) t.seqNum = msg.HEADER.SeqNum switch msg.HEADER.MID { case proto.TermAck: reqID := codec.Bytes2Word(msg.BODY[2:4]) if reqID == proto.UpdateReq { //ch <- 1 //升级命令 } case proto.Register: devinfo := new(DevInfo) devinfo.PhoneNum = strings.TrimLeft(utils.HexBuffToString(t.phoneNum), "0") is, _ := t.Engine.Get(devinfo) if !is { return []byte{} } var reg RegisterBody _, err := codec.Unmarshal(msg.BODY, ®) if err != nil { fmt.Println("err:", err) } var body []byte body, err = codec.Marshal(&RegisterAckBody{ AckSeqNum: msg.HEADER.SeqNum, AckResult: 0, AuthKey: devinfo.Authkey, }) if err != nil { fmt.Println("err:", err) } msgAck := proto.Message{ HEADER: proto.Header{ MID: proto.RegisterAck, Attr: proto.MakeAttr(1, false, 0, uint16(len(body))), Version: 1, PhoneNum: string(t.phoneNum), SeqNum: t.seqNum, }, BODY: body, } return proto.Packer(msgAck) case proto.Login: var auth AuthBody _, err := codec.Unmarshal(msg.BODY, &auth) if err != nil { fmt.Println("err:", err) } t.authkey = auth.AuthKey t.imei = string(auth.Imei) t.tboxver = string(auth.Version) var body []byte body, err = codec.Marshal(&PlatAckBody{ AckSeqNum: msg.HEADER.SeqNum, AckID: msg.HEADER.MID, AckResult: 0, }) if err != nil { fmt.Println("err:", err) } msgAck := proto.Message{ HEADER: proto.Header{ MID: proto.PlatAck, Attr: proto.MakeAttr(1, false, 0, uint16(len(body))), Version: 1, PhoneNum: string(t.phoneNum), SeqNum: t.seqNum, }, BODY: body, } return proto.Packer(msgAck) case proto.Heartbeat: var err error var body []byte body, err = codec.Marshal(&PlatAckBody{ AckSeqNum: msg.HEADER.SeqNum, AckID: msg.HEADER.MID, AckResult: 0, }) if err != nil { fmt.Println("err:", err) } msgAck := proto.Message{ HEADER: proto.Header{ MID: proto.PlatAck, Attr: proto.MakeAttr(1, false, 0, uint16(len(body))), Version: 1, PhoneNum: string(t.phoneNum), SeqNum: t.seqNum, }, BODY: body, } return proto.Packer(msgAck) case proto.Gpsinfo: var gpsInfo GPSInfoBody _, err := codec.Unmarshal(msg.BODY, &gpsInfo) if err != nil { fmt.Println("err:", err) } gpsdata := new(GPSData) gpsdata.Imei = t.imei gpsdata.Stamp = time.Now() gpsdata.WarnFlag = gpsInfo.WarnFlag gpsdata.State = gpsInfo.State gpsdata.Latitude = gpsInfo.Lat gpsdata.Longitude = gpsInfo.Lng gpsdata.Altitude = gpsInfo.Alt gpsdata.Speed = gpsInfo.Speed gpsdata.Direction = gpsInfo.Dir if (gpsdata.State & 0x00000001) > 0 { gpsdata.AccState = 1 } else { gpsdata.AccState = 0 } if (gpsdata.State & 0x00000002) > 0 { gpsdata.GpsState = 1 } else { gpsdata.GpsState = 0 } _, err = t.Engine.Insert(gpsdata) if err != nil { fmt.Println("insert gps err:", err) } var body []byte body, err = codec.Marshal(&PlatAckBody{ AckSeqNum: msg.HEADER.SeqNum, AckID: msg.HEADER.MID, AckResult: 0, }) if err != nil { fmt.Println("err:", err) } msgAck := proto.Message{ HEADER: proto.Header{ MID: proto.PlatAck, Attr: proto.MakeAttr(1, false, 0, uint16(len(body))), Version: 1, PhoneNum: string(t.phoneNum), SeqNum: t.seqNum, }, BODY: body, } return proto.Packer(msgAck) } return nil }