但在使用这些共享API的时候,咱们要注意下面几点:
1)在UNIX平台上,当某个进程终结以后。该进程需要被其父进程调用wait,不然进程成为僵尸进程(Zombie)。python
因此。有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来讲,由于仅仅有一个进程。因此不存在此必要性。
2)multiprocessing提供了threading包中没有的IPC(比方Pipe和Queue),效率上更高。shell
应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占领的不是用户进程的资源)。
3)多进程应该避免共享资源。数组
在多线程中,咱们可以比較easy地共享资源,比方使用全局变量或者传递參数。
在多进程状况下。由于每个进程有本身独立的内存空间。以上方法并不合适。此时咱们可以经过共享内存和Manager的方法来共享资源。
但这样作提升了程序的复杂度,并因为同步的需要而减小了程序的效率。Process.PID中保存有PID,假设进程尚未start()。则PID为None。服务器
咱们可以从如下的程序中看到Thread对象和Process对象在使用上的类似性与结果上的不一样。各个线程和进程都作一件事:打印PID。
但问题是,所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一块儿,没法阅读。
使用Lock同步,在一个任务输出完毕以后,再赞成还有一个任务输出,可以避免多个任务同一时候向终端输出。网络
# Similarity and difference of multi thread vs. multi process import os import threading import multiprocessing # worker function def worker(sign, lock): lock.acquire() print(sign, os.getpid()) lock.release() # Main print('Main:',os.getpid()) # Multi-thread record = [] lock = threading.Lock() for i in range(5): thread = threading.Thread(target=worker,args=('thread',lock)) thread.start() record.append(thread) for thread in record: thread.join() # Multi-process record = [] lock = multiprocessing.Lock() for i in range(5): process = multiprocessing.Process(target=worker,args=('process',lock)) process.start() record.append(process) for process in record: process.join()
咱们经过mutiprocessing.Pipe(duplex=False)建立单向管道 (默以为双向)。
一个进程从PIPE一端输入对象,而后被PIPE还有一端的进程接收,单向管道仅仅赞成管道一端的进程输入,而双向管道则赞成从两端输入。多线程
如下的程序展现了Pipe的使用:
app
# Multiprocessing with Pipe import multiprocessing as mul def proc1(pipe): pipe.send('hello') print('proc1 rec:',pipe.recv()) def proc2(pipe): print('proc2 rec:',pipe.recv()) pipe.send('hello, too') # Build a pipe pipe = mul.Pipe() # Pass an end of the pipe to process 1 p1 = mul.Process(target=proc1, args=(pipe[0],)) # Pass the other end of the pipe to process 2 p2 = mul.Process(target=proc2, args=(pipe[1],)) p1.start() p2.start() p1.join() p2.join()
import os import multiprocessing import time #================== # input worker def inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.time()) queue.put(info) # output worker def outputQ(queue,lock): info = queue.get() lock.acquire() print (str(os.getpid()) + '(get):' + info) lock.release() #=================== # Main record1 = [] # store input processes record2 = [] # store output processes lock = multiprocessing.Lock() # To prevent messy print queue = multiprocessing.Queue(3) # input processes for i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) # output processes for i in range(10): process = multiprocessing.Process(target=outputQ,args=(queue,lock)) process.start() record2.append(process) for p in record1: p.join() queue.close() # No more object will come, close the queue for p in record2: p.join()
一个进程池中可以容纳多个待命的士兵。async
比方如下的程序:
函数
import multiprocessing as mul def f(x): return x**2 pool = mul.Pool(5) rel = pool.map(f,[1,2,3,4,5,6,7,8,9,10]) print(rel)
1)apply_async(func,args)从进程池中取出一个进程运行func。args为func的參数。
它将返回一个AsyncResult的对象。你可以对该对象调用get()方法以得到结果。ui
2)close()进程池再也不建立新的进程
3)join()wait进程池中的全部进程。
必须对Pool先调用close()方法才干join。
import multiprocessing def f(n, a): n.value = 3.14 a[0] = 5 num = multiprocessing.Value('d', 0.0) arr = multiprocessing.Array('i', range(10)) p = multiprocessing.Process(target=f, args=(num, arr)) p.start() p.join() print num.value print arr[:]
咱们用一个进程做为server,创建Manager来真正存放资源。其余的进程可以经过參数传递或者依据地址来訪问Manager,创建链接后。操做server上的资源。
在防火墙赞成的状况下,咱们全然可以将Manager运用于多计算机。从而模仿了一个真实的网络情境。
如下的样例中,咱们对Manager的使用相似于shared memory。但可以共享更丰富的对象类型。
import multiprocessing def f(x, arr, l): x.value = 3.14 arr[0] = 5 l.append('Hello') server = multiprocessing.Manager() x = server.Value('d', 0.0) arr = server.Array('i', range(10)) l = server.list() proc = multiprocessing.Process(target=f, args=(x, arr, l)) proc.start() proc.join() print(x.value) print(arr) print(l)
后者自己已经实现了进程共享)等。 这样Manager就赞成咱们共享不少其它样的对象。