TensorFlow学习笔记(9):分布式TensorFlow

简介

TensorFlow支持使用多台机器的设备进行计算。本文基于官方教程,实践了分布式TensorFlow搭建的过程。python

TensorFlow入门教程shell

基本概念

TensorFlow集群

A TensorFlow "cluster" is a set of "tasks" that participate in the distributed execution of a TensorFlow graph. Each task is associated with a TensorFlow "server", which contains a "master" that can be used to create sessions, and a "worker" that executes operations in the graph.

从上面的定义能够看出,所谓的TensorFlow集群就是一组任务,每一个任务就是一个服务。服务由两个部分组成,第一部分是master,用于建立session,第二部分是worker,用于执行具体的计算。segmentfault

TensorFlow通常将任务分为两类job:一类叫参数服务器,parameter server,简称为ps,用于存储tf.Variable;一类就是普通任务,称为worker,用于执行具体的计算。服务器

首先来理解一下参数服务器的概念。通常而言,机器学习的参数训练过程能够划分为两个类别:第一个是根据参数算算梯度,第二个是根据梯度更新参数。对于小规模训练,数据量不大,参数数量很少,一个CPU就足够了,两类任务都交给一个CPU来作。对于普通的中等规模的训练,数据量比较大,参数数量很少,计算梯度的任务负荷较重,参数更新的任务负荷较轻,因此将第一类任务交给若干个CPU或GPU去作,第二类任务交给一个CPU便可。对于超大规模的训练,数据量大、参数多,不只计算梯度的任务要部署到多个CPU或GPU上,并且更新参数的任务也要部署到多个CPU。若是计算量足够大,一台机器能搭载的CPU和GPU数量有限,就须要多台机器来进行计算能力的扩展了。参数服务器是一套分布式存储,用于保存参数,并提供参数更新的操做。session

咱们来看一下怎么建立一个TensorFlow集群。每一个任务用一个ip:port表示。TensorFlow用tf.train.ClusterSpec表示一个集群信息,举例以下:app

import tensorflow as tf

# Configuration of cluster 
ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

上面的语句提供了一个TensorFlow集群信息,集群有两类任务,称为job,一个job是ps,一个job是worker;ps由2个任务组成,worker由3个任务组成。dom

定义完集群信息后,使用tf.train.Server建立每一个任务:机器学习

tf.app.flags.DEFINE_string("job_name", "worker", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

FLAGS = tf.app.flags.FLAGS

def main(_):
    server = tf.train.Server(cluster,
                             job_name=FLAGS.job_name,
                             task_index=FLAGS.task_index)
    server.join()

if __name__ == "__main__":
    tf.app.run()

对于本例而言,咱们须要在ip:port对应的机器上运行每一个任务,共需执行五次代码,生成五个任务。异步

python worker.py --job_name=ps --task_index=0
python worker.py --job_name=ps --task_index=1
python worker.py --job_name=worker --task_index=0
python worker.py --job_name=worker --task_index=1
python worker.py --job_name=worker --task_index=2

咱们找到集群的某一台机器,执行下面的代码:async

# -*- coding=utf-8 -*-

import tensorflow as tf
import numpy as np

train_X = np.random.rand(100).astype(np.float32)
train_Y = train_X * 0.1 + 0.3

# 选择变量存储位置和op执行位置,这里所有放在worker的第一个task上
with tf.device("/job:worker/task:0"):
    X = tf.placeholder(tf.float32)
    Y = tf.placeholder(tf.float32)
    w = tf.Variable(0.0, name="weight")
    b = tf.Variable(0.0, name="reminder")
    y = w * X + b
    loss = tf.reduce_mean(tf.square(y - Y))

    init_op = tf.global_variables_initializer()
    train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss)

# 选择建立session使用的master
with tf.Session("grpc://xx.xxx.xx.xxxx:oooo") as sess:
    sess.run(init_op)
    for i in range(500):
        sess.run(train_op, feed_dict={X: train_Y, Y: train_Y})
        if i % 50 == 0:
            print i, sess.run(w), sess.run(b)

    print sess.run(w)
    print sess.run(b)

执行结果以下:

0 0.00245265 0.00697793
50 0.0752466 0.213145
100 0.0991397 0.279267
150 0.107308 0.30036
200 0.110421 0.306972
250 0.111907 0.308929
300 0.112869 0.309389
350 0.113663 0.309368
400 0.114402 0.309192
450 0.115123 0.308967
0.115824
0.30873

其实ps和worker本质上是一个东西,就是名字不一样,咱们将上例中的with tf.device("/job:worker/task:0"):改成with tf.device("/job:psr/task:0"):,同样可以执行。之因此在建立集群时要分为两个类别的任务,是由于TensorFlow提供了一些工具函数,会根据名字不一样赋予task不一样的任务,ps的用于存储变量,worker的用于计算。

同步与异步更新

  • 同步更新:将数据拆分红多份,每份基于参数计算出各自部分的梯度;当每一份的部分梯度计算完成后,收集到一块儿算出总梯度,再用总梯度去更新参数。
  • 异步更新:同步更新模式下,每次都要等各个部分的梯度计算完后才能进行参数更新操做,处理速度取决于计算梯度最慢的那个部分,其余部分存在大量的等待时间浪费;异步更新模式下,全部的部分只须要算本身的梯度,根据本身的梯度更新参数,不一样部分之间不存在通讯和等待。

between-graph replication vs in-graph replication

所谓 replication,指的是各个task(简单的状况下,每一个task运行在不一样的GPU device上)如何得到同一个模型,也就是说,replication的对象是模型。

在使用in-graph replication方式时,只有一个client进程(能够在参与训练的CPU或GPU上任选一个task来运行这个client,参与计算的其它tasks不运行这个client)来建立模型(即tf.Graph)及模型的参数(那些tf.Variables,好比权重W和偏置b)。因为参数(W和b)是共享的,该client指定把参数放在/job:ps,即parameter server上(好比 /job:ps/task:0/cpu:0)。模型的计算部分(前向传播,后向传播,loss和梯度计算,等等)也由该client进程定义好,而后client进程把这个计算部分分配到各个GPU device上(这个过程就至关于在各个GPU中复制模型),分配的方式相似函数调用,但每次调用都指定了设备(即 /job:worker/task:0/gpu:0,/job:worker/task:1/gpu:0,等等)。调用时,模型的参数(即W和b)被看成函数的参数输入给不一样tasks(一般运行在不一样GPU上)运行的模型,以保证这些参数确实是共享的。

clipboard.png

若是用between-graph replication方式,则每一个task都运行本身的client进程用于建立模型和参数,并将参数pin到parameter server上(好比 /job:ps/task:0/cpu:0),而后各自独立地执行该模型。注意,每一个task建立的模型必须如出一辙,这很容易作到,由于只要每一个task里的这部分代码都同样就好了。问题是,这些task各自建立并pin到parameter server上的模型参数是一样的吗?问这个问题是由于咱们如今跑的是数据并行,而模型的参数及其更新都必须由parameter server统一处理。回答是,只要各task使用一样的parameter server设备名(好比都用 /job:ps/task:0/cpu:0)和一样的变量名(那些tf.Variable定义的变量,好比权重和偏置变量), 那么在默认的状况下,它们被分配在parameter server的相同的存储里。

clipboard.png

分布式训练案例

import tensorflow as tf
import numpy as np

# Configuration of cluster 
ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS

def main(_):
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):
        
        x_data = tf.placeholder(tf.float32, [100])
        y_data = tf.placeholder(tf.float32, [100])

        W = tf.Variable(tf.random_uniform([1], -1.0, 1.0))
        b = tf.Variable(tf.zeros([1]))
        y = W * x_data + b
        loss = tf.reduce_mean(tf.square(y - y_data))
        
        global_step = tf.Variable(0, name="global_step", trainable=False)
        optimizer = tf.train.GradientDescentOptimizer(0.1)
        train_op = optimizer.minimize(loss, global_step=global_step)
        
        tf.summary.scalar('cost', loss)
        summary_op = tf.summary.merge_all()
        init_op = tf.global_variables_initializer()
    # The StopAtStepHook handles stopping after running given steps.
    hooks = [ tf.train.StopAtStepHook(last_step=1000000)]
    # The MonitoredTrainingSession takes care of session initialization,
    # restoring from a checkpoint, saving to a checkpoint, and closing when done
    # or an error occurs.
    with tf.train.MonitoredTrainingSession(master="grpc://" + worker_hosts[FLAGS.task_index],
                                           is_chief=(FLAGS.task_index==0), # 咱们制定task_index为0的任务为主任务,用于负责变量初始化、作checkpoint、保存summary和复原
                                           checkpoint_dir="/tmp/tf_train_logs",
                                           save_checkpoint_secs=None,
                                           hooks=hooks) as mon_sess:
        while not mon_sess.should_stop():
            # Run a training step asynchronously.
            # See `tf.train.SyncReplicasOptimizer` for additional details on how to
            # perform *synchronous* training.
            # mon_sess.run handles AbortedError in case of preempted PS.
            train_x = np.random.rand(100).astype(np.float32)
            train_y = train_x * 0.1 + 0.3
            _, step, loss_v, weight, biase = mon_sess.run([train_op, global_step, loss, W, b], feed_dict={x_data: train_x, y_data: train_y})
            if step % 100 == 0:
                print "step: %d, weight: %f, biase: %f, loss: %f" %(step, weight, biase, loss_v)
        print "Optimization finished."

if __name__ == "__main__":
    tf.app.run()

代码中,tf.train.replica_device_setter()会根据job名,将with内的Variable op放到ps tasks,将其余计算op放到worker tasks。默认分配策略是轮询。

在属于集群的一台机器中执行上面的代码,屏幕会开始输出每轮迭代的训练参数和损失

python train.py --task_index=0

在另外一台机器上执行下面你的代码,再启动一个任务,会看到屏幕开始输出每轮迭代的训练参数和损失,注意,step再也不是从0开始,而是在启动时刻上一个启动任务的step后继续。此时观察两个任务,会发现他们同时在对同一参数进行更新。

python train.py --task_index=2

思考

分布式TensorFlow与Spark对比:

  • 分布式的级别不一样:TensorFlow的Tensor、Variable和Op不是分布式的,分布式执行的是subgraph. Spark的op和变量都是构建在RDD上,RDD自己是分布式的。
  • 异步训练:TensorFlow支持同步和异步的分布式训练;Spark原生的API只支持同步训练
  • 分布式存储:Spark在底层封装好了worker和分布式数据之间的关系;TensorFlow须要自行维护。
  • Parameter Server:TensorFlow支持,Spark暂不支持。
  • TF分布式部署起来仍是比较繁琐的,须要定义好每一个任务的ip:port,手工启动每一个task,不提供一个界面能够对集群进行维护。

参考资料

相关文章
相关标签/搜索