Python之queue模块以及生产消费者模型

队列

队列相似于一条管道,元素先进先出,进put(arg),取get()
有一点须要注意的是:队列都是在内存中操做,进程退出,队列清空,另外,队列也是一个阻塞的形态.python

队列分类

队列有不少中,但都依赖模块queue
|队列方式|特色|
|---|---|
|queue.Queue|先进先出队列|
|queue.LifoQueue|后进先出队列|
|queue.PriorityQueue|优先级队列|
|queue.deque|双线队列|并发

队列的方法

方法 用法说明
put 放数据,Queue.put()默认有block=True和timeout两个参数。当block=True时,写入是阻塞式的,阻塞时间由timeout肯定。当队列q被(其余线程)写满后,这段代码就会阻塞,直至其余线程取走数据。Queue.put()方法加上 block=False 的参数,便可解决这个隐蔽的问题。但要注意,非阻塞方式写队列,当队列满时会抛出 exception Queue.Full 的异常
get 取数据(默认阻塞),Queue.get([block[, timeout]])获取队列,timeout等待时间
empty 若是队列为空,返回True,反之False
qsize 显示队列中真实存在的元素长度
maxsize 最大支持的队列长度,使用时无括号
join 实际上意味着等到队列为空,再执行别的操做
task_done 在完成一项工做以后,Queue.task_done()函数向任务已经完成的队列发送一个信号
full 若是队列满了,返回True,反之False

单向队列

import queue

q=queue.Queue(5)    #若是不设置长度,默认为无限长
print(q.maxsize)    #注意没有括号
q.put(123)
q.put(456)
q.put(789)
q.put(100)
q.put(111)
q.put(233)
print(q.get())
print(q.get())

如此打印时候是阻塞的,为何呢,由于建立了5个元素长度的队列,但我put进去了6个,因此就阻塞了.若是少写一个能显示出正确的123.app

后进先出队列

q = queue.LifoQueue()
q.put(12)
q.put(34)
print(q.get())

优先级队列

须要注意的是,优先级队列put的是一个元组,(优先级,数据),优先级数越小,级别越高框架

q = queue.PriorityQueue()
q.put((3,'aaaaa'))
q.put((3,'bbbbb'))
q.put((1,'ccccc'))
q.put((3,'ddddd'))
print(q.get())
print(q.get())

out:函数

(1, 'ccccc')
(3, 'aaaaa')

双线队列

q = queue.deque()
q.append(123)
q.append(456)
q.appendleft(780)
print(q.pop())
print(q.popleft())

out:高并发

456
780

生产消费者模型

解决什么问题,使用场景

从下面图中能够发现生产者和消费者之间用中间相似一个队列同样的东西串起来。这个队列能够想像成一个存放产品的“仓库”,生产者只须要关心这个“仓库”,并不须要关心具体的消费者,对于生产者而言甚至都不知道有这些消费者存在。对于消费者而言他也不须要关心具体的生产者,到底有多少生产者也不是他关心的事情,他只要关心这个“仓库”中还有没有东西。这种模型是一种松耦合模型。这样能够回答我上面提出的第一个问题。这个模型的产生就是为了复用和解耦。好比常见的消息框架(很是经典的一种生产者消费者模型的使用场景)ActiveMQ。发送端和接收端用Topic进行关联。这个Topic能够理解为咱们这里“仓库”的地址,这样就能够实现点对点和广播两种方式进行消息的分发。
生产消费者模型图线程

一句话总结

解决程序解耦,较少的资源解决高并发的问题code

import queue,threading,time

q=queue.Queue()

def product(arg):
    while True:
        q.put(str(arg)+'包子')

def consumer(arg):
    while True:
        print(arg,q.get())
        time.sleep(2)
for i in range(3):
    t=threading.Thread(target=product,args=(i,))
    t.start()
for j in range(20):
    t=threading.Thread(target=consumer,args=(j,))
    t.start()
相关文章
相关标签/搜索