asyncio是一个使用async / await语法编写并发代码的库。html
asyncio用做多个Python异步框架的基础,这些框架提供高性能的网络和Web服务器,数据库链接库,分布式任务队列等。python
asyncio一般很是适合IO绑定和高级 结构化网络代码。数据库
asyncio提供了一组高级 API:express
此外,还有一些用于库和框架开发人员的低级 API :api
networking
,运行subprocesses
,处理等;OS signals
使用async / await语法声明的协同程序是编写asyncio应用程序的首选方法。例如,如下代码片断(须要Python 3.7+)打印“hello”,等待1秒,而后打印“world”:安全
import asyncio async def main(): print('hello') await asyncio.sleep(1) print('world') asyncio.run(main()) # hello # world
上面代码等同于下面(不推荐使用基于生成器的协同程序的支持,并计划在Python 3.10中删除。)服务器
import asyncio @asyncio.coroutine def main(): print('hello') yield from asyncio.sleep(1) print('world') asyncio.run(main())
asyncio实际等同于下面的工做(参数为An asyncio.Future, a coroutine or an awaitable is required)网络
import asyncio @asyncio.coroutine def main(): print('hello') yield from asyncio.sleep(1) print('world') # asyncio.run(main()) loop = asyncio.events.new_event_loop() asyncio.events.set_event_loop(loop) loop.run_until_complete(main()) # hello # world
1 This function runs the passed coroutine, taking care of 2 managing the asyncio event loop and finalizing asynchronous 3 generators. 4 5 This function cannot be called when another asyncio event loop is 6 running in the same thread. 7 8 If debug is True, the event loop will be run in debug mode. 9 10 This function always creates a new event loop and closes it at the end. 11 It should be used as a main entry point for asyncio programs, and should 12 ideally only be called once.
实际运行协程asyncio提供了三种主要机制:多线程
一、The asyncio.run()函数来运行顶层入口点“main()”函数(见上面的例子)并发
二、Awaiting on a coroutine 如下代码片断将在等待1秒后打印“hello”,而后在等待另外 2秒后打印“world” :
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main()) # started at 11:54:48 # hello # world # finished at 11:54:51
三、asyncio.create_task()
与asyncio同时运行协同程序的功能Tasks;
让咱们修改上面的例子并同时运行两个say_after
协同程序 :
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(f"{what} at {time.strftime('%X')}") async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}") asyncio.run(main()) # started at 14:27:22 # hello at 14:27:23 # world at 14:27:24 # finished at 14:27:24
稍微改变一下形式,能够理解的更多
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(f"{what} at {time.strftime('%X')}") async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await asyncio.sleep(3) # await task1 # await task2 print(f"finished at {time.strftime('%X')}") asyncio.run(main()) # started at 14:29:41 # hello at 14:29:42 # world at 14:29:43 # finished at 14:29:44
咱们说若是一个对象能够在表达式中使用,那么它就是一个等待对象await
。许多asyncio API旨在接受等待。
有三种主要类型的等待对象:coroutines, Tasks, and Futures.
Coroutines
Python coroutines are awaitables and therefore can be awaited from other coroutines:
import asyncio async def nested(): return 42 async def main(): # Nothing happens if we just call "nested()". # A coroutine object is created but not awaited, # so it *won't run at all*. nested() # Let's do it differently now and await it: print(await nested()) # will print "42". asyncio.run(main()) # 42
重要
在本文档中,术语“coroutine”可用于两个密切相关的概念:
async def
Tasks
任务用于调度协同程序并发。
当一个协程被包装到一个Task中时,会像asyncio.create_task()
同样 conroutine自动安排很快运行:
import asyncio async def nested(): return 42 async def main(): # Schedule nested() to run soon concurrently # with "main()". task = asyncio.create_task(nested()) # "task" can now be used to cancel "nested()", or # can simply be awaited to wait until it is complete: await task asyncio.run(main())
Futures
A Future
是一个特殊的低级别等待对象,它表示异步操做的最终结果。
当等待 Future对象时,它意味着协程将等到Future在其余地方解析。
须要asyncio中的将来对象以容许基于回调的代码与async / await一块儿使用。
一般,不须要在应用程序级代码中建立Future对象。
能够等待有时经过库和一些asyncio API公开的将来对象:
async def main(): await function_that_returns_a_future_object() # this is also valid: await asyncio.gather( function_that_returns_a_future_object(), some_python_coroutine() )
返回Future对象的低级函数的一个很好的例子是loop.run_in_executor()
。
一、运行asyncio程序
asyncio.run(coro,*,debug = False )
此函数运行传递的协同程序,负责管理asyncio事件循环并最终肯定异步生成器。
当另外一个asyncio事件循环在同一个线程中运行时,没法调用此函数。
若是是debugTrue
,则事件循环将以调试模式运行。
此函数始终建立一个新的事件循环并在结束时将其关闭。它应该用做asyncio程序的主要入口点,理想状况下应该只调用一次。
版本3.7中的新功能:重要:此功能已临时添加到Python 3.7中的asyncio中。
二、建立任务
asyncio.create_task(coro)
将coro coroutine包装成a Task
并安排执行。返回Task对象。
任务在返回的循环中执行,若是当前线程中没有运行循环get_running_loop()
, RuntimeError
则引起该任务。
Python 3.7中添加了此功能。在Python 3.7以前,asyncio.ensure_future()
可使用低级函数:
async def coro(): ... # In Python 3.7+ task = asyncio.create_task(coro()) ... # This works in all Python versions but is less readable task = asyncio.ensure_future(coro())
三、sleeping
coroutine asyncio.
sleep
(delay, result=None, *, loop=None)
阻止 delay seconds.。
若是提供了result ,则在协程完成时将其返回给调用者。
leep()
始终挂起当前任务,容许其余任务运行。
该loop 参数已被弃用,并定于去除在Python 3.10。
协程示例每秒显示当前日期5秒:
import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date())
四、同时运行任务
awaitable asyncio.
gather
(*aws, loop=None, return_exceptions=False)
同时在aws 序列中运行 awaitable objects
若是在aws中任何awaitable 是协程,它将自动安排为任务
若是全部等待成功完成,则结果是返回值的汇总列表。结果值的顺序对应于aws中的等待顺序
若是return_exceptions是False
(默认),则第一个引起的异常会当即传播到等待的任务gather()
。
若是return_exceptions是True
,异常的处理方式同样成功的结果,并在结果列表汇总。
若是gather()
被取消,全部提交的awaitables(还没有完成)也被取消。
若是aws序列中的Task or Future被取消,则将其视为已引起CancelledError
- 在这种状况下不会取消gather()
呼叫。这是为了防止取消一个提交的Tasks/Futures 以致使其余任务/期货被取消。
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) asyncio.run(main()) # Expected output: # # Task A: Compute factorial(2)... # Task B: Compute factorial(2)... # Task C: Compute factorial(2)... # Task A: factorial(2) = 2 # Task B: Compute factorial(3)... # Task C: Compute factorial(3)... # Task B: factorial(3) = 6 # Task C: Compute factorial(4)... # Task C: factorial(4) = 24
获取返回结果,异常状况
import asyncio async def factorial(name, number): print(name) if name == 'A': return name elif name == 'B': raise SyntaxError(name) await asyncio.sleep(number) async def main(): # Schedule three calls *concurrently*: result = await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), return_exceptions=True ) print(result) asyncio.run(main()) # A # B # C # ['A', SyntaxError('B'), None]
版本3.7中已更改:若是取消了汇集自己,则不管return_exceptions如何,都会传播取消。
五、Shielding From Cancellation
awaitable asyncio.
shield
(aw, *, loop=None)
Protect an awaitable object from being cancelled
.
If aw is a coroutine it is automatically scheduled as a Task.
The statement:
res = await shield(something())
等同于
res = await something()
除非取消包含它的协程,不然something()
不会取消运行的任务。从观点来看something()
,取消没有发生。虽然它的来电者仍然被取消,因此“等待”表达仍然提出了一个CancelledError
。
若是something()
经过其余方式取消(即从内部取消)也会取消shield()
。
若是但愿彻底忽略取消(不推荐),则该shield()
函数应与try / except子句结合使用,以下所示:
try: res = await shield(something()) except CancelledError: res = None
六、超时
coroutine asyncio.
wait_for
(aw, timeout, *, loop=None)
Wait for the aw awaitable to complete with a timeout.
If aw is a coroutine it is automatically scheduled as a Task.
timeout能够是None
或等待的float或int秒数。若是超时是None
,将等到完成
若是发生超时,它将取消任务并加注 asyncio.TimeoutError
。
要避免该任务cancellation
,请将其包装shield()
。
该函数将一直等到未来实际取消,所以总等待时间可能会超过超时。
若是等待被取消,则将来的aw也会被取消。
该循环参数已被弃用,并定于去除在Python 3.10。
async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout!
改变在3.7版本:当AW被取消,因为超时,wait_for
等待AW被取消。之前,它asyncio.TimeoutError
当即提出 。
七、超时原语
coroutine asyncio.
wait
(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
同时运行aws中的等待对象 并阻塞,直到return_when指定的条件为止。
若是在aws中任何等待的是协程,它将自动安排为任务。wait()
直接传递协同程序对象 已被弃用,由于它会致使 混乱的行为。
返回两组任务/期货:。(done, pending)
用法:
done, pending = await asyncio.wait(aws)
该循环参数已被弃用,并定于去除在Python 3.10。
timeout(浮点数或整数),若是指定,可用于控制返回前等待的最大秒数。
请注意,此功能不会引起asyncio.TimeoutError
。超时发生时未完成的期货或任务仅在第二组中返回。
return_when表示此函数什么时候返回。它必须是如下常量之一:
不变 | 描述 |
---|---|
FIRST_COMPLETED |
当任何将来完成或取消时,该函数将返回。 |
FIRST_EXCEPTION |
当任何将来经过引起异常完成时,函数将返回。若是没有将来引起异常则等同于 ALL_COMPLETED 。 |
ALL_COMPLETED |
全部期货结束或取消时,该功能将返回。 |
不像wait_for()
,wait()
当发生超时不会取消期货。
注意 wait()
将协同程序自动调度为任务,而后在 集合中返回隐式建立的任务对象。所以,如下代码将没法按预期方式工做:(done, pending)
async def foo(): return 42 coro = foo() done, pending = await asyncio.wait({coro}) if coro in done: # This branch will never be run!
如下是如何修复上述代码段:
async def foo(): return 42 task = asyncio.create_task(foo()) done, pending = await asyncio.wait({task}) if task in done: # Everything will work as expected now.
wait()
不推荐直接传递协程对象。
八、 Scheduling From Other Threads
asyncio.
run_coroutine_threadsafe
(coro, loop)
将协程提交给给定的事件循环。线程安全的。
返回a concurrent.futures.Future
以等待另外一个OS线程的结果。
此函数旨在从与运行事件循环的OS线程不一样的OS线程调用。例:
# Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3
若是在协程中引起异常,则会通知返回的Future。它还能够用于取消事件循环中的任务:
try: result = future.result(timeout) except asyncio.TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print(f'The coroutine raised an exception: {exc!r}') else: print(f'The coroutine returned: {result!r}')
请参阅 文档的并发和多线程部分。
与其余asyncio函数不一样,此函数须要 显式传递循环参数。
3.5.1版中的新功能。
九、自省
asyncio.
current_task
(loop = None )
返回当前正在运行的Task
实例,或者None
没有正在运行的任务。
若是loop是None
get_running_loop()
用来获取loop。
版本3.7中的新功能。
asyncio.
all_tasks
(loop = None )
返回Task
循环运行的一组还没有完成的对象。
若是loop是None
,get_running_loop()
则用于获取当前循环。
版本3.7中的新功能。
class asyncio.
Task
(coro,*,loop = None )
A Future-like
object that runs a Python coroutine. Not thread-safe.
任务用于在事件循环中运行协同程序。若是一个协程在Future上等待,则Task暂停执行协程并等待Future的完成。当Future 完成后,包装协程的执行将恢复。
事件循环使用协做调度:事件循环一次运行一个任务。当一个Task等待完成Future时,事件循环运行其余任务,回调或执行IO操做。
使用高级asyncio.create_task()
功能建立任务,或低级别loop.create_task()
或 ensure_future()
功能。不鼓励手动实例化任务。
要取消正在运行的任务,请使用该cancel()
方法。调用它将致使Task将CancelledError
异常抛出到包装的协同程序中。若是在取消期间协程正在等待Future对象,则Future对象将被取消。
cancelled()
可用于检查任务是否被取消。True
若是包装的协程没有抑制CancelledError
异常而且实际上被取消,则该方法返回。
asyncio.Task
继承自Future
其全部API,除了Future.set_result()
和 Future.set_exception()
。
任务支持该contextvars
模块。建立任务时,它会复制当前上下文,而后在复制的上下文中运行其协程。
版本3.7中已更改:添加了对contextvars
模块的支持。
cancel
()
请求取消任务。
这会安排CancelledError
在事件循环的下一个循环中将异常抛入包装的协程。
协程则有机会经过抑制异常与清理,甚至拒绝请求try
... ... ... ... ... ... 块。所以,不一样于,不保证任务将被取消,尽管彻底抑制取消并不常见,而且积极地不鼓励。exceptCancelledError
finally
Future.cancel()
Task.cancel()
如下示例说明了协同程序如何拦截取消请求:
async def cancel_me(): print('cancel_me(): before sleep') try: # Wait for 1 hour await asyncio.sleep(3600) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): # Create a "cancel_me" Task task = asyncio.create_task(cancel_me()) # Wait for 1 second await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) # Expected output: # # cancel_me(): before sleep # cancel_me(): cancel sleep # cancel_me(): after sleep # main(): cancel_me is cancelled now
cancelled
()
True
若是任务被
取消,则返回。
cancel()
而且包装的协同程序将
CancelledError
异常传播 到其中。
done
()
True
若是任务
完成则返回。
result
()
CancelledError
异常。
InvalidStateError
异常。
exception
()
None
。
CancelledError
异常。
InvalidStateError
异常。
add_done_callback
(回调,*,上下文=无)
Future.add_done_callback()
详细信息,请参阅文档。
remove_done_callback
(回调)
Future.remove_done_callback()
详细信息,请参阅文档。
get_stack
(*,limit = None )
print_stack
(*,limit = None,file = None )
get_stack()
。
get_stack()
直接。
sys.stderr
。
all_tasks
(loop = None )
None
,则该
get_event_loop()
函数用于获取当前循环。
asyncio.all_tasks()
功能。
current_task
(loop = None )
None
。
None
,则该
get_event_loop()
函数用于获取当前循环。
asyncio.current_task()
功能。
一、async for 运用
import asyncio class AsyncIter: def __init__(self, items): self.items = items async def __aiter__(self): for item in self.items: await asyncio.sleep(1) yield item async def print_iter(things): async for item in things: print(item) if __name__ == '__main__': loop = asyncio.get_event_loop() things = AsyncIter([1, 2, 3]) loop.run_until_complete(print_iter(things)) loop.close()
Python异步IO实现全过程1 https://mp.weixin.qq.com/s/fJaXmfHfYEk6XL2y8NmKmQ
Python异步IO实现全过程2 https://mp.weixin.qq.com/s/RjDh7AITty92jxC8jIOiPA
Python异步IO实现全过程3 https://mp.weixin.qq.com/s/vlH_2S2JIJpf3N0WRNcIJQ