Python学习笔记18:标准库之多进程(multiprocessing包)

咱们可以使用subprocess包来建立子进程。但这个包有两个很是大的局限性:
1) 咱们老是让subprocess执行外部的程序,而不是执行一个Python脚本内部编写的函数。
2) 进程间仅仅经过管道进行文本交流。
以上限制了咱们将subprocess包应用到更普遍的多进程任务。
这种比較实际是不公平的,因为subprocessing自己就是设计成为一个shell,而不是一个多进程管理包。



一 threading和multiprocessing

multiprocessing包是Python中的多进程管理包。
与threading.Thread相似。它可以利用multiprocessing.Process对象来建立一个进程。
该进程可以执行在Python程序内部编写的函数。
该Process对象与Thread对象的使用方法一样,也有start(), run(), join()的方法。
此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,经过參数传递给各个进程),用以同步进程。其使用方法与threading包中的同名类一致。
因此。multiprocessing的很是大一部份与threading使用同一套API,仅仅只是换到了多进程的情境。




但在使用这些共享API的时候,咱们要注意下面几点:
1)在UNIX平台上,当某个进程终结以后。该进程需要被其父进程调用wait,不然进程成为僵尸进程(Zombie)。python


因此。有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来讲,由于仅仅有一个进程。因此不存在此必要性。
2)multiprocessing提供了threading包中没有的IPC(比方Pipe和Queue),效率上更高。shell


应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占领的不是用户进程的资源)。
3)多进程应该避免共享资源。数组

在多线程中,咱们可以比較easy地共享资源,比方使用全局变量或者传递參数。
在多进程状况下。由于每个进程有本身独立的内存空间。以上方法并不合适。此时咱们可以经过共享内存和Manager的方法来共享资源。
但这样作提升了程序的复杂度,并因为同步的需要而减小了程序的效率。Process.PID中保存有PID,假设进程尚未start()。则PID为None。服务器




咱们可以从如下的程序中看到Thread对象和Process对象在使用上的类似性与结果上的不一样。各个线程和进程都作一件事:打印PID。
但问题是,所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一块儿,没法阅读。
使用Lock同步,在一个任务输出完毕以后,再赞成还有一个任务输出,可以避免多个任务同一时候向终端输出。网络




# Similarity and difference of multi thread vs. multi process
 
import os
import threading
import multiprocessing
 
# worker function
def worker(sign, lock):
    lock.acquire()
    print(sign, os.getpid())
    lock.release()
 
# Main
print('Main:',os.getpid())
 
# Multi-thread
record = []
lock  = threading.Lock()
for i in range(5):
    thread = threading.Thread(target=worker,args=('thread',lock))
    thread.start()
    record.append(thread)
 
for thread in record:
    thread.join()
 
# Multi-process
record = []
lock = multiprocessing.Lock()
for i in range(5):
    process = multiprocessing.Process(target=worker,args=('process',lock))
    process.start()
    record.append(process)
 
for process in record:
    process.join()

所有Thread的PID都与主程序一样,而每个Process都有一个不一样的PID。

二 Pipe和Queue

管道PIPE和消息队列message queue,multiprocessing包中有Pipe类和Queue类来分别支持这两种IPC机制。Pipe和Queue可以用来传送常见的对象。


1) Pipe可以是单向(half-duplex)。也可以是双向(duplex)。


咱们经过mutiprocessing.Pipe(duplex=False)建立单向管道 (默以为双向)。
一个进程从PIPE一端输入对象,而后被PIPE还有一端的进程接收,单向管道仅仅赞成管道一端的进程输入,而双向管道则赞成从两端输入。多线程


如下的程序展现了Pipe的使用:
app

# Multiprocessing with Pipe
 
import multiprocessing as mul
 
def proc1(pipe):
    pipe.send('hello')
    print('proc1 rec:',pipe.recv())
 
def proc2(pipe):
    print('proc2 rec:',pipe.recv())
    pipe.send('hello, too')
 
# Build a pipe
pipe = mul.Pipe()
 
# Pass an end of the pipe to process 1
p1   = mul.Process(target=proc1, args=(pipe[0],))


# Pass the other end of the pipe to process 2
p2   = mul.Process(target=proc2, args=(pipe[1],))


p1.start()
p2.start()
p1.join()
p2.join()

这里的Pipe是双向的。
Pipe对象创建的时候,返回一个含有两个元素的表。每个元素表明Pipe的一端(Connection对象)。
咱们对Pipe的某一端调用send()方法来传送对象,在还有一端使用recv()来接收。


2) Queue与Pipe相相似,都是先进先出的结构。但Queue赞成多个进程放入,多个进程从队列取出对象。
Queue使用mutiprocessing.Queue(maxsize)建立。maxsize表示队列中可以存放对象的最大数量。
如下的程序展现了Queue的使用:
import os
import multiprocessing
import time
#==================
# input worker
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.time())
    queue.put(info)
 
# output worker
def outputQ(queue,lock):
    info = queue.get()
    lock.acquire()
    print (str(os.getpid()) + '(get):' + info)
    lock.release()
#===================
# Main
record1 = []   # store input processes
record2 = []   # store output processes
lock  = multiprocessing.Lock()    # To prevent messy print
queue = multiprocessing.Queue(3)
 
# input processes
for i in range(10):
    process = multiprocessing.Process(target=inputQ,args=(queue,))
    process.start()
    record1.append(process)
 
# output processes
for i in range(10):
    process = multiprocessing.Process(target=outputQ,args=(queue,lock))
    process.start()
    record2.append(process)
 
for p in record1:
    p.join()
 
queue.close()  # No more object will come, close the queue
 
for p in record2:
    p.join()

一些进程使用put()在Queue中放入字符串,这个字符串中包括PID和时间。
还有一些进程从Queue中取出,并打印本身的PID以及get()的字符串。


三 进程池

进程池 (Process Pool)可以建立多个进程。这些进程就像是随时待命的士兵,准备运行任务(程序)。

一个进程池中可以容纳多个待命的士兵。async


比方如下的程序:
函数

import multiprocessing as mul
 
def f(x):
    return x**2
 
pool = mul.Pool(5)
rel  = pool.map(f,[1,2,3,4,5,6,7,8,9,10])
print(rel)

咱们建立了一个允许5个进程的进程池 (Process Pool) 。Pool执行的每个进程都执行f()函数。
咱们利用map()方法,将f()函数做用到表的每个元素上。这与built-in的map()函数相似。仅仅是这里用5个进程并行处理。
假设进程执行结束后,还有需要处理的元素。那么的进程会被用于又一次执行f()函数。除了map()方法外。Pool还有如下的常常用法。


1)apply_async(func,args)从进程池中取出一个进程运行func。args为func的參数。
它将返回一个AsyncResult的对象。你可以对该对象调用get()方法以得到结果。ui


2)close()进程池再也不建立新的进程
3)join()wait进程池中的全部进程。

必须对Pool先调用close()方法才干join。


四 共享资源

多进程共享资源一定会带来进程间相互竞争。而这样的竞争又会形成race condition,咱们的结果有可能被竞争的不肯定性所影响。
但假设需要,咱们依旧可以经过共享内存和Manager对象这么作。


1 共享内存

依据共享内存(shared memory)的原理,这里给出用Python实现的样例:
import multiprocessing
 
def f(n, a):
    n.value   = 3.14
    a[0]      = 5
 
num   = multiprocessing.Value('d', 0.0)
arr   = multiprocessing.Array('i', range(10))
 
p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()
 
print num.value
print arr[:]

这里咱们实际上仅仅有主进程和Process对象表明的进程。
咱们在主进程的内存空间中建立共享的内存,也就是Value和Array两个对象。对象Value被设置成为双精度数(d), 并初始化为0.0。
而Array则相似于C中的数组,有固定的类型(i, 也就是整数)。在Process进程中,咱们改动了Value和Array对象。
回到主程序。打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。


 

2 Manager

Manager对象相似于服务器与客户之间的通讯 (server-client),与咱们在Internet上的活动很是相似。


咱们用一个进程做为server,创建Manager来真正存放资源。其余的进程可以经过參数传递或者依据地址来訪问Manager,创建链接后。操做server上的资源。


在防火墙赞成的状况下,咱们全然可以将Manager运用于多计算机。从而模仿了一个真实的网络情境。
如下的样例中,咱们对Manager的使用相似于shared memory。但可以共享更丰富的对象类型。

import multiprocessing
 
def f(x, arr, l):
    x.value = 3.14
    arr[0] = 5
    l.append('Hello')
 
server = multiprocessing.Manager()
x    = server.Value('d', 0.0)
arr  = server.Array('i', range(10))
l    = server.list()
 
proc = multiprocessing.Process(target=f, args=(x, arr, l))
proc.start()
proc.join()
 
print(x.value)
print(arr)
print(l)

Manager利用list()方法提供了表的共享方式。
实际上你可以利用dict()来共享词典,Lock()来共享threading.Lock(注意,咱们共享的是threading.Lock。而不是进程的mutiprocessing.Lock。

后者自己已经实现了进程共享)等。 这样Manager就赞成咱们共享不少其它样的对象。

相关文章
相关标签/搜索