使用环形队列进行内存管理

最近须要对采集的数据作实时处理,而且不止作一种处理,而数据采集是由一个单独的线程负责的,所以能够称做单生产者多消费者。要求是速度要尽可能的快,不能阻塞数据采集线程,而数据处理线程并不要求处理全部的数据,能够以比较低的速率或者采样率来处理实时的数据。数组

所以设计了一种数据结构,总体上是对一个定长的数组进行封装,数组中的元素是所谓的“内存单元”数据结构

struct MemoryMutexUnit{
    QReadWriteLock mutex;
    void *buffer;
};

这里是用Qt的数据结构实现,这个内存单元由一个读写锁和一个指针组成,为每一个指针加个单独的锁的缘由是为了读写分离,在大多数状况下,数据采集线程和数据读取线程使用的并非同一个内存单元,所以能够避免出现有线程等待锁释放的状况,至少在大多数的状况下是如此的。函数

接下来就是对内存单元的数组进行封装,并提供接口给外部使用。oop

static const int BUF_LEN=2048*2048;

class MemoryLoopMutex
{
    const int m_capacity;
    QAtomicInt m_index;

    MemoryMutexUnit *m_units;
public:
    MemoryLoopMutex();

    bool addBuffer(void*);
    bool copyBuffer(void*);
    void clearBuffer();
};

称之为"MemoryLoop"是由于咱们会环状的遍历内存单元数组m_units,索引m_index指向的是最新可读的内存单元,数据采集线程会从索引m_index开始顺序的查找下一个可写的内存单元,若是到了数组最后一个元素就再从第一个元素开始,所以是环状的遍历。一个内存单元可写是指其中的读写锁能够LockForWrite,若是有线程正在读这个内存单元,那么这个单元就是不可写的。ui

具体实现以下线程

MemoryLoopMutex::MemoryLoopMutex():
    m_capacity(100),m_index(-1)
{
    m_units=new MemoryMutexUnit[m_capacity];

    for(int i=0;i<m_capacity;i++){
        MemoryMutexUnit *pUnit=m_units+i;
        pUnit->buffer=malloc(BUF_LEN);
    }
}

构造函数做用就是初始化每一个内存单元。设计

bool MemoryLoopMutex::addBuffer(void *buffer){
    int index=m_index.loadAcquire();
    if(index<0){index=0;}

    int tryIndex=(index+1)%m_capacity;

    while(tryIndex!=index){
        MemoryMutexUnit *pUnit=m_units+tryIndex;

        if(pUnit->mutex.tryLockForWrite(1)){
            memcpy(pUnit->buffer,buffer,BUF_LEN);
            pUnit->mutex.unlock();

            m_index.storeRelease(tryIndex);
            return true;
        }

        tryIndex=(tryIndex+1)%m_capacity;
        qDebug()<<"another try"<<tryIndex<<m_capacity;
    }

    return false;
}

将数据采集线程获得的缓冲区内存指针memcpy到某个内存单元中,while部分就是环状的遍历全部的内存单元,直到找到一个可写的单元。指针

bool MemoryLoopMutex::copyBuffer(void *buffer){
    int index=m_index.loadAcquire();
    if(index<0||index>=m_capacity){return false;}

    MemoryMutexUnit *pUnit=m_units+index;
    if(!pUnit->mutex.tryLockForRead(1)){return false;}

    memcpy(buffer,pUnit->buffer,BUF_LEN);

    pUnit->mutex.unlock();
    return true;
}

上面的这段代码负责的是内存读取功能,就是将最新可读的内存单元中的内容memcpy出来。code

最后就是“内存清空”功能了,咱们这里没有进行内存释放,所谓的内存清空只是再也不容许数据被读取。经过设置索引为负值来实现。索引

void MemoryLoopMutex::clearBuffer(){
    m_index.storeRelease(-1);
}
相关文章
相关标签/搜索