TensorFlow支持使用多台机器的设备进行计算。本文基于官方教程,实践了分布式TensorFlow搭建的过程。python
TensorFlow入门教程shell
A TensorFlow "cluster" is a set of "tasks" that participate in the distributed execution of a TensorFlow graph. Each task is associated with a TensorFlow "server", which contains a "master" that can be used to create sessions, and a "worker" that executes operations in the graph.
从上面的定义能够看出,所谓的TensorFlow集群就是一组任务,每一个任务就是一个服务。服务由两个部分组成,第一部分是master,用于建立session,第二部分是worker,用于执行具体的计算。segmentfault
TensorFlow通常将任务分为两类job:一类叫参数服务器,parameter server,简称为ps,用于存储tf.Variable
;一类就是普通任务,称为worker,用于执行具体的计算。服务器
首先来理解一下参数服务器的概念。通常而言,机器学习的参数训练过程能够划分为两个类别:第一个是根据参数算算梯度,第二个是根据梯度更新参数。对于小规模训练,数据量不大,参数数量很少,一个CPU就足够了,两类任务都交给一个CPU来作。对于普通的中等规模的训练,数据量比较大,参数数量很少,计算梯度的任务负荷较重,参数更新的任务负荷较轻,因此将第一类任务交给若干个CPU或GPU去作,第二类任务交给一个CPU便可。对于超大规模的训练,数据量大、参数多,不只计算梯度的任务要部署到多个CPU或GPU上,并且更新参数的任务也要部署到多个CPU。若是计算量足够大,一台机器能搭载的CPU和GPU数量有限,就须要多台机器来进行计算能力的扩展了。参数服务器是一套分布式存储,用于保存参数,并提供参数更新的操做。session
咱们来看一下怎么建立一个TensorFlow集群。每一个任务用一个ip:port表示。TensorFlow用tf.train.ClusterSpec
表示一个集群信息,举例以下:app
import tensorflow as tf # Configuration of cluster ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
上面的语句提供了一个TensorFlow集群信息,集群有两类任务,称为job,一个job是ps,一个job是worker;ps由2个任务组成,worker由3个任务组成。dom
定义完集群信息后,使用tf.train.Server
建立每一个任务:机器学习
tf.app.flags.DEFINE_string("job_name", "worker", "One of 'ps', 'worker'") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") FLAGS = tf.app.flags.FLAGS def main(_): server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) server.join() if __name__ == "__main__": tf.app.run()
对于本例而言,咱们须要在ip:port对应的机器上运行每一个任务,共需执行五次代码,生成五个任务。异步
python worker.py --job_name=ps --task_index=0 python worker.py --job_name=ps --task_index=1 python worker.py --job_name=worker --task_index=0 python worker.py --job_name=worker --task_index=1 python worker.py --job_name=worker --task_index=2
咱们找到集群的某一台机器,执行下面的代码:async
# -*- coding=utf-8 -*- import tensorflow as tf import numpy as np train_X = np.random.rand(100).astype(np.float32) train_Y = train_X * 0.1 + 0.3 # 选择变量存储位置和op执行位置,这里所有放在worker的第一个task上 with tf.device("/job:worker/task:0"): X = tf.placeholder(tf.float32) Y = tf.placeholder(tf.float32) w = tf.Variable(0.0, name="weight") b = tf.Variable(0.0, name="reminder") y = w * X + b loss = tf.reduce_mean(tf.square(y - Y)) init_op = tf.global_variables_initializer() train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss) # 选择建立session使用的master with tf.Session("grpc://xx.xxx.xx.xxxx:oooo") as sess: sess.run(init_op) for i in range(500): sess.run(train_op, feed_dict={X: train_Y, Y: train_Y}) if i % 50 == 0: print i, sess.run(w), sess.run(b) print sess.run(w) print sess.run(b)
执行结果以下:
0 0.00245265 0.00697793 50 0.0752466 0.213145 100 0.0991397 0.279267 150 0.107308 0.30036 200 0.110421 0.306972 250 0.111907 0.308929 300 0.112869 0.309389 350 0.113663 0.309368 400 0.114402 0.309192 450 0.115123 0.308967 0.115824 0.30873
其实ps和worker本质上是一个东西,就是名字不一样,咱们将上例中的with tf.device("/job:worker/task:0"):
改成with tf.device("/job:psr/task:0"):
,同样可以执行。之因此在建立集群时要分为两个类别的任务,是由于TensorFlow提供了一些工具函数,会根据名字不一样赋予task不一样的任务,ps的用于存储变量,worker的用于计算。
所谓 replication,指的是各个task(简单的状况下,每一个task运行在不一样的GPU device上)如何得到同一个模型,也就是说,replication的对象是模型。
在使用in-graph replication方式时,只有一个client进程(能够在参与训练的CPU或GPU上任选一个task来运行这个client,参与计算的其它tasks不运行这个client)来建立模型(即tf.Graph)及模型的参数(那些tf.Variables,好比权重W和偏置b)。因为参数(W和b)是共享的,该client指定把参数放在/job:ps,即parameter server上(好比 /job:ps/task:0/cpu:0)。模型的计算部分(前向传播,后向传播,loss和梯度计算,等等)也由该client进程定义好,而后client进程把这个计算部分分配到各个GPU device上(这个过程就至关于在各个GPU中复制模型),分配的方式相似函数调用,但每次调用都指定了设备(即 /job:worker/task:0/gpu:0,/job:worker/task:1/gpu:0,等等)。调用时,模型的参数(即W和b)被看成函数的参数输入给不一样tasks(一般运行在不一样GPU上)运行的模型,以保证这些参数确实是共享的。
若是用between-graph replication方式,则每一个task都运行本身的client进程用于建立模型和参数,并将参数pin到parameter server上(好比 /job:ps/task:0/cpu:0),而后各自独立地执行该模型。注意,每一个task建立的模型必须如出一辙,这很容易作到,由于只要每一个task里的这部分代码都同样就好了。问题是,这些task各自建立并pin到parameter server上的模型参数是一样的吗?问这个问题是由于咱们如今跑的是数据并行,而模型的参数及其更新都必须由parameter server统一处理。回答是,只要各task使用一样的parameter server设备名(好比都用 /job:ps/task:0/cpu:0)和一样的变量名(那些tf.Variable定义的变量,好比权重和偏置变量), 那么在默认的状况下,它们被分配在parameter server的相同的存储里。
import tensorflow as tf import numpy as np # Configuration of cluster ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") FLAGS = tf.app.flags.FLAGS def main(_): with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): x_data = tf.placeholder(tf.float32, [100]) y_data = tf.placeholder(tf.float32, [100]) W = tf.Variable(tf.random_uniform([1], -1.0, 1.0)) b = tf.Variable(tf.zeros([1])) y = W * x_data + b loss = tf.reduce_mean(tf.square(y - y_data)) global_step = tf.Variable(0, name="global_step", trainable=False) optimizer = tf.train.GradientDescentOptimizer(0.1) train_op = optimizer.minimize(loss, global_step=global_step) tf.summary.scalar('cost', loss) summary_op = tf.summary.merge_all() init_op = tf.global_variables_initializer() # The StopAtStepHook handles stopping after running given steps. hooks = [ tf.train.StopAtStepHook(last_step=1000000)] # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(master="grpc://" + worker_hosts[FLAGS.task_index], is_chief=(FLAGS.task_index==0), # 咱们制定task_index为0的任务为主任务,用于负责变量初始化、作checkpoint、保存summary和复原 checkpoint_dir="/tmp/tf_train_logs", save_checkpoint_secs=None, hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Run a training step asynchronously. # See `tf.train.SyncReplicasOptimizer` for additional details on how to # perform *synchronous* training. # mon_sess.run handles AbortedError in case of preempted PS. train_x = np.random.rand(100).astype(np.float32) train_y = train_x * 0.1 + 0.3 _, step, loss_v, weight, biase = mon_sess.run([train_op, global_step, loss, W, b], feed_dict={x_data: train_x, y_data: train_y}) if step % 100 == 0: print "step: %d, weight: %f, biase: %f, loss: %f" %(step, weight, biase, loss_v) print "Optimization finished." if __name__ == "__main__": tf.app.run()
代码中,tf.train.replica_device_setter()
会根据job名,将with
内的Variable
op放到ps tasks,将其余计算op放到worker tasks。默认分配策略是轮询。
在属于集群的一台机器中执行上面的代码,屏幕会开始输出每轮迭代的训练参数和损失
python train.py --task_index=0
在另外一台机器上执行下面你的代码,再启动一个任务,会看到屏幕开始输出每轮迭代的训练参数和损失,注意,step再也不是从0开始,而是在启动时刻上一个启动任务的step后继续。此时观察两个任务,会发现他们同时在对同一参数进行更新。
python train.py --task_index=2
分布式TensorFlow与Spark对比:
参考资料