最近的任务是写一个多线程的东西,就得接触多线程队列了,我反正是没学过度布式的,代码全凭感受写出来的,不过运气好,代码可以work= =多线程
话很少说,直接给代码吧,一个多消费者,多生产者的模式。假设个人任务是求队列的中位数是啥,每消费10000次的时候,我要知道中位数是什么。分布式
至于加不加锁,这个看你了,我反正是加了,代码里面没写……我反正是把写的代码单独封装了一个函数,而后加了个锁函数
欢迎交流,这个代码已经在实际任务上面上线了,但愿不会有bug。this
用的是boost::lockfree::queue,官方文档:http://www.boost.org/doc/libs/1_55_0/boost/lockfree/queue.hppspa
/* 关于锁的代码: 伟大的Boost库给咱们提供了 shared_mutex 类,结合 unique_lock 与 shared_lock 的使用,能够实现读写锁。 一般读写锁须要完成如下功能: 1.当 data 被线程A读取时,其余线程仍能够进行读取却不能写入 2.当 data 被线程A写入时,其余线程既不能读取也不能写入 对应于功能1,2咱们能够这样来描述: 1.当线程A得到共享锁时,其余线程仍能够得到共享锁但不能得到独占锁 2.当线程A得到独占锁时,其余线程既不能得到共享锁也不能得到独占锁 typedef boost::shared_lock<boost::shared_mutex> read_lock; typedef boost::unique_lock<boost::shared_mutex> write_lock; boost::shared_mutex read_write_mutex; int32_t data = 1; //线程A,读data { read_lock rlock(read_write_mutex); std::cout << data << std:; endl; } //线程B,读data { read_lock rlock(read_write_mutex); std::cout << data << std:; endl; } //线程C,写data { write_lock rlock(read_write_mutex); data = 2; } */ #ifndef DYNAMIC_QUEUE_H_ #define DYNAMIC_QUEUE_H_ #include "boost/lockfree/queue.hpp" #include "boost/thread/thread.hpp" #include "boost/thread/mutex.hpp" #include "abtest_parameters.h" namespace un { class DynamicController { public: boost::lockfree::queue<size_t,boost::lockfree::capacity<40000> > lockfree_queue; // boost::lockfree::queue boost里面的无锁队列,惟一比较蛋疼的就是空间最大65536以及无法输出size,其余的就将就用吧。 // 队列长度能够自定义,也能够不定义,会自增加的。 size_t num = 0; void StartDaemonUpdater(){ boost::function0<void> f = boost::bind(&DynamicController::UpdaterWorker, this); boost::thread thrd(f); thrd.detach(); } // 启动消费者队列 void Producer(size_t number){ bool succ = lockfree_queue.bounded_push(number); // 若是用push的话,没空间的话,会等待消费完。 // bounded_push的话,若是每空间会返回false,而后弃掉这个数。成功返回true } // 生产者 size_t GetNumber( return num; } // get代码 void UpdaterWorker(void){ std::vector<size_t> V; while(1){//稳妥起见,这个while里面能够写个sleep以致于不须要一直在消费。 size_t tmp_value; while(lockfree_queue.pop(tmp_value)){ V.push_back(tmp_value); // 更新条件,10000个数 // 用p99更新 if(V.size()>10000){ std::sort(V.begin(),V.end()); num = V[size_t(V.size()*0.5)]; V.clear(); } } } } // 消费者 }; } #endif