进程间通讯html
进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。python
进程队列queuegit
不一样于线程queue,进程queue的生成是用multiprocessing模块生成的。github
在生成子进程的时候,会将代码拷贝到子进程中执行一遍,及子进程拥有和主进程内容同样的不一样的名称空间。网络
示例1:并发
1 import multiprocessing 2 def foo(): 3 q.put([11,'hello',True]) 4 print(q.qsize()) 5 6 q=multiprocessing.Queue() #全局定义一个q进程队列,在产生子进程时候会在子进程里生成,能够指定最大数,限制队列长度 7 if __name__ == '__main__': 8 p=multiprocessing.Process(target=foo,args=()) #由于名称空间不一样,子进程的主线程建立的q队列,主进程get不到,因此会阻塞住 9 p.start() 10 # foo() #主进程执行一下函数就能够访问到了 11 print(q.get())
示例2:app
1 import multiprocessing 2 3 def foo(): 4 q.put([11,'hello',True]) 5 print(q.qsize()) 6 7 if __name__ == '__main__': 8 q = multiprocessing.Queue() #主进程建立一个q进程队列 9 p=multiprocessing.Process(target=foo,args=()) #由于名称空间不一样,子进程的主线程找不到q队列,因此会报错提示没有q 10 p.start() 11 print(q.get())
示例3:框架
1 import multiprocessing 2 3 def foo(argument): #定义函数处理进程队列 4 argument.put([11,'hello',True]) 5 print(argument.qsize()) 6 q = multiprocessing.Queue() #全局定义一个进程队列 7 print('test') 8 9 if __name__ == '__main__': 10 x = multiprocessing.Queue() #主进程定义一个进程队列 11 p=multiprocessing.Process(target=foo,args=(x,)) #主进程把值传给子进程就能够处理了 12 p.start() 13 print(x.get()) 14 # foo(q) 15 # print(q.get())
经常使用方法异步
q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常。 q.get方法能够从队列读取而且删除一个元素。一样,get方法有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常. q.get_nowait():同q.get(False) q.put_nowait():同q.put(False) q.empty():调用此方法时q为空则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中又加入了项目。 q.full():调用此方法时q已满则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中的项目被取走。 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()同样
其余方法socket
q.cancel_join_thread():不会在进程退出时自动链接后台线程。能够防止join_thread()方法阻塞 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,若是某个使用者正在被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误。 q.join_thread():链接队列的后台线程。此方法用于在调用q.close()方法以后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread方法能够禁止这种行为
另外一个建立进程队列的类
http://www.cnblogs.com/zero527/p/7211909.html
管道pipe
管道就是管道,就像生活中的管道,两头都能进能出
默认管道是全双工的,若是建立管道的时候映射成False,左边只能用于接收,右边只能用于发送,相似于单行道
最简单的管道双向通讯示例:
1 import multiprocessing 2 3 def foo(sk): 4 sk.send('hello world') 5 print(sk.recv()) 6 7 if __name__ == '__main__': 8 conn1,conn2=multiprocessing.Pipe() #开辟两个口,都是能进能出,括号中若是False即单向通讯 9 p=multiprocessing.Process(target=foo,args=(conn1,)) #子进程使用sock口,调用foo函数 10 p.start() 11 print(conn2.recv()) #主进程使用conn口接收 12 conn2.send('hi son') #主进程使用conn口发送
经常使用方法
conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另一端已经关闭,那么recv方法会抛出EOFError。 conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象
注意:send()和recv()方法使用pickle模块对对象进行序列化
其余方法
conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法,不用的时候两边都要close conn1.fileno():返回链接使用的整数文件描述符 conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。 conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。
注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。若是忘记执行这些步骤,程序可能再消费者中的recv()操做上挂起。管道是由操做系统进行引用计数的,必须在全部进程中关闭管道后才能生产EOFError异常。所以在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。
1 from multiprocessing import Process,Pipe 2 3 import time,os 4 def consumer(p,name): 5 left,right=p 6 left.close() 7 while True: 8 try: 9 baozi=right.recv() 10 print('%s 收到包子:%s' %(name,baozi)) 11 except EOFError: 12 right.close() 13 break 14 def producer(seq,p): 15 left,right=p 16 right.close() 17 for i in seq: 18 left.send(i) 19 # time.sleep(1) 20 else: 21 left.close() 22 if __name__ == '__main__': 23 left,right=Pipe() 24 c1=Process(target=consumer,args=((left,right),'c1')) 25 c1.start() 26 seq=(i for i in range(10)) 27 producer(seq,(left,right)) 28 right.close() 29 left.close() 30 c1.join() 31 print('主进程') 32 33 生产者消费者关闭某端点
共享数据manage
Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另外一个进程的数据。
注:进程间通讯应该尽可能避免使用共享数据的方式
共享数据:列表
1 from multiprocessing import Manager,Process 2 def foo(l,i): 3 l.append(i**i) 4 if __name__ == '__main__': 5 man=Manager() 6 ml=man.list([11,22,33]) 7 l=[] 8 for i in range(5): 9 p=Process(target=foo,args=(ml,i)) 10 p.start() 11 l.append(p) 12 for i in l: #必需要join,否则会执行报错,处理一个数据必需要一个个来,不能同时处理一个数据 13 i.join() 14 print(ml)
共享数据:字典
1 from multiprocessing import Manager,Process 2 def foo(d,k,v): 3 d[k]=v 4 if __name__ == '__main__': 5 man=Manager() 6 md=man.dict({'name':'bob'}) 7 l=[] 8 for i in range(5): 9 p=Process(target=foo,args=(md,i,'a')) 10 p.start() 11 l.append(p) 12 for i in l: #必需要join,否则会执行报错,处理一个数据必需要一个个来,不能同时处理一个数据 13 i.join() 14 print(md)
进程池
开多进程是为了并发,一般有几个cpu核心就开几个进程,可是进程开多了会影响效率,主要体如今切换的开销,因此引入进程池限制进程的数量。
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
示例:
1 from multiprocessing import Pool 2 import time 3 4 def foo(n): 5 print(n) 6 time.sleep(1) 7 8 if __name__ == '__main__': 9 pool_obj=Pool(5) # 10 for i in range(47): 11 # pool_obj.apply_async(func=foo,args=(i,)) 12 pool_obj.apply(func=foo,args=(i,)) #子进程的生成是靠进程池对象维护的 13 # apply同步,子进程一个个执行 14 # apply_async异步,多个子进程一块儿执行 15 pool_obj.close() 16 pool_obj.join() 17 print('ending')
经常使用方法:
pool_obj.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async() pool_obj.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。 pool_obj.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成 pool_obj.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用
其余方法:
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法 obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起。 obj.ready():若是调用完成,返回True obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常 obj.wait([timeout]):等待结果变为可用。 obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数
协程
协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。
一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序本身控制调度的。
协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时,就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
注意:
1. python的线程属于内核级别的,即由操做系统控制调度(如单线程一旦遇到io就被迫交出cpu执行权限,切换其余线程运行)
2. 单线程内开启协程,一旦遇到io,从应用程序级别(而非操做系统)控制切换
协程优势:
1. 协程的切换开销更小,属于程序级别的切换,操做系统彻底感知不到,于是更加轻量级
2. 单线程内就能够实现并发的效果,最大限度地利用cpu
协程缺点:
1.协程的本质是单线程下,没法利用多核,能够是一个程序开启多个进程,每一个进程内开启多个线程,每一个线程内开启协程
2.协程指的是单个线程,于是一旦协程出现阻塞,将会阻塞整个线程
yield实现协程并发
1 import time 2 def consumer(): 3 r='' 4 while True: 5 n=yield r 6 if not n: 7 return 8 print('[CONSUMER] ←← Consuming %s...' % n) 9 time.sleep(1) 10 r='200 Ok' 11 12 def produce(c): 13 next(c) #1.启动生成器 14 n=0 15 while n < 5: 16 n=n+1 17 print('[PRODUCER] →→ Producing %s...' % n) 18 cr=c.send(n) 19 #2.将n传入到consumer的对象,yield接收到传入值开始执行代码,遇到yield执行代码返回r的值 20 print('[PRODUCER] Consumer return: %s' % cr) 21 #3.produce没有值了,关闭整个过程 22 c.close() 23 24 if __name__ == '__main__': 25 c=consumer() #生成生成器对象 26 produce(c) #执行调用
greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操做进行恢复为止。可使用一个调度器循环在一组生成器函数之间协做多个任务。greentlet是python中实现咱们所谓的"Coroutine(协程)"的一个基础库。
示例1:
1 from greenlet import greenlet 2 def foo(): 3 print('ok1') 4 g2.switch() #阻断 5 print('ok3') 6 g2.switch() 7 def bar(): 8 print('ok2') 9 g1.switch() 10 print('ok4') 11 12 g1=greenlet(foo) #生成foo函数的greenlet对象 13 g2=greenlet(bar) #生成bar函数的greenlet对象 14 g1.switch() #一、执行g1对象,打印ok1 15 #二、遇到g2.switch(),转到g2执行打印ok2 16 #三、遇到g1.switch(),转到g1的阻断处继续执行打印ok3 17 #四、遇到g2.switch(),转到g2执行打印ok4
示例2:
1 def eat(name): 2 print('%s eat food 1' %name) 3 gr2.switch('bob') 4 print('%s eat food 2' %name) 5 gr2.switch() 6 def play_phone(name): 7 print('%s play 1' %name) 8 gr1.switch() 9 print('%s play 2' %name) 10 11 gr1=greenlet(eat) 12 gr2=greenlet(play_phone) 13 gr1.switch(name='natasha')#能够在第一次switch时传入参数,之后都不须要
这种方法不会节省时间,由于不是io操做,而greenlet遇到io操做不会跳转,仍然要io阻断
基于greenlet框架的高级库gevent模块
gevent是第三方库,经过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操做时,好比访问网络,就自动切换到其余的greenlet,等到IO操做完成,再在适当的时候切换回来继续执行。因为IO操做很是耗时,常常使程序处于等待状态,有了gevent为咱们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
因为切换是在IO操做时自动完成,因此gevent须要修改Python自带的一些标准库,这一过程在启动时经过monkey patch完成:
简单示例:
1 import gevent 2 def foo(): 3 print('ok1') 4 gevent.sleep(4) #模拟io操做 5 print('ok3') 6 def bar(): 7 print('ok2') 8 gevent.sleep(2) 9 print('ok4') 10 11 g1=gevent.spawn(foo) 12 g2=gevent.spawn(bar) 13 gevent.joinall([g1,g2]) #所有阻塞,或者单独一个个join
spawn括号内第一个参数是函数名,如foo,后面能够有多个参数,能够是位置实参或关键字实参,都是传给函数foo的
注意:
gevent.sleep(4)模拟的是gevent能够识别的io阻塞,
而time.sleep(2)或其余的阻塞,gevent是不能直接识别的须要用下面一行代码,打补丁,就能够识别了
1 #补丁 2 from gevent import monkey 3 monkey.patch_all()
必须放到被打补丁者的前面,如time,socket模块以前
或者咱们干脆记忆成:要用gevent,须要将补丁放到文件的开头
爬虫示例:
1 from gevent import monkey;monkey.patch_all() 2 import gevent 3 import requests 4 import time 5 6 def get_page(url): 7 print('GET: %s' %url) 8 response=requests.get(url) 9 if response.status_code == 200: 10 print('%d bytes received from %s' %(len(response.text),url)) 11 12 13 start_time=time.time() 14 gevent.joinall([ 15 gevent.spawn(get_page,'https://www.python.org/'), 16 gevent.spawn(get_page,'https://www.yahoo.com/'), 17 gevent.spawn(get_page,'https://github.com/'), 18 ]) 19 stop_time=time.time() 20 print('run time is %s' %(stop_time-start_time))