是时候放弃tensorflow集群投入horovod的怀抱

当数据较多或者模型较大时,为提升机器学习模型训练效率,通常采用多GPU的分布式训练。python

按照并行方式,分布式训练通常分为数据并行和模型并行两种, 模型并行:分布式系统中的不一样GPU负责网络模型的不一样部分。例如,神经网络模型的不一样网络层被分配到不一样的GPU,或者同一层内部的不一样参数被分配到不一样GPU;算法

数据并行:不一样的GPU有同一个模型的多个副本,每一个GPU分配到不一样的数据,而后将全部GPU的计算结果按照某种方式合并。bash

注意,上述中的不用GPU能够是同一台机上的多个GPU,也能够是不用机上的GPU。服务器

注:图中的Machine其实就是GPU,固然也能够指CPU,但深度学习不多采用CPU训练

固然也有数据并行和模型并行的混合模式。网络

注:图中的Machine其实就是GPU,固然也能够包含CPU,但深度学习不多采用CPU训练

由于模型并行各个部分存在必定的依赖,规模伸缩性差(意思是不能随意增长GPU的数量),在实际训练中用的很少。而数据并行,则各部分独立,规模伸缩性好,实际训练中更为经常使用,提速效果也更好。session

数据并行会涉及到各个GPU之间同步模型参数,通常分为同步更新和异步更新。同步更新要等到全部GPU的梯度计算完成,再统一计算新权值,而后全部GPU同步新值后,才进行下一轮计算。异步更新,每一个GPU梯度计算完后,无需等待其余GPU的梯度计算(有时能够设置须要等待的梯度个数),可当即更新总体权值,而后同步此权值,便可进行下一轮计算。同步更新有等待,异步更新基本没有等待,但异步更新涉及到梯度过期等更复杂问题。架构

在实际应用中,单机多卡的同步式数据并行是最经常使用的,在论文中最多见的训练方式是单机八卡。数据再多时,通常就须要多机多卡了。机器学习

不管是单机多卡,仍是多机多卡,均是分布式训练,在horovod出现以前,使用tensorflow,通常只有官方推荐的集群训练方式。异步

但是tensorflow的集群训练,用起来并不轻松。分布式

tensorflow集群的缺点

  1. 概念多,学习曲线陡峭

tensorflow的集群采用的是parameter server架构,所以引入了比较多复杂概念,罗列以下

server
client
master
cluster
parameter server
worker
job
task
replica_device_setter
master service
worker service
clone
复制代码

涉及到的函数

tf.train.Server
tf.train.Supervisor
tf.train.SessionManager
tf.train.ClusterSpec
tf.train.replica_device_setter
tf.train.MonitoredTrainingSession
tf.train.MonitoredSession
tf.train.SingularMonitoredSession
tf.train.Scaffold
tf.train.SessionCreator
tf.train.ChiefSessionCreator
tf.train.WorkerSessionCreator
复制代码

我反复研究过屡次,仍是没有完全弄清楚,server,client,master,master service,worker service,clone,session之间的关系。 大体是,在client中建立server实例,session与server一一对应,server内含master service和worker service两个服务,master service负责与外界通信,好比sess.run通常都是告诉server的master service要开始工做了,server的master service通知同一个server的worker service去干活,worker service调动GPU运算,完成后,返回结果给master service,作权值更新,若是是多机多卡的分布式,parameter server与master service之间作梯度传递和权值同步。 stackoverflow.com/questions/3…

  1. 修改的代码量大

若是想把单机单卡的模型,移植到多机多卡,涉及的代码量是以天记的,慢的话甚至须要一周。

  1. 须要多台机子跑不一样的脚本

tensorflow集群是采用parameter server架构的,要想跑多机多卡的集群,每一个机子都要启动一个client,即跑一个脚本,来启动训练,100个机子,人就要崩溃了。

  1. ps和worker的比例很差选取

tensorflow集群要将服务器分为ps和worker两种job类型,ps设置多少性能最近并无肯定的计算公式。

  1. 性能损失较大

tensorflow的集群性能并很差,当超过必定规模时,性能甚至会掉到理想性能的一半如下。

Horovod

因为tensorflow集群太不友好,业内也一直在尝试新的集群方案。 2017年Facebook发布了《Accurate, large minibatch SGD: Training ImageNet in 1 hour 》验证了大数据并行的高效性,同年百度发表了《Bringing HPC techniques to deep learning 》,验证了全新的梯度同步和权值更新算法的可行性。受这两篇论文的启发,Uber开发了Horovod集群方案。

约定以下: 网络带宽记为:B(单位Mb/s), 模型总参数数据量记为:D(单位Mb), 总服务器数量记为:n, 参数服务器数量记为:n_p(其中有n= n_p+ n_w), worker服务器数量记为:n_w(其中有n= n_p+ n_w) 单服务器计算一次耗时记为:T_0

梯度同步和权值更新算法

1) parameter server架构

tensorflow的集群架构是parameter server架构,数据的传导模型以下图。

则能够计算出,parameter server架构的集群方案,总耗时:

能够看出T与总节点数n基本成线性关系,但不一样的参数服务器和woker服务器分配方案,总性能也将不一样。 假设,e表示worker服务器占比,即e=n_w/n,则能够计算出最优的e值为:

能够看出,最优worker服务器占比与模型大小、网络带宽、单机运行市场都有关系,并非一个一眼能最优值得超参数。

2)horovod的ring-allreduce算法

百度2017年发表的《Bringing HPC techniques to deep learning 》中,采用了全新的梯度同步和权值同步算法,叫作ring-allreduce。此种算法各个节点之间只与相邻的两个节点通讯,并不须要参数服务器。所以,全部节点都参与计算也参与存储。 一次权重更新,主要包含两个过程, 1)累计梯度 将全部梯度分为n个片断,每次只与相邻节点传递1个片断的梯度,n-1次后,每一片断的梯度都完成了全部节点这一片断梯度的累计,但不用片断的累计值分布在不一样节点上。以下图的第二、第3步; 2)将累计后的梯度分发到全部节点 将第一步累计的梯度再次经过n-1次的相互交换后,全部节点的梯度完成同步。以下图的第四、第5步。再平均后,更新权重,就完成了全部节点权重的更新。

能够计算出ring-allreduce算法的总耗时为:

能够看出,总耗时基本与总节点数n成线性关系(n较大时,1/n基本为0)

Horovod的梯度同步和权值同步就采用了ring-allreduce算法。

概念

horovod的数据传递是基于MPI,所以其涉及的概念也是MPI中的概念。以4个服务器,每一个服务器4个GPU为例,

  • size 进程数量,也即全部GPU数量,为16
  • rank 进程的惟一ID,0-15
  • local rank 每个server中的进程的本地惟一ID,0-3
  • allreduce 累加全部数据,并同步到全部节点的操做,以下图

  • allgather 收集全部数据,并同步到全部节点的操做,完成后每一个节点都包含全部节点的数据,而且这些数据单独存在。以下图。

  • broadcast 将数据(须要由根节点确认)从一个节点传播到其余全部节点的操做

大概就这么多概念,简单清晰。

将单机单卡改成多机多卡

将一个只支持单机单卡的训练脚本修改成支持多机多卡的训练脚本,以tensorflow为例,只须要作以下改动:

import tensorflow as tf
import horovod.tensorflow as hvd


# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())

# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * hvd.size())

# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)

# Add hook to broadcast variables from rank 0 to all other processes during
# initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]

# Make training operation
train_op = opt.minimize(loss)

# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
                                       config=config,
                                       hooks=hooks) as mon_sess:
  while not mon_sess.should_stop():
    # Perform synchronous training.
    mon_sess.run(train_op)
复制代码

能够看出,改动不大,只需添加10行左右的代码,主要分为6步:

1)初始化horovod
hvd.init()
复制代码
2)一个GPU与一个进程绑定
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())
复制代码
3)根据总GPU数量放大学习率
opt = tf.train.AdagradOptimizer(0.01 * hvd.size())
复制代码

由于BatchSize会根据GPU数量放大,因此学习率也应该放大

4)使用hvd.DistributedOptimizer封装原有的optimizer
opt = hvd.DistributedOptimizer(opt)
复制代码

分布式训练涉及到梯度同步,每个GPU的梯度计算仍然由原有的optimizer 计算,只是梯度同步由hvd.DistributedOptimizer负责。

5)广播初始变量值到全部进程
hooks = [hvd.BroadcastGlobalVariablesHook(0)]
复制代码

主要为了确保全部进程变量初始值相同

6)只在worker 0上保存checkpoint
checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None
复制代码

防止checkpoint保存错乱

horovod只是须要改动必要改动的,不涉及parameter server架构的device设置等,繁琐的操做。

起训练

在单机4卡的机上起训练,只需执行如下命令:

horovodrun -np 4 -H localhost:4 python train.py
复制代码

在4机,每机4卡的机子上起训练,只需在一个机子上执行如下命令便可:

horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py
复制代码

注意不管是单机多卡,仍是多机多卡,都只需在一个机子上执行一次命令便可,其余机horovod会用MPI启动进程和传递数据。

性能对比

horovod随着规模增大,性能损失远小于tensorflow,基本是线性增长的。

结论

经过tensorflow集群的人,会深入体会到horovod有多好用,感谢百度、Facebook和Uber让深度学习更美好。

不过,也要注意到,horovod的分布式貌似只支持同步更新式的数据并行,模型并行和异步更新式的数据并行,我没有尝试过,根据ring-allreduce算法可知,应该是不支持的。

相关文章
相关标签/搜索