group
进程组。默认状况只有一个组,一个 job 为一个组,也为一个 worldnode
world size
全局进程个数python
rank
表示进程序号,用于进程间的通信。rank=0 的主机为 master 节点dom
local rank
进程内 GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。 rank=3, local_rank=0 表示第 3 个进程内的第 1 块 GPU。分布式
import argparse parser = argparse.ArgumentParser(description='PyTorch distributed training') parser.add_argument("--local_rank", type=int, default=0) parser.add_argument("--dist", type=bool, default=True) parser.add_argument("--gpu_ids", type=list, default=[0,1,2,3]) args = parser.parse_args()
import os import torch import torch.distributed as dist import torch.multiprocessing as mp def init_dist(backend="nccl", **kwargs): """ initialization for distributed training""" if ( mp.get_start_method(allow_none=True) != "spawn" ): # Return the name of start method used for starting processes mp.set_start_method("spawn", force=True) #'spawn' is the default on Windows rank = int(os.environ['RANK']) # system env process ranks num_gpus = torch.cuda.device_count() # Returns the number of GPUs available torch.cuda.set_device(rank % num_gpus) dist.init_process_group( backend=backend, **kwargs ) # Initializes the default distributed process group if args.dist: init_dist() world_size = ( torch.distributed.get_world_size() ) # Returns the number of processes in the current process group rank = torch.distributed.get_rank() # Returns the rank of current process group else: rank = -1 torch.backends.cudnn.benchmark = True
if rank <= 0: logger.info('Something need to log')
import math from torch.utils.data import DataLoader dataset_ratio = 200 if train: train_set = define_Dataset(train_dataset) train_size = int(math.ceil(len(train_set) / batch_size)) total_epochs = int(math.ceil(total_iters / train_size)) if args.dist: world_size = torch.distributed.get_world_size() assert batch_size % world_size == 0 batch_size = batch_size // world_size train_sampler = DistIterSampler( train_set, world_size, rank, dataset_ratio ) total_epochs = int(math.ceil(total_iters / (train_size * dataset_ratio))) train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=False, num_workers=num_workers, drop_last=True, pin_memory=True, sampler=train_sampler) else: train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True, pin_memory=True) else: test_set = define_Dataset(test_dataset) test_loader = DataLoader(test_set, batch_size=1, shuffle=False, num_workers=1, drop_last=False, pin_memory=True)
须要注意的是
♠ world_size 能够理解为 GPU 的数量,须要保证 batch_size 能整除 world_size 即把本来一个 batch 分给几个 GPU
♣ 使用分布式训练时 DataLoader 中 shuffle 须要为 False
♥ 测试时是使用单 GPU 的
♦ 分布式须要指定 sampler测试
DistIterSampler 的代码以下:ui
""" Modified from torch.utils.data.distributed.DistributedSampler Support enlarging the dataset for *iter-oriented* training, for saving time when restart the dataloader after each epoch """ import math import torch import torch.distributed as dist from torch.utils.data.sampler import Sampler class DistIterSampler(Sampler): """Sampler that restricts data loading to a subset of the dataset. It is especially useful in conjunction with :class:`torch.nn.parallel.DistributedDataParallel`. In such case, each process can pass a DistributedSampler instance as a DataLoader sampler, and load a subset of the original dataset that is exclusive to it. .. note:: Dataset is assumed to be of constant size. Arguments: dataset: Dataset used for sampling. num_replicas (optional): Number of processes participating in distributed training. rank (optional): Rank of the current process within num_replicas. """ def __init__(self, dataset, num_replicas=None, rank=None, ratio=100): if num_replicas is None: if not dist.is_available(): raise RuntimeError("Requires distributed package to be available") num_replicas = dist.get_world_size() if rank is None: if not dist.is_available(): raise RuntimeError("Requires distributed package to be available") rank = dist.get_rank() self.dataset = dataset self.num_replicas = num_replicas self.rank = rank self.epoch = 0 self.num_samples = int(math.ceil(len(self.dataset) * ratio / self.num_replicas)) self.total_size = self.num_samples * self.num_replicas def __iter__(self): # deterministically shuffle based on epoch g = torch.Generator() g.manual_seed(self.epoch) indices = torch.randperm( self.total_size, generator=g ).tolist() # Returns a random permutation of integers from 0 to n - 1 dsize = len(self.dataset) indices = [v % dsize for v in indices] # subsample indices = indices[self.rank : self.total_size : self.num_replicas] assert len(indices) == self.num_samples return iter(indices) def __len__(self): return self.num_samples def set_epoch(self, epoch): self.epoch = epoch
from torch.nn.parallel import DataParallel, DistributedDataParallel device = torch.device('cuda' if opt['gpu_ids'] is not None else 'cpu') net = define_net().to(self.device) if args.dist: rank = torch.distributed.get_rank() net = DistributedDataParallel(self.netG, device_ids=[torch.cuda.current_device()]) else: rank = -1 # non dist training net = DataParallel(self.netG) input = input.to(f'cuda:{net.device_ids[0]}') if isinstance(network, nn.DataParallel) or isinstance(network, DistributedDataParallel): network = network.module state_dict = network.state_dict() for key, param in state_dict.items(): state_dict[key] = param.cpu() torch.save(state_dict, save_path)
所以须要在模型的定义、加载、保存以及输入指定 GPU 须要修改。spa
CUDA_VISIBLE_DEVICES=0,1,2,3 python3 -m torch.distributed.launch --nproc_per_node=4 --master_port=3210 train.py
参数说明
♠ CUDA_VISIBLE_DEVICES 指定 GPU 的编号
♣ nproc_per_node 参数指定为当前主机建立的进程数。通常设定为当前主机的 GPU 数量
♥ master_port 分别指定 master 节点的 ip:port.net
其他就是哪错调哪了。rest