在软件开发的过程当中,常常碰到这样的场景:
某些模块负责生产数据,这些数据由其余模块来负责处理(此处的模块多是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式。python
结构图以下:编程
为了你们容易理解,咱们举一个寄信的例子。假设你要寄一封信,大体过程以下:
一、你把信写好——至关于生产者生产数据多线程
二、你把信放入邮箱——至关于生产者把数据放入缓冲区
三、邮递员把信从邮箱取出,作相应处理——至关于消费者把数据取出缓冲区,处理数据并发
解耦
假设生产者和消费者分别是两个线程。若是让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。若是将来消费者的代码发生变化,可能会影响到生产者的代码。而若是二者都依赖于某个缓冲区,二者之间不直接依赖,耦合也就相应下降了。dom
举个例子,咱们去邮局投递信件,若是不使用邮箱(也就是缓冲区),你必须得把信直接交给邮递员。有同窗会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他。这就产生了你和邮递员之间的依赖(至关于生产者和消费者的强耦合)。万一哪天邮递员 换人了,你还要从新认识一下(至关于消费者变化致使修改生产者代码)。而邮箱相对来讲比较固定,你依赖它的成本就比较低(至关于和缓冲区之间的弱耦合)。函数
并发
因为生产者与消费者是两个独立的并发体,他们之间是用缓冲区通讯的,生产者只须要往缓冲区里丢数据,就能够继续生产下一个数据,而消费者只须要从缓冲区拿数据便可,这样就不会由于彼此的处理速度而发生阻塞。性能
继续上面的例子,若是咱们不使用邮箱,就得在邮局等邮递员,直到他回来,把信件交给他,这期间咱们啥事儿都不能干(也就是生产者阻塞)。或者邮递员得挨家挨户问,谁要寄信(至关于消费者轮询)。学习
支持忙闲不均
当生产者制造数据快的时候,消费者来不及处理,未处理的数据能够暂时存在缓冲区中,慢慢处理掉。而不至于由于消费者的性能形成数据丢失或影响生产者生产。spa
咱们再拿寄信的例子,假设邮递员一次只能带走1000封信,万一碰上情人节(或是圣诞节)送贺卡,须要寄出去的信超过了1000封,这时候邮箱这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮箱中,等下次过来时再拿走。操作系统
经过上面的介绍你们应该已经明白了生产者消费者模式。
在实现生产者消费者模式以前,咱们先学习下Python中的多线程编程。
线程是操做系统直接支持的执行单元,高级语言一般都内置多线程的支持,Python也不例外,而且Python的线程是真正的Posix Thread,而不是模拟出来的线程。
Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数状况下,咱们只须要使用threading这个高级模块。
下面咱们先看一段在Python中实现多线程的代码。
import time,threading #线程代码 class TaskThread(threading.Thread): def __init__(self,name): threading.Thread.__init__(self,name=name) def run(self): print('thread %s is running...' % self.getName()) for i in range(6): print('thread %s >>> %s' % (self.getName(), i)) time.sleep(1) print('thread %s finished.' % self.getName()) taskthread = TaskThread('TaskThread') taskthread.start() taskthread.join()
下面是程序的执行结果:
thread TaskThread is running... thread TaskThread >>> 0 thread TaskThread >>> 1 thread TaskThread >>> 2 thread TaskThread >>> 3 thread TaskThread >>> 4 thread TaskThread >>> 5 thread TaskThread finished.
TaskThread类继承自threading模块中的Thread线程类。构造函数的name参数指定线程的名字,经过重载基类run函数实现具体任务。
在简单熟悉了Python的线程后,下面咱们实现一个生产者消费者模shi。
from Queue import Queue import random,threading,time #生产者类 class Producer(threading.Thread): def __init__(self, name,queue): threading.Thread.__init__(self, name=name) self.data=queue def run(self): for i in range(5): print("%s is producing %d to the queue!" % (self.getName(), i)) self.data.put(i) time.sleep(random.randrange(10)/5) print("%s finished!" % self.getName()) #消费者类 class Consumer(threading.Thread): def __init__(self,name,queue): threading.Thread.__init__(self,name=name) self.data=queue def run(self): for i in range(5): val = self.data.get() print("%s is consuming. %d in the queue is consumed!" % (self.getName(),val)) time.sleep(random.randrange(10)) print("%s finished!" % self.getName()) def main(): queue = Queue() producer = Producer('Producer',queue) consumer = Consumer('Consumer',queue) producer.start() consumer.start() producer.join() consumer.join() print 'All threads finished!' if __name__ == '__main__': main()
执行结果可能以下:
Producer is producing 0 to the queue! Consumer is consuming. 0 in the queue is consumed! Producer is producing 1 to the queue! Producer is producing 2 to the queue! Consumer is consuming. 1 in the queue is consumed! Consumer is consuming. 2 in the queue is consumed! Producer is producing 3 to the queue! Producer is producing 4 to the queue! Producer finished! Consumer is consuming. 3 in the queue is consumed! Consumer is consuming. 4 in the queue is consumed! Consumer finished! All threads finished!
由于多线程是抢占式执行的,因此打印出的运行结果不必定和上面的彻底一致。
本例经过Python实现了一个简单的生产者消费者模型。Python中的Queue模块已经提供了对线程同步的支持,因此本文并无涉及锁、同步、死锁等多线程问题。