Python之路--Python基础10--并发编程之进程

1、multiprocessing模块介绍

  Python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。html

  Python提供了multiprocessing。multiprocessing模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。multiprocessing模块的功能众多:支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。须要再次强调的一点是:与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。python

 

2、Process类介绍

建立进程的类:git

  Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动)github

强调:数据库

  1. 须要使用关键字的方式来指定参数编程

  2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号。json

参数介绍:windows

  group参数未使用,值始终为None数组

  target表示调用对象,即子进程要执行的任务安全

  args表示调用对象的位置参数元组,args=(1,2,'egon',)

  kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}

  name为子进程的名称

方法介绍:

  p.start():启动进程,并调用该子进程中的p.run() 。
  p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法。

  p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁。
  p.is_alive():若是p仍然运行,返回True。

  p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程。

属性介绍:

  p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置

  p.name:进程的名称

  p.pid:进程的pid

  p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束(了解便可)

  p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功(了解便可)

 

2、Process类介绍

注意:在windows中Process()必须放到# if __name__ == '__main__':下,否则会报错。

一、建立并开启子进程的两种方式

#开进程的方法一:
import time import random from multiprocessing import Process def work(name): print('%s working' %name) time.sleep(random.randrange(1,5)) print('%s work end' %name) p1=Process(target=work,args=('egon',)) #必须加,号
p2=Process(target=work,args=('alex',)) p1.start() p2.start() print('主线程')
#开进程的方法二:
import time import random from multiprocessing import Process class Work(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s working' %self.name) time.sleep(random.randrange(1,5)) print('%s work end' %self.name) p1=Work('egon') p2=Work('alex') p1.start() #start会自动调用run
p2.start() print('主线程')

进程直接的内存空间是隔离的:

from multiprocessing import Process def work(): global n n = 0 print('子进程内: ', n) if __name__ == '__main__': n = 100 p = Process(target=work) p.start() print('主进程内: ', n) # 输出: # 主进程内: 100 # 子进程内: 0

 

二、Process对象的join方法

from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.start() p.join(0.0001) #等待p中止,等0.0001秒就再也不等了
print('开始')
from multiprocessing import Process import time import random def piao(name): print('%s is piaoing' %name) time.sleep(random.randint(1,3)) print('%s is piao end' %name) p1=Process(target=piao,args=('egon',)) p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('yuanhao',)) p4=Process(target=piao,args=('wupeiqi',)) p1.start() p2.start() p3.start() p4.start() #有的同窗会有疑问:既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗? #固然不是了,必须明确:p.join()是让谁等? #很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,

#详细解析以下: #进程只要start就会在开始运行了,因此p1-p4.start()时,系统中已经有四个并发的进程了 #而咱们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键 #join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其他p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接经过检测,无需等待 #因此4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
p1.join() p2.join() p3.join() p4.join() print('主线程') #上述启动进程与join进程能够简写为 # p_l=[p1,p2,p3,p4] #  # for p in p_l: # p.start() #  # for p in p_l: # p.join()

 

三、Process对象的其余方法或属性

#进程对象的其余方法一:terminate,is_alive
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,5)) print('%s is piao end' %self.name) p1=Piao('egon1') p1.start() p1.terminate()     #关闭进程,不会当即关闭,因此is_alive马上查看的结果可能仍是存活
print(p1.is_alive())  #结果为True

print('开始') print(p1.is_alive()) #结果为False
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Piao-1,
        # #因此加到这里,会覆盖咱们的self.name=name

        #为咱们开启的进程设置名字的作法
        super().__init__() self.name=name #这两个换下位子就行了 def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.start() print('开始') print(p.pid) #查看pid

 

主进程建立守护进程

  1:守护进程会在主进程代码执行结束后就终止

  2:守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children

from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.daemon=True #必定要在p.start()前设置,设置p为守护进程,禁止p建立子进程,而且父进程代码执行结束,p即终止运行
p.start() print('')

 

3、进程同步(锁)

  进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

part1:多个进程共享同一打印终端

#并发运行,效率高,但竞争同一打印终端,带来了打印错乱
from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) if __name__ == '__main__': for i in range(3): p=Process(target=work) p.start()
#由并发变成了串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,)) p.start()

 

part2:多个进程共享同一文件

文件当数据库,模拟抢票

#并发运行,效率高,但竞争写同一文件,数据写入错乱

#文件db的内容为:{"count":1} #注意必定要用双引号,否则json没法识别
from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db.txt')) print('剩余票数%s' %dic['count']) def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模拟读数据的网络延迟
    if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模拟写数据的网络延迟
        json.dump(dic,open('db.txt','w')) print('购票成功') def task(lock): search() #lock.acquire()
 get() #lock.release()

if __name__ == '__main__': lock=Lock() for i in range(5): #模拟并发5个客户端抢票
        p=Process(target=task,args=(lock,)) p.start()

输出:

剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
购票成功
购票成功
购票成功
购票成功
购票成功

解决办法:加锁:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全

#文件db的内容为:{"count":1} #注意必定要用双引号,否则json没法识别
from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db.txt')) print('剩余票数%s' %dic['count']) def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模拟读数据的网络延迟
    if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模拟写数据的网络延迟
        json.dump(dic,open('db.txt','w')) print('购票成功') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock=Lock() for i in range(5): #模拟并发5个客户端抢票
        p=Process(target=task,args=(lock,)) p.start()
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
购票成功

总结:

  加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然能够用文件共享数据实现进程间通讯,但问题是:

  一、效率低(共享数据基于文件,而文件是硬盘上的数据)

  二、须要本身加锁处理

所以咱们最好找寻一种解决方案可以兼顾:一、效率高(多个进程共享一块内存的数据)二、帮咱们处理好锁问题。这就是mutiprocessing模块为咱们提供的基于消息的IPC通讯机制:队列和管道。

 

一、队列和管道都是将数据存放于内存中

二、队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来,

咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。

 

4、队列

进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

建立队列的类(底层就是以管道和锁定的方式实现)

  Queue([maxsize]):建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。

参数介绍:

  maxsize是队列中容许最大项数,省略则无大小限制。   

主要方法介绍:

  q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常。

  q.get方法能够从队列读取而且删除一个元素。一样,get方法有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常.
  q.get_nowait():同q.get(False)
  q.put_nowait():同q.put(False)

  q.empty():调用此方法时q为空则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中又加入了项目。
  q.full():调用此方法时q已满则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中的项目被取走。
  q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()同样

  q.cancel_join_thread():不会在进程退出时自动链接后台线程。能够防止join_thread()方法阻塞

  q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,若是某个使用者正在被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误。

  q.join_thread():链接队列的后台线程。此方法用于在调用q.close()方法以后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread方法能够禁止这种行为

 

栗子:

''' multiprocessing模块支持进程间通讯的两种主要形式:管道和队列 都是基于消息传递实现的,可是队列接口 '''

from multiprocessing import Process,Queue import time q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty
q.put(3) q.put(3) q.put(3) print(q.full()) #满了

print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了

 

生产者消费者模型

  在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。

为何要使用生产者和消费者模式

  在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

什么是生产者消费者模式

  生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

#基于队列实现生产者消费者模型

from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们
    p1=Process(target=producer,args=(q,)) #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,)) #开始
 p1.start() c1.start() print('')

此时的问题是主进程永远不会结束,缘由是:生产者p在生产完后就结束了,可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环

from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) q.put(None) #发送结束信号

if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('')

 

注意:结束信号None,不必定要由生产者发,主进程里一样能够发,但主进程须要等生产者结束后才应该发送该信号

#主进程在生产者生产完毕后发送结束信号None

from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们
    p1=Process(target=producer,args=(q,)) #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,)) #开始
 p1.start() c1.start() p1.join() q.put(None) #发送结束信号
    print('')

但上述解决方式,在有多个生产者和多个消费者时,咱们则须要用一个很low的方式去解决,有几个消费者就须要发送几回结束信号:至关low。其实咱们的思路无非是发送结束信号而已,有另一种队列提供了这种机制。

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

#参数介绍:

  maxsize是队列中容许最大项数,省略则无大小限制。    

#方法介绍:

  JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:

  q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常

  q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止

from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()发送一次信号,证实一个数据已经被取走了

def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) q.join() if __name__ == '__main__': q=JoinableQueue() #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨头',q)) p3=Process(target=producer,args=('泔水',q)) #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True c2.daemon=True #开始
    p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('') #主进程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3结束了,证实c1,c2确定全都收完了p1,p2,p3发到队列的数据
    #于是c1,c2也没有存在的价值了,应该随着主进程的结束而结束,因此设置成守护进程

 

5、共享数据

  展望将来,基于消息传递的并发编程是大势所趋,即使是使用线程,推荐作法也是将程序设计为大量独立的线程集合,经过消息队列交换数据。这样极大地减小了对使用锁定和其余同步手段的需求,还能够扩展到分布式系统中,进程间通讯应该尽可能避免使用本节所讲的共享数据的方式。

from multiprocessing import Manager,Process,Lock import os def work(d,lock): # with lock: #不加锁而操做共享的数据,确定会出现数据错乱
        d['count']-=1

if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic) #{'count': 94}

 

6、进程池

 

  在利用Python进行系统管理的时候,特别是同时操做多个文件目录,或者远程控制多台主机,并行操做能够节约大量的时间。多进程是实现并发的手段之一,须要注意的问题是:

  一、很明显须要并发执行的任务一般要远大于核数

  二、一个操做系统不可能无限开启进程,一般有几个核就开几个进程

  三、进程开启过多,效率反而会降低(开启进程是须要占用系统资源的,并且开启多余核数目的进程也没法作到并行)

  例如当被操做对象数目不大时,能够直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但若是是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时能够发挥进程池的功效。

  咱们就能够经过维护一个进程池来控制进程数目,好比httpd的进程模式,规定最小进程数和最大进程数... 
  ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool能够提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,若是池尚未满,那么就会建立一个新的进程用来执行该请求;但若是池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

  建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个进程去执行全部任务,不会开启其余进程

 

Pool([numprocess [,initializer [, initargs]]]):建立进程池 

参数介绍:

  numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值

  initializer:是每一个工做进程启动时要执行的可调用对象,默认为None

  initargs:是要传给initializer的参数组

方法介绍:

  p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()
  p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。
  p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
  p.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用

 

方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法

  obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起。

  obj.ready():若是调用完成,返回True

  obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常

  obj.wait([timeout]):等待结果变为可用。

  obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数

栗子:

#同步调用apply,一个一个一执行

from multiprocessing import Pool import os,time def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2

if __name__ == '__main__': p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务
    res_l=[] for i in range(10): res=p.apply(work,args=(i,)) #同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程当中可能有阻塞也可能没有阻塞,但无论该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程当中如果任务发生了阻塞就会被夺走cpu的执行权限
 res_l.append(res) print(res_l)
#异步调用apply_async,三个三个执行

from multiprocessing import Pool import os,time def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2

if __name__ == '__main__': p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务
    res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res
 res_l.append(res) #异步apply_async用法:若是使用异步提交的任务,主进程须要使用jion,等待进程池内任务都处理完,而后能够用get收集结果,不然,主进程结束,进程池可能还没来得及执行,也就跟着一块儿结束了
 p.close() p.join() for res in res_l: print(res.get()) #使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get
# 详解:apply_async与apply

#一:使用进程池(异步调用,apply_async) #coding: utf-8
from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
 res_l.append(res) print("==============================>") #没有后面的join,或get,则程序总体结束,进程池中的任务还没来得及所有执行完也都跟着主进程一块儿结束了
 pool.close() #关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
    pool.join()   #调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证实结果已经计算完毕,剩下的事情就是调用每一个对象下的get方法去获取结果
    for i in res_l: print(i.get()) #使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get

#二:使用进程池(同步调用,apply) #coding: utf-8
from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(0.1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另一个
    print("==============================>") pool.close() pool.join() #调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束

    print(res_l) #看到的就是最终的结果组成的列表
    for i in res_l: #apply是同步的,因此直接获得结果,没有get()方法
        print(i)

 

栗子2:

服务端:

#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count()) #开启6个客户端,会发现2个客户端处于等待状态 #在每一个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
from socket import *
from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): print('进程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break

if __name__ == '__main__': p=Pool() while True: conn,client_addr=server.accept() p.apply_async(talk,args=(conn,client_addr)) # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问

 

客户端:

from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))

发现:并发开启多个客户端,服务端同一时间只有3个不一样的pid,干掉一个客户端,另一个客户端才会进来,被3个进程之一处理。

 

回掉函数:

  须要回调函数的场景:进程池中任何一个任务一旦处理完了,就当即告知主进程:我好了额,你能够处理个人结果了。主进程则调用一个函数去处理该结果,该函数即回调函数。咱们能够把耗时间(阻塞)的任务放到进程池中,而后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

from multiprocessing import Pool import requests import json import os def get_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的结果,其实彻底不必拿该结果,该结果已经传给回调函数处理了

''' 打印结果: <进程3388> get https://www.baidu.com <进程3389> get https://www.python.org <进程3390> get https://www.openstack.org <进程3388> get https://help.github.com/ <进程3387> parse https://www.baidu.com <进程3389> get http://www.sina.com.cn/ <进程3387> parse https://www.python.org <进程3387> parse https://help.github.com/ <进程3387> parse http://www.sina.com.cn/ <进程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''
#爬虫案例

from multiprocessing import Pool import time,random import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))

若是在主进程中等待进程池中全部任务都执行完毕后,再统一处理结果,则无需回调函数

from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2
if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待进程池中全部进程执行完毕
 nums=[] for res in res_l: nums.append(res.get()) #拿到全部结果
    print(nums) #主进程拿到全部的处理结果,能够在主进程中进行统一进行处理

 

进程池的其余实现方式:https://docs.python.org/dev/library/concurrent.futures.html

相关文章
相关标签/搜索