从零开始山寨Caffe·捌:IO系统(二)

生产者

双缓冲组与信号量机制

在第陆章中提到了,如何模拟,以及取代根本不存的Q.full()函数。git

其本质是:除了为生产者提供一个成品缓冲队列,还提供一个零件缓冲队列。github

当咱们从外部给定了固定容量的零件以后,生产者的产能就受到了限制。数据库

由两个阻塞队列组成的QueuePair,并非Caffe的首创,它其实是生产者与消费者的编程方式之一。
编程

在大部分操做系统教材中,双缓冲区free、full一般由两个信号量empty、full实现。数组

 

信号量(Semaphore)由操做系统底层实现,而且几乎没有人会直接使用信号量去编程。数据结构

由于在逻辑上,能够由信号量可由mutex+计数器模拟获得。负载均衡

信号量的名字颇有趣,它实际上由两部分组成,信号(激活信号)、量(计数器)。机器学习

汉语的博大精深恰当地诠释的信号量的语义精神,而从Semaphore中,你读不出任何精华。函数

激活信号掩盖了mutex的功与名,信号量的第一大功能,就是mutex锁。学习

量,显然代表信号量能够计数,实际上,信号量常常会被拿来为临界资源计数。

下面的伪代码摘自个人操做系统课本,《计算机操做系统 <第四版> 汤小丹等 著》:

int in=0,out=0;
item buffer[n];
semaphore mutex=1,empty=n,full=0;
void wait(S){
    while(S<=0);
    S--;
}
void signal(S) {S++;}
void producer{
    while(1){
        produce an item in nexp;
        ...
        wait(empty);
        wait(mutex);
        buffer[in]=nexp;
        in=(in+1)%n;
        signal(mutex);
        signal(full);
    }
}

能够看到,除了mutex履行其互斥锁的职责以外,empty和full用来计数。

做为生产者,每次生产时,都要让empty减1,让full加1。

当empty小于等于零时,造成第二把锁,固然,这把锁不是为了互斥,只是为了阻塞。

为了增长效率,这第二把锁能够修改为条件阻塞,让生产者交出CPU控制权,固然这须要操做系统的支持。

 

信号量在现代编程中是多余的,事实上,也没有哪一个线程库会提供。

当"量"为1时,信号量一般是去实现互斥锁功能。

当"量"为临界资源数量时,信号量一般是去实现资源计数、而且条件阻塞的功能。

这两部分的精神内涵都在Blocking Queue中实现了,So,忘记信号量吧。

多生产者单缓冲区

做为通常的机器学习玩家,你是用不着考虑多生产者的。

若是你比较有钱,常常喜欢摆弄4-way泰坦交火,那么就须要考虑一下多生产者的模型了。

在第肆章中,介绍了多GPU的基本运行原理,给出了以下这张图:

对于每一个GPU而言,它至少须要一个对它负责的DataReader,每一个DataRedaer应当有不一样的数据来源。

Caffe中,将控制一个数据来源的类对象称为Body,默认有一个类静态成员的Body关联容器:

class DataReader
{
public:
    .....
private:
    static map<string, boost::weak_ptr<Body> > global_bodies;
};

值得注意的是,此处应该使用weak_ptr,而不是shared_ptr,由于Body自己将由一个shared_ptr控制。

将Body的shared_ptr存入map容器,将会致使指针计数器永远为1。

这样,当咱们准备将Body从map容器中清除时,没法获知它是否已经被释放。

而weak_ptr指向shared_ptr时,不会增长指针计数器计数,当计数为0时,便可将其从map里清除。

每个DataReader只能拥有一个Body,而每一个Body能够有多个成品存储缓冲区(非用于零件缓冲,下节讲)。

每一个Body控制一个数据来源,不一样的数据来源能够用关键字来hash,默认Caffe提供的关键字是:

static string source_key(const LayerParameter& param){
    return param.name() + ":" + param.data_param().source();
}

即Layer名,加上数据库路径。

多生产者主要用于多数据库同时并行训练,这是一种很是经典的模型。

一部分代码涉及到上层的DataLayer,将后续详解。

另一种模型是单生产者,以单数据库,不一样数据区域同时并行训练,该方法也能够采用。(下节讲)

Caffe的默认源码中,既没有完整实现多生产者并行模型,也没有完整实现单生产者并行模型,这点使人遗憾。

不过,从源码中仍然能够看出一点端倪,本教程只介绍大致思路,一样并不提供具体代码。

单生产者多缓冲区

在这种模型下,将只有一个DataReader,一个Body,可是有多个Pair,如图:

有趣的是,Body结构体中,提供了QueuePair数组容器:

class Body :public DragonThread{
public:
        .......
    BlockingQueue<boost::shared_ptr<QueuePair> > new_pairs;
};

可是,Caffe源码中的DataReader,默认只会使用该容器数组的第一个QueuePair,并无完整实现多缓冲区:

class DataReader
{
public:
    DataReader(const LayerParameter& param){
           ........
         ptr_body->new_pairs.push(ptr_pair);
    }
    BlockingQueue<Datum*>& free() const  { return ptr_pair->free; }
    BlockingQueue<Datum*>& full() const  { return ptr_pair->full; }
private:
    boost::shared_ptr<QueuePair> ptr_pair;
    boost::shared_ptr<Body> ptr_body;
};

能够看到,尽管咱们设置了Body,存储多个QueuePair,可是提供的外部访问接口,竟然直接使用了ptr_pair。

固然,若是你要编程使用多缓冲区,必定要修改DataReader的访问接口。

对于单个数据库的顺序数据读取,如何将顺序资源,平摊到多个缓冲区?

Caffe使用了循环读取法:

void Body::interfaceKernel(){
    boost::shared_ptr<DB> db(GetDB(param.data_param().backend()));
    db->Open(param.data_param().source(), DB::READ);
    boost::shared_ptr<Cursor> cursor(db->NewCursor());
    vector<boost::shared_ptr<QueuePair> >  container;
    try{
         ...............
         while (!must_stop()){
            for (int i = 0; i < solver_count; i++) 
                read_one(cursor.get(), container[i].get());
        }
    } catch (boost::thread_interrupted&) {}
}

能够看到,在Body的线程函数中,利用全局管理器提供的solver_count,循环均摊数据到多个QueuePair中。

当你将solver_count设置成大于1时,将可使用Body中的多个缓冲区QueuePair,这点须要注意。

单生产者单缓冲区(默认代码)

仔细思考一下,就会发现,单生产者多缓冲区方案是毫无心义的,看起来咱们彷佛模拟了多缓冲区。

可是实质只是一个线程,把资源分了一下组,多个组在DataLayer进行消费的时候,又会被合并成一个Batch:

如图,由于一个DataLayer只能有一个Prefetching Thread,因此必然是每次从各个Pair里取一次。

若是咱们先把Pair0取完,再取Pair1,再取Pair2,这样也是能够的,是一种不错的shuffle,可是须要追加代码。

从计算角度分析,多缓冲区不会加速,反而会减速,若是是为了作上述的shuffle,是情有可原的。

若是不是,只是单纯地为了负载均衡,轮流从各个Pair里取,那么本质上,就会退化成单生产者单缓冲区。

————————————————————————————————————————————————————

这多是Caffe源码的本意。在这种方案中,DataReader和DataLayer是无须改动代码的。

只要咱们加大DataParameter里的prefech数值,让CPU多缓冲几个Batch,为多个GPU准备就行了。

三种速度方案排名:

多生产者单缓冲区>单生产者单缓冲区>单生产者多缓冲区

线程嵌套线程与Socket

Caffe的源码真的颇有启发性,在DataReader的构造和析构函数中,能够发现贡献者悄悄加了mutex:

DataReader::DataReader(const LayerParameter& param){
    ......
    boost::mutex::scoped_lock lock(bodies_mutex);
        ......
}

DataReader::~DataReader(){
        ......
    boost::mutex::scoped_lock lock(bodies_mutex);
        ......
}

熟悉C++的人应该知道,在常规状况下,构造和析构函数是不会并行执行的,也就是不会被线程执行。

线程并行的仅仅是工做函数,工做以前主进程构造,工做以后,主进程析构。

若是偏要认为构造和析构可能并行的话,那么将出现一种好玩的状况:

因为DataReader自己是线程,线程并行线程,将致使线程嵌套线程。

在个人操做系统课上,个人老师这么说:

线程仅仅拥有进程的少部分资源,权限很小。

那么线程可以嵌套线程么?通过百度以后,我发现真还能够。

当今的操做系统,不管是Linux,仍是Windows,线程的资源权限都是很是大的。

————————————————————————————————————————————————————

线程嵌套线程,会不会和多GPU有关?我认为无关。

每一个GPU的监督线程,这里咱们假设使用DragonThread,在须要工做时,

只须要传入:Solver::solve函数就能够了,Solver、Net、Layer的构造和析构,显然是在主进程里执行的。

那么,线程嵌套线程,有什么意义,有什么状况是必须在线程里触发构造函数?

颇有趣,通常来说,只有Socket线程是这样的。

Socket线程无须使用DragonThread,实际上,Boost的Socket也是由boost::asio而不是boost::thread实现的。

不像多GPU,咱们没法预估,在某一时刻,实际有多少个Socket在执行,有多少个用户发出了访问请求。

所以,不能直接把Solver、Net、Layer的构造,放在主进程当中。否则你知道你要构造多少份嘛?显然你不知道。

因此,从直觉上,将这些的构造,放在每个启动的Socket线程里,用多少,构造多少,看起来不错,如图:

 

这样,假如这几个Solver使用了不一样数据来源,那么global_bodies就有被几个Solver同时修改的可能。

这是构造和析构函数里,须要加mutex的直接缘由。

————————————————————————————————————————————————————

Socket的意义何在?

①从训练角度,多个用户能够远程操控一台主机,训练不一样的Net。

这点与多GPU训练一个模型是不同的。通常而言,咱们不会认为,多个用户经过Socket,竟然想要训练同一个模型。

固然,这也是能够的。

②从测试角度,多个用户,能够利用同一个Net的参数,并行获得本身提供的数据的测试结果。

注意,这样就不要share整个Net,每一个用户的solver使用独立的Net,独立读取训练好的参数。

不然,多个用户会在一个Net上卡半天。

代码实战

创建data_reader.hpp、data_reader.cpp。

QueuePair

class QueuePair{
public:
    QueuePair(const int size);
    ~QueuePair();
    BlockingQueue<Datum*> free; // as producter queue
    BlockingQueue<Datum*> full; // as consumer queue
};

QueuePair的结构在上一章已经介绍过,每个QueuePair将做为一个缓冲区。

QueuePair只须要实现构造函数和析构函数:

QueuePair::QueuePair(const int size){
    // set the upbound for a producter
    for (int i = 0; i < size; i++) free.push(new Datum());
}

QueuePair::~QueuePair(){
    // release and clear
    Datum *datum;
    while (free.try_pop(&datum)) delete datum;
    while (full.try_pop(&datum)) delete datum;
}

在构造函数中,咱们进行"零件"的填充,注意里面的Datum全是空元素,且存入队列的应该是指针。

切记勿存入实体对象Datum,这在应用程序开发中是大忌,由于C++并不是Python,默认执行的深拷贝。

深拷贝大内存数据结构体,会严重拖慢程序执行,并且仍是没有意义的,传递指针更恰当。

在析构函数中,实际上这是惟一一处对Protocol Buffer对象的主动析构,由于Datum没有用shared_ptr。

主动析构主要利用Blocking Queue提供的try,来控制循环进度。

此处切记不要把pop写成peek,不然会形成对空指针的delete,致使程序崩溃。

LayerParameter

DataReader的上层是DataLayer,它是DataLayer的成员变量之一,须要DataLayer提供proto参数。

在你的proto脚本中,追加以下项:

message DataParameter{
    enum DB{
        LEVELDB=0;
        LMDB=1;
    }
    optional string source=1;
    optional uint32 batch_size=2;
    optional DB backend=3 [default=LMDB];
    //4-way pre-buffering is enough for normal machines
    optional uint32 prefech=4 [default=4];
}

message LayerParameter{
    optional string name=1;
    optional string type=2;
    optional DataParameter data_param=8;
}

从新编译后,覆盖你的旧头文件和源文件。

DataParameter中,包含:数据库源路径、batch大小、数据库类型,以及预缓冲区大小。

比较特别的是预缓冲大小,默认是开4个Batch的预缓冲。若是你的GPU计算速度过快,明显大于

CPU供给数据的速度,消费者(DataLayer)常常提示缺数据,你得考虑加大预缓冲区数量。

将DataParameter嵌入到LayerParameter中去。

LayerParameter是一个巨型的数据结构,将包含全部类型Layer的超参数,你能够将其视为基类。

Body

class Body :public DragonThread{
public:
    Body(const LayerParameter& param);
    virtual ~Body();
    vector<boost::shared_ptr<QueuePair>> new_pairs;
protected:
    void interfaceKernel(); 
    void read_one(Cursor *cursor, QueuePair *pair);
    LayerParameter param;
};

Body其实是一个线程,而DataReader却不是,尽管Body是DataReader成员变量。

Body的构造函数和析构函数就是启动线程和中止线程:

Body::Body(const LayerParameter& param) :param(param) { startThread();}
Body::~Body() { stopThread();}

线程工做函数比较复杂:

void Body::interfaceKernel(){
    boost::shared_ptr<DB> db(GetDB(param.data_param().backend()));
    db->Open(param.data_param().source(), DB::READ);
    boost::shared_ptr<Cursor> cursor(db->NewCursor());
    try{
        //    default solver_count=1
        int solver_count = param.phase() == TRAIN ? Dragon::get_solver_count() : 1;
        //    working period
        while (!must_stop()){
            for (int i = 0; i < solver_count; i++) 
                read_one(cursor.get(), new_pairs[i].get());
        }
        //  complex condition
    } catch (boost::thread_interrupted&) {}
}

该函数将会一直卡在循环里,直到训练结束,Body执行析构函数,将线程执行中止。

Body-DataReader构成了Caffe数据缓冲的第一级别:数据库->Datum

在DataLayer中,还会进行第二级别的缓冲:Datum->Blob->Batch,将在后续分析。

最后,还剩下一个read_one函数:

void Body::read_one(Cursor *cursor, QueuePair *pair){
    Datum *datum = pair->free.pop();
    datum->ParseFromString(cursor->value());
    pair->full.push(datum);
    cursor->Next();
    if (!cursor->valid()){
        DLOG(INFO) << "Restarting data prefeching from start.\n";
        cursor->SeekToFirst();
    }
}

read_one每次从一个双缓冲组的free队列中取出空Datum指针。

利用Protocol Buffer的反序列化函数ParseFromString,从数据库中还原Datum,再扔到full队列里。

感谢Protocol Buffer,不然这部分的代码估计不下200行。

当数据库跑完以后,须要回到开头,再次重读,为迭代过程反复提供数据。

这一步只适合训练过程,若是你要一次测试本身的数据,请忘记这个函数,重写一个不要反复读的版本。

DataReader

class DataReader
{
public:
    DataReader(const LayerParameter& param);
    BlockingQueue<Datum*>& free() const  { return ptr_pair->free; }
    BlockingQueue<Datum*>& full() const  { return ptr_pair->full; }
    ~DataReader();
    static string source_key(const LayerParameter& param){
        return param.name() + ":" + param.data_param().source();
    }
private:
    LayerParameter param;
    boost::shared_ptr<QueuePair> ptr_pair;
    boost::shared_ptr<Body> ptr_body;
    static map<string, boost::weak_ptr<Body> > global_bodies;
};

该结构上文已经全面解析过。

在cpp的实现中,首先完成类静态成员变量的外部初始化。

map<string, boost::weak_ptr<Body> > DataReader::global_bodies;

以及一个静态mutex的定义:

static boost::mutex bodies_mutex;

该mutex是Caffe挖的坑之一,虽然默认不会生效,却是给出了不错的指导。

当构建多生产者单缓冲区时,咱们将会有多个Body,即多个DataReader,即多个DragonThread。

这意味着,Body的Hash容器将成为一个互斥资源。

该Hash容器的存在不是没有必要的,因为:

每一个数据来源只能用一次,为了不重复路径,显然须要Hash。

DataReader::DataReader(const LayerParameter& param){
    ptr_pair.reset(new QueuePair(
        param.data_param().prefech()*param.data_param().batch_size()));
    boost::mutex::scoped_lock lock(bodies_mutex);
    string hash_key = source_key(param);
    boost::weak_ptr<Body> weak = global_bodies[hash_key];
    ptr_body = weak.lock();
    if (!ptr_body){
        ptr_body.reset(new Body(param));
        global_bodies[hash_key] = boost::weak_ptr<Body>(ptr_body);
    }
    ptr_body->new_pairs.push(ptr_pair);
}

DataReader的构造函数首先根据用户指定的预缓冲区大小,初始化默认的双缓冲队列组。

接下来,要在Body的Hash容器中登记,mutex锁住,修改以后解锁。

登记所使用的是weak_ptr,weak_ptr可看做shared_ptr的助手,一般视为观察者(Viewer)。

不可以使用->,只能调用lock函数得到shared_ptr。

DataReader的析构,主要任务是析构Body,以及从Hash容器中反登记。

DataReader::~DataReader(){
    string hash_key = source_key(param);
    ptr_body.reset();
    boost::mutex::scoped_lock lock(bodies_mutex);
    if (global_bodies[hash_key].expired()) global_bodies.erase(hash_key);
}

析构体系

DataReader中涉及几个比较重要的析构,这里以图描述下:

完整代码

data_reader.hpp

https://github.com/neopenx/Dragon/blob/master/Dragon/data_include/data_reader.hpp

data_reader.cpp

https://github.com/neopenx/Dragon/blob/master/Dragon/data_src/data_reader.cpp

相关文章
相关标签/搜索