协调生产者消费者协程.html
from tornado import gen from tornado.ioloop import IOLoop from tornado.queues import Queue q = Queue(maxsize=2) @gen.coroutine def consumer(): while True: item = yield q.get() try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done() @gen.coroutine def producer(): for item in range(5): yield q.put(item) print('Put %s' % item) @gen.coroutine def main(): # Start consumer without waiting (since it never finishes). IOLoop.current().spawn_callback(consumer) yield producer() # Wait for producer to put all tasks. yield q.join() # Wait for consumer to finish all tasks. print('Done') IOLoop.current().run_sync(main) # Put 0 # Put 1 # Doing work on 0 # Put 2 # Doing work on 1 # Put 3 # Doing work on 2 # Put 4 # Doing work on 3 # Doing work on 4 # Done
在Python 3.5, Queue
实现了异步迭代器协议, 因此 consumer()
能够被重写为:python
async def consumer(): async for item in q: try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done()
在 4.3 版更改: 为Python 3.5添加 async for
支持 in Python 3.5.异步
maxsize
队列中容许的最大项目数.async
qsize
()[源代码]
当前队列中的项目数.tornado
put
(item, timeout=None)[源代码]
将一个项目放入队列中, 可能须要等待直到队列中有空间.oop
返回一个Future对象, 若是超时会抛出 tornado.gen.TimeoutError
.spa
get
(timeout=None)[源代码]
从队列中删除并返回一个项目.htm
返回一个Future对象, 当项目可用时resolve, 或者在超时后抛出 tornado.gen.TimeoutError
.
get_nowait
()[源代码]
非阻塞的从队列中删除并返回一个项目.
若是有项目是当即可用的则返回该项目, 不然抛出 QueueEmpty
.
task_done
()[源代码]
代表前面排队的任务已经完成.
被消费者队列使用. 每一个 get
用来获取一个任务, 随后(subsequent) 调用 task_done
告诉队列正在处理的任务已经完成.
若是 join
正在阻塞, 它会在全部项目都被处理完后调起; 即当每一个 put
都被一个 task_done
匹配.
若是调用次数超过 put
将会抛出 ValueError
.
join
(timeout=None)[源代码]
阻塞(block)直到队列中的全部项目都处理完.
返回一个Future对象, 超时后会抛出 tornado.gen.TimeoutError
异常.
QueueEmpty
tornado.queues.
QueueEmpty
[源代码]
当队列中没有项目时, 由 Queue.get_nowait
抛出.
QueueFull
tornado.queues.
QueueFull
[源代码]
当队列为最大size时, 由 Queue.put_nowait
抛出.