问题描述:并发
生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在二者之间设置一个具备多个缓冲区的缓冲池,生产者将它生产的产品放入一个缓冲区中,消费者能够从缓冲区中取走产品进行消费,显然生产者和消费者之间必须保持同步,即不容许消费者到一个空的缓冲区中取产品,也不容许生产者向一个已经放入产品的缓冲区中再次投放产品。app
条件变量解决方案:dom
from threading import Lock
from threading import Condition
import threading
class myQueue:
def __init__(self, size):
self.size = size
self.list = list()
self.lock = Lock()
self.notFullCond = Condition(self.lock)
self.notEmptyCond = Condition(self.lock)
def isFull(self):
if self.size == len(self.list):
return True
return False
def isEmpty(self):
if 0 == len(self.list):
return True
return False
def enQueue(self, elem):
self.lock.acquire()
while self.isFull(): #队列满时触发等待notFullCond条件,线程阻塞同时释放互斥锁
print('queue is full, waiting...')
self.notFullCond.wait()
print(threading.current_thread().getName() + ' product ' + str(elem))
self.list.append(elem)
#当有资源进入队列通知全部等待notEmptyCond条件的线程,等释放互斥锁后,等待notEmptyCond条件的线程获取锁,再次判断条件
self.notEmptyCond.notify_all()
self.lock.release()
def deQueue(self):
self.lock.acquire()
while self.isEmpty(): #队列空时触发等待notEmptyCond条件,线程阻塞同时释放互斥锁
print('queue is empty, waiting...')
self.notEmptyCond.wait()
elem = self.list[0]
del(self.list[0])
print(threading.current_thread().getName() + ' consume ' + str(elem))
#当有资源出队列通知全部等待notFullCond条件的线程,等释放互斥锁后,等待notFullCond条件的线程获取锁,再次判断条件
self.notFullCond.notify_all()
self.lock.release()
return elem
信号量解决方案:测试
from threading import Lock from threading import Semaphore import threading class mySemQueue: def __init__(self, size): self.size = size self.list = list() self.lock = Lock() self.writeSem = Semaphore(size)#初始化写信号量 self.readSem = Semaphore(0) #初始化读信号量 def enQueue(self, elem): self.writeSem.acquire() #资源入队申请写信号量,若是为0则阻塞 self.lock.acquire() #互斥锁来保证资源的互斥访问 self.list.append(elem) print(threading.current_thread().getName() + ' product ' + str(elem)) self.lock.release() self.readSem.release() #资源入队后释放一个读信号量,若是其它线程阻塞在这个信号量上,唤醒该线程 def deQueue(self): self.readSem.acquire() #资源出队申请读信号量,若是为0则阻塞 self.lock.acquire() elem = self.list[0] del(self.list[0]) print(threading.current_thread().getName() + ' consume ' + str(elem)) self.lock.release() self.writeSem.release() #资源出队后释放一个写信号量,若是其它线程阻塞在这个信号量上,唤醒该线程 return elem
from threading import Thread import sys import threading class myThread(Thread): def __init__(self, func): Thread.__init__(self) self.func = func def run(self): print(threading.current_thread().getName() + ' start') self.func() from myThread import myThread from myQueue import myQueue import random import sys def producter(): while True: elem =random.randint(1, 100) que.enQueue(elem) def consumer(): while True: que.deQueue() fp = open('log.txt','w') sys.stdout = fp que = myQueue(10) t1 = myThread(producter) t2 = myThread(consumer) t3 = myThread(consumer) t1.start() t2.start() t3.start()