Pytorch多进程最佳实践

 预备知识

模型并行( model parallelism ):即把模型拆分放到不一样的设备进行训练,分布式系统中的不一样机器(GPU/CPU等)负责网络模型的不一样部分 —— 例如,神经网络模型的不一样网络层被分配到不一样的机器,或者同一层内部的不一样参数被分配到不一样机器,如AlexNet的训练。html

数据并行( data parallelism ):即把数据切分,输入到不一样的机器有同一个模型的多个副本,每一个机器分配到不一样的数据,而后将全部机器的计算结果按照某种方式合并。python

多进程最佳实践

torch.multiprocessing 是 Python 的 multiprocessing 多进程模块的替代品。它支持彻底相同的操做,但对其进行了扩展,以便全部经过多进程队列 multiprocessing.Queue 发送的张量都能将其数据移入共享内存,并且仅将其句柄发送到另外一个进程。git

注意:github

当张量 Tensor 被发送到另外一个进程时,张量的数据和梯度 torch.Tensor.grad 都将被共享。网络

这一特性容许实现各类训练方法,如 Hogwild,A3C 或任何其余须要异步操做的训练方法。app

1、CUDA 张量的共享

仅 Python 3 支持进程之间共享 CUDA 张量,咱们可使用 spawnforkserver 启动此类方法。 Python 2 中的 multiprocessing 多进程处理只能使用 fork 建立子进程,而且CUDA运行时不支持多进程处理。异步

警告:async

CUDA API 规定输出到其余进程的共享张量,只要它们被这些进程使用时,都将持续保持有效。您应该当心并确保您共享的 CUDA 张量不会超出它应该的做用范围(不会出现做用范围延伸的问题)。这对于共享模型的参数应该不是问题,但应该当心地传递其余类型的数据。请注意,此限制不适用于共享的 CPU 内存。分布式

也能够参阅: 使用 nn.DataParallel 替代多进程处理优化

2、最佳实践和技巧

一、避免和防止死锁

产生新进程时会出现不少错误,致使死锁最多见的缘由是后台线程。若是有任何持有锁或导入模块的线程,而且 fork 被调用,则子进程极可能处于崩溃状态,而且会以不一样方式死锁或失败。请注意,即便您没有这样作,Python 中内置的库也可能会,更没必要说 多进程处理 了。multiprocessing.Queue 多进程队列其实是一个很是复杂的类,它产生了多个用于序列化、发送和接收对象的线程,而且它们也可能致使上述问题。若是您发现本身处于这种状况,请尝试使用multiprocessing.queues.SimpleQueue ,它不使用任何其余额外的线程。

咱们正在尽量的为您提供便利,并确保这些死锁不会发生,但有些事情不受咱们控制。若是您有任何问题暂时没法应对,请尝试到论坛求助,咱们会查看是否能够解决问题。

二、重用经过队列发送的缓冲区

请记住,每次将张量放入多进程队列 multiprocessing.Queue 时,它必须被移动到共享内存中。若是它已经被共享,将会是一个空操做,不然会产生一个额外的内存拷贝,这会减慢整个过程。即便您有一组进程将数据发送到单个进程,也可让它将缓冲区发送回去,这几乎是不占资源的,而且能够在发送下一批时避免产生拷贝动做。

三、异步多进程训练(如: Hogwild)

使用多进程处理 torch.multiprocessing,能够异步地训练一个模型,参数既能够一直共享,也能够周期性同步。在第一种状况下,咱们建议发送整个模型对象,而在后者中,咱们建议只发送状态字典 state_dict()

咱们建议使用多进程处理队列 multiprocessing.Queue 在进程之间传递各类 PyTorch 对象。使用 fork 启动一个方法时,它也可能会继承共享内存中的张量和存储空间,但这种方式也很是容易出错,应谨慎使用,最好只能让高阶用户使用。而队列,尽管它们有时候不太优雅,却能在任何状况下正常工做。

警告:

你应该留意没有用 if __name__ =='__main__' 来保护的全局语句。若是使用了不一样于 fork 启动方法,它们将在全部子进程中执行。

四、Hogwild

具体的 Hogwild 实现能够在 示例库 中找到,但为了展现代码的总体结构,下面还有一个最简单的示例:

import torch.multiprocessing as mp
from model import MyModel

def train(model):
    # 构建 data_loader,优化器等
    for data, labels in data_loader:
        optimizer.zero_grad()
        loss_fn(model(data), labels).backward()
        optimizer.step()  # 更新共享的参数

if __name__ == '__main__':
    num_processes = 4
    model = MyModel()
    # 注意:这是 "fork" 方法工做所必需的
    model.share_memory()
    processes = []
    for rank in range(num_processes):
        p = mp.Process(target=train, args=(model,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

 Reference

https://ptorch.com/news/176.html

相关文章
相关标签/搜索