Paracel是豆瓣开发的一个分布式计算框架,它基于参数服务器范式来解决机器学习的问题:逻辑回归、SVD、矩阵分解(BFGS,sgd,als,cg),LDA,Lasso...。html
Paracel支持数据和模型的并行,为用户提供简单易用的通讯接口,比mapreduce式的系统要更加灵活。Paracel同时支持异步的训练模式,使迭代问题收敛地更快。此外,Paracel程序的结构与串行程序十分类似,用户能够更加专一于算法自己,不需将精力过多放在分布式逻辑上。python
由于 ps-lite 没有对 SSP 进行深刻,而 Paracel 对 SSP的实现比较深刻,因此咱们本文就看看SSP如何实现。c++
解析时候会删除部分非主体代码。算法
[源码解析] 机器学习参数服务器ps-lite 之(1) ----- PostOffice缓存
[源码解析] 机器学习参数服务器ps-lite(2) ----- 通讯模块Van服务器
[源码解析] 机器学习参数服务器ps-lite 之(3) ----- 代理人Customer网络
[源码解析]机器学习参数服务器ps-lite(4) ----- 应用节点实现架构
[源码解析] 机器学习参数服务器 Paracel (1)-----整体架构框架
不一样的worker同时并行运算的时候,可能由于网络、机器配置等外界缘由,致使不一样的worker的进度是不同的,如何控制worker的同步机制是一个比较重要的课题。less
许多机器学习问题能够转化为迭代任务。对于迭代控制,通常来讲,有三个级别的异步控制协议:BSP(Bulk Synchronous Parallel),SSP(Stalness Synchronous Parallel)和ASP(Asynchronous Parallel),它们的同步限制依次放宽。为了追求更快的计算速度,算法能够选择更宽松的同步协议。
为了更好的说明以及行文完整,咱们把ps-lite之中介绍过的段落再次拿出来。
这三个协议具体以下:
ASP:task之间彻底不用相互等待,彻底不顾worker之间的顺序,每一个worker按照本身的节奏走,跑完一个迭代就update,先完成的task,继续下一轮的训练。
优势:消除了等待慢task的时间,减小了GPU的空闲时间,所以与BSP相比提升了硬件效率。计算速度快,最大限度利用了集群的计算能力,全部的worker所在的机器都不用等待
缺点:
BSP:是通常分布式计算采用的同步协议,每一轮迭代中都须要等待全部的task计算完成。每一个worker都必须在同一个迭代运行,只有一个迭代任务全部的worker都完成了,才会进行一次worker和server之间的同步和分片更新。
BSP的模式和单机串行由于仅仅是batch size的区别,因此在模型收敛性上是彻底同样的。同时,由于每一个worker在一个周期内是能够并行计算的,因此有了必定的并行能力。spark用的就是这种方式。
优势:适用范围广;每一轮迭代收敛质量高
缺点:每一轮迭代中,,BSP要求每一个worker等待或暂停来自其余worker的梯度,这样就须要等待最慢的task,从而显著下降了硬件效率,致使总体任务计算时间长。整个worker group的性能由其中最慢的worker决定;这个worker通常称为straggler。
SSP:容许必定程度的task进度不一致,但这个不一致有一个上限,称为staleness值,即最快的task最多领先最慢的task staleness轮迭代。
就是把将ASP和BSP作一下折中。既然ASP是容许不一样worker之间的迭代次数间隔任意大,而BSP则只容许为0,那我就取一个常数s。有了SSP,BSP就能够经过指定s=0而获得。而ASP一样能够经过制定s=∞来达到。
优势:必定程度减小了task之间的等待时间,计算速度较快。
缺点:每一轮迭代的收敛质量不如BSP,达到一样的收敛效果可能须要更多轮的迭代,适用性也不如BSP,部分算法不适用。
传统的方法是使用BSP来完成迭代,这意味着咱们必须在每一个迭代器的末尾进行同步。这致使了straggler问题:因为一些软硬件的缘由,节点的计算能力每每不尽相同。对于迭代问题来讲,每一轮结束时算得快的节点都需等待算得慢的节点算完,再进行下一轮迭代。这种等待在节点数增多时将变得尤其明显,从而拖慢总体的性能。
有两种方法能够解决这个问题:
Paracel使用第二种方法,放宽了同步条件,即放宽了“每一个迭代步都等待”这个约束:
假设最快的worker与最慢的worker之间的同步不超过一个有界参数,这是每次迭代的收敛性和总收敛时间之间的折衷。当在一轮迭代结束时,算得快的节点能够继续下一轮迭代,但不能比最慢的节点领先参数s个迭代步。当领先超过s个迭代步,Paracel才会强制进行等待。
这样异步的控制方式既从总体上省去了等待时间,也能间接地帮助慢的节点遇上。从优化问题的角度来看,虽然单迭代步收敛得慢了,然而每一个迭代步的时间开销变少了,整体上收敛也就变快了。
这种作法就是Staleness Synchronous Parallel (SSP),基本思想是容许各机器以不一样步调对模型进行更新,可是加一个限制,使得最快的机器的进度和最慢机器的进度之差不要太大。这样作的好处是:既减轻慢的机器拖整个系统的后腿,又能保证模型的最终收敛。
咱们首先回忆一下前文总结的架构。
ssp_switch 用来控制是否使用 ssp。
咱们以 include/ps.hpp 的 paracel_read 为例。
若是启用了 ssp,则:
(stale_cache + limit_s < clock)
,则 while 循环等待。
pull_int(paracel::str_type("server_clock")
来增长 server的时钟。回忆一下前面讲的 SSP 核心思想(容许必定程度的task进度不一致,但这个不一致有一个上限,称为staleness值,即最快的task最多领先最慢的task staleness轮迭代)。其中缓存定义:
paracel::dict_type<paracel::str_type, boost::any> cached_para;
具体代码以下:
template <class V> bool paracel_read(const paracel::str_type & key, V & val, int replica_id = -1) { if(ssp_switch) { if(clock == 0 || clock == total_iters) { // check total_iters for last // 说明是ssp启动或者时间间隔(迭代次数)到了,这时候须要从新获取对应数值,更新cache。 cached_para[key] = boost::any_cast<V>(ps_obj-> kvm[ps_obj->p_ring->get_server(key)]. pull<V>(key)); val = boost::any_cast<V>(cached_para[key]); } else if(stale_cache + limit_s > clock) { // cache hit 若是命中缓存,则直接返回 val = boost::any_cast<V>(cached_para[key]); } else { // cache miss // 若是Miss,若是当前时钟已经大于某个数值 ,则 while 循环等待 // pull from server until leading slowest less than s clocks while(stale_cache + limit_s < clock) { // 时间同步 stale_cache = ps_obj-> kvm[clock_server].pull_int(paracel::str_type("server_clock")); } // 获取key对应权重的最新数值 cached_para[key] = boost::any_cast<V>(ps_obj-> kvm[ps_obj->p_ring->get_server(key)]. pull<V>(key)); val = boost::any_cast<V>(cached_para[key]); } return true; } return ps_obj->kvm[ps_obj->p_ring->get_server(key)].pull(key, val); }
kvclt 之中有pull_int方法,就是与Clock server交互,进行时间同步:
int pull_int(const paracel::str_type & key) { if(p_ssp_sock == nullptr) { p_ssp_sock.reset(create_req_sock(ports_lst[4])); } auto scrip = paste(paracel::str_type("pull_int"), key); int val = -1; bool r = req_send_recv(*p_ssp_sock, scrip, val); if(!r) ERROR_ABORT("key: pull_int does not exist"); return val; }
在 include/server.hpp之中,thrd_exec_ssp 是专门处理ssp的线程。
其用到的ssp_tbl 在 include/kv_def.hpp 之中。
namespace paracel { paracel::kvs<paracel::str_type, int> ssp_tbl; // 这里是ssp专用KV存储 paracel::kvs<paracel::str_type, paracel::str_type> tbl_store; }
以 pull_int 这个命令为例,就是从服务器拉取 “ssp专用KV存储” 对应的数据。
thrd_exec_ssp 具体代码以下:
// thread entry for ssp void thrd_exec_ssp(zmq::socket_t & sock) { paracel::packer<> pk; paracel::ssp_tbl.set("server_clock", 0); while(1) { zmq::message_t s; sock.recv(&s); auto scrip = paracel::str_type(static_cast<const char *>(s.data()), s.size()); auto msg = paracel::str_split_by_word(scrip, paracel::seperator); auto indicator = pk.unpack(msg[0]); //std::cout << indicator << std::endl; if(indicator == "push_int") { // 推送数据 auto key = pk.unpack(msg[1]); paracel::packer<int> pk_i; auto val = pk_i.unpack(msg[2]); paracel::ssp_tbl.set(key, val); bool result = true; rep_pack_send(sock, result); } if(indicator == "incr_int") { // 更改数据 auto key = pk.unpack(msg[1]); if(paracel::startswith(key, "client_clock_")) { if(paracel::ssp_tbl.get(key)) { paracel::ssp_tbl.incr(key, 1); } else { paracel::ssp_tbl.set(key, 1); } if(paracel::ssp_tbl.get(key) >= paracel::ssp_tbl.get("worker_sz")) { paracel::ssp_tbl.incr("server_clock", 1); paracel::ssp_tbl.set(key, 0); } } paracel::packer<int> pk_i; int delta = pk_i.unpack(msg[2]); paracel::ssp_tbl.incr(key, delta); bool result = true; rep_pack_send(sock, result); } if(indicator == "pull_int") { // 拉取数据 auto key = pk.unpack(msg[1]); int result = 0; auto exist = paracel::ssp_tbl.get(key, result); // 获取对应的key if(!exist) { paracel::str_type tmp = "nokey"; rep_send(sock, tmp); } rep_pack_send(sock, result); } } // while }
逻辑以下(注意,由于篇幅所限,这里省略了上图部分变量,加入了新的变量与逻辑):
+------------------+ worker + server | paralg | | | | | | | | | parasrv *ps_obj | | | + | | +------------------+ | | | | | start_server | +------------------+ | | | | | | | | | | | v | | | +------------+-----+ +------------------+ +---------+ | | | | parasrv | |kvclt | | kvclt | | | | | | | | | | | | thrd_exec | | | | host | | | | | | | servers | | | | | | | ssp_tbl | | | | ports_lst | | | | | | | kvm +-----------> | |.....| | | | tbl_store | | | | context | | | | | | | p_ring | | | | | | | thrd_exec_ssp | | + | | conn_prefix | | | | | | | | | | | | | | | ^ | +------------------+ | p_ssp_sock | | | | | | | | | + | | | | | | | | | | | | | | | | | | | | | | | | | | | v | | | | | | | | | +------------+------+ +------------------+ +---------+ | | | | | ring | | | +------------------+ | | | | | | | | | | | srv_hashring | | | | | | | | | | srv_hashring_dct | +------------------------------------+ | | | +-------------------+ +
手机以下:
用户只需添加几行代码便可将BSP进程转换为异步进程。好比一个很是简单的示例。
主要就是使用iter_commit() 在每次迭代结束以后,把本地更新结果提交到参数服务器。
class logistic_regression: public paracel::paralg { public: logistic_regression(paracel::Comm comm, std::string hosts_dct_str, std::string _output, int _rounds, int _limit_s, bool _ssp_switch) : paracel::paralg(hosts_dct_str, comm, _output, _rounds, _limit_s, _ssp_switch) {} void training() { theta = paracel::random_double_list(data_dim); paracel_write("theta", theta); // init push for(int iter = 0; iter < rounds; ++iter) { for(int i = 0; i < data_dim; ++i) { delta[i] = 0.; } random_shuffle(idx.begin(), idx.end()); // pull theta theta = paracel_read<vector<double> >("theta"); for(auto sample_id : idx) { for(int i = 0; i < data_dim; ++i) { delta[i] += coff1 * samples[sample_id][i] - coff2 * theta[i]; } } // traverse // update theta with delta paracel_bupdate("theta", delta, "update.so", "lg_theta_update"); // commit to server at the end of each iteration iter_commit(); // 这里是添加的,在每次迭代结束以后,把本地更新结果提交到参数服务器 } // last pull theta = paracel_read<vector<double> >("theta"); } void solve() { // init training data auto parser = [](const std::vector<std::string>) { /* ... */ }; auto lines = paracel_load(input); parser(lines); paracel_sync(); // set total iterations of your training process set_total_iters(rounds); // training training(); } }; // class logistic regression
前面每一个部分咱们其实都讲解得不透彻,须要在此串联起来。
咱们假设有5个worker,limit_s 是 3,即最快的节点不能比最慢的节点领先参数 3 个迭代步。当领先超过 3 个迭代步,Paracel会强制进行等待。
在 paralg 构建函数中,会对各类数据进行初始化,这里重要的是服务器端 key "worker_sz" 对应的数值被设置为 worker_comm.get_size() ,就是worker 数值 5。
"worker_sz" 的意义是:目前应该有多少个worker一块儿训练。
paralg(paracel::str_type hosts_dct_str, paracel::Comm comm, paracel::str_type _output = "", int _rounds = 1, int _limit_s = 0, bool _ssp_switch = false) : worker_comm(comm), output(_output), nworker(comm.get_size()), rounds(_rounds), limit_s(_limit_s), ssp_switch(_ssp_switch) { ps_obj = new parasrv(hosts_dct_str); init_output(_output); clock = 0; stale_cache = 0; clock_server = 0; total_iters = rounds; if(worker_comm.get_rank() == 0) { paracel::str_type key = "worker_sz"; (ps_obj->kvm[clock_server]). push_int(key, worker_comm.get_size()); // 设置为 5 } paracel_sync(); }
在 iter_commit 之中,逻辑以下。
// put where you want to control iter with ssp void iter_commit() { paracel::str_type clock_key; if(limit_s == 0) { clock_key = "client_clock_0"; } else { clock_key = "client_clock_" + std::to_string(clock % limit_s); } ps_obj->kvm[clock_server].incr_int(paracel::str_type(clock_key), 1); // value 1 is not important clock += 1; if(clock == total_iters) { // 若是已经达到了整体迭代数值,就减小服务器 "worker_sz" 数值 ps_obj->kvm[clock_server].incr_int(paracel::str_type("worker_sz"), -1); } }
kvclt 之中有以下代码,其实就是给服务器转发请求,因此咱们能够略过:
bool incr_int(const paracel::str_type & key, int delta) { if(p_ssp_sock == nullptr) { p_ssp_sock.reset(create_req_sock(ports_lst[4])); } auto scrip = paste(paracel::str_type("incr_int"), key, delta); bool stat; auto r = req_send_recv(*p_ssp_sock, scrip, stat); return r && stat; } int pull_int(const paracel::str_type & key) { if(p_ssp_sock == nullptr) { p_ssp_sock.reset(create_req_sock(ports_lst[4])); } auto scrip = paste(paracel::str_type("pull_int"), key); int val = -1; bool r = req_send_recv(*p_ssp_sock, scrip, val); assert(val != -1); assert(r); if(!r) ERROR_ABORT("key: pull_int does not exist"); return val; }
服务器收到了kvclt 转发的请求,处理举例以下:
在 thread_exec_ssp 中,incr_int 部分代码以下:
if(indicator == "incr_int") { auto key = pk.unpack(msg[1]); if(paracel::startswith(key, "client_clock_")) { if(paracel::ssp_tbl.get(key)) { paracel::ssp_tbl.incr(key, 1); // 把对应的key增长对应的数值 } else { paracel::ssp_tbl.set(key, 1 // 添加这个数值 } if(paracel::ssp_tbl.get(key) >= paracel::ssp_tbl.get("worker_sz")) //全部worker 都完成了一轮迭代 paracel::ssp_tbl.incr("server_clock", 1); //服务器迭代增长1 paracel::ssp_tbl.set(key, 0); //重置为 0,说明须要考虑下次迭代了,由于本次迭代中,全部client都完成了,下次迭代又要从新计算 } } paracel::packer<int> pk_i; int delta = pk_i.unpack(msg[2]); paracel::ssp_tbl.incr(key, delta); bool result = true; rep_pack_send(sock, result); }
把全部逻辑串联起来,名词解释以下:
具体以下:
limit_s 是 3,即最快的节点不能比最慢的节点领先参数 3 个迭代步。当领先超过 3 个迭代步,Paracel会强制进行等待。所以,有两种迭代:
在 worker 的 paralg 构建函数中,会对各类数据进行初始化,这里重要的是服务器端 key "worker_sz" 对应的数值被设置为 worker_comm.get_size() ,就是worker 数值 5。
"worker_sz" 的意义是:目前应该有多少个worker一块儿训练。
在 worker 的 paracel_read 之中,一直用本地的 clock 与远端 "server_clock
" 作比较,若是小于 limit_s 则强制本worker等待;
在worker 的 iter_commit 之中:
递交 iter_commit 以后,在 server 之中:
咱们能够看看逻辑图:
worker 1 + Server 1 | 快 | +-----------------------------------------+ | +------------------------------------------+ | paracel_read() { | | | | | | | |auto key = pk.unpack(msg[1]); | | while(stale_cache + limit_s < clock) { | | |if(startswith(key, "client_clock_")){ | | stale_cache = get("server_clock") | | | if(ssp_tbl.get(key)) { | | } | | | incr(key, 1); | | } | | | } else { | +-----------------------------------------+ | | set(key, 1); | | | } | +---------------------------------------------+ | if(get(key) >= get("worker_sz")) { | worker 2 | | incr("server_clock", 1); | 慢 | | set(key, 0); | +-----------------------------------------+ | | } | | iter_commit() { | | |} | | | | |ssp_tbl.incr(key, delta); | | if(limit_s == 0) { | | | | | clock_key = "client_clock_0" | | +------------------------------------------+ | } else { | | | clock_key = "client_clock_" + | | | (clock % limit_s) | | | } | | | | | | incr_int(clock_key, 1); | | | | | | clock += 1; | | | | | | if(clock == total_iters) { | | | incr_int("worker_sz"), +1); | | | } | | | } | | | } | | +-----------------------------------------+ +
手机以下:
咱们也能够用图表展现下逻辑过程,其中:
首先开始启动训练,表格中从上到下顺序执行。
第一个worker开始训练,实际训练两步,增长c_c_0,c_c_1
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 说明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 5 | 第一个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker2 | ||||||
worker3 | ||||||
worker4 | ||||||
worker5 |
第二个worker开始训练,实际训练两步,增长c_c_0,c_c_1
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 说明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 5 | 第一个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker2 | 2 | 2 | 5 | 第二个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker3 | ||||||
worker4 | ||||||
worker5 |
第三个worker开始训练,实际训练两步,增长c_c_0,c_c_1
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 说明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 5 | 第一个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker2 | 2 | 2 | 5 | 第二个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker3 | 3 | 3 | 5 | 第三个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker4 | ||||||
worker5 |
第四个worker开始训练,实际训练两步,增长c_c_0,c_c_1
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 说明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 5 | 第一个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker2 | 2 | 2 | 5 | 第二个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker3 | 3 | 3 | 5 | 第三个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker4 | 4 | 4 | 5 | 第四个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker5 |
第五个worker开始训练,实际训练一步,增长c_c_0,由于已经完成了一轮实际迭代,因此server_clock增长 1。
此时,worker 5 落后了一个迭代(server_clock = 1)。
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 说明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 5 | 第一个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker2 | 2 | 2 | 5 | 第二个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker3 | 3 | 3 | 5 | 第三个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker4 | 4 | 4 | 5 | 第四个worker开始训练,实际训练两步,增长c_c_0,c_c_1 | ||
worker5 | 5 --> 0 | 5 | 1 | 第五个worker开始训练,实际训练一步,增长c_c_0,由于全部5个worker都已经完成了一轮实际迭代,因此server_clock增长 1,而后对应的 "client_clock_0" 重置为 0,则说明须要考虑下次迭代了。 |
下面看看特殊状况。
首先,4个worker都运行完3步,可是worker 5没有运行,情况以下:
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 说明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 1 | 5 | 本轮第一个worker实际训练三步,增长c_c_0,c_c_1,c_c_2 | |
worker2 | 2 | 2 | 2 | 5 | 本轮第二个worker实际训练三步,增长c_c_0,c_c_1,c_c_2 | |
worker3 | 3 | 3 | 3 | 5 | 本轮第三个worker实际训练三步,增长c_c_0,c_c_1,c_c_2 | |
worker4 | 4 | 4 | 4 | 5 | 本轮第四个worker实际训练三步,增长c_c_0,c_c_1,c_c_2 | |
worker5 |
假设worker 5 的 iter_commit 之中,若是worker 5 发现本身 (clock == total_iters),说明本 worker 5 已经达到了整体迭代数值,就减小服务器 "worker_sz" 数值。即:本worker已经跑完了训练,因此下面一块儿训练的worker数目须要减小 1;
由于 worker 5 一会儿完成 3步训练,因此 s_c 变成 3,即整体迭代次数为 3。
由于 本次虚拟迭代中,5 个worker都完成了训练,因此 c_c_1 ~ c_c_2 都先变成 5, 而后重置为 0。
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 说明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 1 | 5 | 本轮第一个worker实际训练三步,增长c_c_0,c_c_1,c_c_2 | |
worker2 | 2 | 2 | 2 | 5 | 本轮第二个worker实际训练三步,增长c_c_0,c_c_1,c_c_2 | |
worker3 | 3 | 3 | 3 | 5 | 本轮第三个worker实际训练三步,增长c_c_0,c_c_1,c_c_2 | |
worker4 | 4 | 4 | 4 | 5 | 本轮第四个worker实际训练三步,增长c_c_0,c_c_1,c_c_2 | |
worker5 | 5 --> 0 | 5 --> 0 | 5 --> 0 | 4 | 3 | 本轮第五个worker训练完成,worker 5 又发现本身 (clock == total_iters),则"worker_sz" 数值减小1,之后只要看 4 个worker便可。 |
至此,SSP相关咱们分析完毕,下文解析数据/模型加载。