一个项目,要接收 UDP 数据包,解析并获取其中的数据,主要根据解析出来的行号和序号将数据拼接起来,而后将拼接起来的数据(最重要的数据是 R、G、B 三个通道的像素值)显示在窗口中。考虑到每秒钟要接收的数据包的数量较大,Python 的处理速度可能没有那么快,并且以前对 Qt 也比较熟悉了,因此用Qt 做为客户端接收处理数据包,用近期学习的 Python 模拟发送数据包。python
在 TCP/IP 协议中,UDP 数据包的大小是由限制的,所以用 UDP 传输数据时,还要在 UDP 层上再封装一层自定义的协议。这个自定义的协议比较简单,每一个 UDP 包的大小为 1432 个字节,分为几个部分:c++
部分 | 起始字节 | 字节长度 | 说明 |
---|---|---|---|
Start | 0 | 4 | 包头部的 Magic Number,设为 0x53746172 |
PartialCnt | 4 | 1 | 分包总数,一个字节(0-255)之内 |
PartialIdx | 5 | 1 | 分包序号 |
SampleLine | 6 | 1 | 采样率 |
RGB | 7 | 1 | rgb 通道标识符 |
LineIdx | 8 | 4 | 行号,每一行能够包含 RGB 三个通道的数据,每一个通道由多个分包组成 |
ValidDataLen | 12 | 4 | 数据部分有效字节数 |
LineBytes | 16 | 4 | 每行数据包含的字节总数 |
Reserve | 20 | 128 | 保留部分 |
Data | 148 | 1280 | 数据部分 |
end | 1428 | 4 | 包尾部的 Magic Number,设为 0x54456e64 |
上述表格描述的就是一个完整的 UDP 包。这里的一个 UDP 数据包包含的是 RGB 某个通道的某一部分的数据。换种说法:git
因此要生成/解析 UDP 包,最重要的是 PartialCnt、PartialIdx、RGB、LineIdx、Data 这几个部分。清楚了自定义协议就能够开始编写模拟包的生成和相应的接收逻辑了。github
因为本地开发的时候缺乏必要的硬件环境,为了方便开发,用 Python 编写一个简单的 UDPServer,发送模拟生成的数据包。根据上述协议,能够写出以下的 CameraData 类来表示 UDP 数据包:ubuntu
# -*- coding: utf-8 -*- DATA_START_MAGIC = bytearray(4) DATA_START_MAGIC[0] = 0x53 # S DATA_START_MAGIC[1] = 0x74 # t DATA_START_MAGIC[2] = 0x61 # a DATA_START_MAGIC[3] = 0x72 # r DATA_END_MAGIC = bytearray(4) DATA_END_MAGIC[0] = 0x54 # T DATA_END_MAGIC[1] = 0x45 # E DATA_END_MAGIC[2] = 0x6e # n DATA_END_MAGIC[3] = 0x64 # d slice_start_magic = slice(0, 4) slice_partial_cnt = 4 slice_partial_idx = 5 slice_sample_line = 6 slice_rgb_extern = 7 slice_line_idx = slice(8, 12) slice_valid_data_len = slice(12, 16) slice_line_bytes = slice(16, 20) slice_resv = slice(20, 148) slice_data = slice(148, 1428) slice_end_magic = slice(1428, 1432) import numpy as np class CameraData(object): def __init__(self): # self.new() # self.rawdata = rawdata self.dataLow = 10 self.dataHigh = 20 self.new() def genRandomByte(self, by=4): r = bytearray(by) for i in range(by): r[i] = np.random.randint(0, 255) def setPackageIdx(self, i = 0): self.rawdata[slice_partial_idx] = i def setRGB(self, c = 1): self.rawdata[slice_rgb_extern] = c def setLineIdx(self, line): start = slice_line_idx.start self.rawdata[start+3] = 0x000000ff & line self.rawdata[start+2] = (0x0000ff00 & line) >> 8 self.rawdata[start+1] = (0x00ff0000 & line) >> 16 self.rawdata[start+0] = (0xff000000 & line) >> 24 def setValidDataLen(self, len): start = slice_valid_data_len.start self.rawdata[start+3] = 0x000000ff & len self.rawdata[start+2] = (0x0000ff00 & len) >> 8 self.rawdata[start+1] = (0x00ff0000 & len) >> 16 self.rawdata[start+0] = (0xff000000 & len) >> 24 def setLineBytes(self, len): start = slice_line_bytes.start self.rawdata[start+3] = 0x000000ff & len self.rawdata[start+2] = (0x0000ff00 & len) >> 8 self.rawdata[start+1] = (0x00ff0000 & len) >> 16 self.rawdata[start+0] = (0xff000000 & len) >> 24 def randomData(self): size = slice_data.stop - slice_data.start arr = np.random.randint(self.dataLow, self.dataHigh, size, dtype=np.uint8) self.rawdata[slice_data] = bytearray(arr) def new(self): """构造新的数据对象 """ self.rawdata = bytearray(1432) self.rawdata[slice_start_magic] = DATA_START_MAGIC self.rawdata[slice_partial_cnt] = 0x02 self.rawdata[slice_partial_idx] = 0x00 self.rawdata[slice_sample_line] = 0x03 self.rawdata[slice_rgb_extern] = 0x01 self.setLineIdx(0x00) self.setValidDataLen(1280) self.setLineBytes(1432) self.randomData() self.rawdata[slice_end_magic] = DATA_END_MAGIC def hex(self): return self.rawdata.hex() def __repr__(self): return '<CameraData@{} hex len: {}>'.format(hex(id(self)), len(self.rawdata))
CameraData 中的 rawdata 是一个 bytearray 对象,它将会被 UdpServer 经过网络接口发送出去。设置 4 个字节大小的整数时(如写 LineIdx 行号),不能直接将数值赋到 rawdata 中,要将其中的 4 个字节分别赋值到对应的地址上才行。windows
CameraData 中的 randomData 方法是模拟随机数据,更好的作法不是彻底随机给每一个像素点赋值,而是有规律的变化,这样在接收数据出现问题、分析问题的时候能够直观地看到哪里有问题。数组
而后咱们须要定义一个 UdpServer,用它来将数据对象中包含的信息发送出去。缓存
import socket class UdpServer( object ): """该类功能是处理底层的 UDP 数据包发送和接收,利用队列缓存全部数据 """ def __init__(self, *args, **kwargs): self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self._sock.bind( ('', DATA_PORT+11 ) ) self._sock.settimeout( None ) # never timeout # self._sock.setblocking( 0 ) # none block def send_msg( self, msg ): """发送消息, @param msg 字典对象,发送 msg 的 rawdata 字段 """ self._sock.sendto( msg.rawdata, ('192.168.8.1', DATA_PORT))
这个 UdpServer 很是简单,由于后续会经过这个 UdpServer 不停的发包,可是每次发包必须等待发送端成功将 UDP 包发送出去,这里不要将 socket 对象设置成非阻塞的,不然程序运行时会出现错误提示(尽管能够忽略掉这个错误提示,可是不必设置成非阻塞的,阻塞模式彻底足够了)。网络
在 github 中能够找到完整的 Python 文件,里面定义了其余类,如 DataSender
、RGBSender
。DataSender
是在一个线程里面发送 RGB 三个通道的值,RGBSender
的一个对象只会发送 RGB 三个通道中的某一个的值。多线程
在本地测试的时候,为了方便在任务管理器中看到网络占用率,最初是在 VirtualBox 的 ubuntu 虚拟机上运行这个 Python 程序的,可是受到虚拟机的资源分配和电脑性能影响,调用 singleMain
函数时每秒钟最多只能产生 50MB 的数据量。可是在本地非虚拟机环境运行的时候最多能够达到 80MB 的数据量。因此尽量地使用本地环境运行该 Python 程序能够最大限度的生成数据包。
若是让 RGB 三个通道分别在三个不一样的进程中执行发送过程(注释掉 singleMain
的调用,换用 multiSend
方法),那么每秒钟的数据量可到 200MB,不过 80MB 的数据量已经足够多了(接近千兆网卡的上限了,网络利用率太高的话经过网线传输时会出现严重丢包的状况),不须要使用 multiSend
方法加大数据量。
在 singleMain 方法中,不直接执行 dataSender.serve()
,而是在新进程中执行,能够更好的利用多核优点,发送数据更快:
# singleMain() dataSender = DataSender() # dataSender.serve() p = Process(target=dataSender.serve) p.start()
实际开发过程并非这么顺利,由于一开始并不知道在大量数据发送的时候,发送端可否有效地将数据发送出去,其实是边编写 Python 的模拟发送数据程序,边编写 Qt 获取数据的程序,根据出现的问题逐步解决发送端和接收端的问题的。
Qt 这边做为客户端,只须要将接收到的数据包保存下来,获取其中的有效数据,再将 RGB 数据赋到 QImage 对应的像素上显示出来便可。GUI 部分比较简单,使用 QWidget 中的 label 控件,将 QImage 转换成 QPixmap,显示到 label 上就行了。初始化后的窗口如图:
比较麻烦的是接收数据和拼接。一样地,为了方便表示和解析每一个 UDP 包,咱们构造一些类来存储这些信息(如今想一想彷佛直接用结构体表示会更简单)。
咱们在 Qt 中定义 CameraData
类来表示数据包实体:
/** * @brief The CameraData class * 对应从下位机接收到的字节数组的类,原始数据包,须要通过处理后变成一行数据 */ class CameraData : public DataObj { Q_OBJECT public: enum RGBType { R = 1, G = 2, B = 3, UNKOWN = 0 }; static const QByteArray DATA_START_MAGIC; static const QByteArray DATA_END_MAGIC; static const int PacketSize; explicit CameraData(QObject *parent = 0); ~CameraData(); bool isPackageValid(); // 获取保留区域的数据 QByteArray getReserved(); // 设置原始数据 void setRawData(const QByteArray &value); void setRawData(const char *data); // 获取数据区域内的全部数据,默认获取有效数据 QByteArray getData(bool valid = true); int getPackageCntInLine(); int getPackageIdxInLine(); int getSampleDiffLine(); int getRGBExtern(); RGBType getRGBType(); int getLineIdx(); int getValidDataLen(); int getLineBytes(); int sliceToInt(int start, int len = 4); // DataObj interface void reset(); signals: public slots: private: inline QByteArray slice(int start, int len = -1); inline QByteArray getStartMagic(); inline QByteArray getEndMagic(); QByteArray data; int packageCntInLine = -1; int packegeIdxInLine = -1; int lineIdx = -1; int lineBytes = -1; int rgbType = -1; };
CameraData
类继承自 DataObj
类,而 DataObj
类又继承自 QObject
,这样方便进行内存管理和对象上的操做。DataObj
是为了方便复用对象而定义的基类,详细代码可参考 github 上的完整代码。
C++ 部分的 CameraData
类与 Python 中定义的 CameraData
类是对应的,不过 C++ 部分的 CameraData
类只须要调用 CameraData::setRawData
传入一个 QByteArray 对象后就能够自动将其中包含的数据解析出来,而且它只提供获取数据的接口而不提供修改数据的接口。
另外咱们还须要定义一个类 PreProcessData,来表示一行数据:
/** * @brief The PreProcessData class * 预处理数据 */ class PreProcessData: public DataObj { Q_OBJECT public: static const int PacketSize; static const int PacketPerLine; explicit PreProcessData(QObject *parent = 0, int line = -1); void put(CameraData *cd); bool isReady(); void reset(); int line() const; void setLine(int line); const QByteArrayList &getDataList() const; QByteArray repr(); private: /** * @brief cameraData * 每 2 个 CameraData 构成一行的单通道数据,有序存放 RGB 通道数据 * 0-1 存放 R,2-3 存放 G, 4-5 存放 B */ QByteArrayList dataList; int m_line; int m_readyCount = 0; int m_duplicateCount = 0; bool *dataPlaced = 0; };
目前的协议中,每 2 个数据包(对应 2 个 CameraData
对象)构成某一行的单通道数据,因此 PreProcessData
中至少会包含 6 个 CameraData
对象,处理完 CameraData
对象后,只须要存储 Data 部分便可,因此这里没有用 QList
QByteArrayList
来存储数据。当三个通道的数据都准备好后,
PreProcessData::isReady
就会返回 true,表示该行数据已经准备好,能够显示在窗口中。
咱们定义一个 Controller
类用来操做数据接收对象和子线程。用 Qt 的事件槽机制和 QObject::moveToThread
实现多线程很是方便,不重写 QThread 的 run 方法就可让对象的方法在子线程中执行。
class Controller : public QObject { Q_OBJECT public: explicit Controller(QObject *parent = 0); ~Controller(); static const int DataPort; static const int CONTROL_PORT; static const QStringList BOARD_IP; void start(); void stop(); DataProcessor *getDataProcessor() const; signals: public slots: private: CameraDataReceiver *cdr; QThread recvThread; QThread recvProcessThread; QByteArrayList rawdataList; DataProcessor *dp = 0; QTimer *statsTimer; int statsInterval; };
其中 CameraDataReceiver
对象会被实例化,在子线程中接收 UDP 数据包(由于发送和接收数据的端口是不一样的,操做和数据是分离的)。这里将 DataProcessor 经过 getDataProcessor
暴露给上层应用,以便上层应用链接信号槽接收图像。仅到接收数据,就用到了三个线程:分别是 GUI 线程,用于接收 UDP 包的 recvThread 线程和处理 UDP 的 recvProcessThread。
为何接收 UDP 包和处理 UDP 包不是放在一个线程中执行呢?由于这里的数据量实在太多,最开始实现的时候这两个逻辑代码确实是在同一个线程中执行,然而因为处理数据的代码执行起来也要消耗时间,将会致使没法接收其余的 UDP 包,这样的话就会致使比较严重的丢包。为了保证接收端不会丢包,只好将处理逻辑放在其余的线程中执行。
将接收数据和处理数据放在不一样的线程中执行,确实能够解决丢包问题了,可是会出现新的问题:接收到的包若是不可以及时处理完,而且释放掉相应的资源,那么可能会出现程序将数据缓存下来但没法处理,程序占用的内存愈来愈大,致使程序运行起来愈来愈慢。
在编写程序时误觉得是 Qt 的事件循环机制过慢致使程序处理不了那么多数据(实际上它的速度足够处理这些数据),所以将程序中使用的 QUdpSocket 对象换成了 [Windows 平台的 Socket 通讯代码][winsock demo],并将其改写成类方便调用。其实是在 QThread 子线程中无限循环地运行 recvfrom(clientSocket, recvedData.data(), recvbuflen, 0, &fromaddr, &addrLen);
这样的接收数据包函数,跳过了 Qt 事件循环机制,而后当接收到包以后再经过回调函数通知数据处理线程进行处理。
但当我写这篇博客,从新用正常的代码进行测试时,发现即使使用 QUdpSocket::readyRead
信号来接收 UDP 数据,只要数据处理进程不堆积数据,就不会出现占用内存愈来愈多的状况。换句话说,不是 Qt 没法处理实时性的数据,而是本身编写的代码里面有问题。
回想最开始写的程序,在处理 QByteArray 表示的原始数据时,会为每个接收到的数据包分配地址,并且分配的地址位于堆中。而实际上在堆 heap 中分配回收内存地址相较于在栈 stack 中是慢得多的。为每一个到来的数据用 new 构造一个新的 CameraData 对象,而后在处理完后将这个 CameraData delete 掉实际上是很慢的,若是你这样作了,而且你在 CameraData 的析构函数中加上 qDebug 语句打印 "CameraData is deleting...",你会发现,当发送方(咱们的 Python 模拟发送程序)中止发送数据包后很长一段时间内,Qt 程序在一直打印着 "CameraData is deleting"。
而我最开始就是这么作的,因此发生了 Qt 程序随着数据接收的变多,占用的内存愈来愈大的状况。固然,这不排除 qDebug 语句输出到控制台上也会占用不少时间。若是每秒钟要调用上万次 qDebug() << "CameraData is deleting"
,那么建议你使用一个计数变量控制 qDebug 的调用次数,由于这条语句的调用也会让数据处理变得缓慢。
为了让接收端不丢包,须要快速的处理接收到的 UDP 包,而且在处理的代码中不要调用耗时的函数或者 new 操做。为了不重复调用 new 和 delete 操做符,咱们须要构建一个对象池,以便复用池中的对象,减小 new 操做。池的定义比较简单,封装一个 QList
容器类就行了,为了简化和复用池的代码,我用到了 c++ 的 template 特性,可是这个 DataObjPool
中的容器只能是 DataObj 的子类:
template<class T> class DataObjPool { public: virtual ~DataObjPool() { qDeleteAll(pool); numAvailable = 0; } T *getAvailable() { if( numAvailable == 0 ) { return 0; } for(int i = 0; i < pool.size(); i++) { T *item = pool[i]; if(item->isValid()) { item->setValid(false); numAvailable -= 1; return item; } } return 0; } T *get(int id) { return pool[id]; } inline bool release(T *dobj) { dobj->reset(); numAvailable += 1; return true; } int releaseTimeout(int now, int timeout = 100) { int releaseCount = 0; for(int i = 0; i < pool.size(); i++) { T *item = pool[i]; if(now > item->getGenerateMs() + timeout) { item->reset(); numAvailable += 1; releaseCount += 1; } } return releaseCount; } void releaseAll() { for(int i = 0; i < pool.size(); i++) { T *item = pool[i]; if(item->isValid()) { continue; } item->reset(); numAvailable += 1; } } int getNumAvailable() const { return numAvailable; } template<class T2> operator DataObjPool<T2>(); protected: DataObjPool(int size = 100); private: QList<T *> pool; int numAvailable = 0; }; class RawDataObjPool: public DataObjPool<CameraData> { public: RawDataObjPool(int size = 100); }; class LineDataPool : public DataObjPool<PreProcessData> { public: LineDataPool(int size = 100); };
固然你也能够直接编写两个类 RawDataObjPool
和 LineDataPool
,把池的操做分别复制到两个类中,使用模板特化的好处是改动的时候不须要改动两个类了。前面说过,DataObj
类继承自 QObject
,就是为了简化在对象池中进行的操做。DataObjPool
会在构造时在内存中预分配必定数量的对象,以 RawDataObjPool
为例,构造时传入 size 参数,便会预先在内存中建立 size 个 CameraData,在程序运行过程当中,这些对象都会被咱们这个 Qt 程序循环利用,直到关闭程序才会释放掉这些 CameraData(若是操做系统的内存不足,过多的对象占用的内存仍是会被释放)。
对象池的主要接口有两个:getAvailable
和 release
分别用于获取可用的对象或释放掉池中的对象,注意这里的释放是让对象池对该对象进行标记,以便重复使用,而不是释放掉该对象占用的内存空间或 delete 掉。当对象池中无可用对象时,能够根据须要释放掉超时的对象或者释放掉所有对象。
使用对象池减小 new 操做符的使用后,处理数据的子线程的速度明显加快。正常状况下就能够看到以下的图片:
这里数据显示的部分还有待完善,由于发送端的发送数据大小不够凑成一行,因此图片的右侧部分是空白的。
这里说一下数据的复制,从 Socket 接口中传上来的数据,咱们用 QByteArray
对象保存了底层的数据,即使在 UDP 数据包中含有不少个 \x00
这样的数据,QByteArray 也会正确识别出字符串的结束位置。
在设置 CameraData::setRawData(const QByteArray &value)
函数中,尽可能避免手动调用 memcpy(data.data(), value, value.size());
这个底层 API,由于你不知道它会将 QByteArray 对象 CameraData.data
中的 char * data()
指针指向哪一个位置。
我在 CameraData.cpp
文件中将它注释掉了,由于在程序运行和调试时它给我带来了巨大的困惑:常常出现 invalid address specified to rtlvalidateheap
这种类型的错误。通过很长时间的排查后发现注释掉这行代码,程序就能一直稳定运行。
完整的项目代码能够在 github 中找到。