做者|DR. VAIBHAV KUMAR
编译|VK
来源|Analytics In Diamagpython
PyTorch经过提供大量强大的工具和技术,一直在推进计算机视觉和深度学习领域的发展。git
在计算机视觉领域,基于深度学习的执行须要处理大量的图像数据集,所以须要一个加速的环境来加快执行过程以达到可接受的精度水平。github
PyTorch经过XLA(加速线性代数)提供了这一特性,XLA是一种线性代数编译器,能够针对多种类型的硬件,包括GPU和TPU。PyTorch/XLA环境与Google云TPU集成,实现了更快的执行速度。网络
在本文中,咱们将在PyTorch中使用TPU演示一种深卷积神经网络ResNet50的实现。app
该模型将在PyTorch/XLA环境中进行训练和测试,以完成CIFAR10数据集的分类任务。咱们还将检查在50个epoch训练所花费的时间。dom
为了利用TPU的功能,这个实现是在Google Colab中完成的。首先,咱们须要从Notebook设置下的硬件加速器中选择TPU。curl
选择TPU后,咱们将使用下面的行验证环境代码:机器学习
import os assert os.environ['COLAB_TPU_ADDR']
若是启用了TPU,它将成功执行,不然它将抛出‘KeyError: ‘COLAB_TPU_ADDR’’。你也能够经过打印TPU地址来检查TPU。ide
TPU_Path = 'grpc://'+os.environ['COLAB_TPU_ADDR'] print('TPU Address:', TPU_Path)
在下一步中,咱们将安装XLA环境以加快执行过程。咱们在上一篇文章中实现了卷积神经网络。函数
VERSION = "20200516" !curl https://raw.githubusercontent.com/pytorch/xla/master/contrib/scripts/env-setup.py -o pytorch-xla-env-setup.py !python pytorch-xla-env-setup.py --version $VERSION
如今,咱们将在这里导入全部必需的库。
from matplotlib import pyplot as plt import numpy as np import os import time import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import torch_xla import torch_xla.core.xla_model as xm import torch_xla.debug.metrics as met import torch_xla.distributed.parallel_loader as pl import torch_xla.distributed.xla_multiprocessing as xmp import torch_xla.utils.utils as xu import torchvision from torchvision import datasets, transforms import time from google.colab.patches import cv2_imshow import cv2
导入库以后,咱们将定义并初始化所需的参数。
# 定义参数 FLAGS = {} FLAGS['data_dir'] = "/tmp/cifar" FLAGS['batch_size'] = 128 FLAGS['num_workers'] = 4 FLAGS['learning_rate'] = 0.02 FLAGS['momentum'] = 0.9 FLAGS['num_epochs'] = 50 FLAGS['num_cores'] = 8 FLAGS['log_steps'] = 20 FLAGS['metrics_debug'] = False
在下一步中,咱们将定义ResNet50模型。
class BasicBlock(nn.Module): expansion = 1 def __init__(self, in_planes, planes, stride=1): super(BasicBlock, self).__init__() self.conv1 = nn.Conv2d( in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(planes) self.conv2 = nn.Conv2d( planes, planes, kernel_size=3, stride=1, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(planes) self.shortcut = nn.Sequential() if stride != 1 or in_planes != self.expansion * planes: self.shortcut = nn.Sequential( nn.Conv2d( in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(self.expansion * planes)) def forward(self, x): out = F.relu(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) out += self.shortcut(x) out = F.relu(out) return out class ResNet(nn.Module): def __init__(self, block, num_blocks, num_classes=10): super(ResNet, self).__init__() self.in_planes = 64 self.conv1 = nn.Conv2d( 3, 64, kernel_size=3, stride=1, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(64) self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1) self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2) self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2) self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2) self.linear = nn.Linear(512 * block.expansion, num_classes) def _make_layer(self, block, planes, num_blocks, stride): strides = [stride] + [1] * (num_blocks - 1) layers = [] for stride in strides: layers.append(block(self.in_planes, planes, stride)) self.in_planes = planes * block.expansion return nn.Sequential(*layers) def forward(self, x): out = F.relu(self.bn1(self.conv1(x))) out = self.layer1(out) out = self.layer2(out) out = self.layer3(out) out = self.layer4(out) out = F.avg_pool2d(out, 4) out = torch.flatten(out, 1) out = self.linear(out) return F.log_softmax(out, dim=1) def ResNet50(): return ResNet(BasicBlock, [3, 4, 6, 4, 3])
下面的代码片断将定义加载CIFAR10数据集、准备训练和测试数据集、训练过程和测试过程的函数。
SERIAL_EXEC = xmp.MpSerialExecutor() # 只在内存中实例化一次模型权重。 WRAPPED_MODEL = xmp.MpModelWrapper(ResNet50()) def train_resnet50(): torch.manual_seed(1) def get_dataset(): norm = transforms.Normalize( mean=(0.4914, 0.4822, 0.4465), std=(0.2023, 0.1994, 0.2010)) transform_train = transforms.Compose([ transforms.RandomCrop(32, padding=4), transforms.RandomHorizontalFlip(), transforms.ToTensor(), norm, ]) transform_test = transforms.Compose([ transforms.ToTensor(), norm, ]) train_dataset = datasets.CIFAR10( root=FLAGS['data_dir'], train=True, download=True, transform=transform_train) test_dataset = datasets.CIFAR10( root=FLAGS['data_dir'], train=False, download=True, transform=transform_test) return train_dataset, test_dataset # 使用串行执行器能够避免多个进程 # 下载相同的数据。 train_dataset, test_dataset = SERIAL_EXEC.run(get_dataset) train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=xm.xrt_world_size(), rank=xm.get_ordinal(), shuffle=True) train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=FLAGS['batch_size'], sampler=train_sampler, num_workers=FLAGS['num_workers'], drop_last=True) test_loader = torch.utils.data.DataLoader( test_dataset, batch_size=FLAGS['batch_size'], shuffle=False, num_workers=FLAGS['num_workers'], drop_last=True) # 将学习率缩放 learning_rate = FLAGS['learning_rate'] * xm.xrt_world_size() # 获取损失函数、优化器和模型 device = xm.xla_device() model = WRAPPED_MODEL.to(device) optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=FLAGS['momentum'], weight_decay=5e-4) loss_fn = nn.NLLLoss() def train_loop_fn(loader): tracker = xm.RateTracker() model.train() for x, (data, target) in enumerate(loader): optimizer.zero_grad() output = model(data) loss = loss_fn(output, target) loss.backward() xm.optimizer_step(optimizer) tracker.add(FLAGS['batch_size']) if x % FLAGS['log_steps'] == 0: print('[xla:{}]({}) Loss={:.2f} Time={}'.format(xm.get_ordinal(), x, loss.item(), time.asctime()), flush=True) def test_loop_fn(loader): total_samples = 0 correct = 0 model.eval() data, pred, target = None, None, None for data, target in loader: output = model(data) pred = output.max(1, keepdim=True)[1] correct += pred.eq(target.view_as(pred)).sum().item() total_samples += data.size()[0] accuracy = 100.0 * correct / total_samples print('[xla:{}] Accuracy={:.2f}%'.format( xm.get_ordinal(), accuracy), flush=True) return accuracy, data, pred, target # 训练和评估的循环 accuracy = 0.0 data, pred, target = None, None, None for epoch in range(1, FLAGS['num_epochs'] + 1): para_loader = pl.ParallelLoader(train_loader, [device]) train_loop_fn(para_loader.per_device_loader(device)) xm.master_print("Finished training epoch {}".format(epoch)) para_loader = pl.ParallelLoader(test_loader, [device]) accuracy, data, pred, target = test_loop_fn(para_loader.per_device_loader(device)) if FLAGS['metrics_debug']: xm.master_print(met.metrics_report(), flush=True) return accuracy, data, pred, target
如今,咱们将开始ResNet50的训练。训练将在咱们在参数中定义的50个epoch内完成。训练开始前,咱们会记录训练时间,训练结束后,咱们将打印总时间。
start_time = time.time() # 启动训练流程 def training(rank, flags): global FLAGS FLAGS = flags torch.set_default_tensor_type('torch.FloatTensor') accuracy, data, pred, target = train_resnet50() if rank == 0: # 检索TPU核心0上的张量并绘制。 plot_results(data.cpu(), pred.cpu(), target.cpu()) xmp.spawn(training, args=(FLAGS,), nprocs=FLAGS['num_cores'], start_method='fork')
训练结束后,咱们会打印训练过程所花费的时间。
最后,在训练过程当中,咱们将模型对样本测试数据的预测可视化。
end_time = time.time() print("Time taken = ", end_time-start_time)
原文连接:https://analyticsindiamag.com/hands-on-guide-to-implement-resnet50-in-pytorch-with-tpu/
欢迎关注磐创AI博客站:
http://panchuang.net/
sklearn机器学习中文官方文档:
http://sklearn123.com/
欢迎关注磐创博客资源汇总站:
http://docs.panchuang.net/