在我接触多线程编程以来,都是把“多线程”等同于“异步”,使用多线程基本上也都是为了避免阻塞主线程(如界面),才单独开一个线程“后台”运行。最近遇到的状况是数据分析程序的处理速度跟不上数据采集程序,所以考虑使用多个worker线程并行的处理采集到的数据。尝试使用OpenMP
,在程序中使用相似于这种代码git
#pragma omp parallel for for (int i=0;i<6;++i){run();}
可是性能仍是达不到指望,电脑有10多核,开6个线程,却只能把运行速度提升2~3倍,而且再增长线程也没有提升速度。github
You’re doing it wrong…编程
决定好好琢磨一下多线程。我这里主程序是用Qt作的,但愿尽可能使用Qt自带的库,因此多线程也是使用QThread
,仔细考虑过以后获得的思路是维护一个线程安全的队列,而后数据采集线程和多个数据处理线程分别向这个队列传入和取出数据,实现并发,也就是所谓的“单生产者、多消费者模式”。安全
线程安全的队列是这样的多线程
//dataqueue.h #include <QQueue> #include <QMutexLocker> //thread-safe queue class DataQueue { QMutex m_mutex;QQueue<void*> m_datas; public: DataQueue(); void enqueue(void*);void* dequeue(); }; //dataqueue.cpp #include "dataqueue.h" void DataQueue::enqueue(void *data){ QMutexLocker locker(&m_mutex); m_datas.enqueue(data); } void* DataQueue::dequeue(){ QMutexLocker locker(&m_mutex); return m_datas.empty()?NULL:m_datas.dequeue(); }
基本上就是封装了一下QQueue
,数据用空指针void*
表示。并发
数据采集线程就直接实例化一个DataQueue
而后调用enqueue
方法便可,数据分析线程部分须要仔细考虑,主要的问题就是当有新数据到来时(加入DataQueue
以后),分析线程是竞争性地去获取数据仍是被动地被添加数据到线程单独的数据队列中,以及当没有数据时线程(如下所说的“线程”都是数据分析线程)是wait
直到被唤醒仍是sleep
一段时间再次尝试从DataQueue
取数据。负载均衡
这里只考虑下面两种状况:异步
这种方法须要维护一个“线程池”,做为数据采集线程的成员变量,而且每一个数据分析线程都要有一个单独的DataQueue
数据队列做为成员变量。性能
当有新数据到来时,要将数据加入某个线程的数据队列,这个线程的选取有多种方法,能够遍历全部的线程,找到数据队列中数据最少的一个线程,还能够随机地抽出一个线程。总之是要让各个线程的负载比较均匀。线程
当线程的数据队列为空时,使用QWaitCondition
使线程wait
,而后当有新数据加入进来时,再wakeOne
。
这种方法的优势是不会出现太多的锁竞争,由于不会出现多个线程竞争获取数据的状况,但问题是须要主动维护各个线程的负载均衡,比较麻烦。
这种方法比较简单粗暴,也是我最终选择的方法。全局地维护一个DataQueue
数据队列,全部线程(数据采集和分析)都是独立运行的,采集线程往队列里加数据,分析线程从队列里取数据,当队列里没有数据时,分析线程会sleep
一段时间,而后再从队列里取数据。数据分析线程的实现大体是这样的
void DataProcessor::run(){ setRunningFlag(true); void *buffer=NULL; while(getRunningFlag()){ buffer=g_dataQueue.dequeue(); if(NULL==buffer){msleep(5);continue;} processData(buffer); free(buffer); } } bool DataProcessor::getRunningFlag(){ QMutexLocker locker(&m_mutex); return m_bRunning; } void DataProcessor::setRunningFlag(bool bRun){ QMutexLocker locker(&m_mutex); m_bRunning=bRun; } DataProcessor::DataProcessor(){start();} DataProcessor::~DataProcessor(){ setRunningFlag(false);m_timer.stop();wait(); }
g_dataQueue
为全部线程(数据采集和分析)均可访问的一个全局变量,也就是惟一的一个DataQueue
数据队列。
最后值得一提的是,我没有在程序里面使用“内存池”,而是在编译时使用TCMALLOC,发现性能很是好。