multiprocessing并不是是python的一个模块,而是python中多进程管理的一个包,在学习的时候能够与threading这个模块做类比,正如咱们在上一篇转载的文章中所提,python的多线程并不能作到真正的并行处理,只能完成相对的并发处理,那么咱们须要的就是python的多进程来完成并行处理,把全部的cpu资源都利用起来。multiprocessing的很大一部分与threading使用同一套API,只不过换到了多进程的环境。这里面要注意,对于多进程来讲,win32平台和unix平台差异很大,咱们最好在linux上完成实现。html
使用这些共享API时,咱们应该注意如下问题(目前这是我能想到的,之后遇到再扩充):python
一、对join的处理linux
根据Unix环境高级编程中对进程控制一章的描述,当某个进程fork一个子进程后,该进程必需要调用wait等待子进程结束发送的sigchld信号,对子进程进行资源回收等相关工做,不然,子进程会成为僵死进程,被init收养。因此,在multiprocessing.Process实例化一个对象以后,该对象有必要调用join方法,由于在join方法中完成了对底层wait的处理,源码以下:web
def join(self, timeout=None): ''' Wait until child process terminates ''' assert self._parent_pid == os.getpid(), 'can only join a child process' assert self._popen is not None, 'can only join a started process' res = self._popen.wait(timeout) if res is not None: _current_process._children.discard(self)
不过,调用该方法,要注意join的位置(threading模块有提到),是在每一个子进程中阻塞仍是在父进程中阻塞,若是在子进程中阻塞可能达不到并行处理的目的,因此要根据具体需求。而对于多线程来讲,因为只有一个进程,全部子线程共享同一片内存,因此不是必需要进行join调用。例子以下:shell
#!/usr/bin/env python __author__ = 'webber' import os,time import multiprocessing # worker function def worker(sign, lock): lock.acquire() print sign, 'pid:',os.getpid() lock.release() time.sleep(1) # Main print 'Main:',os.getpid() plist = [] lock = multiprocessing.Lock() for j in range(5): p = multiprocessing.Process(target=worker,args=('process',lock)) p.start() plist.append(p) p.join() #for process in record: # process.join()
此外,还有一点关于GIL锁的说明,在python多进程中,一样须要全局解释器锁,由于每一个子进程都有一把GIL,那么当它们向stdout输出时,能够同时争抢stdout资源,致使在每一个子进程输出时会把不一样子进程的输出字符混合在一块儿,没法阅读,影响代码的标志位判断,因此上例子中使用了Lock同步,在一个子进程输出完成以后再容许另外一个子进程获得stdout的权限,这样避免了多个任务同时向终端输出。编程
二、对IPC的处理服务器
multiprocessing包与threading模块的另外一个差别特性体如今IPC上,python的multiprocessing包自带了对Pipe和Queue的管理,效率上更高,而threading模块须要与Queue模块或os.popen()、subprocess.Popen()等配合使用。
根据Unix环境高级编程的第15章进程间通讯的描述,经典的IPC包括管道、FIFO、消息队列、信号量、以及共享存储。不过应用最多的仍是管道。书中指出咱们应该把管道当作是半双工的,而且只能在具备公共祖先的两个进程之间使用。
下面咱们用一下Pipe()和Queue()方法:网络
a、关于Pipe()多线程
对照书中给出的底层pipe接口函数,咱们看到Pipe方法在Unix平台上实现源码以下:并发
def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' if duplex: s1, s2 = socket.socketpair() s1.setblocking(True) s2.setblocking(True) c1 = _multiprocessing.Connection(os.dup(s1.fileno())) c2 = _multiprocessing.Connection(os.dup(s2.fileno())) s1.close() s2.close() else: fd1, fd2 = os.pipe() c1 = _multiprocessing.Connection(fd1, writable=False) c2 = _multiprocessing.Connection(fd2, readable=False) return c1, c2
首先,Pipe能够是单向(half-duplex),也能够是双向的(duplex),默认为双向的。咱们能够经过multiprocessing.Pipe(duplex=False)建立单向的管道。该方法返回一个元祖,包含两个文件描述符,若是为单向的,则为(read-only connection,write-only connection);若是为双向的,则为(read-write Connection, read-write Connection)。一个进程从Pipe一端输入对象(fd[1]),而后被Pipe另外一端的进程接收(fd[0]),两个进程要有同一个父进程或者其中一个是父进程。单向管道只容许管道一端的进程输入,而双向管道则容许从两端输入。这里的双向管道相似于书中提到的“协同进程”的概念。
例如:
#!/usr/bin/env python import multiprocessing as mul def proc1(pipe): # pipe.send('hello') print 'proc1 rec:',pipe.recv() def proc2(pipe): # print 'proc2 rec:',pipe.recv() pipe.send('hello too') pipe = mul.Pipe(duplex=False) #pipe = mul.Pipe() p1 = mul.Process(target=proc1,args=(pipe[0],)) # 读管道 p2 = mul.Process(target=proc2,args=(pipe[1],)) # 写管道 # 因为管道是单向的,对象pipe[0]只有读的权限(recv),而pipe[1]只有写的权限(send)。 #print pipe p1.start() p2.start() p1.join() p2.join()
b、关于Queue()
Queue与Pipe相相似,都是先进先出的结构,但Queue容许多个进程放入,多个进程从队列取出对象。这里能够与Queue模块相类比学习。Queue方法实际上是Unix环境高级编程IPC中FIFO命名管道的实现方法。FIFO可用于有如下两种状况:
---shell命令使用FIFO将数据从一条管道传送到另外一条时,无需建立中间临时文件。
---客户进程-服务器进程应用程序中,FIFO用做汇聚点,在客户进程和服务器进程两者之间传递数据。
如下就FIFO的第二种状况写一个python例子:
#!/usr/bin/env python # -*- coding:utf-8 -*- import multiprocessing import time import os # 客户进程,向众所周知的FIFO服务器进程发送请求 def client_proc(queue,msg): request = 'I am client ' + str(msg) + ' pid: '+ str(os.getpid()) + ' time:' + str(time.time()) # 注意信息的格式,都统一为字符串类型 queue.put(request) def server_proc(queue,lock): msg = queue.get() lock.acquire() print msg + '--------------->I am server ' + 'pid: ' + str(os.getpid()) lock.release() plist_cli = [] plist_ser = [] lock = multiprocessing.Lock() queue = multiprocessing.Queue() # 参数为空,默认为队列可无限长 for i in range(10): p1 = multiprocessing.Process(target=client_proc,args=(queue,i)) p2 = multiprocessing.Process(target=server_proc,args=(queue,lock)) p1.start() p2.start() plist_cli.append(p1) plist_ser.append(p2) for proc in plist_cli: proc.join() for proc in plist_ser: proc.join() queue.close()
输出以下:
I am client 2 pid: 9867 time:1482489226.77--------------->I am server pid: 9879
I am client 0 pid: 9865 time:1482489226.77--------------->I am server pid: 9881
I am client 4 pid: 9869 time:1482489226.77--------------->I am server pid: 9884
I am client 1 pid: 9866 time:1482489226.77--------------->I am server pid: 9886
I am client 3 pid: 9868 time:1482489226.78--------------->I am server pid: 9888
I am client 7 pid: 9872 time:1482489226.78--------------->I am server pid: 9889
I am client 5 pid: 9870 time:1482489226.78--------------->I am server pid: 9892
I am client 6 pid: 9871 time:1482489226.78--------------->I am server pid: 9891
I am client 9 pid: 9878 time:1482489226.78--------------->I am server pid: 9893
I am client 8 pid: 9875 time:1482489226.78--------------->I am server pid: 9894
从输出能够看出,10个客户端进程把生产信息放入队列,10个服务端进程从队列取出信息而且打印,从打印时间和msg的子进程编号来看,10个服务端进程争夺stdout,经过Lock使它们有序输出,不至于输出信息混乱,msg编号没有从0排至9正是由于它们被分配给了不一样的cpu资源,不一样cpu资源在处理速度上不会彻底同样,因此争夺stdout的能力也不一样。
三、共享内存和Manager管理
众所周知,在处理多进程时,每一个进程都有本身独立的内存空间,因此在多进程环境中咱们应该尽可能避免共享资源,不然要依赖与IPC。python的多进程除了上面提到的经常使用的依赖于管道和FIFO以外,还能够经过共享内存和Manager的方法来共享资源。这个不经常使用,因为共享内存涉及同步的问题,会下降程序的效率而不推荐使用。之后涉及到再扩展。
四、进程池
参考博客:http://www.cnblogs.com/kaituorensheng/p/4465768.html
当咱们在编写网络服务端时,Unix网络编程一书中提到服务端须要fork子进程,用子进程来处理监听到的链接请求,创建链接套接字,并在子进程中关闭监听套接字,父进程中关闭链接套接字。那么,当链接的并发不是很大时,咱们能够利用进程池的方式来处理到来的链接。multiprocessing.Pool能够提供指定数量的进程供用户调用,当有新的请求提交到pool中时,若是进程池尚未满,那么就会建立一个新的进程用来执行该请求;若是池中的进程数已经达到最大值,那么该请求将会阻塞等待,直到池中有进程结束,才会建立新的进程来处理该请求。
Pool方法默认的初始值以下:
def __init__(self, processes=None, initializer=None, initargs=(),maxtasksperchild=None)
一般,咱们应该指定进程池的大小,若是不指定,默认为cpu的个数,即processes=cpu_count(),咱们能够用该模块自带的方法查看本机的cpu个数,
print multiprocessing.cpu_count()。下面看个进程池的例子:
#!/usr/bin/env python # -*- coding:utf-8 -*- import multiprocessing import time def func(msg): print 'msg:',msg time.sleep(3) print 'end' pool = multiprocessing.Pool(processes=3) for i in xrange(4): msg = 'hello %d' % (i) pool.apply_async(func,(msg,)) #非阻塞 # pool.apply(func,(msg,)) #阻塞,apply()源自内建函数,用于间接的调用函数,而且按位置把元祖或字典做为参数传入。 # pool.imap(func,[msg,]) #非阻塞, 注意与apply传的参数的区别 # pool.map(func,[msg,]) #阻塞 print 'Mark~~~~~~~~~~~~~~~' pool.close() pool.join() # 调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束 print 'sub-process done'
注意apply_async和apply的差异,此外,进程池请求函数处理还能够用map,imap,注意传递参数的区别。
#!/usr/bin/env python # -*- coding:utf-8 -*- import multiprocessing import time def func(msg): print 'msg:',msg time.sleep(3) print 'end' pool = multiprocessing.Pool(50) msg = range(50) #pool.imap(func,msg) #非阻塞, 注意与apply传的参数的区别 pool.map(func,msg) #阻塞 print 'Mark~~~~~~~~~~~~~~~' pool.close() pool.join() print 'sub-process done'
此外,若是子进程的处理函数中包含返回值,咱们能够在父进程中对子进程调用get方法,将返回值取出,这里注意,要调用get方法的时候,进程池必须采用apply_async调用函数。例如:
if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) result = [] for i in xrange(3): msg = "hello %d" %(i) result.append(pool.apply_async(func, (msg, ))) pool.close() pool.join() for res in result: print ":::", res.get() print "Sub-process(es) done."
最后,调用close()以后,进程池再也不建立新的进程;
调用join()以后,wait进程池中的所有进程。必须对Pool先调用close()方法才能join。
参考博客:http://www.lxway.com/4488626156.htm