一 multiprocessing模块介绍python
python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。Python提供了multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。数据库
multiprocessing模块的功能众多:支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。编程
须要再次强调的一点是:与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。json
建立进程的类:windows
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动) 强调: 1. 须要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍:数组
group参数未使用,值始终为None target表示调用对象,即子进程要执行的任务 args表示调用对象的位置参数元组,args=(1,2,) kwargs表示调用对象的字典,kwargs={'name':'abc'} name为子进程的名称
方法介绍:安全
p.start():启动进程,并调用该子进程中的p.run() p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法 p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁 p.is_alive():若是p仍然运行,返回True p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
属性介绍:服务器
p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置 p.name:进程的名称 p.pid:进程的pid p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束(了解便可) p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功(了解便可)
注意:在windows中Process()必须放到# if __name__ == '__main__':下网络
建立并开启子进程的两种方式多线程
from multiprocessing import Process import time import random def task(name): print("start %s"%name) time.sleep(random.randint(3)) print("end %s"name) if __name__ == '__main__': p = Process(target=task,args=("abc",)) p.start() print("主线程")
from multiprocessing import Process import time import random class MyProcess(Process) def __init__(self,name): super().__init__() self.name = name def run(): print("start %s"%self.name) time.sleep(random.randint(3)) print("end %s"self.name) if __name__ == '__main__': p = MyProcess("abc") p.start() print("主线程")
进程直接的内存空间是隔离的
from multiprocessing import Process import os n = 100 def func(): global n n += 10 print("%s:%d"%(os.getpid(),n)) if __name__ == '__main__': p = Process(target=func) p.start() p.join() print("%s:%d"%(os.getpid(),n)) print("主") 运行结果 17424:110 15080:100 主
Process对象的join方法
#没有加join from multiprocessing import Process import time import os def func(): time.sleep(1) print("start:%s"%(os.getpid())) if __name__ == '__main__': p = Process(target=func) p.start() print("主:%s"%os.getpid()) 运行结果 主:18064 start:7064 #主进程先运行完毕,不会等待子进程 #加join后 from multiprocessing import Process import time import os def func(): time.sleep(1) print("start:%s"%(os.getpid())) if __name__ == '__main__': p = Process(target=func) p.start() p.join() print("主:%s"%os.getpid()) 运行结果 start:16988 主:1716 #主进程会堵塞等待子进程运行完毕后,才继续运行
from multiprocessing import Process import time import os def func(name): time.sleep(1) print("start:%s"%name) if __name__ == '__main__': start_time = time.time() p = Process(target=func,args=("p",)) p2 = Process(target=func,args=("p2",)) p.start() p2.start() p.join() p2.join() end_time = time.time() print("运行时间[%s]"%(end_time-start_time)) print("主:%s"%os.getpid()) 运行结果 start:p start:p2 运行时间[1.2825298309326172] #耗时是运行时间最长的进程,而不是之和 主:12056 #join是让主进程进行堵塞等待,对于其余的进程是不影响的 上述代码中同时开启了进程p和p2在进程p运行的时候,进程p2也是在运行的,等待的只是主进程。
Process对象的其余方法或属性(了解)
from multiprocessing import Process import time import random class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print("start %s" % self.name) time.sleep(random.randint(3)) if __name__ == '__main__': p = MyProcess("abc") p.start() p.terminate() # 关闭进程,不会当即关闭,因此is_alive马上查看的结果可能仍是存活 print(p.is_alive()) # 结果为True time.sleep(0.2) print("主线程") print(p.is_alive()) #结果为False
主进程建立守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
from multiprocessing import Process import time import random class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print("start %s" % self.name) time.sleep(random.randint(3)) if __name__ == '__main__': p = MyProcess("abc") p.daemon = True #设置要在p.start()以前 p.start() print("主线程") 运行结果 主进程 #将p设置为子进程以后,主进程一旦运行完毕,其守护进程无论是否运行完毕,都会被终止。
#主进程代码运行完毕,守护进程就会结束 from multiprocessing import Process from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,由于主进程打印main----时,p1也执行了,可是随即被终止
进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的,
而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理
一、多个进程共享同一打印终端
from multiprocessing import Process import time import random def foo(name): print("%s start print"%name) time.sleep(0.2) print("%s end print" % name) if __name__ == '__main__': l = [] for i in range(3): p = Process(target=foo,args=(i,)) l.append(p) for i in l: i.start() print("主") 运行结果 主 start print start print start print end print end print end print #进程间竞争打印终端,形成数据穿插
from multiprocessing import Process,Lock import time import random def foo(name,lock): lock.acquire() print("%s start print"%name) time.sleep(0.2) print("%s end print" % name) lock.release() if __name__ == '__main__': l = [] lock = Lock() for i in range(3): p = Process(target=foo,args=(i,lock,)) l.append(p) for i in l: i.start() print("主") 运行结果 主 0 start print 0 end print 1 start print 1 end print 2 start print 2 end print #程序变成串行运行,效率下降,但却保证了数据的安全
二、多个进程共享同一文件
文件当数据库,模拟抢票
#文件db的内容为:{"count":1} #注意必定要用双引号,否则json没法识别 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db.txt')) print('\033[43m剩余票数%s\033[0m' %dic['count']) def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模拟读数据的网络延迟 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模拟写数据的网络延迟 json.dump(dic,open('db.txt','w')) print('\033[43m购票成功\033[0m') def task(lock): search() get() if __name__ == '__main__': lock=Lock() for i in range(100): #模拟并发100个客户端抢票 p=Process(target=task,args=(lock,)) p.start() 并发运行,效率高,但竞争写同一文件,数据写入错乱
#文件db的内容为:{"count":1} #注意必定要用双引号,否则json没法识别 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db.txt')) print('\033[43m剩余票数%s\033[0m' %dic['count']) def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模拟读数据的网络延迟 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模拟写数据的网络延迟 json.dump(dic,open('db.txt','w')) print('\033[43m购票成功\033[0m') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock=Lock() for i in range(100): #模拟并发100个客户端抢票 p=Process(target=task,args=(lock,)) p.start() 加锁:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全
总结
#加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 虽然能够用文件共享数据实现进程间通讯,但问题是: 1.效率低(共享数据基于文件,而文件是硬盘上的数据) 2.须要本身加锁处理 #所以咱们最好找寻一种解决方案可以兼顾:一、效率高(多个进程共享一块内存的数据)二、帮咱们处理好锁问题。这就是mutiprocessing模块为咱们提供的基于消息的IPC通讯机制:队列和管道。 队列和管道都是将数据存放于内存中 队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来, 咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。
进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。
建立队列的类(底层就是以管道和锁定的方式实现):
1 Queue([maxsize]):建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。
参数介绍:
1 maxsize是队列中容许最大项数,省略则无大小限制。
方法介绍:
主要方法
q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常。
q.get方法能够从队列读取而且删除一个元素。一样,get方法有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():调用此方法时q为空则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中又加入了项目。
q.full():调用此方法时q已满则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中的项目被取走。
q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()同样
其余方法
1 q.cancel_join_thread():不会在进程退出时自动链接后台线程。能够防止join_thread()方法阻塞 2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,若是某个使用者正在被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误。 3 q.join_thread():链接队列的后台线程。此方法用于在调用q.close()方法以后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread方法能够禁止这种行为
应用
''' multiprocessing模块支持进程间通讯的两种主要形式:管道和队列 都是基于消息传递实现的,可是队列接口 ''' from multiprocessing import Process,Queue import time q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) print(q.full()) #满了 print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了
管道
进程间通讯(IPC)方式二:管道(不推荐使用,了解便可)
#建立管道的类: Pipe([duplex]):在进程之间建立一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的链接对象,强调一点:必须在产生Process对象以前产生管道 #参数介绍: dumplex:默认管道是全双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送。 #主要方法: conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另一端已经关闭,那么recv方法会抛出EOFError。 conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象 #其余方法: conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法 conn1.fileno():返回链接使用的整数文件描述符 conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。 conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。
from multiprocessing import Process,Pipe import time,os def consumer(p,name): left,right=p left.close() while True: try: baozi=right.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: right.close() break def producer(seq,p): left,right=p right.close() for i in seq: left.send(i) # time.sleep(1) else: left.close() if __name__ == '__main__': left,right=Pipe() c1=Process(target=consumer,args=((left,right),'c1')) c1.start() seq=(i for i in range(10)) producer(seq,(left,right)) right.close() left.close()
注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。若是忘记执行这些步骤,程序可能在消费者中的recv()操做上挂起。管道是由操做系统进行引用计数的,必须在全部进程中关闭管道后才能生产EOFError异常。所以在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。
from multiprocessing import Process,Pipe import time,os def adder(p,name): server,client=p client.close() while True: try: x,y=server.recv() except EOFError: server.close() break res=x+y server.send(res) print('server done') if __name__ == '__main__': server,client=Pipe() c1=Process(target=adder,args=((server,client),'c1')) c1.start() server.close() client.send((10,20)) print(client.recv()) client.close() c1.join() print('主进程') #注意:send()和recv()方法使用pickle模块对对象进行序列化。 管道能够用于双向通讯,利用一般在客户端/服务器中使用的请求/响应模型或远程过程调用,就可使用管道编写与进程交互的程序
共享数据
展望将来,基于消息传递的并发编程是大势所趋
即使是使用线程,推荐作法也是将程序设计为大量独立的线程集合
经过消息队列交换数据。这样极大地减小了对使用锁定和其余同步手段的需求,
还能够扩展到分布式系统中
进程间通讯应该尽可能避免使用本节所讲的共享数据的方式
Value、Array是经过共享内存的方式共享数据
Manager是经过共享进程的方式共享数据
Value\Array
from multiprocessing import Process,Lock import multiprocessing #Value/Array def func1(a,arr): a.value=3.14 for i in range(len(arr)): arr[i]=-arr[i] if __name__ == '__main__': num=multiprocessing.Value('d',1.0)#num=0 arr=multiprocessing.Array('i',range(10))#arr=range(10) p=multiprocessing.Process(target=func1,args=(num,arr)) p.start() p.join() print (num.value) print (arr[:]) #执行结果 3.14 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Manager管理的共享数据类型有:Value、Array、dict、list、Lock、Semaphore等等,同时Manager还能够共享类的实例对象。
实例代码:
from multiprocessing import Process,Manager def func1(shareList,shareValue,shareDict,lock): with lock: shareValue.value+=1 shareDict[1]='1' shareDict[2]='2' for i in range(len(shareList)): shareList[i]+=1 if __name__ == '__main__': manager=Manager() list1=manager.list([1,2,3,4,5]) dict1=manager.dict() array1=manager.Array('i',range(10)) value1=manager.Value('i',1) lock=manager.Lock() proc=[Process(target=func1,args=(list1,value1,dict1,lock)) for i in range(20)] for p in proc: p.start() for p in proc: p.join() print (list1) print (dict1) print (array1) print (value1) #运行结果 [21, 22, 23, 24, 25] {1: '1', 2: '2'} array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) Value('i', 21)
进程池
在利用Python进行系统管理的时候,特别是同时操做多个文件目录,或者远程控制多台主机,并行操做能够节约大量的时间。多进程是实现并发的手段之一,须要注意的问题是:
例如当被操做对象数目不大时,能够直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但若是是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时能够发挥进程池的功效。
咱们就能够经过维护一个进程池来控制进程数目,好比httpd的进程模式,规定最小进程数和最大进程数...
ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool能够提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,若是池尚未满,那么就会建立一个新的进程用来执行该请求;但若是池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。
建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个进程去执行全部任务,不会开启其余进程
Pool([numprocess [,initializer [, initargs]]]):建立进程池
参数介绍
numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值
initializer:是每一个工做进程启动时要执行的可调用对象,默认为None
initargs:是要传给initializer的参数组
方法介绍
p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async() p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。 p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成 P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用
应用
from multiprocessing import Pool import os,time def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) #同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程当中可能有阻塞也可能没有阻塞,但无论该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程当中如果任务发生了阻塞就会被夺走cpu的执行权限 res_l.append(res) print(res_l) 同步调用apply
from multiprocessing import Pool import os,time def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res res_l.append(res) #异步apply_async用法:若是使用异步提交的任务,主进程须要使用jion,等待进程池内任务都处理完,而后能够用get收集结果,不然,主进程结束,进程池可能还没来得及执行,也就跟着一块儿结束了 p.close() p.join() for res in res_l: print(res.get()) #使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get 异步调用apply_async