在第陆章中提到了,如何模拟,以及取代根本不存的Q.full()函数。git
其本质是:除了为生产者提供一个成品缓冲队列,还提供一个零件缓冲队列。github
当咱们从外部给定了固定容量的零件以后,生产者的产能就受到了限制。数据库
由两个阻塞队列组成的QueuePair,并非Caffe的首创,它其实是生产者与消费者的编程方式之一。
编程
在大部分操做系统教材中,双缓冲区free、full一般由两个信号量empty、full实现。数组
信号量(Semaphore)由操做系统底层实现,而且几乎没有人会直接使用信号量去编程。数据结构
由于在逻辑上,能够由信号量可由mutex+计数器模拟获得。负载均衡
信号量的名字颇有趣,它实际上由两部分组成,信号(激活信号)、量(计数器)。机器学习
汉语的博大精深恰当地诠释的信号量的语义精神,而从Semaphore中,你读不出任何精华。函数
激活信号掩盖了mutex的功与名,信号量的第一大功能,就是mutex锁。学习
量,显然代表信号量能够计数,实际上,信号量常常会被拿来为临界资源计数。
下面的伪代码摘自个人操做系统课本,《计算机操做系统 <第四版> 汤小丹等 著》:
int in=0,out=0; item buffer[n]; semaphore mutex=1,empty=n,full=0; void wait(S){ while(S<=0); S--; } void signal(S) {S++;} void producer{ while(1){ produce an item in nexp; ... wait(empty); wait(mutex); buffer[in]=nexp; in=(in+1)%n; signal(mutex); signal(full); } }
能够看到,除了mutex履行其互斥锁的职责以外,empty和full用来计数。
做为生产者,每次生产时,都要让empty减1,让full加1。
当empty小于等于零时,造成第二把锁,固然,这把锁不是为了互斥,只是为了阻塞。
为了增长效率,这第二把锁能够修改为条件阻塞,让生产者交出CPU控制权,固然这须要操做系统的支持。
信号量在现代编程中是多余的,事实上,也没有哪一个线程库会提供。
当"量"为1时,信号量一般是去实现互斥锁功能。
当"量"为临界资源数量时,信号量一般是去实现资源计数、而且条件阻塞的功能。
这两部分的精神内涵都在Blocking Queue中实现了,So,忘记信号量吧。
做为通常的机器学习玩家,你是用不着考虑多生产者的。
若是你比较有钱,常常喜欢摆弄4-way泰坦交火,那么就须要考虑一下多生产者的模型了。
在第肆章中,介绍了多GPU的基本运行原理,给出了以下这张图:
对于每一个GPU而言,它至少须要一个对它负责的DataReader,每一个DataRedaer应当有不一样的数据来源。
Caffe中,将控制一个数据来源的类对象称为Body,默认有一个类静态成员的Body关联容器:
class DataReader { public: ..... private: static map<string, boost::weak_ptr<Body> > global_bodies; };
值得注意的是,此处应该使用weak_ptr,而不是shared_ptr,由于Body自己将由一个shared_ptr控制。
将Body的shared_ptr存入map容器,将会致使指针计数器永远为1。
这样,当咱们准备将Body从map容器中清除时,没法获知它是否已经被释放。
而weak_ptr指向shared_ptr时,不会增长指针计数器计数,当计数为0时,便可将其从map里清除。
每个DataReader只能拥有一个Body,而每一个Body能够有多个成品存储缓冲区(非用于零件缓冲,下节讲)。
每一个Body控制一个数据来源,不一样的数据来源能够用关键字来hash,默认Caffe提供的关键字是:
static string source_key(const LayerParameter& param){ return param.name() + ":" + param.data_param().source(); }
即Layer名,加上数据库路径。
多生产者主要用于多数据库同时并行训练,这是一种很是经典的模型。
一部分代码涉及到上层的DataLayer,将后续详解。
另一种模型是单生产者,以单数据库,不一样数据区域同时并行训练,该方法也能够采用。(下节讲)
Caffe的默认源码中,既没有完整实现多生产者并行模型,也没有完整实现单生产者并行模型,这点使人遗憾。
不过,从源码中仍然能够看出一点端倪,本教程只介绍大致思路,一样并不提供具体代码。
在这种模型下,将只有一个DataReader,一个Body,可是有多个Pair,如图:
有趣的是,Body结构体中,提供了QueuePair数组容器:
class Body :public DragonThread{ public: ....... BlockingQueue<boost::shared_ptr<QueuePair> > new_pairs; };
可是,Caffe源码中的DataReader,默认只会使用该容器数组的第一个QueuePair,并无完整实现多缓冲区:
class DataReader { public: DataReader(const LayerParameter& param){ ........ ptr_body->new_pairs.push(ptr_pair); } BlockingQueue<Datum*>& free() const { return ptr_pair->free; } BlockingQueue<Datum*>& full() const { return ptr_pair->full; } private: boost::shared_ptr<QueuePair> ptr_pair; boost::shared_ptr<Body> ptr_body; };
能够看到,尽管咱们设置了Body,存储多个QueuePair,可是提供的外部访问接口,竟然直接使用了ptr_pair。
固然,若是你要编程使用多缓冲区,必定要修改DataReader的访问接口。
对于单个数据库的顺序数据读取,如何将顺序资源,平摊到多个缓冲区?
Caffe使用了循环读取法:
void Body::interfaceKernel(){ boost::shared_ptr<DB> db(GetDB(param.data_param().backend())); db->Open(param.data_param().source(), DB::READ); boost::shared_ptr<Cursor> cursor(db->NewCursor()); vector<boost::shared_ptr<QueuePair> > container; try{ ............... while (!must_stop()){ for (int i = 0; i < solver_count; i++) read_one(cursor.get(), container[i].get()); } } catch (boost::thread_interrupted&) {} }
能够看到,在Body的线程函数中,利用全局管理器提供的solver_count,循环均摊数据到多个QueuePair中。
当你将solver_count设置成大于1时,将可使用Body中的多个缓冲区QueuePair,这点须要注意。
仔细思考一下,就会发现,单生产者多缓冲区方案是毫无心义的,看起来咱们彷佛模拟了多缓冲区。
可是实质只是一个线程,把资源分了一下组,多个组在DataLayer进行消费的时候,又会被合并成一个Batch:
如图,由于一个DataLayer只能有一个Prefetching Thread,因此必然是每次从各个Pair里取一次。
若是咱们先把Pair0取完,再取Pair1,再取Pair2,这样也是能够的,是一种不错的shuffle,可是须要追加代码。
从计算角度分析,多缓冲区不会加速,反而会减速,若是是为了作上述的shuffle,是情有可原的。
若是不是,只是单纯地为了负载均衡,轮流从各个Pair里取,那么本质上,就会退化成单生产者单缓冲区。
————————————————————————————————————————————————————
这多是Caffe源码的本意。在这种方案中,DataReader和DataLayer是无须改动代码的。
只要咱们加大DataParameter里的prefech数值,让CPU多缓冲几个Batch,为多个GPU准备就行了。
三种速度方案排名:
多生产者单缓冲区>单生产者单缓冲区>单生产者多缓冲区
Caffe的源码真的颇有启发性,在DataReader的构造和析构函数中,能够发现贡献者悄悄加了mutex:
DataReader::DataReader(const LayerParameter& param){ ...... boost::mutex::scoped_lock lock(bodies_mutex); ...... } DataReader::~DataReader(){ ...... boost::mutex::scoped_lock lock(bodies_mutex); ...... }
熟悉C++的人应该知道,在常规状况下,构造和析构函数是不会并行执行的,也就是不会被线程执行。
线程并行的仅仅是工做函数,工做以前主进程构造,工做以后,主进程析构。
若是偏要认为构造和析构可能并行的话,那么将出现一种好玩的状况:
因为DataReader自己是线程,线程并行线程,将致使线程嵌套线程。
在个人操做系统课上,个人老师这么说:
线程仅仅拥有进程的少部分资源,权限很小。
那么线程可以嵌套线程么?通过百度以后,我发现真还能够。
当今的操做系统,不管是Linux,仍是Windows,线程的资源权限都是很是大的。
————————————————————————————————————————————————————
线程嵌套线程,会不会和多GPU有关?我认为无关。
每一个GPU的监督线程,这里咱们假设使用DragonThread,在须要工做时,
只须要传入:Solver::solve函数就能够了,Solver、Net、Layer的构造和析构,显然是在主进程里执行的。
那么,线程嵌套线程,有什么意义,有什么状况是必须在线程里触发构造函数?
颇有趣,通常来说,只有Socket线程是这样的。
Socket线程无须使用DragonThread,实际上,Boost的Socket也是由boost::asio而不是boost::thread实现的。
不像多GPU,咱们没法预估,在某一时刻,实际有多少个Socket在执行,有多少个用户发出了访问请求。
所以,不能直接把Solver、Net、Layer的构造,放在主进程当中。否则你知道你要构造多少份嘛?显然你不知道。
因此,从直觉上,将这些的构造,放在每个启动的Socket线程里,用多少,构造多少,看起来不错,如图:
这样,假如这几个Solver使用了不一样数据来源,那么global_bodies就有被几个Solver同时修改的可能。
这是构造和析构函数里,须要加mutex的直接缘由。
————————————————————————————————————————————————————
Socket的意义何在?
①从训练角度,多个用户能够远程操控一台主机,训练不一样的Net。
这点与多GPU训练一个模型是不同的。通常而言,咱们不会认为,多个用户经过Socket,竟然想要训练同一个模型。
固然,这也是能够的。
②从测试角度,多个用户,能够利用同一个Net的参数,并行获得本身提供的数据的测试结果。
注意,这样就不要share整个Net,每一个用户的solver使用独立的Net,独立读取训练好的参数。
不然,多个用户会在一个Net上卡半天。
创建data_reader.hpp、data_reader.cpp。
class QueuePair{ public: QueuePair(const int size); ~QueuePair(); BlockingQueue<Datum*> free; // as producter queue BlockingQueue<Datum*> full; // as consumer queue };
QueuePair的结构在上一章已经介绍过,每个QueuePair将做为一个缓冲区。
QueuePair只须要实现构造函数和析构函数:
QueuePair::QueuePair(const int size){ // set the upbound for a producter for (int i = 0; i < size; i++) free.push(new Datum()); } QueuePair::~QueuePair(){ // release and clear Datum *datum; while (free.try_pop(&datum)) delete datum; while (full.try_pop(&datum)) delete datum; }
在构造函数中,咱们进行"零件"的填充,注意里面的Datum全是空元素,且存入队列的应该是指针。
切记勿存入实体对象Datum,这在应用程序开发中是大忌,由于C++并不是Python,默认执行的深拷贝。
深拷贝大内存数据结构体,会严重拖慢程序执行,并且仍是没有意义的,传递指针更恰当。
在析构函数中,实际上这是惟一一处对Protocol Buffer对象的主动析构,由于Datum没有用shared_ptr。
主动析构主要利用Blocking Queue提供的try,来控制循环进度。
此处切记不要把pop写成peek,不然会形成对空指针的delete,致使程序崩溃。
DataReader的上层是DataLayer,它是DataLayer的成员变量之一,须要DataLayer提供proto参数。
在你的proto脚本中,追加以下项:
message DataParameter{ enum DB{ LEVELDB=0; LMDB=1; } optional string source=1; optional uint32 batch_size=2; optional DB backend=3 [default=LMDB]; //4-way pre-buffering is enough for normal machines optional uint32 prefech=4 [default=4]; } message LayerParameter{ optional string name=1; optional string type=2; optional DataParameter data_param=8; }
从新编译后,覆盖你的旧头文件和源文件。
DataParameter中,包含:数据库源路径、batch大小、数据库类型,以及预缓冲区大小。
比较特别的是预缓冲大小,默认是开4个Batch的预缓冲。若是你的GPU计算速度过快,明显大于
CPU供给数据的速度,消费者(DataLayer)常常提示缺数据,你得考虑加大预缓冲区数量。
将DataParameter嵌入到LayerParameter中去。
LayerParameter是一个巨型的数据结构,将包含全部类型Layer的超参数,你能够将其视为基类。
class Body :public DragonThread{ public: Body(const LayerParameter& param); virtual ~Body(); vector<boost::shared_ptr<QueuePair>> new_pairs; protected: void interfaceKernel(); void read_one(Cursor *cursor, QueuePair *pair); LayerParameter param; };
Body其实是一个线程,而DataReader却不是,尽管Body是DataReader成员变量。
Body的构造函数和析构函数就是启动线程和中止线程:
Body::Body(const LayerParameter& param) :param(param) { startThread();} Body::~Body() { stopThread();}
线程工做函数比较复杂:
void Body::interfaceKernel(){ boost::shared_ptr<DB> db(GetDB(param.data_param().backend())); db->Open(param.data_param().source(), DB::READ); boost::shared_ptr<Cursor> cursor(db->NewCursor()); try{ // default solver_count=1 int solver_count = param.phase() == TRAIN ? Dragon::get_solver_count() : 1; // working period while (!must_stop()){ for (int i = 0; i < solver_count; i++) read_one(cursor.get(), new_pairs[i].get()); } // complex condition } catch (boost::thread_interrupted&) {} }
该函数将会一直卡在循环里,直到训练结束,Body执行析构函数,将线程执行中止。
Body-DataReader构成了Caffe数据缓冲的第一级别:数据库->Datum 。
在DataLayer中,还会进行第二级别的缓冲:Datum->Blob->Batch,将在后续分析。
最后,还剩下一个read_one函数:
void Body::read_one(Cursor *cursor, QueuePair *pair){ Datum *datum = pair->free.pop(); datum->ParseFromString(cursor->value()); pair->full.push(datum); cursor->Next(); if (!cursor->valid()){ DLOG(INFO) << "Restarting data prefeching from start.\n"; cursor->SeekToFirst(); } }
read_one每次从一个双缓冲组的free队列中取出空Datum指针。
利用Protocol Buffer的反序列化函数ParseFromString,从数据库中还原Datum,再扔到full队列里。
感谢Protocol Buffer,不然这部分的代码估计不下200行。
当数据库跑完以后,须要回到开头,再次重读,为迭代过程反复提供数据。
这一步只适合训练过程,若是你要一次测试本身的数据,请忘记这个函数,重写一个不要反复读的版本。
class DataReader { public: DataReader(const LayerParameter& param); BlockingQueue<Datum*>& free() const { return ptr_pair->free; } BlockingQueue<Datum*>& full() const { return ptr_pair->full; } ~DataReader(); static string source_key(const LayerParameter& param){ return param.name() + ":" + param.data_param().source(); } private: LayerParameter param; boost::shared_ptr<QueuePair> ptr_pair; boost::shared_ptr<Body> ptr_body; static map<string, boost::weak_ptr<Body> > global_bodies; };
该结构上文已经全面解析过。
在cpp的实现中,首先完成类静态成员变量的外部初始化。
map<string, boost::weak_ptr<Body> > DataReader::global_bodies;
以及一个静态mutex的定义:
static boost::mutex bodies_mutex;
该mutex是Caffe挖的坑之一,虽然默认不会生效,却是给出了不错的指导。
当构建多生产者单缓冲区时,咱们将会有多个Body,即多个DataReader,即多个DragonThread。
这意味着,Body的Hash容器将成为一个互斥资源。
该Hash容器的存在不是没有必要的,因为:
每一个数据来源只能用一次,为了不重复路径,显然须要Hash。
DataReader::DataReader(const LayerParameter& param){ ptr_pair.reset(new QueuePair( param.data_param().prefech()*param.data_param().batch_size())); boost::mutex::scoped_lock lock(bodies_mutex); string hash_key = source_key(param); boost::weak_ptr<Body> weak = global_bodies[hash_key]; ptr_body = weak.lock(); if (!ptr_body){ ptr_body.reset(new Body(param)); global_bodies[hash_key] = boost::weak_ptr<Body>(ptr_body); } ptr_body->new_pairs.push(ptr_pair); }
DataReader的构造函数首先根据用户指定的预缓冲区大小,初始化默认的双缓冲队列组。
接下来,要在Body的Hash容器中登记,mutex锁住,修改以后解锁。
登记所使用的是weak_ptr,weak_ptr可看做shared_ptr的助手,一般视为观察者(Viewer)。
不可以使用->,只能调用lock函数得到shared_ptr。
DataReader的析构,主要任务是析构Body,以及从Hash容器中反登记。
DataReader::~DataReader(){ string hash_key = source_key(param); ptr_body.reset(); boost::mutex::scoped_lock lock(bodies_mutex); if (global_bodies[hash_key].expired()) global_bodies.erase(hash_key); }
DataReader中涉及几个比较重要的析构,这里以图描述下:
data_reader.hpp
https://github.com/neopenx/Dragon/blob/master/Dragon/data_include/data_reader.hpp
data_reader.cpp
https://github.com/neopenx/Dragon/blob/master/Dragon/data_src/data_reader.cpp