感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154.html 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。html
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):node
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:python
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。git
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。github
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。后端
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。网络
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。多线程
普通单卡训练
首先,导入所须要的库:框架
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:tcp
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html感谢参考原文-http://bjbsair.com/2020-03-27/tech-info/7154/ 神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操做(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集愈来愈大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,须要一种在不一样GPU之间对模型和数据进行切分和调度的方法。
PyTorch是很是流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法能够在多个GPU上切分模型和数据:nn.DataParallel和nn.distributedataparallel。DataParallel更易于使用(只需简单包装单GPU模型)。然而,因为它使用一个进程来计算模型权重,而后在每一个批处理期间将分发到每一个GPU,所以通讯很快成为一个瓶颈,GPU利用率一般很低。并且,nn.DataParallel要求全部的GPU都在同一个节点上(不支持分布式),并且不能使用Apex进行混合精度训练。nn.DataParallel和nn.distributedataparallel的主要差别能够总结为如下几点(译者注):
- DistributedDataParallel支持模型并行,而DataParallel并不支持,这意味若是模型太大单卡显存不足时只能使用前者;
- DataParallel是单进程多线程的,只用于单卡状况,而DistributedDataParallel是多进程的,适用于单机和多机状况,真正实现分布式训练;
- DistributedDataParallel的训练更高效,由于每一个进程都是独立的Python解释器,避免GIL问题,并且通讯成本低其训练速度更快,基本上DataParallel已经被弃用;
- 必需要说明的是DistributedDataParallel中每一个进程都有独立的优化器,执行本身的更新过程,可是梯度经过通讯传递到每一个进程,全部执行的内容是相同的;
总的来讲,Pytorch文档是至关完备和清晰的,尤为是在1.0x版本后。可是关于DistributedDataParallel的介绍却较少,主要的文档有如下三个:
- Writing Distributed Applications with PyTorch:主要介绍分布式API,分布式配置,不一样通讯机制以及内部机制,可是说实话大部分人不太赞成看懂,并且不多会直接用这些;
- Getting Started with Distributed Data Parallel:简单介绍了如何使用DistributedDataParallel,可是用例并不清晰完整;
- ImageNet training in PyTorch:比较完整的使用实例,可是仅有代码,缺乏详细说明;(apex也提供了一个相似的训练用例Mixed Precision ImageNet Training in PyTorch)
- (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亚马逊云上进行分布式训练,可是估计不少人用不到。
这篇教程将经过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,并且也包括任何使用apex进行混合精度训练。
DistributedDataParallel内部机制
DistributedDataParallel经过多进程在多个GPUs间复制模型,每一个GPU都由一个进程控制(固然可让每一个进程控制多个GPU,但这显然比每一个进程有一个GPU要慢;也能够多个进程在一个GPU上运行)。GPU能够都在同一个节点上,也能够分布在多个节点上。每一个进程都执行相同的任务,而且每一个进程都与全部其余进程通讯。进程或者说GPU之间只传递梯度,这样网络通讯就再也不是瓶颈。
在训练过程当中,每一个进程从磁盘加载batch数据,并将它们传递到其GPU。每个GPU都有本身的前向过程,而后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,因此梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每一个节点都获得了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通讯。Pytorch经过distributed.init_process_group函数来实现这一点。他须要知道进程0位置以便全部进程均可以同步,以及预期的进程总数。每一个进程都须要知道进程总数及其在进程中的顺序,以及使用哪一个GPU。一般将进程总数称为world_size.Pytorch提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
实例讲解
这里经过一个MNIST实例来说解,咱们先将其改为分布式训练,而后增长混合精度训练。
普通单卡训练
首先,导入所须要的库:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而后咱们定义一个简单的CNN模型处理MNIST数据:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函数main()接受参数,执行训练:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中训练部分主函数为:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
经过启动主函数来开始训练:
if __name__ == '__main__': main()
你可能注意到有些参数是多余的,可是对后面的分布式训练是有用的。咱们经过执行如下语句就能够在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
分布式训练
使用多进程进行分布式训练,咱们须要为每一个GPU启动一个进程。每一个进程须要知道本身运行在哪一个GPU上,以及自身在全部进程中的序号。对于多节点,咱们须要在每一个节点启动脚本。
首先,咱们要配置基本的参数:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是节点总数,而args.gpus是每一个节点的GPU总数(每一个节点GPU数是同样的),而args.nr 是当前节点在全部节点的序号。节点总数乘以每一个节点的GPU数能够获得world_size,也即进程总数。全部的进程须要知道进程0的IP地址以及端口,这样全部进程能够在开始时同步,通常状况下称进程0是master进程,好比咱们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点全部进程,每一个进程运行train(i, args),其中i从0到args.gpus - 1。
一样,咱们要修改训练函数:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
这里咱们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,而后就是经过dist.init_process_group初始化分布式环境,其中backend参数指定通讯后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通讯框架,相对比较高效。mpi也是高性能计算经常使用的通讯协议,不过你须要本身安装MPI实现框架,好比OpenMPI。gloo却是内置通讯后端,可是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里咱们设置的是env://,指的是环境变量初始化方式,须要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数咱们已经配置,后面两个参数也能够经过dist.init_process_group函数中world_size和rank参数配置。其它的初始化方式还包括共享文件系统以及TCP,好比init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待全部进程来同步,若是任何一个进程出错,就会失败。
对于模型侧,咱们只须要用DistributedDataParallel包装一下原来的model便可,在背后它会支持梯度的All-Reduce操做。对于数据侧,咱们nn.utils.data.DistributedSampler来给各个进程切分数据,只须要在dataloader中使用这个sampler就好,值得注意的一点是你要训练循环过程的每一个epoch开始时调用train_sampler.set_epoch(epoch),(主要是为了保证每一个epoch的划分是不一样的)其它的训练代码都保持不变。
最后就能够执行代码了,好比咱们是4节点,每一个节点是8卡,那么须要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size实际上是batch_size_per_gpu * world_size,对于有BN的模型还能够采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实一样适用于评估或者测试过程,好比咱们把数据划分到不一样的进程中进行预测,这样能够加速预测过程。实现代码和上述过程彻底同样,不过咱们想计算某个指标,那就须要从各个进程的统计结果进行All-Reduce,由于每一个进程仅是计算的部分数据的内容。好比咱们要计算分类准确度,咱们能够统计每一个进程的数据总数total和分类正确的数量count,而后进行聚合。这里要提的一点,当用dist.init_process_group初始化分布式环境时,其实就是创建一个默认的分布式进程组(distributed process group),这个group同时会初始化Pytorch的torch.distributed包。这样咱们能够直接用torch.distributed的API就能够进行分布式基本操做了,下面是具体实现:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度训练(采用apex)
混合精度训练(混合FP32和FP16训练)能够适用更大的batch_size,并且能够利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练很是简单,只须要修改部分代码:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其实就两处变化,首先是采用amp.initialize来包装model和optimizer以支持混合精度训练,其中opt_level指的是优化级别,若是为O0或者O3不是真正的混合精度,可是能够用来肯定模型效果和速度的baseline,而O1和O2是混合精度的两种设置,能够选择某个进行混合精度训练。另一处是在进行根据梯度更新参数前,要先经过amp.scale_loss对梯度进行scale以防止梯度下溢(underflowing)。此外,你还能够用apex.parallel.DistributedDataParallel替换nn.DistributedDataParallel。
题外话
我以为PyTorch官方的分布式实现已经比较完善,并且性能和效果都不错,能够替代的方案是horovod,不只支持PyTorch还支持TensorFlow和MXNet框架,实现起来也是比较容易的,速度方面应该不相上下。
参考
- Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部份内容来自此处)
- torch.distributed https://pytorch.org/docs/stable/distributed.html