python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。Python提供了multiprocessing。
html
multiprocessing模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。python
multiprocessing模块的功能众多:支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。linux
须要再次强调的一点是:与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。数据库
1.建立进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动)
强调:
1. 须要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
2. 参数介绍:
1 group参数未使用,值始终为None
3 target表示调用对象,即子进程要执行的任务
5 args表示调用对象的位置参数元组,args=(1,2,'jame',)
7 kwargs表示调用对象的字典,kwargs={'name':'jame','age':18}
9 name为子进程的名称编程
3.方法介绍:
p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法 .
p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死 锁。
p.is_alive():若是p仍然运行,返回True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。json
timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 windows
4.属性介绍:
p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置
p.name:进程的名称
p.pid:进程的pid
p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束(了解便可)
p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。数组
这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功(了解便可)安全
1.建立并开启子进程的两种方法: 网络
from multiprocessing import Process import os import time def task1(name): print('%s: %s is start,父进程pid:[%s]'%(name,os.getpid(),os.getppid())) time.sleep(1) print('%s :%s is stop,父进程Pid:[%s]'%(name,os.getpid(),os.getppid())) def task2(name): print('%s: %s is start,父进程pid:[%s]'%(name,os.getpid(),os.getppid())) time.sleep(1) print('%s :%s is stop,父进程Pid:[%s]'%(name,os.getpid(),os.getppid())) if __name__ == '__main__': p1=Process(name='001子进程',target=task1,args=('子进程1',)) p2=Process(target=task2,kwargs={'name':'子进程2'}) print(p1.name) print(p2.name) p2.start() print('若是有join则最后打印父进程.pid:%s,父进程的父进程:%s'%(os.getpid(),os.getppid()))
#开进程的方法二:#在linux下运行,windows下会提示要在main方法下运行 from multiprocessing import Process import time class Myp(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s is running'%self.name) time.sleep(1) print('%s is stop '%self.name) p1=Myp('egon') p1.start() print('主进程')
2.进程之间的内存是隔离的
from multiprocessing import Process n=100 #在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就能够了 y=111 def work(): global n n=0 print('子进程内n,y: ',n,y) if __name__ == '__main__': p=Process(target=work) p.start() y=222 print('主进程内n,y:',n,y) ''' 主进程内n,y: 100 222 子进程内n,y: 0 111 总结:子进程是复制父进程的变量, 子进程改更本身的变量后,不影响父进程的变量的值。 '''
3.用多进程方式使socket有并发的做用?
#Author http://www.cnblogs.com/Jame-mei from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg: break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': while True: conn,client_addr=server.accept() print(conn,client_addr) p=Process(target=talk,kwargs={'conn':conn,'client_addr':client_addr}) p.start()
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf
问题:每来一个客户端,都在服务端开启一个进程,若是并发来一个万个客户端,要开启一万个进程吗,你本身尝试着在你本身的机器上开启一万个,10万个进程试一试。
解决方法:进程池!
4.Process对象的join方法
#Author http://www.cnblogs.com/Jame-mei from multiprocessing import Process import time import random def task(name): print('%s is start' %name) time.sleep(random.randrange(1,3)) print('%s is stop' %name) if __name__ == '__main__': p=Process(target=task,args=('jame',)) p.start() p.join() #p.join(0.0001) #等待p中止,等0.0001秒就再也不等了 print('开始') ''' jame is start jame is stop 开始 #因此是主进程等待子进程结束,并非p等着p结束噢 '''
#Author http://www.cnblogs.com/Jame-mei from multiprocessing import Process import time import random def piao(name): print('%s is piaoing' %name) time.sleep(random.randint(1,3)) print('%s is piao end' %name) p1=Process(target=piao,args=('egon',)) p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('yuanhao',)) p4=Process(target=piao,args=('wupeiqi',)) p1.start() p2.start() p3.start() p4.start() #有的同窗会有疑问:既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗? #固然不是了,必须明确:p.join()是让谁等? #很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p, #详细解析以下: #进程只要start就会在开始运行了,因此p1-p4.start()时,系统中已经有四个并发的进程了 #而咱们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键 #join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其他p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接经过检测,无需等待 # 因此4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间 p1.join() p2.join() p3.join() p4.join() print('主线程') #上述启动进程与join进程能够简写为 # p_l=[p1,p2,p3,p4] # # for p in p_l: # p.start() # # for p in p_l: # p.join() #有了join,程序不就是串行了吗???
5.Process对象的其余方法或属性(了解)
from multiprocessing import Process import time #terminate与is_alive def task(name): print('%s is start'%name) time.sleep(1) print('%s is start'%name) if __name__ == '__main__': p1=Process(target=task,args=('子进程',)) p1.start() p1.terminate() #关闭进程,不会当即关闭,因此is_alive马上查看的结果可能仍是存活 print(p1.is_alive()) #True time.sleep(2) #先先等2秒,看看p1是否关闭 print(p1.is_alive()) #False
#Author http://www.cnblogs.com/Jame-mei from multiprocessing import Process import time,os #name pid def task(name): print('%s is start pid:%s,父进程:%s'%(name,os.getpid(),os.getppid())) time.sleep(1) def task2(name): print('%s is start pid:%s,父进程:%s'%(name,os.getpid(),os.getppid())) time.sleep(1) if __name__ == '__main__': p1=Process(target=task,args=('子进程task',)) p2=Process(name='子进程2',target=task,args=('子进程task2',)) p1.start() p2.start() print('p1进程的名字:',p1.name) print('p2进程的名字:',p2.name) ''' p1进程的名字: Process-1 p2进程的名字: 子进程2 子进程task is start pid:2540,父进程:9524 子进程task2 is start pid:10332,父进程:9524 '''
注意:在windows中Process()必须放到# if __name__ == '__main__':下建立与执行进程.....
if __name__ == "__main__" since statements inside this if-statement will not get called upon import. 因为Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 若是在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。
主进程建立守护进程:
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
# @Time : 2018/9/4 9:39 # @Author : Jame from multiprocessing import Process import time,random def task(name): print('%s is start'%name) time.sleep(random.randrange(1,3)) print('%s is stop'%name) if __name__ == '__main__': p=Process(target=task,args=('jame',)) p.daemon=True #要在p.start()以前设置,设置p为守护进程,禁止p建立子进程,而且父进程代码执行结束,p也要终止运行。 p.start() print('主进程')
# @Time : 2018/9/4 9:45 # @Author : Jame #主进程代码运行结束,守护进程就会结束! from multiprocessing import Process import time def foo(): print(123) time.sleep(1) print('end123') def bar(): print(456) time.sleep(3) print('end456') if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print('main----------------------') # 打印该行则主进程代码结束,则守护进程p1应该被终止。 # 可能会有p1任务执行的打印信息123,由于主进程打印main----时,p1也执行了,可是随即被终止!
进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件或者同一个打印终端是没问题的.
而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理!
1.多进程共享同一个打印终端
#并发运行,效率高,可是竞争同一个打印终端,带来了打印错乱
# @Time : 2018/9/4 9:57 # @Author : Jame #并发运行,效率高,可是竞争同一个打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running'%os.getpid()) time.sleep(2) print('%s is done'%os.getpid()) if __name__ == '__main__': for i in range(3): p=Process(target=work) p.start() ''' 4896 is running 7876 is running 7712 is running 4896 is done 7876 is done 7712 is done '''
#由并发变成了串行,牺牲了运行效率,可是避免了竞争
# @Time : 2018/9/4 10:05 # @Author : Jame #由并发变成了串行,牺牲了运行效率,可是避免了竞争 from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() #加锁 print('%s is running'%os.getpid()) time.sleep(1) print('%s is done'%os.getpid()) lock.release() #释放锁 if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,)) p.start() ''' 4928 is running 4928 is done 9284 is running 9284 is done 4412 is running 4412 is done '''
2.多进程共享同一文件
文件当数据库,模拟抢票
1):并发运行,并发效率高,可是竞争写入统一文件,数据写入错乱
# @Time : 2018/9/4 10:13 # @Author : Jame from multiprocessing import Process,Lock import time,json,random #文件db的内容:{"count":1} #注意必定要用双引号,否则json没法识别 def search(): dic=json.load(open('db.txt')) print('\033[43m剩余票数%s\033[0m' %dic['count']) def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模拟读取数据的网络延迟 if dic['count']>0: dic['count']-=1 time.sleep(0.2) #模拟写数据的网络延迟 json.dump(dic,open('db.txt','w')) print('\003[43m购票成功\003[0m') def task(lock): search() get() if __name__ == '__main__': lock=Lock() for i in range(100): #模拟100个并发的客户端抢票 p=Process(target=task,args=(lock,)) p.start()
2):加锁:购票行为变成了串行,牺牲了运行效率,可是保证了数据的安全。
#文件db的内容为:{"count":1} #注意json的格式必定是双引号,否则没法识别. from multiprocessing import Process,Lock import time,json,random #1.查询剩余票数 def search(): dic=json.load(open('db2.txt')) print('\033[43m剩余票数%s\033[0m' %dic['count']) #2.模拟抢票 def get(): dic=json.load(open('db2.txt')) time.sleep(0.1) #模拟读取数据的网络延迟 if dic['count']>0: dic['count']-=1 time.sleep(0.2) #模拟写数据的网络延迟 json.dump(dic,open('db2.txt','w')) print('\003[43m购票成功\003[0m') def task(lock): search() lock.acquire() #加锁处理 get() lock.release() #释放锁处理 if __name__ == '__main__': lock=Lock() for i in range(100): #模拟100个并发的客户端抢票 p=Process(target=task,args=(lock,)) p.start()
总结:
1.加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,便是串行的修改,速度慢了,可是牺牲了速度保证了数据的安全性和一致性。
虽然能够用文件共享数据实现进程间通讯,可是问题是:
*效率低(共享数据基于文件,而文件是硬盘上的数据)
*须要本身加锁处理
2.所以,multiprocessing为咱们提供了基于消息的IPC通讯机制:队列和管道。
我应该尽可能避免使用共享数据,尽量的使用消息传递和队列,避免处理复杂的同步和问题,并且在进程数目增多时,每每能够得到更好的可扩展性。
*效率高(队列和管道都是将数据放在内存中的)
*帮咱们处理好锁的问题(队列又是基于管道+锁实现的,可让咱们从复杂的锁问题中解放出来)
进程间是彼此隔离的,要实现进程间通讯(IPC),multiprocessing模块支持2种形式:队列和管道,这2中方式都是使用消息传递的。
1.建立队列的类(底层就是以管道和锁定的方式实现的):
1 Queue([maxsize]):建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。
2.参数介绍
maxsize 是队列中容许最大项数,省略则无大小限制。
3.方法介绍
3.1 主要方法:
def put(self, obj, block=True, timeout=None): ''' 做用:能够插入数据队列中。 参数及做用:2个可选参数:blocked和timeout。 若是block=True(默认参数),而且timeout为正值,则该方法会阻塞timeout指定的时间,直到该队列有剩余空间。 若是超时会抛出Queue.Full异常。 若是block=False,可是该Queue已满,会马上抛出Queue.Full异常。 ''' def get(self, block=True, timeout=None): ''' 做用:从队列读取并删除一个元素。 参数及做用:2个可选参数:blocked和timeout。 若是block=True(默认参数),而且timeout为正值,那么在等待时间没有取到任何元素,会抛出Queue.Full异常。 若是block=False,有两种状况存在,若是Queue有一个值可取,则当即返回该值,不然,若是队列为空,会马上抛出Queue.Full异常。 ''' def put_nowait(self, obj): ''' 同put(False) 做用:能够插入数据队列中。 ''' def get_nowait(self): ''' 同get(False) 做用:从队列读取并删除一个元素。 ''' def empty(self): ''' 做用:调用此方法为空时,返回True。 能否可靠:否! 好比在返回True的过程当中,若是队列中又加入了项目。 ''' def full(self): ''' 做用:调用此方法已满,返回True。 能否可靠:否! 好比在返回True的过程当中,若是队列中的项目被取走。 ''' def qsize(self): ''' 做用:返回队列中目前的正确数量。 能否可靠:否! 好比在返回True的过程当中,若是队列中又加入了项目,或者项目被取走等。 '''
3.2 其余方法(了解)
def cancel_join_thread(self): ''' 做用:不会在进程退出时,自动链接后台线程。 能够防止join_thread()方法阻塞。 ''' def close(self): ''' 做用:关闭队列,防止队列中加入更多数据。 调用此方法,后台线程将继续写入那些已经入队列但还没有写入数据,而后将在此方法完成后立刻关闭。 若是q被垃圾收集,将调用此方法。 关闭队列不会在队列使用者中产生任何类型的数据结束信号或者异常。 例如:若是某个使用者正在被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误操做。 ''' def join_thread(self): ''' 做用:链接队列的后台线程。 '''
4.队列应用
4.1 简单实例
# @Time : 2018/9/4 11:24 # @Author : Jame from multiprocessing import Queue ''' multiprocessing 模块支持进程间通讯的两种形式:管道和队列 都是基于消息传递实现的,可是队列接口 ''' q=Queue(3) #put get put_nowait get_nowait full empty q.put(3) q.put(3) q.put(3) print(q.full()) #True print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #True
4.2.生产者消费者模型
在并发编程中使用生产者和消费者模式可以解决绝大数并发问题。该模式经过平衡 生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。
4.2.1.为何要生产者 和 消费者?
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。
在线程开发过程当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等消费者处理完,才能继续生产数据。
同理,若是消费者的处理能力大于生产者,那么消费者必须等待生产者。为了解决这个问题因而就引入了生者产和消费者模式。
4.2.2 什么是生产者消费者模式?
生产者消费者模式就是经过一个容器来解决生产和消费者的强耦合问题。
生产者和消费者彼此之间不直接通信,而是经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列
里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。
1):基于队列实现生产者消费者模型!
# @Time : 2018/9/4 14:45 # @Author : Jame from multiprocessing import Process,Queue import time,random,os #1消费者 def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m'%(os.getpid(),res)) #2 生产者 def producer(q): for i in range(5): time.sleep(random.randint(1,3)) res='包子%s'%i q.put(res) print('\033[44m%s 生产了 %s\033[0m'%(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:厨师 p1=Process(target=producer,args=(q,)) #消费者:吃货顾客 c1=Process(target=consumer,args=(q,)) #开始运转 p1.start() c1.start() print('主进程.....')
可是这里的主进程永远不会结束,缘由是:生产者p在生产完后就结束了,可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。如何解决,请看下一步。
2):让生产者在生产完后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环。
# @Time : 2018/9/4 14:45 # @Author : Jame from multiprocessing import Process,Queue import time,random,os #1消费者 def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m'%(os.getpid(),res)) #2 生产者 def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s'%i q.put(res) print('\033[44m%s 生产了 %s\033[0m'%(os.getpid(),res)) q.put(None) if __name__ == '__main__': q=Queue() #生产者们:厨师 p1=Process(target=producer,args=(q,)) #消费者:吃货顾客 c1=Process(target=consumer,args=(q,)) #开始运转 p1.start() c1.start() print('主进程.....')
注意:这里的结束信号None,不必定要由生产者发,主进程里一样能够发,可是主进程须要等生产者结束后才应该发送该信号。
# @Time : 2018/9/4 14:45 # @Author : Jame from multiprocessing import Process,Queue import time,random,os #1消费者 def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m'%(os.getpid(),res)) #2 生产者 def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s'%i q.put(res) print('\033[44m%s 生产了 %s\033[0m'%(os.getpid(),res)) #q.put(None) if __name__ == '__main__': q=Queue() #生产者们:厨师 p1=Process(target=producer,args=(q,)) #消费者:吃货顾客 c1=Process(target=consumer,args=(q,)) #开始运转 p1.start() c1.start() p1.join() q.put(None) #发送结束信号 print('主进程.....')
注意:若是有多个生产者和消费者的时候,咱们则须要发送多个结束信号None给消费者。(方式比较Low,了解一下)
# @Time : 2018/9/4 14:45 # @Author : Jame from multiprocessing import Process,Queue import time,random,os #1消费者 def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m'%(os.getpid(),res)) #2 生产者 def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s %s'%(name,i) q.put(res) print('\033[44m%s 生产了 %s\033[0m'%(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:厨师们 p1=Process(target=producer,args=('包子',q,)) p2=Process(target=producer,args=('骨头',q)) #消费者:吃货顾客们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) #开始运转 p1.start() p2.start() c1.start() c2.start() p1.join() #必须保证生产者所有生产完,才能发送结束信号 p2.join() q.put(None) #有几个消费者就应该发送几个结束信号None q.put(None) #发送结束信号 print('主进程.....')
最后,咱们无非就是发送结束信号而已,有另一种队列提供了这种机制。
JoinableQueue([maxsize]) :这就像一个Queue对象,但队列容许项目的使用者 ,通知生成者 项目已经被成功处理。
通知进程是使用共享的信息和条件变量来实现的。
参数:maxsize 是队列中容许最大项数,省略则无大小限制。
方法:
JoinableQueue 的实例p 除了与Queue对象相同的方法以外,还具备:
q.task_done() :使用者使用此方法发出信号,表示q.get() 的返回项目已经被处理。若是调用此方法的次数大于从队列中删除的项目的数量,将引起ValueError异常。
q.join() :生产者调用此方法进行阻塞,知道队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目调用q.task_done()方法为止。
应用:
#主进程等--->p1,p2等---->c1,c2 #p1,p2结束了,证实c1,c2确定全都收完了p1,p2发到队列的数据
#于是c1,c2也没有存在的价值了,应该随着主进程的结束而结束,因此设置成守护进程。
# @Time : 2018/9/4 14:45 # @Author : Jame from multiprocessing import Process,Queue,JoinableQueue import time,random,os #1消费者 def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m'%(os.getpid(),res)) q.task_done() #向q.join()发送一次信号,证实一个数据已经被取走了! #2 生产者 def producer(name,q): for i in range(5): time.sleep(random.randint(1,3)) res='%s %s'%(name,i) q.put(res) print('\033[44m%s 生产了 %s\033[0m'%(os.getpid(),res)) q.join() if __name__ == '__main__': #q=Queue() q=JoinableQueue() #生产者们:厨师们 p1=Process(target=producer,args=('包子',q,)) p2=Process(target=producer,args=('骨头',q)) #消费者:吃货顾客们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True c2.daemon=True #开始运转 ''' p1.start() p2.start() c1.start() c2.start() ''' p_l=[p1,p2,c1,c2] for p in p_l: p.start() p1.join() p2.join() print('主进程.....')
总结:生产者消费者模型
#程序中有两类角色
一类负责生产数据(生产者)
一类负责处理数据(消费者)
#引入生产者消费者模型为了解决的问题是
平衡生产者和消费者之间的工做能力,从而提升程序总体处理数据的速度。
#如何实现:
生产者<------>队列<------>消费者
生产者消费者模型实现类程序的解耦合
进程间通讯IPC方式二:管道(不推荐使用,了解便可)
1.管道的类,参数,方法介绍
1.建立管道的类:
Pipe([duplex]):在进程之间建立一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的链接对象。
强调一点:必须在产生Process对象以前产生管道。
2.参数介绍:
dumplex:默认管道是全双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送。
3.主要方法:
conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。
若是链接的另一端已经关闭,那么recv方法会抛出EOFError。
conn1.send(obj):经过链接发送对象。
obj是与序列化兼容的任意对象
#其余方法:
conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回链接使用的整数文件描述符
conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。
若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。
若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):
经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。
结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收
conn1.recv_bytes_into(buffer [, offset]):
接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。
2.基于管道实现进程间通讯(与队列的方式是相似的,队列就是管道加锁实现的)
# @Time : 2018/9/4 17:55 # @Author : Jame from multiprocessing import Process,Pipe import time,os def consumer(p,name): left,right=p left.close() while True: try: baozi=right.recv() print('%s 收到包子:%s'%(name,baozi)) except EOFError: right.close() break def producer(seq,p): left,right=p right.close() for i in seq: left.send(i) else: left.close() if __name__ == '__main__': left,right=Pipe() c1=Process(target=consumer,args=((left,right),'c1')) c1.start() seq=(i for i in range(10)) producer(seq,(left,right)) right.close() left.close() c1.join() print('主进程...')
注意:
生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。
若是忘记了执行这些步骤,程序可能会再消费中的recv()操做上挂起。
管道是由操做系统进行引用计数的,必须再全部进程中关闭管道后才能生产EOFError异常。
所以在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。
管道能够用于双向通讯,利用一般在客户端 // 服务端中使用情的请求 // 响应模型或者远程调用,就可使用管道编写与进程交互的程序:
# @Time : 2018/9/5 9:19 # @Author : Jame from multiprocessing import Process,Pipe import time,os def adder(p,name): server,client=p client.close() while True: try: x,y=server.recv() except EOFError: server.close() break res=x+y server.send(res) print('server done') if __name__ == '__main__': server,client=Pipe() c1=Process(target=adder,args=((server,client),'c1')) c1.start() server.close() client.send((10,20)) print(client.recv()) client.close() c1.join() print('主进程')
将来基于消息传递的并发编程是大势所趋,即使是使用线程,推荐的作法也是将程序设计为大量独立的线程集合。
经过消息队列交换数据,这样极大地减小了对使用锁定和其余同步手段的需求,还能够扩展到分布式系统中。
进程间通讯应该尽可能避免使用本节所讲的共享数据的方式。
# @Time : 2018/9/5 14:21 # @Author : Jame from multiprocessing import Manager,Process,Lock import os def work(dic,lock): dic['count']-=1 print(dic['count']) if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':10}) p_l=[] for i in range(10): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
进程间数据是独立的,能够借助于队列或管道实现通讯,两者都是基于消息传递的 虽然进程间数据独立,但能够经过Manager实现数据共享,事实上Manager的功能远不止于此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,
互斥锁 同时只容许一个线程更改数据,而Semaphore是同时容许必定数量的线程更改数据 ,好比厕全部3个坑,那最多只容许3我的上厕所,后面的人只能等里面有人出来了才能再进去,若是指定信号量为3,那么来一我的得到一把锁,计数加1,当计数等于3时,后面的人均须要等待。
一旦释放,就有人能够得到一把锁 信号量与进程池的概念很像,可是要区分开,信号量涉及到加锁的概念。
# @Time : 2018/9/5 14:49 # @Author : Jame from multiprocessing import Process,Semaphore import time,random def go_wc(sem,user): sem.acquire() start = time.time() print('%s 占到一个坑位...'%user) time.sleep(random.randint(0,3)) #模拟每一个人蹲坑的速度,因人而异... stop=time.time() print('=========================%s 占用时间:%s'%(user,(stop-start))) sem.release() if __name__ == '__main__': sem=Semaphore(2) p_l=[] for i in range(10): p=Process(target=go_wc,args=(sem,'user:%s'%i,)) p.start() p_l.append(p) for i in p_l: i.join() print('=================>>>>>>>>>>>>') ''' user:1 占到一个坑位... user:4 占到一个坑位... =========================user:4 占用时间:0.0 user:2 占到一个坑位... =========================user:2 占用时间:1.0000574588775635 user:3 占到一个坑位... =========================user:3 占用时间:0.0 user:0 占到一个坑位... =========================user:1 占用时间:2.0001144409179688 user:5 占到一个坑位... =========================user:0 占用时间:1.0000569820404053 user:7 占到一个坑位... =========================user:5 占用时间:1.0000574588775635 user:8 占到一个坑位... =========================user:8 占用时间:0.0 user:6 占到一个坑位... =========================user:6 占用时间:2.0001144409179688 user:9 占到一个坑位... =========================user:7 占用时间:3.000171422958374 =========================user:9 占用时间:1.0000569820404053 =================>>>>>>>>>>>> '''
python线程的事件用于主线程控制其余线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。
clear:将“Flag”设置为False set:将“Flag”设置为True
# @Time : 2018/9/5 15:08 # @Author : Jame from multiprocessing import Process,Event import time,random def car(e,n): while True: if not e.is_set(): #False print('\033[31m红灯亮\033[0m,car%s等着'%n) e.wait() print('\033[32m车%s 看见绿灯亮了\033[0m'%n) time.sleep(random.randint(1,3)) if not e.is_set(): continue print('走你,car',n) break def police_car(e,n): while True: if not e.is_set(): print('\033[31m红灯亮\033[0m,car%s等着'%n) e.wait() print('灯的是 %s,警车走了,car%s'%(e.is_set(),n)) break def traffic_lights(e,inverval): while True: time.sleep(inverval) if e.is_set(): e.clear() #e.is_set()--->False else: e.set() if __name__ == '__main__': e=Event() for i in range(5): p=Process(target=police_car,args=(e,i,)) p.start() t=Process(target=traffic_lights,args=(e,3)) t.start() print('主======================>>>>')
在利用python进行系统管理的时候,特别是同时操做多个文件目录,或者远程控制多台主机,而且操做能够节约大量时间。多进程是实现并发的手段之一,须要注意的问题:
0.1很明显须要并发执行的任务通畅要远大于核数;
0.2一个操做系统不可能无限开启进程,一般有几个核就开几个进程;
0.3进程开启过多,效率反而会降低(开启进程后须要占用系统资源的,并且开启多余核数目的进程也没法作到并行);
例如:当咱们操做对象的数目不太对的时候,能够直接利用multiprocessing中的Process动态生成多个进程,十几个还好,可是若是成百上千个....手动的去限制进程数量却又太过繁琐,
所以能够发挥进程池的功效。
咱们就能够经过维护一个进程池来控制进程数目,好比httpd的进程模式,规定最小进程数和最大进程数。
注意:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool能够提供指定数量的进程,供用户调用,当有新的请求提交到pool中时候,若是池子还没满,那么就应该建立一个新的进程
用来执行该请求;但若是池中的进程数量已经达到最大值了,那么该请求会等待,知道池中有进程结束,就会重用进程池中的进程。
1.建立进程池的类
若是指定numprocess 为3,则进程池会从无到有建立3个进程,而后自始自终的使用这个3个进程去执行全部任务,不会开启其余进程。
Pool(numprocess, processes=None, initializer=None, initargs(),maxtasksperchild=None, context=None)
2.参数介绍
numprocess #要建立的进程数,若是省略,则默认使用cpu_count()的值。
initializer #是每一个工做进程启动时要执行的可调用对象,默认None。
initargs #是要传给initializer的参数组
3.方法介绍
3.1主要方法
p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。须要强调的是:此操做并不会在全部池工做进程中并执行func函数。
若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。
此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做 中的结果。
p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用
3.2其余方法
apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法
obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起。
obj.ready():若是调用完成,返回True
obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常
obj.wait([timeout]):等待结果变为可用。
obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数
4.进程池应用
练习1:
# @Time : 2018/9/5 17:36 # @Author : Jame from multiprocessing import Pool import os,time def work(n): print('%s run'%os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #进程池建立3个进程,之后都是这3个进程执行任务。 res_l=[] for i in range(5): res=p.apply(work,args=(i,)) #同步调用,知道本次任务执行完毕拿到res。 # 等待任务work任务的过程当中可能有阻塞也可能没有阻塞。 # 可是无论任务是否存在阻塞,同步调用都会在原地等待,只是等待中如有任务发生了阻塞就会被夺去cpu的执行权限。 res_l.append(res) print(res_l)
# @Time : 2018/9/5 17:36 # @Author : Jame from multiprocessing import Pool import os,time def work(n): print('%s run'%os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #进程池建立3个进程,之后都是这3个进程执行任务。 res_l=[] for i in range(5): res=p.apply_async(work,args=(i,)) #若是使用异步提交任务,主进程须要使用join,等待进程池任务都处理完了,而后能够用get收集结果。 #不然,主进程结束,进程池还没来得及执行,也就跟着一块儿结束了。 res_l.append(res) p.close() p.join() for res in res_l: print(res.get()) #使用get来获取apply_async的结果,若是是apply,则没有get方法,由于apply时同步执行,马上后去结果,无需get
练习2:使用进程池维护固定数目的进程(重写练习1)
# @Time : 2018/9/6 9:18 # @Author : Jame import socket from multiprocessing import Pool import os server=socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): print('进程pid:%s'%os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(processes=2) while True: conn,client_addr=server.accept() p.apply_async(talk,args=(conn,client_addr)) #p.apply(talk,args=(conn,client_addr)) #通过测试,同步的时候同一时间只有一个客户端能够收到服务端的回复 #按照前后顺序,断开client1后,client2收到了服务端回复。 ''' 进程pid:8488 进程pid:9160 进程pid:8876 进程pid:9364 #Pool 内的进程数量默认是cpu 的核数,假设为4(查看方法os.cpu_count()) #开启6个客户端,会发现2个客户端处理等待状态 #每一个进程内查看Pid,会发现pid使用为4个,即多个客户端公用4个进程. 当把Pool中调整到p=Pool(processes=2),会发现客户端同一时间只有2个进程进来。 '''
# @Time : 2018/9/6 9:18 # @Author : Jame import socket client=socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>>:').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
# @Time : 2018/9/6 10:57 # @Author : Jame from multiprocessing import Process,Pool import time def func(msg): print('msg:',msg) time.sleep(1) return msg if __name__ == '__main__': pool=Pool(processes=3) #同一时间进程是数量是3个 res_l=[] for i in range(10): msg='hello %d'%i res=pool.apply_async(func,(msg,)) #异步执行 res_l.append(res) print('=====================>>>>>>>>>>>') pool.close() pool.join() # 调用join以前,先调用close函数,不然会出错,执行完close后不会有新的进程加入到pool中, # join函数等待全部子进程结束。 print(res_l) #这里和同步调用结果不同,看到的是一组对象列表。。。 #而非跟同步调用同样的结果列表,但这一步是join后执行的,证实结果已经计算完毕,剩下的事情就是调用每一个对象下的get方法获取结果! for i in res_l: print(i.get()) #使用get来获取apply_async的结果,若是是apply,则没有get方法,由于apply时同步执行的,马上获取结果根本不须要get。
# @Time : 2018/9/6 10:57 # @Author : Jame from multiprocessing import Process,Pool import time def func(msg): print('msg:',msg) time.sleep(1) return msg if __name__ == '__main__': pool=Pool(processes=3) res_l=[] for i in range(10): msg='hello %d'%i res=pool.apply(func,(msg,)) #同步执行,便是执行完一个拿到结果,再去执行另一个。 res_l.append(res) print('=====================>>>>>>>>>>>') pool.close() pool.join() #调用join以前,先调用close函数,不然会出错,执行完close后不会有新的进程加入到pool中, #join函数等待全部子进程结束。 print(res_l)#看到的就是最终的结果组成的列表。 #['hello 0', 'hello 1', 'hello 2', 'hello 3', 'hello 4', 'hello 5', 'hello 6', 'hello 7', 'hello 8', 'hello 9'] for i in res_l: #apply是同步的,全部就直接获得结果,没有get()方法!!! print(i)
5.回调函数
回调函数的场景:进程池中任何一个任务一旦处理完成,就当即告知主进程,我好了哦,你能够处理结果了。
主进程则调用一个函数去处理结果,该函数即回调函数。
咱们能够把耗时间(阻塞)的任务放到进程池中,而后指定回调函数(主进程负责执行),这样主进程在执行回调函数时候就省去了I/O的过程,直接拿到的是任务的结果。
# @Time : 2018/9/6 14:00 # @Author : Jame from multiprocessing import Pool import requests import json import os def get_page(url): print('<进程:%s> get %s'%(os.getpid(),url)) respone=requests.get(url) if respone.status_code==200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<进程:%s> parse %s '%(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n'%(res['url'],len(res['text'])) with open('db3.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.taobao.com', 'http://www.jd.com', 'http://www.sina.com.cn' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() #print([res.get() for res in res_l]) for res in res_l: print(res.get()) #这里拿到的是get_page的结果,其实彻底不必拿该结果,该结果已经传给了回调函数处理了。 ''' <进程:9488> get https://www.baidu.com <进程:8188> get https://www.python.org <进程:9440> get https://www.taobao.com <进程:9440> get http://www.jd.com <进程:6880> parse https://www.taobao.com <进程:9488> get http://www.sina.com.cn <进程:6880> parse https://www.baidu.com <进程:6880> parse https://www.python.org <进程:6880> parse http://www.jd.com <进程:6880> parse http://www.sina.com.cn {'url': 'https://www.baidu.com', {'url': 'https://www.python.org', {'url': 'https://www.taobao.com', {'url': 'http://www.jd.com', {'url': 'http://www.sina.com.cn', '''
# @Time : 2018/9/6 14:00 # @Author : Jame from multiprocessing import Pool import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code==200: return (response.text,pattern) def pasrse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1 = re.compile( r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<', re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, 'http://finance.sina.com.cn/stock/usstock/sector.shtml':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=pasrse_page) res_l.append(res) for i in res_l: print(i.get())
*若是在主进程中等待进程池全部任务执行完毕,再统一处理结果,则无序回调函数。
# @Time : 2018/9/6 15:32 # @Author : Jame from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() nums=[] for res in res_l: nums.append(res.get()) print(nums) #[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
6.进程池的其余实现方法