Python: 多进程的分布式进程multiprocessing.managers

multiprocessing.managers

 

在Thread和Process中,应当优选Process,由于Process更稳定,并且,Process能够分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。html

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程能够做为调度者,将任务分布到其余多个进程中,依靠网络通讯。因为managers模块封装很好,没必要了解网络通讯的细节,就能够很容易地编写分布式多进程程序。python

 

 

Server process

Manager()返回一个manager对象。它控制一个服务器进程,这个进程会管理Python对象并容许其余进程经过代理的方式来操做这些对象。git

manager对象支持多种类型。例子见下:github

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = "1"
    d["2"] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()       #产生一个代理对象d
        l = manager.list(range(10))

        p = Process(target=f, args=(d,l))
        p.start()
        p.join()

        print(d)
        print(l)

解释:bootstrap

with语句:见这篇文章

with 语句是从 Python 2.5 开始引入的一种与异常处理相关的功能(2.5 版本中要经过 from __future__ import with_statement 导入后才可使用),从 2.6 版本开始缺省可用(参考 What's new in Python 2.6? 中 with 语句相关部分介绍)。windows

with 语句适用于对资源进行访问的场合,确保无论使用过程当中是否发生异常都会执行必要的“清理”操做,释放资源,好比文件使用后自动关闭、线程中锁的自动获取acquire和release等。服务器

⚠️,with语句的实现相似try..finally。网络

 

代理对象:

  • 指向其余共享对象的对象。
  • 共享对象也能够说是代理 指涉 的对象。
  • 多个代理对象可能指向同一个指涉对象。

代理对象代理了指涉对象的一系列方法调用(虽然并非指涉对象的每一个方法都有必要被代理)。经过这种方式,代理的使用方法能够和它的指涉对象同样:app

>>> from multiprocessing import Manager >>> manager = Manager() >>> l = manager.list([i*i for i in range(10)]) >>> print(l) [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> print(repr(l)) <ListProxy object, typeid 'list' at 0x...> >>> l[4] 16 >>> l[2:5] [4, 9, 16]

上面使用了list(), dict()方法dom

 

管理器的特色:

服务器进程管理器比使用共享内存对象更灵活,它们支持二进制对象类型。

同时,一个单独的manager能够被网络上的不一样计算机的进程共享。

缺点是比使用shared memory慢。

 

使用manager对象能够建立一个共享queue。具体见下一章节:


 

 

Managers

 

Managers提供了建立一种数据的方法,这个数据能够被不一样的进程共享。这种共享也包括经过网络在不一样计算机的进程上共享。

 

multiprocessing.Manager()

返回一个已启动的SyncManager对象(BaseManager的子类的实例对象),用于在进程之间分享数据。

SyncManager对象(点击查看方法)对应一个已经启动的子进程,它拥有一系列方法,能够为大部分经常使用数据类型建立并返回 代理对象 代理,用于进程间同步。甚至包括共享列表和字典。(👆的代码例子)

 

当管理器被垃圾回收或者父进程退出时,管理器进程会当即退出。

 

class multiprocessing.managers.BaseManager([address[, authkey]])

建立一个BaseManager对象。建立后,须要调用start()或get_server().server_forever()确保对象对于的管理器进程已经启动。

  • address参数,管理器服务进程监听的地址。若是值是None,则任意主机的请求都能创建链接。
  • authkey参数,byte类的字符串。认真标识(验证码)

 start(), 为管理器开启子进程。

 get_server(),返回一个Server对象。

 connect(), 链接本地管理器对象到一个远程管理器进程

 shutdown() 中止管理器的进程。配合start()。

 register(typid, callable)⚠️最重要的类方法,凡是注册到管理器的类型/对象,就能够被网络上的不一样进程共享了。

  


 

例子

下面是一个简单的Master/Worker模型,实现一个简单的分布计算。若是要启动多个worker,就能够把任务分配到多台机器上了,

好比把计算n*n的代码替换成发送邮件,就实现了邮件队列的异步发送。

 

经过manager模块的支持,多进程分布到多台机器上。一个服务进程能够做为调度者,将任务分布到其余多个进程中。

 

⚠️

注意Queue的做用是用来传递任务和接收结果,每一个任务的描述数据量要尽可能小。

好比发送一个处理日志文件的任务,就不要发送几百兆的日志文件自己,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

 

案例代码 参考了https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600#0,但这个代码不适合python3.8版本的了。

会报告2个错误。

第一个❌

#_pickle.PicklingError: Can't pickle <function <lambda> at 0x107ef8670>: attribute lookup <lambda> on __main__ failed

网上查了一下,https://github.com/scikit-learn/scikit-learn/issues/9467这篇文章指出pickle模块不指出lambda函数。

看文档,https://docs.python.org/zh-cn/3/library/pickle.html ,被封存对象不能是lambda函数返回的对象。只能是def定义返回的对象。

第二个❌

RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

看文档https://docs.python.org/zh-cn/3/library/multiprocessing.html, 3.8版本增长了freeze_support()函数。主要是为了支持windows可执行文件。毕竟multiprocessing可用于分布式进程。

因此必须引入freeze_support:

看代码:

服务器上的代码:

import random, time, queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support

# 创建2个队列,一个发送,一个接收
task_queue = queue.Queue()
result_queue = queue.Queue()

def get_task():
    return task_queue

def get_result():
    return result_queue

class QueueManager(BaseManager): pass
# 服务器的管理器上注册2个共享队列
QueueManager.register('get_task', callable=get_task)
QueueManager.register('get_result', callable=get_result)
# 设置端口,地址默认为空。验证码authkey须要设定。
manager = QueueManager(address=('', 5000), authkey=b'abc')

def manager_run():
    manager.start()
    # 经过管理器访问共享队列。
    task = manager.get_task()
    result = manager.get_result()

    #对队列进行操做, 往task队列放进任务。
    for value in range(10):
        n = random.randint(0,100)
        print('Put task %d' % n)
        task.put(n)
    # 从result队列取出结果
    print('Try get result...')
    try:
        for value in range(10):
            r = result.get(timeout=10)
            print('Result: %s' % r)
    except queue.Empty:
        print('result is empty')
    # 关闭管理器。
    manager.shutdown()
    print('master exit.')

if __name__ == '__main__':
    freeze_support()
    manager_run()

 

另外一台机器(或本机启动也能够):

import time, sys, queue
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager): pass

# 从网络上的服务器上获取Queue,因此注册时只提供服务器上管理器注册的队列的名字:
QueueManager.register('get_task')
QueueManager.register('get_result')

server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# b'abc'至关于'abc'.encode('ascii'),类型是bytes
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 链接服务器
m.connect()
# 得到服务器上的队列对象
task = m.get_task()
result = m.get_result()

for value in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n , n, n*n)
        time.sleep(1)
        result.put(r)
    except queue.Empty:
        print('task queue is empty')

print('worker exit.')
相关文章
相关标签/搜索