目录python
是一个解耦合的过程
补充: queue不适合传大文件,通产传一些消息.编程
生产者: 生产数据的任务多线程
消费者: 处理数据的任务
生产者能够不停的生产,达到了本身最大的生产效率,消费者能够不停的消费,也达到了本身最大的消费效率.生产者消费者模型大大提升了生产者生产的效率和消费者消费的效率.并发
生产者--队列(盆)-->消费者dom
在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。线程
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。code
生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。对象
from multiprocessing import Process,Queue def producer(q,name,food): """生产者""" for i in range(1,11): print(f'{name}生产{food}{i}') res = f'{food}{i}' q.put(res) q.put(None) def consumer(q,name): """消费者""" while True: res = q.get(timeout=3) if res is None:break print(f'{name}吃了{res}') if __name__ == '__main__': q = Queue() p1 = Process(target=producer,args=(q,'ocean','包子')) c1 = Process(target=consumer,args=(q,'chen')) p1.start() c1.start() ##################### ocean生产包子1 ocean生产包子2 ocean生产包子3 ocean生产包子4 ocean生产包子5 ocean生产包子6 ocean生产包子7 ocean生产包子8 ocean生产包子9 ocean生产包子10 chen吃了包子1 chen吃了包子2 chen吃了包子3 chen吃了包子4 chen吃了包子5 chen吃了包子6 chen吃了包子7 chen吃了包子8 chen吃了包子9 chen吃了包子10
from multiprocessing import Process,Queue import time,random def producer(q,name,food): """生产者""" for i in range(1,4): print(f'{name}生产了{food}{i}') time.sleep(random.randint(1,3)) res = f'{food}{i}' q.put(res) def consumer(q,name): """消费者""" while True: res = q.get(timeout=3) if res is None:break time.sleep(random.randint(1,3)) print(f'{name}吃了{res}') if __name__ == '__main__': q = Queue() p1 = Process(target=producer,args=(q,'ocean','包子')) p2 = Process(target=producer,args=(q,'sky','韭菜')) p3 = Process(target=producer,args=(q,'rocky','酒')) c1 = Process(target=consumer,args=(q,'nick')) c2 = Process(target=consumer,args=(q,'mac')) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join()#生产者生产完毕 q.put(None)#存在几个消费者,put几回None q.put(None) # for i in range(2): # q.put(None) ############################# ocean生产了包子1 sky生产了韭菜1 rocky生产了酒1 ocean生产了包子2 sky生产了韭菜2 ocean生产了包子3 sky生产了韭菜3 mac吃了韭菜1 rocky生产了酒2 nick吃了包子1 mac吃了包子2 rocky生产了酒3 nick吃了韭菜2 mac吃了酒1 nick吃了包子3 nick吃了酒2 mac吃了韭菜3 nick吃了酒3
建立可链接的共享进程队列。这就像是一个Queue对象,但队列容许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。队列
q.task_done()
:使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。若是调用此方法的次数大于从队列中删除的项目数量,将引起ValueError异常。进程
q.join()
:生产者将使用此方法进行阻塞,直到队列中全部项目均被处理。阻塞将持续到为队列中的每一个项目均调用q.task_done()方法为止。
from multiprocessing import JoinableQueue #用法和Queue类似 q = JoinableQueue() q.put('ocean')#队列放入一个任务,内存在一个计数机制,+1 q.put('can')#计数机制 +1 print(q.get()) q.task_done()#完成一次任务,计数机制-1 print(q.get()) q.task_done()#计数机制 -1 q.join()#计数机制不为0的时候,阻塞等待计数器为0后经过 ################## ocean can
from multiprocessing import Process, JoinableQueue import time, random def producer(q, name, food): """生产者""" for i in range(3): print(f'{name}生产了{food}{i}') time.sleep(random.randint(1, 3)) res = f'{food}{i}' q.put(res) def consumer(q, name): """消费者""" while True: res = q.get() time.sleep(random.randint(1, 3)) print(f'{name}吃了{res}') q.task_done() # 用来计数 if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer, args=(q, 'ocean', '包子')) p2 = Process(target=producer, args=(q, 'chen', '菜')) p3 = Process(target=producer, args=(q, 'nick', '酒')) c1 = Process(target=consumer, args=(q, 'pocky')) c2 = Process(target=consumer, args=(q, 'mac')) p1.start() p2.start() p3.start() c1.daemon = True # 定义为收回进程 c2.daemon = True c1.start() c2.start() p1.join() p2.join() p3.join() # 生产者生产完毕 q.join() # 生产者生产完毕--这是主程序的最后一行代码,执行后结束---q.join()消费者已经取干净了,没有存在的意思了。 # 这个是主程序的最后一行代码结束,守护进程结束,也是守护进程的概念 ############################### ocean生产了包子0 chen生产了菜0 nick生产了酒0 chen生产了菜1 nick生产了酒1 ocean生产了包子1 chen生产了菜2 pocky吃了菜0 nick生产了酒2 ocean生产了包子2 mac吃了酒0 pocky吃了包子0 mac吃了菜1 mac吃了包子1 mac吃了菜2 pocky吃了酒1 pocky吃了包子2 mac吃了酒2