建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。python
Queue([maxsize])
建立共享的进程队列。
参数 :maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。安全
底层队列使用管道和锁定实现。服务器
q.get( [ block [ ,timeout ] ] )
:返回q中的一个项目。若是q为空,此方法将阻塞,直到队列中有项目可用为止。dom
q.put(item [, block [,timeout ] ] )
:将item放入队列。若是队列已满,此方法将阻塞至有空间可用为止。线程
q.close()
:关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但还没有写入的数据,但将在此方法完成时立刻关闭。code
q.empty()
:若是调用此方法时 q为空,返回True。继承
q.get_nowait()
:取值 没有值不等待直接报错队列
q.full()
:若是q已满,返回为True. 因为线程的存在,结果也多是不可靠的进程
若是其余进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。这时候empty,full,get_nowait取到结果和实际管道的结果就可能不一致了!ip
""" 队列:先进先出 堆栈:先进后出 """ from multiprocessing import Queue q = Queue(5) # 括号内能够传参数 表示的是这个队列的最大存储数 # 往队列中添加数据 q.put(1) q.put(2) # print(q.full()) # 判断队列是否满了 q.put(3) q.put(4) q.put(5) # print(q.full()) # q.put(6) # 当队列满了以后 再放入数据 不会报错 会原地等待 直到队列中有数据被取走(阻塞态) print(q.get()) print(q.get()) print(q.get()) print(q.empty()) # 判断队列中的数据是否取完 print(q.get()) print(q.get()) print(q.empty()) # print(q.get_nowait()) # 取值 没有值不等待直接报错 # print(q.get()) # 当队列中的数据被取完以后 再次获取 程序会阻塞 直到有人往队列中放入值 """ full get_nowait empty 都不适用于多进程的状况 """
from multiprocessing import Process,Queue def producer(q): q.put('hello GF~') def consumer(q): print(q.get()) if __name__ == '__main__': q = Queue() p = Process(target=producer,args=(q,)) c = Process(target=consumer, args=(q,)) p.start() c.start() """ 子进程放数据 主进程获取数据 两个子进程相互放 取数据 """
生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。
生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('%s 吃 %s' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('生产了 %s' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主')
此时的问题是主进程永远不会结束,缘由是:生产者p在生产完后就结束了,可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。
解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环。
由于数据没有,get()会一直阻塞,根据队列先进先出的原理,这样生产者在发完全部数据后发个标志数据,好比None,而消费者进行判断,当最后结果为None时候结束循环.
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('%s 吃 %s' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('生产了 %s' %(os.getpid(),res)) q.put(None) #发送结束信号 if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主')
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('%s 吃 %s' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('生产了 %s' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() p1.join() q.put(None) #发送结束信号 print('主')
注意:结束信号None,不必定要由生产者发,主进程里一样能够发,但主进程须要等生产者结束后才应该发送该信号。
建立可链接的共享进程队列。但队列容许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
本质就是继承了Queue,而后基础上添加了task_done和join方法.
q.task_done()
:使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。q.join()
:生产者将使用此方法进行阻塞,直到队列中全部项目均被处理。阻塞将持续到为队列中的每一个项目均调用q.task_done()方法为止。""" 生产者:生产/制造数据的 消费者:消费/处理数据的 例子:作包子的,买包子的 1.作包子远比买包子的多 2.作包子的远比包子的少 供需不平衡的问题 """ from multiprocessing import Process,Queue,JoinableQueue import random import time def producer(name,food,q): for i in range(10): data = '%s生产了%s%s'%(name,food,i) time.sleep(random.random()) q.put(data) print(data) def consumer(name,q): while True: data = q.get() if data == None:break print('%s吃了%s'%(name,data)) time.sleep(random.random()) q.task_done() # 告诉队列你已经从队列中取出了一个数据 而且处理完毕了 if __name__ == '__main__': q = JoinableQueue() p = Process(target=producer,args=('大厨egon','馒头',q)) p1 = Process(target=producer,args=('跟班tank','生蚝',q)) c = Process(target=consumer,args=('许兆龙',q)) c1 = Process(target=consumer,args=('吃货jerry',q)) p.start() p1.start() c.daemon = True c1.daemon = True c.start() c1.start() p.join() p1.join() q.join() # 等到队列中数据所有取出 # q.put(None) # q.put(None)
基于管道实现进程间通讯(与队列的方式是相似的,队列就是管道加锁实现的)
管道能够用于双向通讯,利用一般在客户端/服务器中使用的请求/响应模型或远程过程调用,就可使用管道编写与进程交互的程序