Tensorflow的一个特点就是分布式计算。分布式Tensorflow是由高性能的gRPC框架做为底层技术来支持的。这是一个通讯框架gRPC(google remote procedure call),是一个高性能、跨平台的RPC框架。RPC协议,即远程过程调用协议,是指经过网络从远程计算机程序上请求服务。git
Tensorflow分布式是由多个服务器进程和客户端进程组成。有几种部署方式,例如单机多卡和多机多卡(分布式)。算法
单机多卡是指单台服务器有多块GPU设备。假设一台机器上有4块GPU,单机多GPU的训练过程以下:shell
而分布式是指有多台计算机,充分使用多台计算机的性能,处理数据的能力。能够根据不一样计算机划分不一样的工做节点。当数据量或者计算量达到超过一台计算机处理能力的上限的话,必须使用分布式。api
当咱们知道的基本的分布式原理以后,咱们来看看分布式的架构的组成。分布式架构的组成能够说是一个集群的组成方式。那么通常咱们在进行Tensorflow分布式时,须要创建一个集群。一般是咱们分布式的做业集合。一个做业中又包含了不少的任务(工做结点),每一个任务由一个工做进程来执行。服务器
通常来讲,在分布式机器学习框架中,咱们会把做业分红参数做业(parameter job)和工做结点做业(worker job)。运行参数做业的服务器咱们称之为参数服务器(parameter server,PS),负责管理参数的存储和更新,工做结点做业负责主要从事计算的任务,如运行操做。网络
参数服务器,当模型愈来愈大时,模型的参数愈来愈多,多到一台机器的性能不够完成对模型参数的更新的时候,就须要把参数分开放到不一样的机器去存储和更新。参数服务器能够是由多台机器组成的集群。工做节点是进行模型的计算的。Tensorflow的分布式实现了做业间的数据传输,也就是参数做业到工做结点做业的前向传播,以及工做节点到参数做业的反向传播。session
在训练一个模型的过程当中,有哪些部分能够分开,放在不一样的机器上运行呢?在这里就要接触到数据并行的概念。架构
数据并总的原理很简单。其中CPU主要负责梯度平均和参数更新,而GPU主要负责训练模型副本。app
每个设备的计算速度不同,有的快有的满,那么CPU在更新变量的时候,是应该等待每个设备的一个batch进行完成,而后求和取平均来更新呢?仍是让一部分先计算完的就先更新,后计算完的将前面的覆盖呢?这就由同步更新和异步更新的问题。
更新参数分为同步和异步两种方式,即异步随机梯度降低法(Async-SGD)和同步随机梯度降低法(Sync-SGD)
建立集群的方法是为每个任务启动一个服务,这些任务能够分布在不一样的机器上,也能够同一台机器上启动多个任务,使用不一样的GPU等来运行。每一个任务都会建立完成如下工做
Tensorflow的分布式API使用以下:
建立ClusterSpec,表示参与分布式TensorFlow计算的一组进程
cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222", /job:worker/task:0 "worker1.example.com:2222", /job:worker/task:1 "worker2.example.com:2222"],/job:worker/task:2 "ps": ["ps0.example.com:2222", /job:ps/task:0 "ps1.example.com:2222"]}) /job:ps/task:1
建立Tensorflow的集群描述信息,其中ps和worker为做业名称,经过指定ip地址加端口建立
建立一个服务(主节点或者工做节点服务),用于运行相应做业上的计算任务,运行的任务在task_index指定的机器上启动,例如在不一样的ip+端口上启动两个工做任务
tf.device(device_name_or_function):选择指定设备或者设备函数
import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data FLAGS = tf.app.flags.FLAGS tf.app.flags.DEFINE_string("job_name", "worker", "启动服务类型,ps或者worker") tf.app.flags.DEFINE_integer("task_index", 0, "指定是哪一台服务器索引") def main(argv): # 集群描述 cluster = tf.train.ClusterSpec({ "ps": ["127.0.0.1:4466"], "worker": ["127.0.0.1:4455"] }) # 建立不一样的服务 server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) if FLAGS.job_name == "ps": server.join() else: work_device = "/job:worker/task:0/cpu:0" with tf.device(tf.train.replica_device_setter( worker_device=work_device, cluster=cluster )): # 全局计数器 global_step = tf.train.get_or_create_global_step() # 准备数据 mnist = input_data.read_data_sets("./data/mnist/", one_hot=True) # 创建数据的占位符 with tf.variable_scope("data"): x = tf.placeholder(tf.float32, [None, 28 * 28]) y_true = tf.placeholder(tf.float32, [None, 10]) # 创建全链接层的神经网络 with tf.variable_scope("fc_model"): # 随机初始化权重和偏重 weight = tf.Variable(tf.random_normal([28 * 28, 10], mean=0.0, stddev=1.0), name="w") bias = tf.Variable(tf.constant(0.0, shape=[10])) # 预测结果 y_predict = tf.matmul(x, weight) + bias # 全部样本损失值的平均值 with tf.variable_scope("soft_loss"): loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=y_true, logits=y_predict)) # 梯度降低 with tf.variable_scope("optimizer"): train_op = tf.train.GradientDescentOptimizer(0.1).minimize(loss, global_step=global_step) # 计算准确率 with tf.variable_scope("acc"): equal_list = tf.equal(tf.argmax(y_true, 1), tf.argmax(y_predict, 1)) accuracy = tf.reduce_mean(tf.cast(equal_list, tf.float32)) # 建立分布式会话 with tf.train.MonitoredTrainingSession( checkpoint_dir="./temp/ckpt/test", master="grpc://127.0.0.1:4455", is_chief=(FLAGS.task_index == 0), config=tf.ConfigProto(log_device_placement=True), hooks=[tf.train.StopAtStepHook(last_step=100)] ) as mon_sess: while not mon_sess.should_stop(): mnist_x, mnist_y = mnist.train.next_batch(4000) mon_sess.run(train_op, feed_dict={x: mnist_x, y_true: mnist_y}) print("训练第%d步, 准确率为%f" % (global_step.eval(session=mon_sess), mon_sess.run(accuracy, feed_dict={x: mnist_x, y_true: mnist_y}))) if __name__ == '__main__': tf.app.run()
运行参数服务器:
$ python zfx.py --job_name=ps
运行worker服务器:
$ python zfx.py --job_name=worker