python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。Python提供了multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。python
multiprocessing模块的功能众多:支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。编程
须要再次强调的一点是:与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。windows
建立进程的类:数组
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动) 强调: 1. 须要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍:安全
group参数未使用,值始终为None target表示调用对象,即子进程要执行的任务 args表示调用对象的位置参数元组,args=(1,2,'egon',) kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} name为子进程的名称
方法介绍:网络
p.start():启动进程,并调用该子进程中的p.run() p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法 p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁 p.is_alive():若是p仍然运行,返回True p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
属性介绍:多线程
p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置 p.name:进程的名称 p.pid:进程的pid p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束(了解便可) p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功(了解便可)
注:p.pid和p.ppid分别查询当前进程id和父进程id同:os.getpid()和os.getppid()并发
注意:windows建立进程必定要写在if __name__ == '__main__':的里面,由于子进程会执行一遍整个文件,若是放在外面会无限建立子进程,爆栈app
第一种方式dom
def func(): print('child') if __name__ == '__main__': p = Process(target=func) # 建立一个进程对象 p.start() # 告诉操做系统我要建立一个进程,直到执行了start才有进程
第二种方式:
import time import os import random from multiprocessing import Process class Piao(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print('%s piaoing' % self.name) time.sleep(random.randrange(1, 5)) print('%s piao end' % self.name) self.walk() # 只有这种才是真正的进程本身调用walk def walk(self): print("子进程:", os.getpid()) if __name__ == '__main__': p1 = Piao('egon') p2 = Piao('alex') p3 = Piao('wupeiqi') p4 = Piao('yuanhao') p1.start() # start会自动调用run p2.start() p3.start() p4.start() p1.walk() # 这样写至关于在主进程调用类中的方法,只有在类中run中写才是子进程本身调用 print('主线程')
join详解:
正常状况下父进程会等子进程结束后再结束进程(尽管父进程早已结束),可是子进程和父进程是异步的,若想父进程等子进程结束后再进行下一步则用join()
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.start() p.join(0.0001) #等待p中止,等0.0001秒就再也不等了 print('开始') join:主进程等,等待子进程结束
这里就会有个优化问题,多个子进程能够装在列表里,而后循环join
import time,os from multiprocessing import Process def desk(i): time.sleep(1) print('%d:子进程%d干的事,父进程是:%d' %(i,os.getpid(),os.getppid())) if __name__ == '__main__': p_list = [] for i in range(10): p = Process(target=desk, args=(i,)) p.start() # p.join() p_list.append(p) for p in p_list: p.join() print('+++++++++++++++++++++++++++')
在windows中,父进程会等子进程结束才结束,若让父进程不等子进程而可自行结束(子进程也要跟着结束),则须要将子进程设置为守护进程
主进程建立守护进程
其一:守护进程会在主进程代码执行结束后就终止,注意不是主进程所有结束,而是主进程的代码执行结束,守护进程不等子进程
其二:守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.daemon=True #必定要在p.start()前设置,设置p为守护进程,禁止p建立子进程,而且父进程代码执行结束,p即终止运行 p.start() print('主')
进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。
加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
acqurie()和release()中间的程序被锁住,只有有钥匙才能访问
#由并发变成了串行,牺牲了运行效率,但避免了竞争 from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() # 请求锁,阻塞 print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) lock.release() # 用完还锁 if __name__ == '__main__': lock=Lock() # 实例化锁 for i in range(3): p=Process(target=work,args=(lock,)) # 添加锁 p.start() 加锁:由并发变成了串行,牺牲了运行效率,但避免了竞争
信号量(至关于一把锁上好几把钥匙,这几把钥匙的拥有者可同时访问)
互斥锁 同时只容许一个线程更改数据,而Semaphore是同时容许必定数量的线程更改数据 ,好比厕全部3个坑,那最多只容许3我的上厕所,后面的人只能等里面有人出来了才能再进去,若是指定信号量为3,那么来一我的得到一把锁,计数加1,当计数等于3时,后面的人均须要等待。一旦释放,就有人能够得到一把锁 信号量与进程池的概念很像,可是要区分开,信号量涉及到加锁的概念 from multiprocessing import Process,Semaphore import time,random def go_wc(sem,user): sem.acquire() print('%s 占到一个茅坑' %user) time.sleep(random.randint(0,3)) #模拟每一个人拉屎速度不同,0表明有的人蹲下就起来了 sem.release() if __name__ == '__main__': sem=Semaphore(5) p_l=[] for i in range(13): p=Process(target=go_wc,args=(sem,'user%s' %i,)) p.start() p_l.append(p) for i in p_l: i.join() print('============》') 信号量Semahpore(同线程同样)
# python线程的事件用于主线程控制其余线程的执行,事件主要提供了三个方法 set、wait、clear。 # # 事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。 # # clear:将“Flag”设置为False # set:将“Flag”设置为True # e.is_set是判断是否Flag是True from multiprocessing import Process,Event import time,random def traffic_light(e): while True: if e.is_set(): time.sleep(3) print('红灯亮') e.clear() # 绿变红 else: time.sleep(3) print('绿灯亮') e.set() # 红变绿 def car(i,e): e.wait() # 触发事件 print("%s车经过"%i) if __name__ == '__main__': e = Event() # 生成事件对象 # 起一个进程,只执行红绿灯转换 p = Process(target=traffic_light, args=(e,)) p.start() for i in range(100): if i%6 == 0: time.sleep(random.randint(1,3)) car_pro = Process(target=car, args=(i,e)) car_pro.start()
from multiprocessing import Process,Queue Queue([maxsize]):建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。 maxsize是队列中容许最大项数,省略则无大小限制。 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常。 q.get方法能够从队列读取而且删除一个元素。一样,get方法有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常. q.get_nowait():同q.get(False) q.put_nowait():同q.put(False) q.empty():调用此方法时q为空则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中又加入了项目。 q.full():调用此方法时q已满则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中的项目被取走。 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()同样 #了解 1 q.cancel_join_thread():不会在进程退出时自动链接后台线程。能够防止join_thread()方法阻塞 2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,若是某个使用者正在被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误。 3 q.join_thread():链接队列的后台线程。此方法用于在调用q.close()方法以后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread方法能够禁止这种行为
实例
from multiprocessing import Process, Queue def q_get(q): print(q.get()) def q_put(q): q.put('hello') if __name__ == '__main__': q = Queue() p1 = Process(target=q_put, args=(q,)) p1.start() p2 = Process(target=q_get, args=(q,)) p2.start()
生产者消费者模型:
生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。
基于队列实现生产者消费者模型
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主')
此时的问题是主进程永远不会结束,缘由是:生产者p在生产完后就结束了,可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。
解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) q.put(None) #发送结束信号 if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主') 生产者在生产完毕后发送结束信号None
注意:结束信号None,不必定要由生产者发,主进程里一样能够发,但主进程须要等生产者结束后才应该发送该信号
但上述解决方式,在有多个生产者和多个消费者时,咱们则须要用一个很low的方式去解决
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(name,q): for i in range(2): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨头',q)) p3=Process(target=producer,args=('泔水',q)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) #开始 p1.start() p2.start() p3.start() c1.start() p1.join() #必须保证生产者所有生产完毕,才应该发送结束信号 p2.join() p3.join() q.put(None) #有几个消费者就应该发送几回结束信号None q.put(None) #发送结束信号 print('主') 有几个消费者就须要发送几回结束信号:至关low
其实咱们的思路无非是发送结束信号而已,有另一种队列提供了这种机制
JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
参数介绍: maxsize是队列中容许最大项数,省略则无大小限制。
方法介绍:JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止
注意:消费者进程要设置成守护进程!
from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()发送一次信号,证实一个数据已经被取走了 def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) q.join() # 阻塞到q队列中全部的项目均被q.task_done()方法为止 if __name__ == '__main__': q=JoinableQueue() #生产者们:即厨师们 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨头',q)) p3=Process(target=producer,args=('泔水',q)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True # 设置为守护进程 c2.daemon=True # 设置为守护进程 #开始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() p2.join() p3.join() # 当全部生产者结束后(队列中也被取完)就能够直接结束了(守护进程) print('主') #主进程等--->p1,p2,p3等---->c1,c2 #p1,p2,p3结束了,证实c1,c2确定全都收完了p1,p2,p3发到队列的数据 #于是c1,c2也没有存在的价值了,应该随着主进程的结束而结束,因此设置成守护进程
#建立管道的类:
Pipe([duplex]):在进程之间建立一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的链接对象,强调一点:必须在产生Process对象以前产生管道
#参数介绍:
dumplex:默认管道是全双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送。
#主要方法:
conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是同一Pipe对象生成的另一端已经关闭,那么recv方法会抛出EOFError。
注意:
foo,son = Pipe()
p = Process(target=func,args=((foo,son),))这种传参进去的管道双端不是同一Pipe对象,相似于形参,可能关一端,另外一端不受影响
conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象
其余方法:
conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回链接使用的整数文件描述符
conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。
基于管道实现进程间通讯(与队列的方式是相似的,队列就是管道加锁实现的)
from multiprocessing import Process,Pipe import time,os def consumer(p,name): left,right=p left.close() while True: try: baozi=right.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: right.close() break def producer(seq,p): left,right=p right.close() for i in seq: left.send(i) # time.sleep(1) else: left.close() if __name__ == '__main__': left,right=Pipe() c1=Process(target=consumer,args=((left,right),'c1')) c1.start() seq=(i for i in range(10)) producer(seq,(left,right)) right.close() left.close() c1.join() print('主进程')
注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。若是忘记执行这些步骤,程序可能再消费者中的recv()操做上挂起。管道是由操做系统进行引用计数的,必须在全部进程中关闭管道后才能生产EOFError异常。所以在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。
进程间通讯应该尽可能避免使用本节所讲的共享数据的方式
进程间数据是独立的,能够借助于队列或管道实现通讯,两者都是基于消息传递的
虽然进程间数据独立,但能够经过Manager实现数据共享,事实上Manager的功能远不止于此
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,
from multiprocessing import Manager,Process,Lock import os def work(d,lock): # with lock: #不加锁而操做共享的数据,确定会出现数据错乱 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic) #{'count': 94} 进程之间操做共享的数据
假如16核的cpu,那么最多也就只能跑16个进程,若是跑100个进程,则按时间轮转片来分,每一个进程分到的时间就不多,进程池简单说就是开一个只有固定可执行进程数的池子,而后若是有100个任务,轮着用着这四个进程,这就避免了开过多进程形成的浪费
Pool([numprocess [,initializer [, initargs]]]):建立进程池
参数:
numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值
initializer:是每一个工做进程启动时要执行的可调用对象,默认为None
initargs:是要传给initializer的参数组
主要方法:
p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。
p.map():传两个参数,一个是须要执行的方法,一个是可迭代对象(range(100))
p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用
其余方法(了解部分)
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法
obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起。
obj.ready():若是调用完成,返回True
obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常
obj.wait([timeout]):等待结果变为可用。
obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数
异步进程池
from multiprocessing import Pool import os,time def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 # 正常的进程无法拿到返回值,进程池能够 if __name__ == '__main__': p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res res_l.append(res) #异步apply_async用法:若是使用异步提交的任务,主进程须要使用jion,等待进程池内任务都处理完,而后能够用get收集结果,不然,主进程结束,进程池可能还没来得及执行,也就跟着一块儿结束了 p.close() # 必定要close一下,防止再向进程池添加 p.join() # 异步进程池主进程不等子进程,因此要join一下,否则进程池跟着主进程结束了 for res in res_l: print(res.get()) #使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get 异步调用apply_async
同步进程池
#一:使用进程池(异步调用,apply_async) #coding: utf-8 from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply_async(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 res_l.append(res) print("==============================>") #没有后面的join,或get,则程序总体结束,进程池中的任务还没来得及所有执行完也都跟着主进程一块儿结束了 pool.close() #关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成 pool.join() #调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束 print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证实结果已经计算完毕,剩下的事情就是调用每一个对象下的get方法去获取结果 for i in res_l: print(i.get()) #使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get #二:使用进程池(同步调用,apply) #coding: utf-8 from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(0.1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另一个 print("==============================>") pool.close() pool.join() #调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束 print(res_l) #看到的就是最终的结果组成的列表 for i in res_l: #apply是同步的,因此直接获得结果,没有get()方法 print(i) 详解:apply_async与apply
须要回调函数的场景:进程池中任何一个任务一旦处理完了,就当即告知主进程:我好了额,你能够处理个人结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
咱们能够把耗时间(阻塞)的任务放到进程池中,而后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
from multiprocessing import Pool import time,random import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() # res=requests.get('http://maoyan.com/board/7') # print(re.findall(pattern,res.text))