最近须要对采集的数据作实时处理,而且不止作一种处理,而数据采集是由一个单独的线程负责的,所以能够称做单生产者多消费者。要求是速度要尽可能的快,不能阻塞数据采集线程,而数据处理线程并不要求处理全部的数据,能够以比较低的速率或者采样率来处理实时的数据。数组
所以设计了一种数据结构,总体上是对一个定长的数组进行封装,数组中的元素是所谓的“内存单元”数据结构
struct MemoryMutexUnit{ QReadWriteLock mutex; void *buffer; };
这里是用Qt的数据结构实现,这个内存单元由一个读写锁和一个指针组成,为每一个指针加个单独的锁的缘由是为了读写分离,在大多数状况下,数据采集线程和数据读取线程使用的并非同一个内存单元,所以能够避免出现有线程等待锁释放的状况,至少在大多数的状况下是如此的。函数
接下来就是对内存单元的数组进行封装,并提供接口给外部使用。oop
static const int BUF_LEN=2048*2048; class MemoryLoopMutex { const int m_capacity; QAtomicInt m_index; MemoryMutexUnit *m_units; public: MemoryLoopMutex(); bool addBuffer(void*); bool copyBuffer(void*); void clearBuffer(); };
称之为"MemoryLoop"是由于咱们会环状的遍历内存单元数组m_units
,索引m_index
指向的是最新可读的内存单元,数据采集线程会从索引m_index
开始顺序的查找下一个可写的内存单元,若是到了数组最后一个元素就再从第一个元素开始,所以是环状的遍历。一个内存单元可写是指其中的读写锁能够LockForWrite
,若是有线程正在读这个内存单元,那么这个单元就是不可写的。ui
具体实现以下线程
MemoryLoopMutex::MemoryLoopMutex(): m_capacity(100),m_index(-1) { m_units=new MemoryMutexUnit[m_capacity]; for(int i=0;i<m_capacity;i++){ MemoryMutexUnit *pUnit=m_units+i; pUnit->buffer=malloc(BUF_LEN); } }
构造函数做用就是初始化每一个内存单元。设计
bool MemoryLoopMutex::addBuffer(void *buffer){ int index=m_index.loadAcquire(); if(index<0){index=0;} int tryIndex=(index+1)%m_capacity; while(tryIndex!=index){ MemoryMutexUnit *pUnit=m_units+tryIndex; if(pUnit->mutex.tryLockForWrite(1)){ memcpy(pUnit->buffer,buffer,BUF_LEN); pUnit->mutex.unlock(); m_index.storeRelease(tryIndex); return true; } tryIndex=(tryIndex+1)%m_capacity; qDebug()<<"another try"<<tryIndex<<m_capacity; } return false; }
将数据采集线程获得的缓冲区内存指针memcpy
到某个内存单元中,while
部分就是环状的遍历全部的内存单元,直到找到一个可写的单元。指针
bool MemoryLoopMutex::copyBuffer(void *buffer){ int index=m_index.loadAcquire(); if(index<0||index>=m_capacity){return false;} MemoryMutexUnit *pUnit=m_units+index; if(!pUnit->mutex.tryLockForRead(1)){return false;} memcpy(buffer,pUnit->buffer,BUF_LEN); pUnit->mutex.unlock(); return true; }
上面的这段代码负责的是内存读取功能,就是将最新可读的内存单元中的内容memcpy
出来。code
最后就是“内存清空”功能了,咱们这里没有进行内存释放,所谓的内存清空只是再也不容许数据被读取。经过设置索引为负值来实现。索引
void MemoryLoopMutex::clearBuffer(){ m_index.storeRelease(-1); }