生产者指的是生产数据的任务,消费者指的是处理数据的任务,html
生产数据目的,是为了给消费者处理。编程
在并发编程中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。并发
import time def producer(): '''生产者是厨师''' for i in range(1,4): # 模拟生产三个数据 res = "包子%s" % i time.sleep(2) print("生产者生产%s" % res) consumer(res) def consumer(res): '''消费者吃包子''' time.sleep(1) print("消费者消费%s" % res) if __name__ == "__main__": producer() ''' 生产者生产包子1 消费者消费包子1 生产者生产包子2 消费者消费包子2 生产者生产包子3 消费者消费包子3 '''
生产者必须等消费者,消费者也必须等生产者!spa
生产者消费者模型是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。code
这个阻塞队列就是用来给生产者和消费者解耦的htm
经过队列blog
from multiprocessing import Process from multiprocessing import Queue import time def producer(q): for i in range(1,6): res = "包子%s" %i time.sleep(0.5) print("生产者生产%s" % res) # 如今不是给消费者了 放入数据 res 把包子放到队列 q.put(res) def consumer(q): while True: # 一直接取数据 res = q.get() time.sleep(1) print("消费者消费%s" % res) if __name__ == "__main__": # 容器 q = Queue() # 生产者们 # 须要传参数 把生产后数据的往队列里丢 p = Process(target=producer, args=(q,)) # 消费者们 c = Process(target=consumer, args=(q,)) p.start() c.start() print("主")
执行结果,不是生产一个消费一个,生成者与消费者互相不影响队列
主
生产者生产包子1
生产者生产包子2
生产者生产包子3
消费者消费包子1
生产者生产包子4
生产者生产包子5
消费者消费包子2
消费者消费包子3
消费者消费包子4
消费者消费包子5
缘由是:生产者p在生产完后就结束了,可是消费者c在取空了队列以后,则一直处于死循环中且卡在q.get()这一步。(生产者从队列取不到数据就卡住)进程
一旦队列取空了 会被锁住 就卡住了,程序卡在消费者ip
解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环
from multiprocessing import Process from multiprocessing import Queue import time def producer(q): for i in range(1,6): res = "包子%s" %i time.sleep(0.5) print("生产者生产%s" % res) # 如今不是给消费者了 放入数据 res 把包子放到队列 q.put(res) def consumer(q): while True: # 接取数据 res = q.get() # 消费者在最后一次收到None后退出 if res is None:break time.sleep(1) print("消费者消费%s" % res) if __name__ == "__main__": # 容器 q = Queue() # 生产者们 # 须要传参数 把生产后数据的往队列里丢 p = Process(target=producer, args=(q,)) # 消费者们 c = Process(target=consumer, args=(q,)) p.start() c.start() # 保证全部子进程执行完 主进程才能工做,否则一直阻塞 p.join() # 保证生产者把全部数据扔到队列后 而后发送结束信号 q.put(None) print("主") ''' 生产者生产包子1 生产者生产包子2 生产者生产包子3 消费者消费包子1 生产者生产包子4 生产者生产包子5 消费者消费包子2 主 消费者消费包子3 消费者消费包子4 消费者消费包子5 '''
若是有多个消费者 生产者时候
有几个消费者就须要发送几回结束信号
from multiprocessing import Process, Queue import time def producer(q): for i in range(5): res = "包子%s" %i time.sleep(0.5) print("生产者生产%s" % res) # 如今不是给消费者了 放入数据 res 把包子放到队列 q.put(res) def consumer(q): while True: # 接取数据 res = q.get() if res is None:break time.sleep(1) print("消费者消费%s" % res) if __name__ == "__main__": # 容器 q = Queue() # 生产者们 # 须要传参数 把生产后数据的往队列里丢 p1 = Process(target=producer, args=(q,)) p2 = Process(target=producer, args=(q,)) p3= Process(target=producer, args=(q,)) # 消费者们 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) p1.start() p2.start() p3.start() c1.start() c2.start() # 保证全部子进程执行完 主进程才能工做,否则一直阻塞 p1.join() p2.join() p3.join() # 发完数据后 发送结束信号 有几个消费者就应该来几个信号 q.put(None) q.put(None) print("主")
结束信号必定是跟在正常数据后面,保证全部消费者把正常数据取走之后,接下来取的是结束信号
结束信号应该在主进程里面确保全部的生产者,都生产完毕之后,才发送结束信号
其实咱们的思路无非是发送结束信号而已,有另一种队列提供了这种机制 能够点击这个文章