Pytorch多GPU训练本质上是数据并行,每一个GPU上拥有整个模型的参数,将一个batch的数据均分红N份,每一个GPU处理一份数据,而后将每一个GPU上的梯度进行整合获得整个batch的梯度,用整合后的梯度更新全部GPU上的参数,完成一次迭代。html
其中多gpu训练的方案有两种,一种是利用nn.DataParallel
实现,这种方法是最先引入pytorch的,使用简单方便,不涉及多进程。另外一种是用torch.nn.parallel.DistributedDataParallel
和
torch.utils.data.distributed.DistributedSampler
结合多进程实现,第二种方式效率更高,参考,可是实现起来稍难, 第二种方式同时支持多节点分布式实现。方案二的效率要比方案一高,即便是在单运算节点上,参考pytorch doc:node
In the single-machine synchronous case, torch.distributed or the torch.nn.parallel.DistributedDataParallel() wrapper may still have advantages over other approaches to data parallelism, including torch.nn.DataParallel():python
本篇文章将详细介绍这两种方式的实现,只限于单机上实现,分布式较为复杂,下一篇文章再介绍。
参考:git
nn.DataParallel
wrap.model = nn.DataParallel(model)
os.environ["CUDA_VISIBLE_DEVICES"]="0"
指定当前程序可使用GPU设备号,若是不指定将会使用设备上全部的GPU设备。os.environ["CUDA_VISIBLE_DEVICES"]="0,1,2" #使用3个GPU
训练过程与使用单GPU一致,使用这种方法,pytorch会自动的将batch数据拆分为N份(N是用os.environ
指定的GPU数量),分别forward,backward,而后自动整合每一个GPU上的梯度,在一块GPU上update参数,最后将参数广播给其余GPU,完成一次迭代。github
代码:
shell
import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader import os # dataset class RandomDataset(Dataset): def __init__(self, size, length): self.len = length self.data = torch.randn(length, size) def __getitem__(self, index): return self.data[index] def __len__(self): return self.len # model define class Model(nn.Module): # Our model def __init__(self, input_size, output_size): super(Model, self).__init__() self.fc = nn.Linear(input_size, output_size) def forward(self, input): output = self.fc(input) print("\tIn Model: input size", input.size(), "output size", output.size()) return output if __name__=="__main__": # Parameters input_size = 5 output_size = 2 batch_size = 30 data_size = 100 dataset = RandomDataset(input_size, data_size) # dataloader define rand_loader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True) # model init model = Model(input_size, output_size) # cuda devices os.environ["CUDA_VISIBLE_DEVICES"]="0,1" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if torch.cuda.device_count() > 1: print("Let's use", torch.cuda.device_count(), "GPUs!") # dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs model = nn.DataParallel(model) model.to(device) for data in rand_loader: input = data.to(device) output = model(input) # loss # backward #update time.sleep(1)#模拟一个比较长的batch时间 print("Outside: input size", input.size(), "output_size", output.size()) torch.save(model.module.state_dict(), "model.pth")
In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2]) Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([5, 5]) output size torch.Size([5, 2]) In Model: input size torch.Size([5, 5]) output size torch.Size([5, 2]) Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])
方案二是用多进程来实现的,其实分布式就是多进程的意思,分布在多个机器上的进程,利用网络通讯协调彼此。关于分布式的处理下一篇文章再详细介绍。这里主要介绍单机上方案二与方案一的不一样。首先每一个进程都有独立的训练过程,一次迭代后share梯度,整合梯度,独立更新参数。迭代过程当中不会进行参数的传递(初始化时会同步全部进程上的参数)。其次进程之间的通讯采用了NCCL,固然NCCL已是pytorch内部支持了,因此通常状况下不用理这个。分布式的细节参考下一篇文章,这里只给出最简单的实现。网络
torch.utils.data.distributed.DistributedSampler
. 具体使用参见测试部分的代码。torch.nn.parallel.DistributedDataParallel
. 具体使用参见测试部分的代码。代码与方案一相似,须要初始化进程组,表示本程序是分布式训练的。多进程的建立经过指定python -m torch.distributed.launch --nproc_per_node=2 --nnodes=1
来实现的,nnodes为1,由于这里咱们是一个计算节点,nproc_per_node=2
表示须要建立两个进程来训练,而后每一个进程都得到分配给它rank号,rank惟一标识一个进程,rank 0为master,其余是slave。固然通常是须要两个GPU的,测试程序中是根据rank来指定进程使用GPU,即rank 0使用GPU0,rank 1进程使用GPU1。须要根据数据集建立一个分布式的sampler,初始化dataloader的时候要指定这个sampler,模型分布式封装详见代码。
代码:
多线程
import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader import os import torch.distributed as dist import torch.utils.data.distributed import sys import time # dataset class RandomDataset(Dataset): def __init__(self, size, length): self.len = length self.data = torch.randn(length, size) def __getitem__(self, index): return self.data[index] def __len__(self): return self.len # model define class Model(nn.Module): # Our model def __init__(self, input_size, output_size): super(Model, self).__init__() self.fc = nn.Linear(input_size, output_size) def forward(self, input): output = self.fc(input) # print("\tIn Model: input size", input.size(), # "output size", output.size()) return output if __name__=="__main__": # Parameters input_size = 5 output_size = 2 batch_size = 30 data_size = 100 # check the nccl backend if not dist.is_nccl_available(): print("Error: nccl backend not available.") sys.exit(1) # init group dist.init_process_group(backend="nccl", init_method="env://") # get the process rank and the world size rank = dist.get_rank() world_size = dist.get_world_size() # prepare the dataset dataset = RandomDataset(input_size, data_size) train_sampler = torch.utils.data.distributed.DistributedSampler(dataset) rand_loader = DataLoader(dataset, batch_size=batch_size//world_size, shuffle=(train_sampler is None), sampler=train_sampler) # dataloader define # rand_loader = DataLoader(dataset=dataset, # batch_size=batch_size, shuffle=True) # model init model = Model(input_size, output_size) # cuda devices # os.environ["CUDA_VISIBLE_DEVICES"]="0" # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # if torch.cuda.device_count() > 1: # print("Let's use", torch.cuda.device_count(), "GPUs!") # # dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs # model = nn.DataParallel(model) # model.to(device) # distribute model define device = torch.device('cuda', rank) model = model.to(device) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank], output_device=rank) print("From rank %d: start training, time:%s"%(rank, time.strftime("%Y-%m-%d %H:%M:%S"))) for data in rand_loader: input = data.to(device) output = model(input) # loss # backward #update time.sleep(1)#模拟一个比较长的batch时间 print("From rank %d: Outside: input size %s, output size %s"%(rank, str(input.size()), str(output.size())),flush=True) torch.save(model.module.state_dict(), "model_%d.pth"%rank) print("From rank %d: end training, time: %s"%(rank, time.strftime("%Y-%m-%d %H:%M:%S")))
python -m torch.distributed.launch --nproc_per_node=2 --nnodes=1 simple_test.py
From rank 0: start training, time:2019-09-26 13:20:13 From rank 1: start training, time:2019-09-26 13:20:13 From rank 0: Outside: input size torch.Size([15, 5]), output size torch.Size([15, 2]) From rank 1: Outside: input size torch.Size([15, 5]), output size torch.Size([15, 2]) From rank 0: Outside: input size torch.Size([15, 5]), output size torch.Size([15, 2]) From rank 1: Outside: input size torch.Size([15, 5]), output size torch.Size([15, 2]) From rank 1: Outside: input size torch.Size([15, 5]), output size torch.Size([15, 2])From rank 0: Outside: input size torch.Size([15, 5]), output size torch.Size([15, 2]) From rank 0: Outside: input size torch.Size([5, 5]), output size torch.Size([5, 2]) From rank 0: end training, time: 2019-09-26 13:20:17 From rank 1: Outside: input size torch.Size([5, 5]), output size torch.Size([5, 2]) From rank 1: end training, time: 2019-09-26 13:20:17 ***************************************** Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. *****************************************
我直接将测试果贴上来,能够看出有点乱,是因为多进程并行致使的问题,仔细看能够看出有两个进程并行训练,每一个进程处理半个batch数据。最后的OMP_NUM_THREADS 信息是pytorch lanch的时候打印的,翻译过来就是我没有指定OMP多线程的数目,它为了防止系统过负荷,因此贴心的帮我设置为了1,原码参考.app
模型的保存与加载,与单GPU的方式有所不一样。这里统统将参数以cpu的方式save进存储, 由于若是是保存的GPU上参数,pth文件中会记录参数属于的GPU号,则加载时会加载到相应的GPU上,这样就会致使若是你GPU数目不够时会在加载模型时报错,像下面这样:dom
RuntimeError: Attempting to deserialize object on CUDA device 1 but torch.cuda.device_count() is 1. Please use torch.load with map_location to map your storages to an existing device.
模型保存都是一致的,不过期刻记住方案二中你有多个进程在同时跑,因此会保存多个模型到存储上,若是使用共享存储就要注意文件名的问题,固然通常只在rank0进程上保存参数便可,由于全部进程的模型参数是同步的。
torch.save(model.module.cpu().state_dict(), "model.pth")
模型的加载:
param=torch.load("model.pth")
好了今天就写到这儿,很久没有这么认真的写篇博客了。固然仍是有一些地方不够完善,好比关于模型参数同步的检验。若是你有什么问题,或者以为哪里有不对的地方请在评论区给出,蟹蟹 ^=^。