IPC(Inter-Process Communication)python
IPC机制:实现进程之间通信编程
管道:pipe 基于共享的内存空间安全
队列:pipe+锁的概念--->queue多线程
建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。并发
Queue([maxsize])
建立共享的进程队列。
参数 :maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。dom
底层队列使用管道和锁定实现。函数
是从队列里面取值而且把队列面的取出来的值删掉,没有参数的状况下就是是默认一直等着取值测试
就算是队列里面没有可取的值的时候,程序也不会结束,就会卡在哪里,一直等着操作系统
from multiprocessing import Queue q = Queue() # 生成一个队列对象 # put方法是往队列里面放值 q.put('Cecilia陈') q.put('xuchen') q.put('喜陈') # get方法是从队列里面取值 print(q.get()) print(q.get()) print(q.get()) q.put(5) q.put(6) print(q.get())
Cecilia陈
xuchen
喜陈
5线程
Queue加参数之后,参数是数值
参数实几就表示实例化的这个Queue队列能够放几个值
当队列已经满的时候,再放值,程序会阻塞,但不会结束
from multiprocessing import Queue q = Queue(3) q.put('Cecilia陈') q.put('xuchen') q.put('喜陈') print(q.full()) # 判断队列是否满了 返回的是True/False q.put(2) # 当队列已经满的时候,再放值,程序会阻塞,但不会结束
True 队列已经满了
q.put(self, obj, block=True, timeout=None)
self :put就至关因而Queue里的一个方法,这个时候q.put就至关因而队列对象q来调用对象的绑定方法,这个参数能够省略便可
obj:是咱们须要往队列里面放的值
block=True :队列若是满了的话,再往队列里放值的话会等待,程序不会结束
timeout=None:是再block这个参数的基础上的,当block的值为真的时候,timeout是用来等待多少秒,若是再这个时间里,队列一直是满的,那么程序就会报错并结束(Queue.Full异常)
from multiprocessing import Queue q = Queue(3) q.put('zhao',block=True,timeout=2) q.put('zhao',block=True,timeout=2) q.put('zhao',block=True,timeout=2) q.put('zhao',block=True,timeout=5) # 此时程序将对等待5秒之后报错了
q.get(self,block=True, timeout=None)
self :get就至关因而Queue里的一个方法,这个时候q.get就至关因而队列对象q来调用对象的绑定方法,这个参数能够省略便可
block=True :从队列q对象里面取值,若是娶不到值的话,程序不会结束
timeout=None:是再block这个参数的基础上的,当block的值为真的时候,timeout是用来等待多少秒,若是再这个时间里,get取不到队列里面的值的话,那么程序就会报错并结束(queue.Empty异常)
from multiprocessing import Queue q = Queue() q.put('Cecilia陈') print(q.get()) q.get(block=True,timeout=2) # 此时程序会等待2秒后,报错了,队列里面没有值了
1.put()的block=False
from multiprocessing import Queue q = Queue(2) q.put('Cecilia陈') q.put('喜陈') print(q.full()) q.put('xichen',block=False) # 队列已经满了,我不等待了,直接报错
2.get()的block=Flase
from multiprocessing import Queue q = Queue(2) q.put('Cecilia陈') q.put('喜陈') print(q.get()) print(q.get()) print(q.get(block=False)) # 队列已经没有值了,我不等待了,直接报错
1.put_nowait() 至关于bolok=False,队列满的时候,再放值的时候,程序不等待,不阻塞,直接报错
from multiprocessing import Queue q = Queue(2) q.put('Cecilia陈') q.put('喜陈') print(q.full()) q.put_nowait('xichen') # 程序不等待,不阻塞,直接报错
2.get_nowait() 至关于bolok=False,当队列里没有值的时候,再取值的时候,程序不等待,不阻塞,程序直接报错
from multiprocessing import Queue q = Queue(2) q.put('Cecilia陈') q.put('喜陈') print(q.get()) print(q.get()) print(q.full()) q.get_nowait()# 再取值的时候,程序不等待,不阻塞,程序直接报错
这个例子尚未加入进程通讯,只是先来看看队列为咱们提供的方法,以及这些方法的使用和现象。
''' multiprocessing模块支持进程间通讯的两种主要形式:管道和队列 都是基于消息传递实现的,可是队列接口 ''' from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) # q.put(3) # 若是队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。 # 若是队列中的数据一直不被取走,程序就会永远停在这里。 try: q.put_nowait(3) # 可使用put_nowait,若是队列满了不会阻塞,可是会由于队列满了而报错。 except: # 所以咱们能够用一个try语句来处理这个错误。这样程序不会一直阻塞下去,可是会丢掉这个消息。 print('队列已经满了') # 所以,咱们再放入数据以前,能够先看一下队列的状态,若是已经满了,就不继续put了。 print(q.full()) #满了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法同样,若是队列已经空了,那么继续取就会出现阻塞。 try: q.get_nowait(3) # 可使用get_nowait,若是队列满了不会阻塞,可是会由于没取到值而报错。 except: # 所以咱们能够用一个try语句来处理这个错误。这样程序不会一直阻塞下去。 print('队列已经空了') print(q.empty()) #空了
这是一个queue的简单应用,使用队列q对象调用get函数来取得队列中最早进入的数据。
from multiprocessing import Process, Queue def f(q,name,age): q.put(name,age) #调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。 if __name__ == '__main__': q = Queue() #建立一个Queue对象 p = Process(target=f, args=(q,'Cecilia陈',18)) #建立一个进程 p.start() print(q.get()) p.join()
['Cecilia陈', 18]
生产者: 生产数据的任务
消费者: 处理数据的任务
生产者--队列(盆)-->消费者
生产者能够不停的生产,达到了本身最大的生产效率,消费者能够不停的消费,也达到了本身最大的消费效率.
生产者消费者模型大大提升了生产者生产的效率和消费者消费的效率.
补充: queue不适合传大文件,通产传一些消息.
在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。
生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。
from multiprocessing import Queue,Process # 生产者 def producer(q,name,food): for i in range(3): print(f'{name}生产了{food}{i}') res = f'{food}{i}' q.put(res) # 消费者 def consumer(q,name): while True: res = q.get(timeout=5) print(f'{name}吃了{res}') if __name__ == '__main__': q = Queue() # 为的是让生产者和消费者使用同一个队列,使用同一个队列进行通信 p1 = Process(target=producer,args=(q,'Cecilia陈','巧克力')) c1 = Process(target=consumer,args=(q,'Tom')) p1.start() c1.start()
此时的问题是主进程永远不会结束,缘由是:生产者p在生产完后就结束了,可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。
解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环。
注意:结束信号None,不必定要由生产者发,主进程里一样能够发,但主进程须要等生产者结束后才应该发送该信号
from multiprocessing import Queue,Process def producer(q,name,food): for i in range(3): print(f'{name}生产了{food}{i}') res = f'{food}{i}' q.put(res) q.put(None) # 当生产者结束生产的的时候,咱们再队列的最后再作一个表示,告诉消费者,生产者已经不生产了,让消费者不要再去队列里拿东西了 def consumer(q,name): while True: res = q.get(timeout=5) if res == None:break # 判断队列拿出的是否是生产者放的结束生产的标识,若是是则不取,直接退出,结束程序 print(f'{name}吃了{res}') if __name__ == '__main__': q = Queue() # 为的是让生产者和消费者使用同一个队列,使用同一个队列进行通信 p1 = Process(target=producer,args=(q,'Cecilia陈','巧克力')) c1 = Process(target=consumer,args=(q,'Tom')) p1.start() c1.start()
使用这个方法的话,是很low的,有几个消费者就要在主进程中向队列中put几个结束信号
from multiprocessing import Queue,Process import time,random def producer(q,name,food): for i in range(3): print(f'{name}生产了{food}{i}') time.sleep((random.randint(1,3))) res = f'{food}{i}' q.put(res) # q.put(None) # 当生产者结束生产的的时候,咱们再队列的最后再作一个表示,告诉消费者,生产者已经不生产了,让消费者不要再去队列里拿东西了 def consumer(q,name): while True: res = q.get(timeout=5) if res == None:break # 判断队列拿出的是否是生产者放的结束生产的标识,若是是则不取,直接退出,结束程序 time.sleep((random.randint(1, 3))) print(f'{name}吃了{res}') if __name__ == '__main__': q = Queue() # 为的是让生产者和消费者使用同一个队列,使用同一个队列进行通信 # 多个生产者进程 p1 = Process(target=producer,args=(q,'Cecilia陈','巧克力')) p2 = Process(target=producer,args=(q,'xichen','冰激凌')) p3 = Process(target=producer,args=(q,'喜陈','可乐')) # 多个消费者进程 c1 = Process(target=consumer,args=(q,'Tom')) c2 = Process(target=consumer,args=(q,'jack')) # 告诉操做系统启动生产者进程 p1.start() p2.start() p3.start() # 告诉操做系统启动消费者进程 c1.start() c2.start() p1.join() p2.join() p3.join() q.put(None) # 几个消费者put几回 q.put(None)
建立可链接的共享进程队列。这就像是一个Queue对象,但队列容许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
JoinableQueue的实例p除了与Queue对象相同的方法以外,还具备如下方法:
q.task_done()
:使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。若是调用此方法的次数大于从队列中删除的项目数量,将引起ValueError异常。
q.join()
:生产者将使用此方法进行阻塞,直到队列中全部项目均被处理。阻塞将持续到为队列中的每一个项目均调用q.task_done()方法为止。
from multiprocessing import Queue,Process,JoinableQueue import time,random def producer(q,name,food): for i in range(3): print(f'{name}生产了{food}{i}') # time.sleep((random.randint(1,3))) res = f'{food}{i}' q.put(res) # q.put(None) # 当生产者结束生产的的时候,咱们再队列的最后再作一个表示,告诉消费者,生产者已经不生产了,让消费者不要再去队列里拿东西了 q.join() def consumer(q,name): while True: res = q.get(timeout=5) # if res == None:break # 判断队列拿出的是否是生产者放的结束生产的标识,若是是则不取,直接退出,结束程序 # time.sleep((random.randint(1, 3))) print(f'{name}吃了{res}') q.task_done()#向q.join()发送一次信号,证实一个数据已经被取走了 if __name__ == '__main__': q = JoinableQueue() # 为的是让生产者和消费者使用同一个队列,使用同一个队列进行通信 # 多个生产者进程 p1 = Process(target=producer,args=(q,'Cecilia陈','巧克力')) p2 = Process(target=producer,args=(q,'xichen','冰激凌')) p3 = Process(target=producer,args=(q,'喜陈','可乐')) # 多个消费者进程 c1 = Process(target=consumer,args=(q,'Tom')) c2 = Process(target=consumer,args=(q,'jack')) # 告诉操做系统启动生产者进程 p1.start() p2.start() p3.start() # 把生产者设为守护进程 c1.daemon = True c2.daemon = True # 告诉操做系统启动消费者进程 c1.start() c2.start() p1.join() p2.join() p3.join() # 等待生产者生产完毕 print('主进程') ### 分析 # 生产者生产完毕--这是主进程最后一行代码结束--q.join()消费者已经取干净了,没有存在的意义了 # 这是主进程最后一行代码结束,消费者已经取干净了,没有存在的意义了.守护进程的概念.
from multiprocessing import Process,Queue,JoinableQueue q = JoinableQueue() q.put('zhao') # 放队列里一个任务 q.put('qian') print(q.get()) q.task_done() # 完成了一次任务 print(q.get()) q.task_done() # 完成了一次任务 q.join() #计数器不为0的时候 阻塞等待计数器为0后经过 # 想象成一个计数器 :put +1 task_done -1