Python标准库10 多进程初步 (multiprocessing包)

做者:Vamei 出处:http://www.cnblogs.com/vamei 欢迎转载,也请保留这段声明。谢谢!html

 

咱们已经见过了使用subprocess包来建立子进程,但这个包有两个很大的局限性:1) 咱们老是让subprocess运行外部的程序,而不是运行一个Python脚本内部编写的函数。2) 进程间只经过管道进行文本交流。以上限制了咱们将subprocess包应用到更普遍的多进程任务。(这样的比较实际是不公平的,由于subprocessing自己就是设计成为一个shell,而不是一个多进程管理包)python

 

threading和multiprocessing

(请尽可能先阅读Python多线程与同步)shell

multiprocessing包是Python中的多进程管理包。与threading.Thread相似,它能够利用multiprocessing.Process对象来建立一个进程。该进程能够运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象能够像多线程那样,经过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。因此,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。多线程

但在使用这些共享API的时候,咱们要注意如下几点:app

  • 在UNIX平台上,当某个进程终结以后,该进程须要被其父进程调用wait,不然进程成为僵尸进程(Zombie)。因此,有必要对每一个Process对象调用join()方法 (实际上等同于wait)。对于多线程来讲,因为只有一个进程,因此不存在此必要性。函数

  • multiprocessing提供了threading包中没有的IPC(好比Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (由于它们占据的不是用户进程的资源)。ui

  • 多进程应该避免共享资源。在多线程中,咱们能够比较容易地共享资源,好比使用全局变量或者传递参数。在多进程状况下,因为每一个进程有本身独立的内存空间,以上方法并不合适。此时咱们能够经过共享内存Manager的方法来共享资源。但这样作提升了程序的复杂度,并由于同步的须要而下降了程序的效率。
    spa

Process.PID中保存有PID,若是进程尚未start(),则PID为None。线程

 

咱们能够从下面的程序中看到Thread对象和Process对象在使用上的类似性与结果上的不一样。各个线程和进程都作一件事:打印PID。但问题是,全部的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一块儿,没法阅读。使用Lock同步,在一个任务输出完成以后,再容许另外一个任务输出,能够避免多个任务同时向终端输出。设计

# Similarity and difference of multi thread vs. multi process
# Written by Vamei

import os
import threading
import multiprocessing

# worker function
def worker(sign, lock):
    lock.acquire()
    print(sign, os.getpid())
    lock.release()

# Main
print('Main:',os.getpid())

# Multi-thread
record = []
lock  = threading.Lock()
for i in range(5):
    thread = threading.Thread(target=worker,args=('thread',lock))
    thread.start()
    record.append(thread)

for thread in record:
    thread.join()

# Multi-process
record = []
lock = multiprocessing.Lock()
for i in range(5):
    process = multiprocessing.Process(target=worker,args=('process',lock))
    process.start()
    record.append(process)

for process in record:
    process.join()

全部Thread的PID都与主程序相同,而每一个Process都有一个不一样的PID。

(练习: 使用mutiprocessing包将Python多线程与同步中的多线程程序更改成多进程程序)

 

Pipe和Queue

正如咱们在Linux多线程中介绍的管道PIPE和消息队列message queue,multiprocessing包中有PipeQueue类来分别支持这两种IPC机制。Pipe和Queue能够用来传送常见的对象。

 

1) Pipe能够是单向(half-duplex),也能够是双向(duplex)。咱们经过mutiprocessing.Pipe(duplex=False)建立单向管道 (默认为双向)。一个进程从PIPE一端输入对象,而后被PIPE另外一端的进程接收,单向管道只容许管道一端的进程输入,而双向管道则容许从两端输入。

下面的程序展现了Pipe的使用:

# Multiprocessing with Pipe
# Written by Vamei

import multiprocessing as mul

def proc1(pipe):
    pipe.send('hello')
    print('proc1 rec:',pipe.recv())

def proc2(pipe):
    print('proc2 rec:',pipe.recv())
    pipe.send('hello, too')

# Build a pipe
pipe = mul.Pipe()

# Pass an end of the pipe to process 1
p1   = mul.Process(target=proc1, args=(pipe[0],))
# Pass the other end of the pipe to process 2
p2   = mul.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()

这里的Pipe是双向的。

Pipe对象创建的时候,返回一个含有两个元素的表,每一个元素表明Pipe的一端(Connection对象)。咱们对Pipe的某一端调用send()方法来传送对象,在另外一端使用recv()来接收。

 

2) Queue与Pipe相相似,都是先进先出的结构。但Queue容许多个进程放入,多个进程从队列取出对象。Queue使用mutiprocessing.Queue(maxsize)建立,maxsize表示队列中能够存放对象的最大数量。

下面的程序展现了Queue的使用:

# Written by Vamei
import os
import multiprocessing
import time
#==================
# input worker
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.time())
    queue.put(info)

# output worker
def outputQ(queue,lock):
    info = queue.get()
    lock.acquire()
    print (str(os.getpid()) + '(get):' + info)
    lock.release()
#===================
# Main
record1 = []   # store input processes
record2 = []   # store output processes
lock  = multiprocessing.Lock()    # To prevent messy print
queue = multiprocessing.Queue(3)

# input processes
for i in range(10):
    process = multiprocessing.Process(target=inputQ,args=(queue,))
    process.start()
    record1.append(process)

# output processes
for i in range(10):
    process = multiprocessing.Process(target=outputQ,args=(queue,lock))
    process.start()
    record2.append(process)

for p in record1:
    p.join()

queue.close()  # No more object will come, close the queue

for p in record2:
    p.join()

一些进程使用put()在Queue中放入字符串,这个字符串中包含PID和时间。另外一些进程从Queue中取出,并打印本身的PID以及get()的字符串。

 

总结

Process, Lock, Event, Semaphore, Condition

Pipe, Queue

相关文章
相关标签/搜索