你说你学过操做系统这门课?写个无Bug的生产者和消费者模型试试!git
——你真的学好了操做系统这门课嘛?程序员
在第壹章,展现过这样图:github
其中,左半部分构成了新版Caffe最恼人、最庞大的IO系统。编程
也是从来最不重视的一部分。数组
第伍章又对左半部分的独立性进行了分析,我是这么描述到:网络
Datum和Blob(Batch)不是上下文相关的。数据结构
Blob包含着正向传播的shape信息,这些信息只有初始化网络在初始化时才能肯定。多线程
而Datum则只是与输入样本有关。机器学习
因此,Datum的读取工做能够在网络未初始化以前就开始,这就是DataReader采用线程设计的内涵。异步
因此,左半部分又能够分为左左半部分,和左右半部分。
第伍章讲到,在一个机器学习系统中,生产者和消费者的执行周期是不同的。
为了平衡在周期上的差别,节约计算资源,咱们显然须要对生产者作必定限制。
存储生产资源,能够用数组,也能够用STL容器。
再考虑生产者和消费者的行为:
①不存在随机访问:
显然,消费者是按照固定顺序访问缓冲区的。
咱们没有必要考虑随机访问的状况。
②不存在随机写入:
显然,生产者每次只须要将资源放置于缓冲区两端。
咱们没有必要考虑在线性表中间位置写入的状况。
因为vector底层由顺序表实现,其访问速度随着元素数量的递增而递减,
而queue底层由链式表实现,其访问速度不随元素数量的递增而递减,且没有随机写入/访问的状况。
因此,选择queue做为缓冲区是比较优异的。
为了限制生产者的行为,咱们须要在STL提供的queue基础上,改进出一种新的数据结构——Blocking Queue。
第肆章简单提到了mutex问题,这是阻塞队列除了Blocking以外,须要考虑的第二大问题。
而且已经证实了:生产者和消费者之间必然是异步的。
咱们以队列的push和pop操做为例,分析一下,为何在多线程状况下,须要加mutex。
假设线程A预备执行push操做,因此它是一个生产者;
假设线程B预备执行pop操做,因此它是一个消费者;
设有临界缓冲区队列Q,在某时刻T,线程A发出push操做,在T+1时候,线程B发出pop操做,
且push须要10个单位时间,pop只须要一个单位时间,问T+2时刻,pop出去的资源你敢用嘛?
显然,没人敢用这个执行push的半成品。
发生上述问题的症结在于,两个异步线程对于同一个资源,产生了争夺行为。
解决方案就是:在push时,锁住资源,禁止pop;在pop时,锁住资源,禁止push。
广义上,咱们能够认为,须要将push和pop函数变成原子函数,即:执行期间不可中断的函数。
———————————————————————————————————————————————————————————
另外,须要注意的是,mutex与blocking是两个概念。
在广义上,mutex会将多个线程对同一个资源的异步并行操做,拉成一个串行执行队列,串行等待执行。
而blocking则是将线程休眠,CPU会暂时放弃对其控制。
在程序员界,虽然有时候会把mutex和blocking都称为阻塞,但其原理和内涵是彻底不一样的。
———————————————————————————————————————————————————————————
boost提供不俗的mutex功能,使用前须要 #include "boost/thread/mutex.hpp"
你能够将一个boost::mutex对象嵌入到一个类当中,这样,容许每个类对象拥有一把锁。
因为对一个queue对象,主要是锁住来自该对象的push和pop操做,
因此,mutex理应当是以类对象为一个单位的,参考代码以下:
template <typename T> class BlockingQueue{ public: void push(const T& t){ boost::mutex::scoped_lock lock(mutex); Q.push(t); } T pop(){ boost::mutex::scoped_lock lock(mutex); T t = Q.front(); Q.pop(); return t; } private: boost::mutex mutex; queue<T> Q; };
boost::mutex::scoped_lock lock提供局部锁定功能。
它与boost::scoped_ptr有相似的效果,scoped_ptr在做用域结束后,就当即释放对象。
而scoped_lock在做用域结束后,会当即解锁,若是不用scoped_lock,咱们能够这么写:
void push(const T& t){ mutex.lock(); Q.push(t); mutex.unlock(); }
前面几章说了那么久的阻塞,其中大部分指的应该是blocking。
mutex大部分状况下,都只是在锁一个局部函数,阻塞周期很是短。
惟一的例外是Layer的正向传播函数forward,mutex锁住的周期很是长。
blocking和mutex的惟一不一样在于:
blocking以后,操做系统会挑拨CPU放弃对线程的处理。
这是很是危险的一个行为,由于该线程被家长赶去睡觉了,并且不能反抗家长的命令。
除非家长通知它:噢,你能够活动了。在此以前,该线程将永远处于无效状态。
上面的例子有两个重点:
①CPU放弃线程
②不可主动激活
既然如此,为了激活这个线程,模型就必须设计成“对偶模型”,而生产者和消费者,偏偏正是对偶的。
———————————————————————————————————————————————————————————
boost::condition_variable提供了简单的blocking功能,为了统一控制,能够将其与mutex捆在一块儿:
template <typename T> class BlockingQueue { public: class Sync{ public: boost::mutex mutex; boost::condition_variable condition; }; private: queue<T> Q; boost::shared_ptr<Sync> sync; };
如今考虑一下,什么时候须要注销、阻塞一个线程,大体有两种状况:
①缓冲区空,此时消费者不能消费,拒绝pop操做以后,能够交出CPU控制权。
②缓冲区满,此时生产者不能生产,拒绝push操做以后,能够交出CPU控制权。
为了激活彼此,就须要模型是对偶的:
①经历缓冲区空以后,忽然push了一个元素,此时应当由生产者激活消费者线程。
②经历缓冲区满以后,忽然pop了一个元素,此时应当由消费者激活生产者线程。
看起来,咱们能够将代码写成这样:
void BlockingQueue<T>::push(const T& t){ boost::mutex::scoped_lock lock(sync->mutex); while (Q.full()){ sync->condition.wait(lock); //suspend, spare CPU clock } Q.push(t); sync->condition.notify_one(); } template<typename T> T BlockingQueue<T>::pop(const string& log_waiting_msg){ boost::mutex::scoped_lock lock(sync->mutex); while (Q.empty()){ sync->condition.wait(lock); //suspend, spare CPU clock } T t = Q.front(); Q.pop(); sync->condition.notify_one(); return t; }
其中,sync->condition.wait(lock)表示使用当前mutex为标记,交出CPU控制权。
sync->condition.notify_one()则表示激活一个线程的CPU控制权。
能够看到,blocking和activating的代码是彻底对偶的,blocking本身,activating对方。
上节代码是不可能实现的,由于没有Q.full()这个函数。
在传统生产者、消费者程序中,一般会使用单缓冲队列。
使用单缓冲队列是没有问题的,由于在这种简单的代码结构中,咱们很容易知道缓冲队列的上界。
好比,设定缓冲队列大小为20,在编程中,能够经过检测 if(count==20)来达到。
当代码结构复杂时,好比,缓冲队列大小变量一般在很是上层上层上层的位置,而处于底层的缓冲队列,
是没法探知何谓“缓冲队列满”的含义的,这就为编程带来很大的难题。
———————————————————————————————————————————————————————————
解决方案是,使用双缓冲队列组方案,咱们设定两个阻塞队列,一个叫free,一个叫full。
二者组成一个QueuePair:
class QueuePair{ public: QueuePair(const int size); ~QueuePair(); BlockingQueue<Datum*> free; // as producter queue BlockingQueue<Datum*> full; // as consumer queue };
为了不检测缓冲队列的上界,咱们能够先放置与上界数量等量的空元素指针到free队列。
每次生产者生产时,从free队列中pop一个空Datum元素,填充,再扔进full队列。
这样,BlockingQueue的push操做就不须要检测上界了。
原理很简单,生产者想要push,以前必须pop,pop能够经过检测缓冲队列空来实现。
这样,就用检测一个缓冲队列的空,模拟且替代了检测另外一个缓冲队列的满。
对于上层代码而言,咱们仅仅须要预先填充N个元素至free队列中便可,很是方便。
这部分是DataReader的设计核心。
★数据结构
———————————————————————————————————————————————————————————
创建blocking_queue.hpp。
template <typename T> class BlockingQueue { public: BlockingQueue(); void push(const T& t); T pop(const string& log_waiting_msg=""); T peek(); size_t size(); // try_func return false when need blocking // try_func for destructor bool try_pop(T* t); bool try_peek(T* t); class Sync{ public: boost::mutex mutex; boost::condition_variable condition; }; private: queue<T> Q; boost::shared_ptr<Sync> sync; };
除了push和pop以外,追加队列第三个经常使用操做——peek。
peek目的是取出队首元素,可是不从队列里pop掉。
peek用于实验性读取Datum,为DataTransfomer初始化所用。
除了经过返回值以外获取以外,咱们还要准备try系列函数。
try除了获取元素外,同时返回一个bool值,代表成功或者失败。
主要用于对Datum的析构,这也是全部代码里,惟一一处对protobuff数值的析构。
★实现
———————————————————————————————————————————————————————————
创建blocking_queue.cpp。
总体代码没有什么好说的,细节以及在上文讲解了。
template<typename T> BlockingQueue<T>::BlockingQueue() :sync(new Sync()) {} template<typename T> void BlockingQueue<T>::push(const T& t){ // function_local mutex and unlock automaticly // cause another thread could call pop externally // when this thread is calling push pop&peer at the same time boost::mutex::scoped_lock lock(sync->mutex); Q.push(t); // must wake one opposite operation avoid deadlock // formula: wait_kind_num = notify_kind_num // referring Producter-Consumer Model and it's semaphore setup method sync->condition.notify_one(); } template<typename T> T BlockingQueue<T>::pop(const string& log_waiting_msg){ boost::mutex::scoped_lock lock(sync->mutex); while (Q.empty()){ if (!log_waiting_msg.empty()){ LOG_EVERY_N(INFO, 1000) << log_waiting_msg; } sync->condition.wait(lock); //suspend, spare CPU clock } T t = Q.front(); Q.pop(); return t; } template<typename T> T BlockingQueue<T>::peek(){ boost::mutex::scoped_lock lock(sync->mutex); while (Q.empty()) sync->condition.wait(lock); T t = Q.front(); return t; } template<typename T> bool BlockingQueue<T>::try_pop(T* t){ boost::mutex::scoped_lock lock(sync->mutex); if (Q.empty()) return false; *t = Q.front(); Q.pop(); return true; } template<typename T> bool BlockingQueue<T>::try_peek(T* t){ boost::mutex::scoped_lock lock(sync->mutex); if (Q.empty()) return false; *t = Q.front(); return true; } template<typename T> size_t BlockingQueue<T>::size(){ boost::mutex::scoped_lock lock(sync->mutex); return Q.size(); }
在第壹章,咱们提到了INSTANTIATE_CLASS(classname)宏的做用。
本段将重点解释,出如今blocking_queue.cpp最后的实例化代码。
template<typename T>能够说是整个Caffe里出现频率最高的代码了。
C++编译器有个好玩的特性,就是对于在cpp文件里出现的模板定义代码,
只检查最基本的语法错误,好比标点符号之类的。甚至你把变量名拼错了,编译仍然能经过。
因此,我在最初山寨Caffe的时候,写了一堆错误的代码,编译器都没告诉我。
后来在医院体检时,偶然转了几圈,大概猜到了编译器应该是为模板代码开了独立的编译检查空间。
为了便于理解,参考图以下:
因为C/C++是强类型检查语言,类型检查处于编译先锋位置。
而未肯定类型的模板定义代码,将不会进行大部分词法分析、语法分析、语义分析。
奇怪的是,若是你将模板定义代码写在头文件里,那么它就会被上升到普通编译空间。
原理大体以下:
编译器不会对未include的头文件进行最终编译。
这意味着,若是你要使用一个模板类型,好比A<int> a;
必然处于include下,此时必然是指定类型的,编译器就没必要将代码push到模板空间。
或者,存在一种转移,编译器将定义代码由模板空间转到普通空间,进行下一步分析。
然而,若是咱们将模板定义代码写在源文件A.cpp里,而后在B.cpp里,使用A<int> a,
此时编译器应该去哪里找模板类A的定义代码?按照编译链追溯,应该是到A.hpp里,
再由A.hpp,找到A.cpp。
这种思路在模板定义于A.cpp是不可能实现的,如图所示:
这是两种空间本质区别,因为模板空间的分析没有结束,C++不会让你由hpp找到cpp中的定义代码的。
为了能让编译A.cpp时,从模板空间迁移到普通空间,咱们必须为其提供明确的类型。
好比在blocking_queue.cpp的结尾,你应该添加如下代码:
template class BlockingQueue<Batch<float>*>; template class BlockingQueue<Batch<double>*>; template class BlockingQueue < Datum* > ; template class BlockingQueue < boost::shared_ptr<QueuePair> > ;
这四行代码枚举了BlockingQueue中可能出现的全部具体类型,此时编译器才会对A.cpp进行完整的编译。
在common.hpp中的实例化宏则要简单的多,
#define INSTANTIATE_CLASS(classname) \ template class classname<float>; \ template class classname<double>
该宏用于Blob、Layer、Net和Solver四大数据结构,由于它们的类型,除了float,就是double。
模板机制中存在模板特殊化的概念,它在功能上等效于实例化。
模板特殊化在math_functions.cpp中将会大量存在。
好比此函数:
template<> void dragon_cpu_gemm<double>(const CBLAS_TRANSPOSE transA, const CBLAS_TRANSPOSE transB, const int M, const int N, const int K, const double alpha, const double* A, const double* B, const double beta, double *C){ int lda = (transA == CblasNoTrans) ? K : M; int ldb = (transB == CblasNoTrans) ? N : K; cblas_dgemm(CblasRowMajor, transA, transB, M, N, K, alpha, A, lda, B, ldb, beta, C, N); }
注意实例化与特殊化template附近的区别,特殊化须要添加<>。
模板特殊化必需要明确给出指定类型的代码,而实例化则没必要给出。
模板实例化本质是模板特殊化的特例,条件是:全部类型,执行相同的代码。
而这份相同的代码,如下述形式给出:
template<typename T> XXX<T>::Y(){ ...... ...... }
你能够将实例化视为声明,特殊化视为定义。
二者给出其一,就能让编译器完整编译分离的模板定义代码,前提是,必须写在cpp文件中。
NVCC编译cu文件时,会无视A.cpp里的任何实例化、特殊化代码。
Caffe中给出的解决方案是,追加对cu文件中函数的特别实例化。
由如下几个宏实现:
#define INSTANTIATE_LAYER_GPU_FORWARD(classname) \ template void classname<float>::forward_gpu( \ const vector<Blob<float>*>& bottom, \ const vector<Blob<float>*>& top); \ template void classname<double>::forward_gpu( \ const vector<Blob<double>*>& bottom, \ const vector<Blob<double>*>& top); #define INSTANTIATE_LAYER_GPU_BACKWARD(classname) \ template void classname<float>::backward_gpu( \ const vector<Blob<float>*>& top, \ const vector<bool> &data_need_bp, \ const vector<Blob<float>*>& bottom); \ template void classname<double>::backward_gpu( \ const vector<Blob<double>*>& top, \ const vector<bool> &data_need_bp, \ const vector<Blob<double>*>& bottom) #define INSTANTIATE_LAYER_GPU_FUNCS(classname) \ INSTANTIATE_LAYER_GPU_FORWARD(classname); \ INSTANTIATE_LAYER_GPU_BACKWARD(classname)
见CSDN板块的讨论:http://bbs.csdn.net/topics/380250382
blocking_queue.hpp
https://github.com/neopenx/Dragon/blob/master/Dragon/include/blocking_queue.hpp
blocking_queue.cpp
https://github.com/neopenx/Dragon/blob/master/Dragon/src/blocking_queue.cpp