文 / Zalando Research 研究科学家 Kashif Rasulpython
来源 | TensorFlow 公众号git
与大多数 AI 研究部门同样,Zalando Research 也意识到了对创意进行尝试和快速原型设计的重要性。随着数据集变得愈来愈庞大,了解如何利用咱们拥有的共享资源来高效快速地训练深度学习模型变得大有用处。github
TensorFlow 的估算器 API 对于在分布式环境中使用多个 GPU 来训练模型很是有用。本文将主要介绍这一工做流程。咱们先使用 Fashion-MNIST 小数据集训练一个用 tf.keras 编写的自定义估算器,而后在文末介绍一个较实际的用例。api
请注意:TensorFlow 团队一直在开发另外一项很酷的新功能(在我写这篇文章时,该功能仍处于 Master 阶段),使用这项新功能,您只需多输入几行代码便可训练 tf.keras 模型, 而无需先将该模型转化为估算器!其工做流程也很赞。下面我着重讲讲估算器 API。选择哪个由您本身决定! 注:功能连接 github.com/tensorflow/…数组
TL; DR:基本上,咱们须要记住,对于 tf.keras. 模型,咱们只要经过 tf.keras.estimator.model_to_estimator 方法将其转化为 tf.estimator.Estimator 对象,便可使用 tf.estimator API 来进行训练。转化完成后,咱们可使用估算器提供的机制用不一样的硬件配置训练模型。bash
您能够今后笔记本下载本文中的代码并亲自运行。 注:笔记本连接 github.com/kashif/tf-k…网络
import os
import time
#!pip install -q -U tensorflow-gpu
import tensorflow as tf
import numpy as np
复制代码
导入 Fashion-MNIST 数据集架构
咱们用 Fashion-MNIST 数据集随手替换一下 MNIST,这里面包含几千张 Zalando 时尚文章的灰度图像。获取训练和测试数据很是简单,以下所示:app
(train_images, train_labels), (test_images, test_labels) =
tf.keras.datasets.fashion_mnist.load_data()
复制代码
咱们想把这些图像的像素值从 0 到 255 之间的一个数字转换为 0 到 1 之间的一个数字,并将该数据集转换为 [B, H, W ,C] 格式,其中 B 表明批处理的图像数,H 和 W 分别是高度和宽度,C 是咱们数据集的通道数(灰度为 1):分布式
TRAINING_SIZE = len(train_images)
TEST_SIZE = len(test_images)
train_images = np.asarray(train_images, dtype=np.float32) / 255
# Convert the train images and add channels
train_images = train_images.reshape((TRAINING_SIZE, 28, 28, 1))
test_images = np.asarray(test_images, dtype=np.float32) / 255
# Convert the test images and add channels
test_images = test_images.reshape((TEST_SIZE, 28, 28, 1))
复制代码
接下来,咱们想将标签从整数编号(例如,2 或套衫)转换为独热编码(例如,0,0,1,0,0,0,0,0,0,0)。为此,咱们要使用 tf.keras.utils.to_categorical 函数:
# How many categories we are predicting from (0-9)
LABEL_DIMENSIONS = 10
train_labels = tf.keras.utils.to_categorical(train_labels,
LABEL_DIMENSIONS)
test_labels = tf.keras.utils.to_categorical(test_labels,
LABEL_DIMENSIONS)
# Cast the labels to floats, needed later
train_labels = train_labels.astype(np.float32)
test_labels = test_labels.astype(np.float32)
复制代码
构建 tf.keras 模型
咱们会使用 Keras 功能 API 来建立神经网络。Keras 是一个高级 API,可用于构建和训练深度学习模型,其采用模块化设计,使用方便,易于扩展。tf.keras 是 TensorFlow 对这个 API 的实现,其支持 Eager Execution、tf.data 管道和估算器等。
在架构方面,咱们会使用 ConvNet。一个很是笼统的说法是,ConvNet 是卷积层 (Conv2D) 和池化层 (MaxPooling2D) 的堆栈。但最重要的是,ConvNet 将每一个训练示例看成一个 3D 形状张量(高度、宽度、通道),对于灰度图像,张量从通道 = 1 开始,而后返回一个 3D 张量。
所以,在 ConvNet 部分以后,咱们须要将张量平面化,并添加密集层,其中最后一个返回 LABEL_DIMENSIONS 大小的向量,并附带 tf.nn.softmax 激活:
inputs = tf.keras.Input(shape=(28,28,1)) # Returns a placeholder
x = tf.keras.layers.Conv2D(filters=32,
kernel_size=(3, 3),
activation=tf.nn.relu)(inputs)
x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
x = tf.keras.layers.Conv2D(filters=64,
kernel_size=(3, 3),
activation=tf.nn.relu)(x)
x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
x = tf.keras.layers.Conv2D(filters=64,
kernel_size=(3, 3),
activation=tf.nn.relu)(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(64, activation=tf.nn.relu)(x)
predictions = tf.keras.layers.Dense(LABEL_DIMENSIONS,
activation=tf.nn.softmax)(x)
复制代码
如今,咱们能够定义学习模型,请选择优化器(咱们从 TensorFlow 中选择一个,而不使用来自 tf.keras. optimizers 的优化器)并进行编译:
model = tf.keras.Model(inputs=inputs, outputs=predictions)
optimizer = tf.train.AdamOptimizer(learning_rate=0.001)
model.compile(loss='categorical_crossentropy',
optimizer=optimizer,
metrics=['accuracy'])
复制代码
建立估算器
使用已编译的 Keras 模型建立估算器,也就是咱们所说的 model_to_estimator 方法。请注意,Keras 模型的初始模型状态保存在建立的估算器中。
那估算器有哪些优势呢?首先要提如下几点:
您能够在本地主机或分布式多 GPU 环境中运行基于估算器的模型,而无需更改您的模型; 估算器可以简化模型开发者之间的共享实现; 估算器可以为您构建图形,因此有点像 Eager Execution,没有明确的会话。
那么咱们要如何训练简单的 tf.keras 模型来使用多 GPU?咱们可使用 tf.contrib.distribute.MirroredStrategy 范式,经过同步训练进行图形内复制。如需了解更多关于此策略的信息,请观看分布式 TensorFlow 训练讲座。 注:分布式 TensorFlow 连接 www.youtube.com/watch?v=bRM…
基本上,每一个工做器 GPU 都有一个网络拷贝,并会获取一个数据子集,据以计算本地梯度,而后等待全部工做器以同步方式结束。而后,工做器经过 Ring All-reduce 运算互相传递其本地梯度,这一般要进行优化,以减小网络带宽并增长吞吐量。在全部梯度到达后,每一个工做器会计算其平均值并更新参数,而后开始下一步。理想状况下,您在单个节点上有多个高速互联的 GPU。
要使用此策略,咱们首先要用已编译的 tf.keras 模型建立一个估算器,而后经过 RunConfig config 赋予其 MirroredStrategy 配置。默认状况下,该配置会使用所有 GPU,但您也能够赋予其一个 num_gpus 选项,以使用特定数量的 GPU:
NUM_GPUS = 2
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)
estimator = tf.keras.estimator.model_to_estimator(model,
config=config)
复制代码
建立估算器输入函数
要经过管道将数据传递到估算器,咱们须要定义一个数据导入函数,该函数返回批量数据的 tf.data 数据集(图像、标签)。下面的函数接收 numpy 数组,并经过 ETL 过程返回数据集。
请注意,最后咱们还调用了预读取方法,该方法会在训练时将数据缓冲到 GPU,以便下一批数据准备就绪并等待 GPU,而不是在每次迭代时让 GPU 等待数据。GPU 可能仍然没有获得充分利用,要改善这一点,咱们可使用融合版转换运算(如 shuffle_and_repeat),而不是两个单独的运算。不过,我在这里选用的是简单用例。
def input_fn(images, labels, epochs, batch_size):
# Convert the inputs to a Dataset. (E)
ds = tf.data.Dataset.from_tensor_slices((images, labels))
# Shuffle, repeat, and batch the examples. (T)
SHUFFLE_SIZE = 5000
ds = ds.shuffle(SHUFFLE_SIZE).repeat(epochs).batch(batch_size)
ds = ds.prefetch(2)
# Return the dataset. (L)
return ds
复制代码
训练估算器
首先,咱们定义一个 SessionRunHook 类,用于记录随机梯度降低法每次迭代的次数:
class TimeHistory(tf.train.SessionRunHook):
def begin(self):
self.times = []
def before_run(self, run_context):
self.iter_time_start = time.time()
def after_run(self, run_context, run_values):
self.times.append(time.time() - self.iter_time_start)
复制代码
亮点在这里!咱们能够对估算器调用 train 函数,并经过 hooks 参数,向其赋予咱们定义的 input_fn (包含批次大小和咱们但愿的训练回合次数)和 TimeHistory 实例:
time_hist = TimeHistory()
BATCH_SIZE = 512
EPOCHS = 5
estimator.train(lambda:input_fn(train_images,
train_labels,
epochs=EPOCHS,
batch_size=BATCH_SIZE),
hooks=[time_hist])
复制代码
性能
如今,咱们可使用时间钩子来计算训练的总时间和平均每秒训练的图像数量(平均吞吐量):
total_time = sum(time_hist.times)
print(f"total time with {NUM_GPUS} GPU(s): {total_time} seconds")
avg_time_per_batch = np.mean(time_hist.times)
print(f"{BATCH_SIZE*NUM_GPUS/avg_time_per_batch} images/second with {NUM_GPUS} GPU(s)")
复制代码
使用两块 K80 GPU 进行训练时的 Fashion-MNIST 训练吞吐量和总时间,采用不一样 NUM_GPUS,显示缩放不良
评估估算器
为了检验模型的性能,咱们要对估算器调用评估方法:
estimator.evaluate(lambda:input_fn(test_images,
test_labels,
epochs=1,
batch_size=BATCH_SIZE))
复制代码
视网膜 OCT (光学相干断层成像术)图像示例
为了测试模型在处理较大数据集时的扩展性能,咱们使用 视网膜 OCT 图像数据集,这是 Kaggle 众多大型数据集中的一个。该数据集由活人视网膜的横截面 X 光图像组成,分为四个类别:NORMAL、CNV、DME 和 DRUSEN:
光学相干断层成像术的表明图像,选自 Kermany 等人所著的《经过基于图像的深度学习技术肯定医学诊断和可治疗疾病》(Identifying Medical Diagnoses and Treatable Diseases by Image-Based Deep Learning)
该数据集共有 84,495 张 JPEG 格式的 X 光图像,尺寸多为 512x496,能够经过 Kaggle CLI 下载: 注:CLI 连接 github.com/Kaggle/kagg…
#!pip install kaggle
#!kaggle datasets download -d paultimothymooney/kermany2018
复制代码
下载完成后,训练集和测试集图像类位于各自的文件夹内,所以咱们能够将模式定义为:
labels = ['CNV', 'DME', 'DRUSEN', 'NORMAL']
train_folder = os.path.join('OCT2017', 'train', '**', '*.jpeg')
test_folder = os.path.join('OCT2017', 'test', '**', '*.jpeg')
复制代码
接下来,咱们要编写估算器的输入函数,该函数能够提取任何文件模式,并返回已缩放图像和独热编码标签做为 tf.data.Dataset。此次,咱们遵循输入管道性能指南中的最佳实践。请特别注意,若是 prefetch 的 buffer_size 为 None,则 TensorFlow 会自动使用最优的预读取缓冲区大小: 注:输入管道性能指南连接 www.tensorflow.org/performance…
1 def input_fn(file_pattern, labels,
2 image_size=(224,224),
3 shuffle=False,
4 batch_size=64,
5 num_epochs=None,
6 buffer_size=4096,
7 prefetch_buffer_size=None):
8
9 table = tf.contrib.lookup.index_table_from_tensor(mapping=tf.constant(labels))
10 num_classes = len(labels)
11
12 def _map_func(filename):
13 label = tf.string_split([filename], delimiter=os.sep).values[-2]
14 image = tf.image.decode_jpeg(tf.read_file(filename), channels=3)
15 image = tf.image.convert_image_dtype(image, dtype=tf.float32)
16 image = tf.image.resize_images(image, size=image_size)
17 return (image, tf.one_hot(table.lookup(label), num_classes))
18
19 dataset = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle)
20
21 if num_epochs is not None and shuffle:
22 dataset = dataset.apply(
23 tf.contrib.data.shuffle_and_repeat(buffer_size, num_epochs))
24 elif shuffle:
25 dataset = dataset.shuffle(buffer_size)
26 elif num_epochs is not None:
27 dataset = dataset.repeat(num_epochs)
28
29 dataset = dataset.apply(
30 tf.contrib.data.map_and_batch(map_func=_map_func,
31 batch_size=batch_size,
32 num_parallel_calls=os.cpu_count()))
33 dataset = dataset.prefetch(buffer_size=prefetch_buffer_size)
34
35 return dataset
复制代码
此次训练该模型时,咱们将使用一个通过预训练的 VGG16,而且只从新训练其最后 5 层:
keras_vgg16 = tf.keras.applications.VGG16(input_shape=(224,224,3),
include_top=False)
output = keras_vgg16.output
output = tf.keras.layers.Flatten()(output)
prediction = tf.keras.layers.Dense(len(labels),
activation=tf.nn.softmax)(output)
model = tf.keras.Model(inputs=keras_vgg16.input,
outputs=prediction)
for layer in keras_vgg16.layers[:-4]:
layer.trainable = False
复制代码
如今,咱们万事皆备,能够按照上述步骤进行,并使用 NUM_GPUS GPU 在几分钟内训练咱们的模型:
model.compile(loss='categorical_crossentropy', optimizer=tf.train.AdamOptimizer(), metrics=['accuracy'])
NUM_GPUS = 2
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)
estimator = tf.keras.estimator.model_to_estimator(model, config=config)
BATCH_SIZE = 64
EPOCHS = 1
estimator.train(input_fn=lambda:input_fn(train_folder, labels, shuffle=True, batch_size=BATCH_SIZE, buffer_size=2048, num_epochs=EPOCHS, prefetch_buffer_size=4), hooks=[time_hist])
复制代码
训练结束后,咱们能够评估测试集的准确度,应该在 95% 左右(对初始基线来讲还不错):
estimator.evaluate(input_fn=lambda:input_fn(test_folder,
labels,
shuffle=False,
batch_size=BATCH_SIZE,
buffer_size=1024,
num_epochs=1))
复制代码
使用两块 K80 GPU 进行训练时的 Fashion-MNIST 训练吞吐量和总时间,采用不一样 NUM_GPUS,显示线性缩放
总结
咱们在上文中介绍了如何使用估算器 API 在多个 GPU 上轻松训练 Keras 深度学习模型,如何编写符合最佳实践的输入管道,以充分利用咱们的资源(线性缩放),以及如何经过钩子为咱们的训练吞吐量计时。
请务必注意,最后咱们主要关注的是测试集错误。您可能会注意到,测试集的准确度会随着 NUM_GPUS 值的增长而降低。其中一个缘由多是,使用 BATCH_SIZE*NUM_GPUS 的批量大小时,MirroredStrategy 可以有效地训练模型,而当咱们增长 GPU 数量时,可能须要调整 BATCH_SIZE 或学习率。为便于制图,文中除 NUM_GPUS 以外的全部其余超参数均保持不变,但实际上咱们须要调整这些超参数。
数据集和模型的大小也会影响这些方案的缩放效果。在读取或写入小数据时,GPU 的带宽较差,若是是较为老旧的 GPU(如 K80),则情形尤为如此,并且可能会形成上面 Fashion-MNIST 图中所示状况。
致谢
感谢 TensorFlow 团队,特别是 Josh Gordon,以及 Zalando Research 的各位同事,特别是 Duncan Blythe、Gokhan Yildirim 和 Sebastian Heinz,感谢他们帮忙修改草稿。