tensorflow 多GPU编程 彻底指南

目前已有不少介绍tensorflow使用多GPU的文章,但大多凌乱不堪,更有相互借鉴之嫌。笔者钻研很多天,总算理清里面的脉络,特成此文以飨读者。python

  1. 缘起

tensorflow使用GPU时默认占满全部可用GPU的显存,但只在第一个GPU上进行计算。下图展现了一个典型的使用GPU训练的例子,虽然机器上有两块GPU,但却只有一块真正在工做,若是不加以利用,另外一块GPU就白白浪费了。咱们知道,GPU是一种相对比较昂贵的计算资源,虽然正值矿难,相比以前动辄八九千一块1080Ti的价格低了很多,但也不是通常人能浪费的起的,所以如何有效提升GPU特别是tensorflow上的利用率就成为了一项重要的考量。git

2.朴素的解决方案

一种常常说起的方法是设置可见的GPU,方法是经过设置CUDA_VISIBLE_DEVICES来完成,若是在shell中运行,每次运行某个shell脚本以前,加上export CUDA_VISIBLE_DEVICES=0#或者是你指望运行的GPU id(0到n-1,其中n是机器上GPU卡的数量),若是是在python脚本,还能够在全部代码以前:github

import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"

另外为缓解tensorflow一上来就占满整个GPU致使的问题,还能够在建立sess的时候传入附加的参数,更好的控制GPU的使用方法是shell

config = tf.ConfigProto(allow_soft_placement=True,allow_grouth=True)
config.gpu_options.per_process_gpu_memory_fraction = 0.9 #占用90%显存
sess = tf.Session(config=config)

3.多GPU编程指南

上面的方法确实能够解决必定的问题,好比多个用户公用几块卡,每一个人分配不一样的卡来作实验,或者每张卡上运行不一样的参数设置。但有时候咱们更须要利用好已有的卡,来或得线性的加速比,以便在更短的时候获取参考结果,上面的方法就无能无力,只能本身写代码解决了,而当前怎么把单GPU代码转换为多GPU代码好像尚未一篇文章给出示例。编程

说到tensorflow的多GPU编程,不得不说是一部悲壮的血泪史。用过caffe的用户都知道caffe下的多GPU是多么的简单,从单GPU切换到多GPU你根本不用修改任何代码,只须要在编译选项中把USE_NCCL打开就好,剩下的都是caffe帮你自动完成了。到了tensorflow就不同的,全部的东西都得你本身操心,包括变量放哪,平均梯度什么的。固然好处是你对训练的过程理解的也更透彻了。网络

tensorflow的models里面有一个使用多GPU的例子,路径为tutorials/image/cifar10/cifar10_multi_gpu_train.py,但它仅仅是一个能运行的demo而已,里面并无解释枚举代码的含义,更别提为何写那些代码了,乱的一比。app

从网上找出一个能用且好用的例子并不像想象中的简单,我踏遍整个网络,把百度和谷歌中全部相关的文章都看了一遍,以为还不错的是Multi-GPU Basic,里面用一个小例子对比了单GPU和多GPU的差别。异步

另一个用Mnist的例子是这边文章要描述的重点,不过里面仍有一些干扰咱们理解的代码,好比那个assign_to_device函数。函数

其实tensorflow使用多GPU也是很是简单的,无外乎用tf.device("/gpu:0")之类的包起来,但要注意变量存放的位置。测试

咱们也以一个mnist单GPU训练的例子做为开始:

import tensorflow as tf
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data
 
mnist=input_data.read_data_sets("/tmp/mnist/",one_hot=True)
 
num_gpus=2
num_steps=200
learning_rate=0.001
batch_size=1024
display_step=10
 
num_input=784
num_classes=10
def conv_net(x,is_training):
    # "updates_collections": None is very import ,without will only get 0.10
    batch_norm_params = {"is_training": is_training, "decay": 0.9, "updates_collections": None}
    #,'variables_collections': [ tf.GraphKeys.TRAINABLE_VARIABLES ]
    with slim.arg_scope([slim.conv2d, slim.fully_connected],
                        activation_fn=tf.nn.relu,
                        weights_initializer=tf.truncated_normal_initializer(mean=0.0, stddev=0.01),
                        weights_regularizer=slim.l2_regularizer(0.0005),
                        normalizer_fn=slim.batch_norm, normalizer_params=batch_norm_params):
        with tf.variable_scope("ConvNet",reuse=tf.AUTO_REUSE):
            x = tf.reshape(x, [-1, 28, 28, 1])
            net = slim.conv2d(x, 6, [5,5], scope="conv_1")
            net = slim.max_pool2d(net, [2, 2],scope="pool_1")
            net = slim.conv2d(net, 12, [5,5], scope="conv_2")
            net = slim.max_pool2d(net, [2, 2], scope="pool_2")
            net = slim.flatten(net, scope="flatten")
            net = slim.fully_connected(net, 100, scope="fc")
            net = slim.dropout(net,is_training=is_training)
            net = slim.fully_connected(net, num_classes, scope="prob", activation_fn=None,normalizer_fn=None)
            return net
def train_single():
    X = tf.placeholder(tf.float32, [None, num_input])
    Y = tf.placeholder(tf.float32, [None, num_classes])
    logits=conv_net(X,True)
    loss=tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=Y,logits=logits))
    opt=tf.train.AdamOptimizer(learning_rate)
    train_op=opt.minimize(loss)
    logits_test=conv_net(X,False)
    correct_prediction = tf.equal(tf.argmax(logits_test, 1), tf.argmax(Y, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        for step in range(1,num_steps+1):
            batch_x, batch_y = mnist.train.next_batch(batch_size)
            sess.run(train_op,feed_dict={X:batch_x,Y:batch_y})
            if step%display_step==0 or step==1:
                loss_value,acc=sess.run([loss,accuracy],feed_dict={X:batch_x,Y:batch_y})
                print("Step:" + str(step) + ":" + str(loss_value) + " " + str(acc))
        print("Done")
        print("Testing Accuracy:",np.mean([sess.run(accuracy, feed_dict={X: mnist.test.images[i:i + batch_size],
              Y: mnist.test.labels[i:i + batch_size]}) for i in
              range(0, len(mnist.test.images), batch_size)]))

if __name__ == "__main__":
    train_single()

在上面寥寥数行的代码中,咱们下载并构建了mnist数据集,经过mnist.train.next_batch(batch_szie)的方式返回一个batch的数据,而后扔进conv_net网络,计算出loss后再使用Adam优化器进行训练,最后测试了其精度。

只须要几分钟,在个人一块1080Ti卡上就能运行完毕,而且获得至关好的结果:

Step:1:2.213318 0.20703125
Step:10:0.46338645 0.88183594
Step:20:0.18729115 0.9550781
Step:30:0.17860937 0.9628906
Step:40:0.11540267 0.97558594
Step:50:0.081396215 0.9824219
Step:60:0.097750194 0.9746094
Step:70:0.060169913 0.984375
Step:80:0.059070613 0.98828125
Step:90:0.060746174 0.9892578
Step:100:0.057775088 0.9892578
Step:110:0.038614694 0.98828125
Step:120:0.0369242 0.9921875
Step:130:0.035249908 0.9941406
Step:140:0.03395287 0.9902344
Step:150:0.03798459 0.98828125
Step:160:0.052775905 0.99121094
Step:170:0.017296169 0.99609375
Step:180:0.026407585 0.9951172
Step:190:0.044104658 0.9941406
Step:200:0.025472593 0.99121094
Done
('Testing Accuracy:', 0.99301463)

接下来咱们将以此为起点,对其进行多GPU化改造。下面的图很好的阐释了整个流程。

多GPU并行可分为模型并行和数据并行两大类,上图展现的是数据并行,这也是咱们常常用到的方法,而其中数据并行又可分为同步方式和异步方式两种,因为咱们通常都会配置一样的显卡,所以这儿也选择了同步方式,也就是把数据分给不一样的卡,等全部的GPU都计算完梯度后进行平均,最后再更新梯度。

首先要改造的就是数据读取部分,因为如今咱们有多快卡,每张卡要分到不一样的数据,因此在获取batch的时候要把大小改成batch_x,batch_y=mnist.train.next_batch(batch_size*num_gpus),一次取足够的数据保证每块卡都分到batch_size大小的数据。而后咱们对取到的数据进行切分,咱们以i表示GPU的索引,连续的batch_size大小的数据分给同一块GPU:

_x=X[i*batch_size:(i+1)*batch_size]
_y=Y[i*batch_size:(i+1)*batch_size]

因为咱们多个GPU上共享一样的图,为了防止名字混乱,最好使用name_scope进行区分,也就是以下的形式:

for i in range(2):
            with tf.device("/gpu:%d"%i):
                with tf.name_scope("tower_%d"%i):
                    _x=X[i*batch_size:(i+1)*batch_size]
                    _y=Y[i*batch_size:(i+1)*batch_size]
                    logits=conv_net(_x,dropout,reuse_vars,True)

咱们须要有个列表存储全部GPU上的梯度,还有就是复用变量,须要在以前定义以下两个值:

tower_grads=[]
 reuse_vars=False

全部的准备工做都已完成,就能够计算每一个GPU上的梯度了

opt = tf.train.AdamOptimizer(learning_rate)
                    loss=tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=_y,logits=logits))
                    grads=opt.compute_gradients(loss)
                    reuse_vars=True
                    tower_grads.append(grads)

这样tower_grads就存储了全部GPU上全部变量的梯度,下面就是计算平均值了,这个是全部见过的函数中惟一一个几乎从没变过的代码:

def average_gradients(tower_grads):
    average_grads=[]
    for grad_and_vars in zip(*tower_grads):
        grads=[]
        for g,_ in grad_and_vars:
            expend_g=tf.expand_dims(g,0)
            grads.append(expend_g)
        grad=tf.concat(grads,0)
        grad=tf.reduce_mean(grad,0)
        v=grad_and_vars[0][1]
        grad_and_var=(grad,v)
        average_grads.append(grad_and_var)
    return average_grads

tower_grads里面保存的形式是(第一个GPU上的梯度,第二个GPU上的梯度,...第N-1个GPU上的梯度),这里有一点须要注意的是zip(*),它的做用上把上面的那个列表转换成((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))的形式,也就是以列访问的方式,取到的就是某个变量在不一样GPU上的值。

最后就是更新梯度了

grads=average_gradients(tower_grads)
train_op=opt.apply_gradients(grads)

上面的讲述略有零散,最后咱们给个全代码版本方便你们测试:

import tensorflow as tf
import numpy as np
from tensorflow.contrib import slim
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("/tmp/mnist/", one_hot=True)

num_gpus = 2
num_steps = 1000
learning_rate = 0.001
batch_size = 1000
display_step = 10

num_input = 784
num_classes = 10

def conv_net_with_layers(x,is_training,dropout = 0.75):
    with tf.variable_scope("ConvNet", reuse=tf.AUTO_REUSE):
        x = tf.reshape(x, [-1, 28, 28, 1])
        x = tf.layers.conv2d(x, 12, 5, activation=tf.nn.relu)
        x = tf.layers.max_pooling2d(x, 2, 2)
        x = tf.layers.conv2d(x, 24, 3, activation=tf.nn.relu)
        x = tf.layers.max_pooling2d(x, 2, 2)
        x = tf.layers.flatten(x)
        x = tf.layers.dense(x, 100)
        x = tf.layers.dropout(x, rate=dropout, training=is_training)
        out = tf.layers.dense(x, 10)
        out = tf.nn.softmax(out) if not is_training else out
    return out

def conv_net(x,is_training):
    # "updates_collections": None is very import ,without will only get 0.10
    batch_norm_params = {"is_training": is_training, "decay": 0.9, "updates_collections": None}
    #,'variables_collections': [ tf.GraphKeys.TRAINABLE_VARIABLES ]
    with slim.arg_scope([slim.conv2d, slim.fully_connected],
                        activation_fn=tf.nn.relu,
                        weights_initializer=tf.truncated_normal_initializer(mean=0.0, stddev=0.01),
                        weights_regularizer=slim.l2_regularizer(0.0005),
                        normalizer_fn=slim.batch_norm, normalizer_params=batch_norm_params):
        with tf.variable_scope("ConvNet",reuse=tf.AUTO_REUSE):
            x = tf.reshape(x, [-1, 28, 28, 1])
            net = slim.conv2d(x, 6, [5,5], scope="conv_1")
            net = slim.max_pool2d(net, [2, 2],scope="pool_1")
            net = slim.conv2d(net, 12, [5,5], scope="conv_2")
            net = slim.max_pool2d(net, [2, 2], scope="pool_2")
            net = slim.flatten(net, scope="flatten")
            net = slim.fully_connected(net, 100, scope="fc")
            net = slim.dropout(net,is_training=is_training)
            net = slim.fully_connected(net, num_classes, scope="prob", activation_fn=None,normalizer_fn=None)
            return net

def average_gradients(tower_grads):
    average_grads = []
    for grad_and_vars in zip(*tower_grads):
        grads = []
        for g, _ in grad_and_vars:
            expend_g = tf.expand_dims(g, 0)
            grads.append(expend_g)
        grad = tf.concat(grads, 0)
        grad = tf.reduce_mean(grad, 0)
        v = grad_and_vars[0][1]
        grad_and_var = (grad, v)
        average_grads.append(grad_and_var)
    return average_grads


def train():
    with tf.device("/cpu:0"):
        global_step=tf.train.get_or_create_global_step()
        tower_grads = []
        X = tf.placeholder(tf.float32, [None, num_input])
        Y = tf.placeholder(tf.float32, [None, num_classes])
        opt = tf.train.AdamOptimizer(learning_rate)
        with tf.variable_scope(tf.get_variable_scope()):
            for i in range(2):
                with tf.device("/gpu:%d" % i):
                    with tf.name_scope("tower_%d" % i):
                            _x = X[i * batch_size:(i + 1) * batch_size]
                            _y = Y[i * batch_size:(i + 1) * batch_size]
                            logits = conv_net(_x, True)
                            tf.get_variable_scope().reuse_variables()
                            loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=_y, logits=logits))
                            grads = opt.compute_gradients(loss)
                            tower_grads.append(grads)
                            if i == 0:
                                logits_test = conv_net(_x, False)
                                correct_prediction = tf.equal(tf.argmax(logits_test, 1), tf.argmax(_y, 1))
                                accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
        grads = average_gradients(tower_grads)
        train_op = opt.apply_gradients(grads)
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            for step in range(1, num_steps + 1):
                batch_x, batch_y = mnist.train.next_batch(batch_size * num_gpus)
                sess.run(train_op, feed_dict={X: batch_x, Y: batch_y})
                if step % 10 == 0 or step == 1:
                    loss_value, acc = sess.run([loss, accuracy], feed_dict={X: batch_x, Y: batch_y})
                    print("Step:" + str(step) + ":" + str(loss_value) + " " + str(acc))
            print("Done")
            print("Testing Accuracy:",
                  np.mean([sess.run(accuracy, feed_dict={X: mnist.test.images[i:i + batch_size],
                                                         Y: mnist.test.labels[i:i + batch_size]}) for i in
                           range(0, len(mnist.test.images), batch_size)]))
def train_single():
    X = tf.placeholder(tf.float32, [None, num_input])
    Y = tf.placeholder(tf.float32, [None, num_classes])
    logits=conv_net(X,True)
    loss=tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=Y,logits=logits))
    opt=tf.train.AdamOptimizer(learning_rate)
    train_op=opt.minimize(loss)
    logits_test=conv_net(X,False)
    correct_prediction = tf.equal(tf.argmax(logits_test, 1), tf.argmax(Y, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        for step in range(1,num_steps+1):
            batch_x, batch_y = mnist.train.next_batch(batch_size)
            sess.run(train_op,feed_dict={X:batch_x,Y:batch_y})
            if step%display_step==0 or step==1:
                loss_value,acc=sess.run([loss,accuracy],feed_dict={X:batch_x,Y:batch_y})
                print("Step:" + str(step) + ":" + str(loss_value) + " " + str(acc))
        print("Done")
        print("Testing Accuracy:",np.mean([sess.run(accuracy, feed_dict={X: mnist.test.images[i:i + batch_size],
              Y: mnist.test.labels[i:i + batch_size]}) for i in
              range(0, len(mnist.test.images), batch_size)]))

if __name__ == "__main__":
    #train_single()
    train()

我这边的运行结果是

Step:1:2.1529438 0.27929688
Step:10:0.4463266 0.89941406
Step:20:0.18885617 0.9580078
Step:30:0.12153786 0.96484375
Step:40:0.07257775 0.98339844
Step:50:0.07452829 0.98535156
Step:60:0.048265547 0.99121094
Step:70:0.02948389 0.9873047
Step:80:0.048876762 0.9902344
Step:90:0.06494201 0.9902344
Step:100:0.024681691 0.98535156
Step:110:0.025596365 0.99316406
Step:120:0.02779768 0.9941406
Step:130:0.02191917 0.9980469
Step:140:0.022235561 0.9951172
Step:150:0.0124597605 0.9970703
Step:160:0.011557209 0.9902344
Step:170:0.014788041 0.99609375
Step:180:0.015063373 0.99609375
Step:190:0.012394376 0.99902344
Step:200:0.017424839 0.99609375
Done
('Testing Accuracy:', 0.99301463)

能够看到GPU都用上了