Python开发【异步】:asyncio

异步asyncio

asyncio是一个使用async / await语法编写并发代码的库html

asyncio用做多个Python异步框架的基础,这些框架提供高性能的网络和Web服务器,数据库链接库,分布式任务队列等。python

asyncio一般很是适合IO绑定和高级 结构化网络代码。数据库

 

asyncio提供了一组高级 API:express

此外,还有一些用于库和框架开发人员的低级 API api

 

Conroutines

使用async / await语法声明的协同程序是编写asyncio应用程序的首选方法。例如,如下代码片断(须要Python 3.7+)打印“hello”,等待1秒,而后打印“world”:安全

import asyncio

async def main():
     print('hello')
     await asyncio.sleep(1)
     print('world')

asyncio.run(main())

# hello
# world

上面代码等同于下面(不推荐使用基于生成器的协同程序的支持,并计划在Python 3.10中删除。服务器

import asyncio

@asyncio.coroutine
def main():
     print('hello')
     yield  from asyncio.sleep(1)
     print('world')

asyncio.run(main())

asyncio实际等同于下面的工做(参数为An asyncio.Future, a coroutine or an awaitable is required)网络

import asyncio

@asyncio.coroutine
def main():
     print('hello')
     yield  from asyncio.sleep(1)
     print('world')

# asyncio.run(main())
loop = asyncio.events.new_event_loop()
asyncio.events.set_event_loop(loop)
loop.run_until_complete(main())

# hello
# world
 1     This function runs the passed coroutine, taking care of
 2     managing the asyncio event loop and finalizing asynchronous
 3     generators.
 4 
 5     This function cannot be called when another asyncio event loop is
 6     running in the same thread.
 7 
 8     If debug is True, the event loop will be run in debug mode.
 9 
10     This function always creates a new event loop and closes it at the end.
11     It should be used as a main entry point for asyncio programs, and should
12     ideally only be called once.
asyncio.run功能介绍

实际运行协程asyncio提供了三种主要机制:多线程

一、The asyncio.run()函数来运行顶层入口点“main()”函数(见上面的例子)并发

二、Awaiting on a coroutine 如下代码片断将在等待1秒后打印“hello”,而后在等待另外 2秒后打印“world” 

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# started at 11:54:48
# hello
# world
# finished at 11:54:51

三、asyncio.create_task()与asyncio同时运行协同程序功能Tasks让咱们修改上面的例子并同时运行两个say_after协同程序 :

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    # Wait until both tasks are completed (should take
    # around 2 seconds.)

    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# started at 14:27:22
# hello at 14:27:23
# world at 14:27:24
# finished at 14:27:24

 稍微改变一下形式,能够理解的更多

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await asyncio.sleep(3)

    # await task1
    # await task2
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# started at 14:29:41
# hello at 14:29:42
# world at 14:29:43
# finished at 14:29:44

  

 Awaitables

咱们说若是一个对象能够在表达式中使用,那么它就是一个等待对象await许多asyncio API旨在接受等待。

有三种主要类型的等待对象:coroutinesTasks, and Futures.

Coroutines

Python coroutines are awaitables and therefore can be awaited from other coroutines:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

# 42

重要

在本文档中,术语“coroutine”可用于两个密切相关的概念:

  • 一个协程功能:一个功能;async def
  • 一个协程对象:经过调用协同程序函数返回的对象 

Tasks

任务用于调度协同程序并发。

当一个协程被包装到一个Task中时,会像asyncio.create_task()同样  conroutine自动安排很快运行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Futures

Future是一个特殊的低级别等待对象,它表示异步操做最终结果

等待 Future对象时,它意味着协程将等到Future在其余地方解析。

须要asyncio中的将来对象以容许基于回调的代码与async / await一块儿使用。

一般,不须要在应用程序级代码中建立Future对象。

能够等待有时经过库和一些asyncio API公开的将来对象:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

返回Future对象的低级函数的一个很好的例子是loop.run_in_executor()

 

Asyncio方法

一、运行asyncio程序 

 asyncio.runcoro*debug = False 

  此函数运行传递的协同程序,负责管理asyncio事件循环并最终肯定异步生成器

  当另外一个asyncio事件循环在同一个线程中运行时,没法调用此函数。

  若是debugTrue,则事件循环将以调试模式运行。

  此函数始终建立一个新的事件循环并在结束时将其关闭。它应该用做asyncio程序的主要入口点,理想状况下应该只调用一次。

  版本3.7中的新功能重要:此功能已临时添加到Python 3.7中的asyncio中

 

二、建立任务 

asyncio.create_task(coro)

  将coro coroutine包装成a Task 并安排执行。返回Task对象。

  任务在返回的循环中执行,若是当前线程中没有运行循环get_running_loop(), RuntimeError则引起该任务

  Python 3.7中添加了此功能在Python 3.7以前,asyncio.ensure_future()可使用低级函数:

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())

 

三、sleeping 

coroutine asyncio.sleep(delayresult=None*loop=None)

  阻止 delay seconds.

  若是提供了result ,则在协程完成时将其返回给调用者。

  leep() 始终挂起当前任务,容许其余任务运行。

  该loop 参数已被弃用,并定于去除在Python 3.10。

  协程示例每秒显示当前日期5秒:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

  

四、同时运行任务

awaitable asyncio.gather(*awsloop=Nonereturn_exceptions=False)

  同时aws 序列中运行 awaitable objects

  若是在aws中任何awaitable 是协程,它将自动安排为任务

  若是全部等待成功完成,则结果是返回值的汇总列表。结果值的顺序对应于aws中的等待顺序

  若是return_exceptionsFalse(默认),则第一个引起的异常会当即传播到等待的任务gather()

  若是return_exceptionsTrue,异常的处理方式同样成功的结果,并在结果列表汇总。

  若是gather()取消,全部提交的awaitables(还没有完成)也被取消

  若是aws序列中的Task or Future取消,则将其视为已引起CancelledError在这种状况下不会取消gather() 呼叫这是为了防止取消一个提交的Tasks/Futures 以致使其余任务/期货被取消。

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

 获取返回结果,异常状况

import asyncio

async def factorial(name, number):

    print(name)
    if name == 'A':
        return name
    elif name == 'B':
        raise SyntaxError(name)
    await asyncio.sleep(number)


async def main():
    # Schedule three calls *concurrently*:
    result = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
        return_exceptions=True
    )
    print(result)

asyncio.run(main())

# A
# B
# C
# ['A', SyntaxError('B'), None]

版本3.7中已更改:若是取消汇集自己,则不管return_exceptions如何,都会传播取消

 

五、Shielding From Cancellation

awaitable asyncio.shield(aw*loop=None)

  Protect an awaitable object from being cancelled.

  If aw is a coroutine it is automatically scheduled as a Task.

  The statement:

res = await shield(something())

  等同于

res = await something()

  除非取消包含它的协程,不然something()不会取消运行的任务从观点来看something(),取消没有发生。虽然它的来电者仍然被取消,因此“等待”表达仍然提出了一个CancelledError

  若是something()经过其余方式取消(即从内部取消)也会取消shield()

  若是但愿彻底忽略取消(不推荐),则该shield()函数应与try / except子句结合使用,以下所示:

try:
    res = await shield(something())
except CancelledError:
    res = None

 

六、超时

coroutine asyncio.wait_for(aw, timeout, *, loop=None)

  Wait for the aw awaitable to complete with a timeout.

  If aw is a coroutine it is automatically scheduled as a Task.

  timeout能够是None或等待的float或int秒数。若是超时None,将等到完成

  若是发生超时,它将取消任务并加注 asyncio.TimeoutError

  要避免该任务cancellation,请将其包装shield()

  该函数将一直等到未来实际取消,所以总等待时间可能会超过超时

  若是等待被取消,则将来的aw也会被取消。

  该循环参数已被弃用,并定于去除在Python 3.10。

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

改变在3.7版本:AW被取消,因为超时,wait_for等待AW被取消。之前,它asyncio.TimeoutError当即提出 

 

七、超时原语

coroutine asyncio.wait(aws*loop=Nonetimeout=Nonereturn_when=ALL_COMPLETED)

  同时运行aws中的等待对象 并阻塞,直到return_when指定的条件为止

  若是在aws中任何等待的是协程,它将自动安排为任务。wait()直接传递协同程序对象 已被弃用,由于它会致使 混乱的行为

  返回两组任务/期货:(done, pending)

  用法:

done, pending = await asyncio.wait(aws)

  该循环参数已被弃用,并定于去除在Python 3.10。

  timeout(浮点数或整数),若是指定,可用于控制返回前等待的最大秒数。

  请注意,此功能不会引起asyncio.TimeoutError超时发生时未完成的期货或任务仅在第二组中返回。

  return_when表示此函数什么时候返回。它必须是如下常量之一:

不变 描述
FIRST_COMPLETED 当任何将来完成或取消时,该函数将返回。
FIRST_EXCEPTION 当任何将来经过引起异常完成时,函数将返回。若是没有将来引起异常则等同于 ALL_COMPLETED
ALL_COMPLETED 全部期货结束或取消时,该功能将返回。

  不像wait_for()wait()当发生超时不会取消期货。   

  注意 wait()将协同程序自动调度为任务,而后在 集合中返回隐式建立的任务对象。所以,如下代码将没法按预期方式工做:(done, pending)

async def foo():
    return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
    # This branch will never be run!

  如下是如何修复上述代码段:

async def foo():
    return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
    # Everything will work as expected now.

  wait()不推荐直接传递协程对象。

 

八、 Scheduling From Other Threads

asyncio.run_coroutine_threadsafe(coroloop)

  将协程提交给给定的事件循环。线程安全的。

  返回a concurrent.futures.Future以等待另外一个OS线程的结果。

  此函数旨在从与运行事件循环的OS线程不一样的OS线程调用。例:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

  若是在协程中引起异常,则会通知返回的Future。它还能够用于取消事件循环中的任务:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

  请参阅 文档并发和多线程部分。

  与其余asyncio函数不一样,此函数须要 显式传递循环参数。

  3.5.1版中的新功能。

 

九、自省

asyncio.current_taskloop = None 

  返回当前正在运行的Task实例,或者None没有正在运行的任务。

  若是loopNone get_running_loop()用来获取loop。

  版本3.7中的新功能。

asyncio.all_tasksloop = None 

  返回Task循环运行的一组还没有完成的对象。

  若是loopNoneget_running_loop()则用于获取当前循环。

  版本3.7中的新功能。

 

任务对象

class asyncio.Task(coro,*,loop = None 

Future-like object that runs a Python coroutine. Not thread-safe.

任务用于在事件循环中运行协同程序。若是一个协程在Future上等待,则Task暂停执行协程并等待Future的完成。当Future 完成后,包装协程的执行将恢复。

事件循环使用协做调度:事件循环一次运行一个任务。当一个Task等待完成Future时,事件循环运行其余任务,回调或执行IO操做。

使用高级asyncio.create_task()功能建立任务,或低级别loop.create_task()或 ensure_future()功能。不鼓励手动实例化任务。

要取消正在运行的任务,请使用该cancel()方法。调用它将致使Task将CancelledError异常抛出到包装的协同程序中。若是在取消期间协程正在等待Future对象,则Future对象将被取消。

cancelled()可用于检查任务是否被取消。True若是包装的协程没有抑制CancelledError异常而且实际上被取消,则该方法返回

asyncio.Task继承自Future其全部API,除了Future.set_result()和 Future.set_exception()

任务支持该contextvars模块。建立任务时,它会复制当前上下文,而后在复制的上下文中运行其协程。

版本3.7中已更改:添加了对contextvars模块的支持

cancel

  请求取消任务。

  这会安排CancelledError在事件循环的下一个循环中将异常抛入包装的协程。

  协程则有机会经过抑制异常与清理,甚至拒绝请求try... ... ... ... ... ... 块。所以,不一样于不保证任务将被取消,尽管彻底抑制取消并不常见,而且积极地不鼓励。exceptCancelledErrorfinallyFuture.cancel()Task.cancel()

  如下示例说明了协同程序如何拦截取消请求:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled
   True若是任务被 取消,则返回。
  请求 取消时取消任务,  cancel()而且包装的协同程序将 CancelledError异常传播 到其中。
done
   True若是任务 完成则返回。
  一个任务 完成时,包裹协程要么返回的值,引起异常,或者任务被取消。
result
  返回任务的结果。
  若是任务 完成,则返回包装协程的结果(或者若是协程引起异常,则从新引起该异常。)
  若是已 取消任务,则此方法会引起 CancelledError异常。
  若是Task的结果尚不可用,则此方法会引起 InvalidStateError异常。
exception
  返回Task的例外。
  若是包装的协同程序引起异常,则返回异常。若是包装的协程正常返回,则此方法返回 None
  若是已 取消任务,则此方法会引起  CancelledError异常。
  若是还没有 完成任务,则此方法会引起  InvalidStateError异常。
add_done_callback回调*上下文=无
  添加要在任务 完成时运行的回调。
  此方法仅应在基于低级回调的代码中使用。 
  有关 Future.add_done_callback() 详细信息,请参阅文档。
remove_done_callback回调
  从回调列表中删除 回调
  此方法仅应在基于低级回调的代码中使用。
  有关 Future.remove_done_callback() 详细信息,请参阅文档。
get_stack*limit = None 
  返回此任务的堆栈帧列表。
  若是未完成包装的协同程序,则会返回挂起它的堆栈。若是协程已成功完成或被取消,则返回一个空列表。若是协程被异常终止,则返回回溯帧列表。
  帧始终从最旧到最新排序。
  对于挂起的协程,只返回一个堆栈帧。
  可选的 limit参数设置要返回的最大帧数; 默认状况下,返回全部可用的帧。返回列表的排序取决因而返回堆栈仍是返回:返回堆栈的最新帧,但返回最旧的回溯帧。(这与回溯模块的行为相匹配。)
print_stack*limit = Nonefile = None 
  打印此任务的堆栈或回溯。
  这会为检索到的帧生成相似于回溯模块的输出 get_stack()
  该 极限参数传递给 get_stack()直接。
  的 文件参数是其中输出被写入的I / O流; 默认输出写入 sys.stderr
classmethod all_tasks(loop = None 
  返回一组事件循环的全部任务。
  默认状况下,返回当前事件循环的全部任务。若是是 loop None,则该 get_event_loop()函数用于获取当前循环。
   不推荐使用此方法,将在Python 3.9中删除。请改用此 asyncio.all_tasks()功能。
classmethod current_task(loop = None 
  返回当前正在运行的任务或 None
  若是是 loop None,则该 get_event_loop()函数用于获取当前循环。
   不推荐使用此方法,将在Python 3.9中删除。请改用此 asyncio.current_task()功能。

 其余

一、async for 运用

import asyncio


class AsyncIter:
    def __init__(self, items):
        self.items = items

    async def __aiter__(self):
        for item in self.items:
            await asyncio.sleep(1)
            yield item


async def print_iter(things):
    async for item in things:
        print(item)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    things = AsyncIter([1, 2, 3])
    loop.run_until_complete(print_iter(things))
    loop.close()

  

 

 

资料

Python异步IO实现全过程1 https://mp.weixin.qq.com/s/fJaXmfHfYEk6XL2y8NmKmQ

Python异步IO实现全过程2 https://mp.weixin.qq.com/s/RjDh7AITty92jxC8jIOiPA

Python异步IO实现全过程3 https://mp.weixin.qq.com/s/vlH_2S2JIJpf3N0WRNcIJQ

相关文章
相关标签/搜索