#asyncio之Coroutines,Tasks and Futurehtml
Coroutines and Tasks属于High-level APIs,也就是高级层的api。python
本节概述用于协程和任务的高级异步api。 ###Coroutines Coroutines翻译过来意思是协程, 使用async/await语法声明的协程是编写asyncio应用程序的首选方法。api
import asyncio async def main(): print("hello") await asyncio.sleep(1) print("world") if __name__ == '__main__': # asyncio.run(main()) # 3.7的用法 # 阻塞直到hello world()协程结束时返回 loop = asyncio.get_event_loop() loop.run_until_complete(main())
第一个异步函数是经过建立loop循环去调用,其余异步函数之间经过await进行调用。 像下面的一个例子安全
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") if __name__ == '__main__': loop = asyncio.get_event_loop() # 阻塞直到hello world()协程结束时返回 loop.run_until_complete(main()) loop.close()
或者咱们能够经过asyncio.create_task()将协程say_after封装任务去调用就像下面这样。微信
async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # 等待两个子任务完成 await task1 await task2 print(f"finished at {time.strftime('%X')}")
###Awaitables 咱们说,若是一个对象能够用在await表达式中,那么它就是Awaitables的对象。 可等待对象主要有三种类型:coroutines, Tasks, and Futures. ####Coroutines 前面的代码中演示了协程的运做方式,这里主要强调两点。并发
import asyncio async def nested(): await asyncio.sleep(2) print("等待2s") async def main(): # 将协程包装成任务含有状态 # task = asyncio.create_task(nested()) task = asyncio.ensure_future(nested()) print(task) # "task" can now be used to cancel "nested()", or # can simply be awaited to wait until it is complete: await task print(task) print(task.done()) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) except KeyboardInterrupt as e: for task in asyncio.Task.all_tasks(): print(task) task.cancel() print(task) loop.run_forever() # restart loop finally: loop.close()
能够看到app
<Task pending coro=<nested() running at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9>> 等待2s <Task finished coro=<nested() done, defined at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9> result=None> True
建立task后,task在加入事件循环以前是pending状态而后调用nested函数等待2s以后打印task为finished状态。asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)均可以建立一个task,python3.7增长了asyncio.create_task(coro)。其中task是Future的一个子类 ##Future future:表明未来执行或没有执行的任务的结果。它和task上没有本质的区别 一般不须要在应用程序级别代码中建立Future对象。 future对象有几个状态:异步
经过上面的代码能够知道建立future的时候,task为pending,事件循环调用执行的时候是running,调用完毕天然就是done因而调用task.done()打印了true。async
若是在命令行中运行上述代码,ctrl+c后会发现 输出如下内容函数
<Task pending coro=<nested() running at 1-2-1.py:9>> ^C<Task pending coro=<main() running at 1-2-1.py:21> wait_for=<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10d342978>()]> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>> <Task pending coro=<main() running at 1-2-1.py:21> wait_for=<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>> <Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]> <Task cancelling coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>
由于咱们调用了task.cancel() 因此能够看到此时的任务状态为取消状态。 ####并发的执行任务 经过使用await+asyncio.gather能够完成并发的操做。 asyncio.gather用法以下。 *asyncio.gather(aws, loop=None, return_exceptions=False) **aws是一系列协程,协程都成功完成,就返回值一个结果列表。结果值的顺序与aws中添加协程的顺序相对应。 return_exceptions=False,其实就是若是有一个任务失败了,就直接抛出异常。若是等于True就把错误信息做为结果返回回来。 首先来一个正常状况不出错的例子:
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") if number == 2: 1 / 0 await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: res = await asyncio.gather( *[factorial("A", 2), factorial("B", 3), factorial("C", 4)] , return_exceptions=True) for item in res: print(item) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) except KeyboardInterrupt as e: for task in asyncio.Task.all_tasks(): print(task) task.cancel() print(task) loop.run_forever() # restart loop finally: loop.close()
输入如下内容:
Task A: Compute factorial(2)... Task B: Compute factorial(2)... Task C: Compute factorial(2)... Task B: Compute factorial(3)... Task C: Compute factorial(3)... Task B: factorial(3) = 6 Task C: Compute factorial(4)... Task C: factorial(4) = 24 division by zero None None
能够发现async.gather最后会返回一系列的结果,若是出现了错误就把错误信息做为返回结果,这里我当数字为2时人为加了异常操做1/0,因而返回告终果division by zero,对于其余的任务由于没有返回值因此是None。这里return_exceptions=True来保证了若是其中一个任务出现异常,其余任务不会受其影响会执行到结束。
asyncio.wait
coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
asyncio.wait和async.gather用法差很少只是async.wait接收的是个列表。 第三个参数和async.gather有点区别.
参数名 | 含义 |
---|---|
FIRST_COMPLETED | 任何一个future完成或取消时返回 |
FIRST_EXCEPTION | 任何一个future出现错误将返回,若是出现异常等价于ALL_COMPLETED |
ALL_COMPLETED | 当全部任务完成或者被取消时返回结果,默认值。 |
###Timeouts | |
经过使用asyncio.wait_for来完成一个超时函数回调操做,若是函数规定时间内未完成则报错。 | |
*asyncio.wait_for(aw, timeout, , loop=None) | |
aw表明一个协程,timeout单位秒。 |
async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout!
1秒内eternity没有完成就报错了。 python3.7中发生更改:当aw因为超时而被取消时,再也不显示异常而是等待aw被取消。 说到timeout的,若是仅仅是对一个代码块作timeout操做而不是等待某个协程此时推荐第三方模块async_timeout ###async_timeout ####安装
pip installa async_timeout
使用方法很简单以下
async with async_timeout.timeout(1.5) as cm: await inner() print(cm.expired)
若是1.5s能够运行完打印true,不然打印false,表示超时。 ###asyncio.as_completed *asyncio.as_completed(aws, , loop=None, timeout=None) 使用as_completed会返回一个能够迭代的future对象,一样能够获取协程的运行结果,使用方法以下:
async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] for task in asyncio.as_completed(tasks): result = await task print('Task ret: {}'.format(result)) start = now() loop = asyncio.get_event_loop() done = loop.run_until_complete(main()) print('TIME: ', now() - start)
###协程嵌套 使用async能够定义协程,协程用于耗时的io操做,咱们也能够封装更多的io操做过程,这样就实现了嵌套的协程,即一个协程中await了另一个协程,如此链接起来 官网实例: 图解:
一、run_until_complete运行,会注册task(协程:print_sum)并开启事件循环 →
二、print_sum协程中嵌套了子协程,此时print_sum协程暂停(相似委托生成器),转到子协程(协程:compute)中运行代码,期间子协程需sleep1秒钟,直接将结果反馈到event loop中,即将控制权转回调用方,而中间的print_sum暂停不操做 →
三、1秒后,调用方将控制权给到子协程(调用方与子协程直接通讯),子协程执行接下来的代码,直到再遇到wait(此实例没有)→
四、 最后执行到return语句,子协程向上级协程(print_sum抛出异常:StopIteration),同时将return返回的值返回给上级协程(print_sum中的result接收值),print_sum继续执行暂时时后续的代码,直到遇到return语句 →
五、向 event loop 抛出StopIteration异常,此时协程任务都已经执行完毕,事件循环执行完成(event loop :the loop is stopped),close事件循环。 ###调度线程 asyncio.run_coroutine_threadsafe(coro, loop) 等待其余线程返回一个concurrent.futures.Future对象,这是一个线程安全的方法。 这个函数应该从不一样的OS线程调用,而不是从事件循环所在的线程调用。
def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def do_some_work(x): print('Waiting {}'.format(x)) await asyncio.sleep(x) print('Done after {}s'.format(x)) def more_work(x): print('More work {}'.format(x)) time.sleep(x) print('Finished more work {}'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: {}'.format(time.time() - start)) asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
上述的例子,主线程中建立一个new_loop,而后在另外的子线程中开启一个无限事件循环。主线程经过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操做,同时主线程又不会被block。一共执行的时间大概在6s左右。 run_in_executor
import time import asyncio async def main(): print(f'{time.ctime()} Hello') await asyncio.sleep(1.0) print(f'{time.ctime()} Goodbye') loop.stop() def blocking(): # 1 time.sleep(0.5) # 2 print(f'{time.ctime()} Hello from a thread!') loop = asyncio.get_event_loop() loop.create_task(main()) loop.run_in_executor(None, blocking) # 3 loop.run_forever() pending = asyncio.Task.all_tasks(loop=loop) # 4 group = asyncio.gather(*pending) loop.run_until_complete(group) loop.close()
输出
Fri Jan 4 15:32:03 2019 Hello Fri Jan 4 15:32:04 2019 Hello from a thread! Fri Jan 4 15:32:04 2019 Goodbye
下面对上面的函数的序号进行讲解:
1 这个函数调用了常规的sleep(),这会阻塞主线程并阻止loop运行,咱们不能使这个函数变成协程,更糟糕的是不能在主线程运行loop时调用它,解决办法是用一个executor来运行它; 2 注意一点,这个sleep运行时间比协程中的sleep运行时间要短,后文再讨论若是长的话会发生什么; 3 该方法帮助咱们在事件loop里用额外的线程或进程执行函数,这个方法的返回值是一个Future对象,意味着能够用await来切换它; 4 挂起的task中不包含前面的阻塞函数,而且这个方法只返回task对象,绝对不会返回Future对象。 ###绑定回调 绑定回调,在task执行完毕的时候能够获取执行的结果,回调的最后一个参数是future对象,经过该对象能够获取协程返回值。若是回调须要多个参数,能够经过偏函数导入
import time import asyncio now = lambda : time.time() async def do_some_work(x): print('Waiting: ', x) return 'Done after {}s'.format(x) def callback(future): # 回调函数 print('Callback: ', future.result()) start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() get_future = asyncio.ensure_future(coroutine) task.add_done_callback(callback) # 添加回调函数 loop.run_until_complete(get_future) print('TIME: ', now() - start)
回调函数须要多个参数时,future参数要放最后。执行完成,咱们能够经过参数future获取协程的执行结果:future.result()
import functools # functools.partial:偏函数,能将带参数的函数包装成一个新的函数 def callback(t, future): # 回调函数 ,future放最后 print('Callback:', t, future.result()) task.add_done_callback(functools.partial(callback, 2)
###asyncio.iscoroutine(obj) Return True if obj is a coroutine object. 判断是否为coroutine对象,若是是返回True
###asyncio.iscoroutinefunction(func) 判断是否为coroutine函数,若是是返回True
https://docs.python.org/3.7/library/asyncio-task.html https://www.jianshu.com/p/b5e347b3a17c
微信公众号:python学习开发 加微信italocxa 入群。
原文出处:https://www.cnblogs.com/c-x-a/p/10220398.html