[译]Python中的异步IO:一个完整的演练

原文:Async IO in Python: A Complete Walkthrough 原文做者: Brad Solomon 原文发布时间:2019年1月16日 翻译:Tacey Wong 翻译时间:2019年7月22日javascript

翻译仅便于我的学习,熟悉英语的请阅读原文html


目录java


Async IO是一种并发编程设计,Python中已经有了独立的支持,而且从Python3.4到Python3.7获得了快速发展。node

你可能疑惑,“并发、并行、线程、多处理”。MMP这已经不少了,异步IO是哪根葱?”python

本教程旨在帮助你回答这个问题,让你更牢固地掌握Python的异步IO。ios

如下是要介绍的内容:git

  • 异步IO:一种与语言无关的范例(模型),它具备许多跨编程语言的实现
  • async/await:两个 用于定义协程的新Python关键字
  • asyncio:为运行和管理协程提供基础和API的Python包/库

协程(专用生成器函数)是Python中异步IO的核心,稍后咱们将深刻研究它们。程序员

注意:在本文中,使用术语异步IO来表示与语言无关的异步IO设计,而asyncio指的是Python包。github

开始以前,你须要确保已经配置搭建了可使用asyncio及其余库的实验环境。golang

搭建本身的实验环境

你须要安装Python 3.7+以及aiohttpaiofiles包才能完整地跟随本文进行实验。

$ python3.7 -m venv ./py37async
$ source ./py37async/bin/activate  # Windows: .\py37async\Scripts\activate.bat
$ pip install --upgrade pip aiohttp aiofiles  # 可选项: aiodns

有关安装Python 3.7和设置虚拟环境的帮助,请查看Python 3安装和设置指南虚拟环境基础

ok,let's go!

异步IO鸟瞰图

相较于它久经考验的表亲(多进程和多线程)来讲,异步IO不太为人所知。本节将从高层全面地介绍异步IO是什么,以及哪些场景适合用它。

哪些场景适合异步IO?

并发和并行是个很是普遍的主题。由于本文重点介绍异步IO及其在Python中的实现,如今值得花一点时间将异步IO与其对应物进行比较,以了解异步IO如何适应更大、有时使人眼花缭乱的难题。

并行:同时执行多个操做。 多进程:是一种实现并行的方法,它须要将任务分散到计算机的中央处理单元(cpu或核心)上。多进程很是适合cpu密集的任务:密集for循环和密集数学计算一般属于这一类。 并发:并发是一个比并行更普遍的术语。 它代表多个任务可以以重叠方式运行。 (有一种说法是并发并不意味着并行。) 线程:是一种并发执行模型,多个线程轮流执行任务。 一个进程能够包含多个线程。 因为GIL(全局解释器锁)的存在,Python与线程有着复杂的关系,但这超出了本文的范围。

了解线程的重要之处是它更适合于io密集的任务。cpu密集型任务的特色是计算机核心从开始到结束都在不断地工做,而一个IO密集型任务更多的是等待IO的完成。

综上所述,并发既包括多进程(对于CPU密集任务来讲是理想的),也包括线程(对于IO密集型任务来讲是理想的)。多进程是并行的一种形式,并行是并发的一种特定类型(子集)。Python经过multiprocessing, threading, 和concurrent.futures标准库为这二者提供了长期支持。

如今是时候召集一名新成员了!在过去的几年里,一个独立的设计被更全面地嵌入到了CPython中:经过标准库的asyncio包和新的async/await语言关键字实现异步IO。须要说明的是,异步IO不是一个新发明的概念,它已经存在或正在构建到其余语言和运行时环境中,好比Golang、C#或者Scala。

Python文档将asyncio包称为用于编写并发代码的库。然而,异步IO既不是多线程也不是多进程,它不是创建在其中任何一个之上。事实上异步IO是一种单进程单线程设计:它使用协做式多任务操做方式,在本教程结束时你将理解这个术语。换句话说,尽管在单个进程中使用单个线程,但异步IO给人一种并发的感受。协程(异步IO的一个核心特性)能够并发地调度,但它们本质上不是并发的。

重申一下,异步输入输出是并发编程的一种风格,但不是并行的。与多进程相比,它与线程更紧密地结合在一块儿,但与这二者大相径庭,而且是并发技术包中的独立成员。

如今还留下了一个词没有解释。 异步是什么意思?这不是一个严格的定义,可是对于咱们这里的目的,我能够想到/考虑到两个属性:

  • 异步例程可以在等待其最终结果时“暂停”,并容许其余例程同时运行。
  • 经过上面的机制,异步代码便于并发执行。 换句话说,异步代码提供了并发的外观和感受

下面是一个一个将全部内容组合在一块儿的图表。 白色术语表明概念,绿色术语表明实现或实现它们的方式:

(Concurrencey并发、Threading线程、Async IO异步IO、Parallelism并行、Multiprocessing多进程)

我将在这里中止对并发编程模型的比较。本教程重点介绍异步IO的子组件,如何使用它、以及围绕它建立的API。要深刻研究线程、多处理和异步IO,请暂停这里并查看Jim Anderson对(Python中并发性的概述)[https://realpython.com/python-concurrency/]。Jim比我有趣得多,并且参加的会议也比我多。

译者注:要了解多种并发模型的比较,能够参考(《七周七并发模型》

异步IO释义

异步IO乍一看彷佛违反直觉,自相矛盾。如何使用一个线程和一个CPU内核来简化并发代码?我历来都不擅长编造例子,因此我想借用 Miguel Grinberg2017年PyCon演讲中的一个例子,这个例子很好地解释了一切:

国际象棋大师JuditPolgár举办了一个国际象棋比赛,在那里她扮演多个业余选手。 她有两种方式进行比赛:同步和异步。 假设:

  • 24个对手
  • Judit在5秒钟内完成一个棋子的移动
  • 每一个对手移动一个棋子须要55秒
  • 游戏平均30对移动(总计60次移动) 同步版本:Judit一次只玩一场游戏,从不一样时玩两场,直到游戏结束。每场比赛须要(55 + 5)* 30 == 1800秒,或30分钟。 整个比赛须要24 * 30 == 720分钟,或12小时。 异步版本:Judit从一张桌子走到另外一张桌子,每张桌子走一步。她离开了牌桌,让对手在等待的时间里采起下一步行动。在全部24场比赛中,一个动做须要Judit 24 * 5 == 120秒,即2分钟。整个比赛如今被缩减到120 * 30 == 3600秒,也就是1小时。

只有一个JuditPolgár,她只有两只手,一次只作一次动做。可是,异步进行将展览时间从12小时减小到1小时。所以,协同多任务处理是一种奇特的方式,能够说一个程序的事件循环(稍后会有更多)与多个任务通讯,让每一个任务在最佳时间轮流运行。

异步IO须要很长的等待时间,不然函数将被阻塞,并容许其余函数在停机期间运行。

异步IO使用起来不容易

我听人说过“当你可以的时候使用异步IO;必要时使用线程”。事实是,构建持久的多线程代码可能很难,而且容易出错。异步IO避免了一些线程设计可能遇到的潜在速度障碍。

但这并非说Python中的异步IO很容易。警告:当你稍微深刻其中时,异步编程也会很困难!Python的异步模型是围绕诸如回调,事件,传输,协议和future等概念构建的 -——术语可能使人生畏。事实上,它的API一直在不断变化,这使得它变得比较难。

幸运的是,asyncio已经相对成熟,其大部分功能再也不处于临时性状态,而其文档也有了大规模的改善,而且该主题的一些优质资源也开始出现。

asyncio 包和 async/await

如今你已经对异步输IO做为一种设计有了必定的了解,让咱们来探讨一下Python的实现。Python的asyncio包(在Python 3.4中引入)和它的两个关键字async和wait服务于不一样的目的,可是它们会一块儿帮助你声明、构建、执行和管理异步代码。

async/await 语法和原生协程

警告:当心你在网上读到的东西。Python的异步IO API已经从Python 3.4迅速发展到Python 3.7。一些旧的模式再也不被使用,一些最初不被容许的东西如今经过新的引入被容许。据我所知,本教程也将很快加入过期的行列。

异步IO的核心是协程。协程是Python生成器函数的一个专门版本。让咱们从一个基线定义开始,而后随着你在此处的进展,以此为基础进行构建:协程是一个函数,它能够在到达返回以前暂停执行,而且能够在一段时间内间接将控制权传递给另外一个协程。

稍后,你将更深刻地研究如何将传统生成器从新用于协程。目前,了解协程如何工做的最简单方法是开始编写一些协程代码。

让咱们采用沉浸式方法,编写一些异步输入输出代码。这个简短的程序是异步IO的Hello World,但它对展现其核心功能大有帮助:

#!/usr/bin/env python3
# countasync.py

import asyncio

async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")

async def main():
    await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
    import time
    s = time.perf_counter()
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

当你执行此文件时,请注意与仅用def和time.sleep()定义函数相比,看起来有什么不一样:

$ python3 countasync.py
One
One
One
Two
Two
Two
countasync.py executed in 1.01 seconds.

该输出的顺序是异步IO的核心。与count()的每一个调用通讯是一个事件循环或协调器。当每一个任务到达asyncio.sleep(1)时,函数会向事件循环发出呼叫,并将控制权交还给它,例如,“我将休眠1秒。在这段时间里,作一些有意义的事情吧”。

将此与同步版本进行对比::

#!/usr/bin/env python3
# countsync.py

import time

def count():
    print("One")
    time.sleep(1)
    print("Two")

def main():
    for _ in range(3):
        count()

if __name__ == "__main__":
    s = time.perf_counter()
    main()
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

执行时,顺序和执行时间会有轻微但严重的变化:

$ python3 countsync.py
One
Two
One
Two
One
Two
countsync.py executed in 3.01 seconds.

虽然使用time.sleep()asyncio.sleep()看起来很普通,可是它们能够替代任何涉及等待时间的时间密集型进程。(您能够等待的最普通的事情是一个sleep()调用,它基本上什么也不作。)也就是说,time.sleep()能够表示任何耗时的阻塞函数调用,而asyncio.sleep()用于代替非阻塞调用(但也须要一些时间来完成)。

你将在下一节中看到,等待某些东西(包括asyncio.sleep()的好处是,周围的函数能够暂时将控制权交给另外一个更容易当即执行某些操做的函数。相比之下,time.sleep()或任何其余阻塞调用与异步Python代码不兼容,由于它会在睡眠时间内中止全部工做。

异步IO规则

此时,异步、wait和它们建立的协程函数的更正式定义已经就绪。这一节有点密集,可是掌握async/await是颇有帮助的,因此若是须要的话,能够回到这里:

  • 语法async def引入了原生协程或异步生成器。async withasync for表达式也是有效的,稍后你将看到它们。
  • 关键词await将函数控制传递回事件循环(它暂停执行周围的协程)。若是Python在g()的范围内遇到await f()表达式,这就是await告诉事件循环,“暂停执行g()直到我等待的f()的结果 返回 。 与此同时,让其余东西运行。“

在代码中,第二个要点大体是这样的:

async def g():
    # 在这里暂停 ,f()执行完以后再返回到这里。
    return r

关于什么时候以及可否使用async / await,还有一套严格的规则。不管您是在学习语法仍是已经使用async / await,这些都很是方便:

  • 使用async def引入的函数是协程。它可使用waitreturnyield,但全部这些都是可选的。声明async def noop(): pass 是合法的:
    • 使用wait和/或return建立一个coroutine函数。要调用coroutine函数,你必须等待它获得结果。
    • 在异步def块中使用yield不太常见(而且最近才在Python中合法)。这将建立一个异步生成器,您可使用异步生成器进行迭代。 暂时忘掉异步生成器,重点关注使用await和/或return的协程函数的语法。
    • 任何使用async def定义的东西都不能使用yield from,这会引起SyntaxError(语法错误)。
  • 就像在def函数以外使用yield是一个SyntaxError同样,在async def协程以外使用wait也是一个SyntaxError

如下是一些简洁的示例,旨在总结以上几条规则:

async def f(x):
    y = await z(x)  # OK - `await` and `return` allowed in coroutines
    return y

async def g(x):
    yield x  # OK - this is an async generator

async def m(x):
    yield from gen(x)  # No - SyntaxError

def m(x):
    y = await z(x)  # Still no - SyntaxError (no `async def` here)
    return y

最后,当您使用await f()时,它要求f()是一个awaitable对象。嗯,这不是颇有帮助,是吗? 如今,只要知道一个等待对象是(1)另外一个协程或(2)定义返回一个迭代器.__ await __()dunder方法的对象。若是你正在编写一个程序,在大多数状况下,你只须要担忧第一种状况。

这又给咱们带来了一个你可能会看到的技术上的区别:将函数标记为coroutine的一个老方法是用@asyncio.coroutine来修饰一个普通的def函数。结果是基于生成器的协同程序。自从在Python 3.5中引入async/await语法以来,这种结构已通过时了。

这两个协程本质上是等价的(都是可 awaitable的),可是第一个协程是基于生成器的,而第二个协程是一个原生协程:

import asyncio

@asyncio.coroutine
def py34_coro():
    """Generator-based coroutine, older syntax"""
    yield from stuff()

async def py35_coro():
    """Native coroutine, modern syntax"""
    await stuff()

若是你本身编写任何代码,为了显式最好使用本机协程。基于生成器的协程将在Python 3.10中删除

在本教程的后半部分,咱们将仅出于解释的目的来讨论基于生成器的协同程序。引入async / await的缘由是使协同程序成为Python的独立功能,能够很容易地与正常的生成器函数区分开来,从而减小歧义。

不要陷入基于生成器的协程中,这些协同程序已随着async / await的出现而过期了。若是你坚持async/await语法,它们有本身的小规则集(例如,await不能在基于生成器的协同程序中使用),这些规则在很大程度上是不相关的。

废话很少说,让咱们来看几个更复杂的例子。

下面是异步IO如何减小等待时间的一个例子:给定一个协程makerandom(),它一直在[0,10]范围内产生随机整数,直到其中一个超过阈值,你想让这个协程的屡次调用不须要等待彼此连续完成。你能够在很大程度上遵循上面两个脚本的模式,只需稍做修改:

#!/usr/bin/env python3
# rand.py

import asyncio
import random

# ANSI colors
c = (
    "\033[0m",   # End of color
    "\033[36m",  # Cyan
    "\033[91m",  # Red
    "\033[35m",  # Magenta
)

async def makerandom(idx: int, threshold: int = 6) -> int:
    print(c[idx + 1] + f"Initiated makerandom({idx}).")
    i = random.randint(0, 10)
    while i <= threshold:
        print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
        await asyncio.sleep(idx + 1)
        i = random.randint(0, 10)
    print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
    return i

async def main():
    res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
    return res

if __name__ == "__main__":
    random.seed(444)
    r1, r2, r3 = asyncio.run(main())
    print()
    print(f"r1: {r1}, r2: {r2}, r3: {r3}")

彩色输出比我能说的多得多,并让你了解这个脚本是如何执行的:

该程序使用一个主协程makerandom(),并在3个不一样的输入上同时运行它。大多数程序将包含小型、模块化的协程和一个包装器函数,用于将每一个较小的协程连接在一块儿。而后,main()用中央协程映射到某个可迭代的池中收集任务(future)。

在这个小例子中,池是range(3)。在稍后介绍的更全面的示例中,它是一组须要同时请求,解析和处理的URL,main()封装了每一个URL的整个例程。

虽然“制做随机整数”(CPU密集比这更复杂)可能不是做为asyncio候选者的最佳选择,可是在示例中存在asyncio.sleep(),旨在模仿不肯定等待时间的IO密集进程 。例如,asyncio.sleep()调用可能表示在消息应用程序中的两个客户端之间发送和接收不那么随机的整数。

异步IO设计模式

Async IO附带了它本身的一组脚本设计,您将在本节中介绍这些脚本设计。

链式协程

协程的一个关键特性是它们能够连接在一块儿(记住,一个协成对象是awaitable的,因此另一个协成能够await它)。这容许你将程序分红更小的、可管理的、可回收的协同程序:

#!/usr/bin/env python3
# chained.py

import asyncio
import random
import time

async def part1(n: int) -> str:
    i = random.randint(0, 10)
    print(f"part1({n}) sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-1"
    print(f"Returning part1({n}) == {result}.")
    return result

async def part2(n: int, arg: str) -> str:
    i = random.randint(0, 10)
    print(f"part2{n, arg} sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-2 derived from {arg}"
    print(f"Returning part2{n, arg} == {result}.")
    return result

async def chain(n: int) -> None:
    start = time.perf_counter()
    p1 = await part1(n)
    p2 = await part2(n, p1)
    end = time.perf_counter() - start
    print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")

async def main(*args):
    await asyncio.gather(*(chain(n) for n in args))

if __name__ == "__main__":
    import sys
    random.seed(444)
    args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
    start = time.perf_counter()
    asyncio.run(main(*args))
    end = time.perf_counter() - start
    print(f"Program finished in {end:0.2f} seconds.")

请仔细注意输出,其中part1()睡眠时间可变,part2()在结果可用时开始处理结果:

$ python3 chained.py 9 6 3
part1(9) sleeping for 4 seconds.
part1(6) sleeping for 4 seconds.
part1(3) sleeping for 0 seconds.
Returning part1(3) == result3-1.
part2(3, 'result3-1') sleeping for 4 seconds.
Returning part1(9) == result9-1.
part2(9, 'result9-1') sleeping for 7 seconds.
Returning part1(6) == result6-1.
part2(6, 'result6-1') sleeping for 4 seconds.
Returning part2(3, 'result3-1') == result3-2 derived from result3-1.
-->Chained result3 => result3-2 derived from result3-1 (took 4.00 seconds).
Returning part2(6, 'result6-1') == result6-2 derived from result6-1.
-->Chained result6 => result6-2 derived from result6-1 (took 8.01 seconds).
Returning part2(9, 'result9-1') == result9-2 derived from result9-1.
-->Chained result9 => result9-2 derived from result9-1 (took 11.01 seconds).
Program finished in 11.01 seconds.

在此设置中,main()的运行时间将等于它收集和调度的任务的最大运行时间。

使用队列

asyncio包提供了与queue模块的类相似的queue classes类。在咱们到目前为止的示例中,咱们并不真正须要队列结构。在 chained.py中,每一个任务(future)都由一组协同程序组成,这些协同程序显式地相互等待,并在每一个链上传递一个输入。

还有一种替代结构也能够用于异步IO:许多生产者,彼此没有关联,将项目添加到队列中。每一个生产者能够在交错、随机、未宣布的时间向队列添加多个项。当商品出现时,一组消费者贪婪地从队列中取出商品,不等待任何其余信号。

在这种设计中,没有任何个体消费者与生产者的连接。消费者事先不知道生产者的数量,甚至不知道将添加到队列中的累计项目数。

单个生产者或消费者分别从队列中放置和提取项所需的时间是可变的。队列充当一个吞吐量,它能够与生产者和消费者通讯,而不须要它们彼此直接通讯。

注意:虽然队列一般用于线程程序,由于queue.Queue()的线程安全性。在涉及异步IO时,您不须要关心线程安全性(例外状况是当你将二者结合时,但在本教程中没有这样作。)【译者注:这里的二者结合说的是异步IO和多线程结合】。队列的一个用例(如这里的例子)是队列充当生产者和消费者的发送器,不然它们不会直接连接或关联在一块儿。

这个程序的同步版本看起来至关糟糕:一组阻塞生成器按顺序将项添加到队列中,一次一个生产者。只有在全部生产者完成以后,队列才能够由一个消费者一次处理一个项一个项地处理。这种设计有大量的延迟。物品可能会闲置在队列中,而不是当即拿起并处理。

下面是异步版本asyncq.py。 这个工做流程的挑战在于须要向消费者发出生产完成的信号。不然,await q.get()将无限期挂起,由于队列已经被彻底处理,可是消费者并不知道生产已经完成。

(很是感谢StackOverflow用户帮助理顺main():关键是await q.join(),它将一直阻塞到队列中的全部项都被接收和处理,而后取消消费者任务,不然这些任务会挂起并没有休止地等待其余队列项出现)

下面是完整的脚本:

#!/usr/bin/env python3
# asyncq.py

import asyncio
import itertools as it
import os
import random
import time

async def makeitem(size: int = 5) -> str:
    return os.urandom(size).hex()

async def randsleep(a: int = 1, b: int = 5, caller=None) -> None:
    i = random.randint(0, 10)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)

async def produce(name: int, q: asyncio.Queue) -> None:
    n = random.randint(0, 10)
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")

async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now-t:0.5f} seconds.")
        q.task_done()

async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    for c in consumers:
        c.cancel()

if __name__ == "__main__":
    import argparse
    random.seed(444)
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--nprod", type=int, default=5)
    parser.add_argument("-c", "--ncon", type=int, default=10)
    ns = parser.parse_args()
    start = time.perf_counter()
    asyncio.run(main(**ns.__dict__))
    elapsed = time.perf_counter() - start
    print(f"Program completed in {elapsed:0.5f} seconds.")

前几个协同程序是辅助函数,它返回随机字符串,小数秒性能计数器和随机整数。生产者将1到5个项目放入队列中。 每一个项目是(i,t)的元组,其中i是随机字符串,t是生产者尝试将元组放入队列的时间。

当消费者将项目拉出时,它只使用项目所在的时间戳计算项目在队列中所用的时间。

请记住,asyncio.sleep()是用来模拟其余一些更复杂的协同程序的,若是它是一个常规的阻塞函数,会消耗时间并阻塞全部其余的执行。 .

下面是一个有两个生产者和五个消费者的测试:

$ python3 asyncq.py -p 2 -c 5
Producer 0 sleeping for 3 seconds.
Producer 1 sleeping for 3 seconds.
Consumer 0 sleeping for 4 seconds.
Consumer 1 sleeping for 3 seconds.
Consumer 2 sleeping for 3 seconds.
Consumer 3 sleeping for 5 seconds.
Consumer 4 sleeping for 4 seconds.
Producer 0 added <377b1e8f82> to queue.
Producer 0 sleeping for 5 seconds.
Producer 1 added <413b8802f8> to queue.
Consumer 1 got element <377b1e8f82> in 0.00013 seconds.
Consumer 1 sleeping for 3 seconds.
Consumer 2 got element <413b8802f8> in 0.00009 seconds.
Consumer 2 sleeping for 4 seconds.
Producer 0 added <06c055b3ab> to queue.
Producer 0 sleeping for 1 seconds.
Consumer 0 got element <06c055b3ab> in 0.00021 seconds.
Consumer 0 sleeping for 4 seconds.
Producer 0 added <17a8613276> to queue.
Consumer 4 got element <17a8613276> in 0.00022 seconds.
Consumer 4 sleeping for 5 seconds.
Program completed in 9.00954 seconds.

在这种状况下,项目在几分之一秒内处理。 延迟可能有两个缘由:

  • 标准的,在很大程度上不可避免的开销
  • 当一个项出如今队列中时,全部消费者都在睡觉的状况

关于第二个缘由,幸运的是,扩展到成百上千的消费者是彻底正常的。用python3 asyncq.py -p 5 - c100应该没有问题。这里的要点是,理论上,您可让不一样系统上的不一样用户控制生产者和消费者的管理,队列充当中央吞吐量。

到目前为止,您已经跳进了火坑。了解了三个asyncio调用async和await定义的协程并等待的示例。若是你没有彻底关注或者只是想深刻了解Python中现代协同程序的机制,下一节咱们将开始讨论这个。

生成器中异步IO的Roots

以前,您看到了一个基于生成器的旧式协同程序的例子,它已经被更显式的原生协同程序所淘汰。这个例子值得从新展现一下:

import asyncio

@asyncio.coroutine
def py34_coro():
    """Generator-based coroutine"""
    # No need to build these yourself, but be aware of what they are
    s = yield from stuff()
    return s

async def py35_coro():
    """Native coroutine, modern syntax"""
    s = await stuff()
    return s

async def stuff():
    return 0x10, 0x20, 0x30

做一个实验,若是py34_coro()或py35_coro()调用自身,而不await或不调用asyncio.run()或其余asyncio函数,会发生什么?独调用一个协同程序会返回一个协同程序对象:

>>> py35_coro()
<coroutine object py35_coro at 0x10126dcc8>

这表面上并非颇有趣。 调用协同程序的结果是一个awaitable的协程对象。

测验时间:Python的其余什么功能跟这同样?(Python的哪些特性在单独调用时实际上没有多大做用?)

但愿你将生成器做为这个问题的答案,由于协同程序是加强型生成器。 在这方面的行为相似:

>>> def gen():
...     yield 0x10, 0x20, 0x30
...
>>> g = gen()
>>> g  # Nothing much happens - need to iterate with `.__next__()`
<generator object gen at 0x1012705e8>
>>> next(g)
(16, 32, 48)

正如它所发生的那样,生成器函数是异步IO的基础(不管是否使用async def声明协程而不是旧的@asyncio.coroutine包装器)。从技术上讲,await更接近于yield from而非yield。(但请记住,yield from x()只是替换for i in x():yield i的语法糖)

生成器与异步IO相关的一个关键特性是能够有效地随意中止和从新启动生成器。例如,你能够在生成器对象上进行迭代,而后在剩余的值上继续迭代。当一个生成器函数达到yield时,它会产生该值,但随后它会处于空闲状态,直到它被告知产生其后续值。

这能够经过一个例子来充实:

>>> from itertools import cycle
>>> def endless():
...     """Yields 9, 8, 7, 6, 9, 8, 7, 6, ... forever"""
...     yield from cycle((9, 8, 7, 6))

>>> e = endless()
>>> total = 0
>>> for i in e:
...     if total < 30:
...         print(i, end=" ")
...         total += i
...     else:
...         print()
...         # Pause execution. We can resume later.
...         break
9 8 7 6 9 8 7 6 9 8 7 6 9 8

>>> # Resume
>>> next(e), next(e), next(e)
(6, 9, 8)

await关键字的行为相似,标记了一个断点,协程挂起本身并容许其余协程工做。在这种状况下,“挂起”是指暂时放弃控制但未彻底退出或结束协程。请记住,yield,以及由此产生的yield fromawait是发生器执行过程当中的一个断点。

这是函数和生成器之间的根本区别。一个函数要么全有要么全无。一旦它开始,它就不会中止,直到它到达一个return,而后将该值推给调用者(调用它的函数)。另外一方面,生成器每次达到yield时都会暂停,再也不继续。它不只能够将这个值推入调用堆栈,并且当您经过对它调用next()恢复它时,它还能够保留它的局部变量。

生成器的第二个特征虽然不为人知,却也也很重要。也能够经过其.send()方法将值发送到生成器。这容许生成器(和协同程序)相互调用(await)而不会阻塞。我不会再深刻了解这个功能的细节,由于它主要是为了在幕后实现协同程序,但你不该该真的须要本身直接使用它。

若是你有兴趣了解更多内容,能够从PEP 342/正式引入协同程序开始。 Brett Cannon的Python中异步等待(Async-Await)是如何工做的也是一个很好的读物,asyncio上的PYMOTW文章也是如此。还有David Beazley的[关于协程和并发的有趣课程] 深刻探讨了协同程序运行的机制。

让咱们尝试将上述全部文章压缩成几句话:

这些协同程序其实是经过一种很是规的机制运行的。它们的结果是在调用其.send()方法时抛出异常对象的属性。全部这些都有一些不可靠的细节,可是它可能不会帮助您在实践中使用这部分语言,因此如今让咱们继续。

为了联系在一块儿,如下是关于协同做为生成器这个主题的一些关键点:

  • 协同程序是利用生成器方法的特性的再利用生成器
  • 旧式基于生成器的协同程序使用yield from来等待协程结果。原生协同程序中的现代Python语法只是将yield from等价替换为await做为等待协程结果的方法。await相似于yield,这样想一般是有帮助的。
  • await的使用是标志着断点的信号。它容许协程暂时暂停执行并容许程序稍后返回它。

其余特色: async for and Async Generators + Comprehensions

与纯async/await一块儿,Python还容许经过async for异步迭代异步迭代器。异步迭代器的目的是让它可以在迭代时在每一个阶段调用异步代码。

这个概念的天然延伸是异步发生器。回想一下,你能够在原生协程中使用await,return或yield。在Python 3.6中可使用协程中的yield(经过PEP 525),它引入了异步生成器,目的是容许await和yield在同一个协程函数体中使用:

>>> async def mygen(u: int = 10):
...     """Yield powers of 2."""
...     i = 0
...     while i < u:
...         yield 2 ** i
...         i += 1
...         await asyncio.sleep(0.1)

最后但一样重要的是,Python经过async for来实现异步理解。就像它的同步表兄弟同样,这主要是语法糖:

>>> async def main():
...     # This does *not* introduce concurrent execution
...     # It is meant to show syntax only
...     g = [i async for i in mygen()]
...     f = [j async for j in mygen() if not (j // 3 % 5)]
...     return g, f
...
>>> g, f = asyncio.run(main())
>>> g
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
>>> f
[1, 2, 16, 32, 256, 512]

这是一个关键的区别:异步生成器和理解都不会使迭代并发。它们所作的就是提供同步对等程序的外观和感受,可是有能力让循环放弃对事件循环的控制,让其余协同程序运行。

换句话说,异步迭代器和异步生成器不是为了在序列或迭代器上同时映射某些函数而设计的。它们仅仅是为了让封闭的协程容许其余任务轮流使用。async for和async with语句仅在使用纯for或with会“破坏”协程中await的性质的状况下才须要。异步性和并发之间的区别是一个须要掌握的关键因素。

事件循环和asyncio.run()

您能够将事件循环视为一段时间的while True循环,它监视协同程序,获取有关闲置内容的反馈,并查找可在此期间执行的内容。当协同程序等待的任何内容变得可用时,它可以唤醒空闲协程。

到目前为止,事件循环的整个管理已由一个函数调用隐式处理:

asyncio.run(main())  # Python 3.7+

Python 3.7中引入的asyncio.run()负责获取事件循环,运行任务直到它们被标记为完成,而后关闭事件循环。

使用get_event_loop()管理asyncio事件循环有一种更加冗长的方式。典型的模式以下所示:

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

你可能会在较旧的示例中看到loop.get_event_loop(),但除非你须要对事件循环管理控制进行特别的微调,不然asyncio.run()应该足以知足大多数程序的须要。

若是确实须要在Python程序中与事件循环交互,loop是一个老式的Python对象,它支持使用loop.is_running()和loop.is_closed()进行内省/introspection 。若是须要得到更精细的控制,能够对其进行操做,例如经过将循环做为参数传递来调度回调

更重要的是要深刻了解事件循环的机制。关于事件循环,这里有几点值得强调。

1:协同程序在与事件循环绑定以前不会自行作不少事情。

你以前在生成器的解释中看到了这一点,但值得重申。若是您有一个主协程在等待其余协程,那么单独调用它几乎没有什么效果:

>>> import asyncio

>>> async def main():
...     print("Hello ...")
...     await asyncio.sleep(1)
...     print("World!")

>>> routine = main()
>>> routine
<coroutine object main at 0x1027a6150>

请记住使用asyncio.run()经过调度main()协程(未来的对象)来实际强制执行,以便在事件循环上执行:

>>> asyncio.run(routine)
Hello ...
World!

(其余协同程序能够经过await执行。一般在asyncio.run()中封装main(),而后从那里调用带有await的链式协程。)

2:默认状况下,异步IO事件循环在单个线程和单个CPU内核上运行。一般,在一个CPU内核中运行一个单线程事件循环是绰绰有余的。还能够跨多个核心运行事件循环。请查看John Reese谈话获取更多内容,顺便提个醒,你的笔记本电脑可能会自发燃烧。

3:事件循环是可插入的。也就是说,若是你真的须要,你能够编写本身的事件循环实现,并让它以相同的方式运行任务。这在uvloop包中获得了很好的演示,这是Cython中事件循环的一个实现。

这就是"可插入事件循环"这个术语的含义:你可使用事件循环的任何工做实现,与协同程序自己的结构无关。asyncio包自己附带两个不一样的事件循环实现,默认状况下基于选择器模块。(第二个实现仅适用于Windows。)

一个完整的程序:异步请求

你已经走了这么远,如今是时候享受快乐和无痛的部分了。在本节中,您将使用aiohttp(一种速度极快的异步http 客户端/服务端 框架)构建一个抓取网页的网址收集器areq.py。(咱们只须要客户端部分。)这种工具能够用来映射一组站点之间的链接,这些连接造成一个有向图

:您可能想知道为何Python的requests包与异步IO不兼容。requests构建在urllib3之上,而urllib3又使用Python的http和socket模块。默认状况下,socket操做是阻塞的。这意味着Python不会想await requests.get(url)这样,由于.get()不是awaitable的。相比之下,aiohttp中几乎全部东西都是一个awaitable的协程,好比,session.request() response.text(). 它是一个很棒的库,可是在异步代码中使用requests是有害的。

高层程序结构以下:

  1. 从本地文件url .txt中读取url序列。
  2. 发送对URL的GET请求并解码生成的内容。 若是这失败了,在那里停下来找一个网址。
  3. 在响应的HTML中搜索href标记内的URL
  4. 将结果写入foundurls.txt。
  5. 尽量异步和并发地执行上述全部操做。(对请求使用aiohttp,对文件附件使用aiofiles。这是IO的两个主要示例,很是适合异步IO模型。)

下是urls.txt的内容。 它并不庞大,而且主要包含高流量的网站:

$ cat urls.txt
https://regex101.com/
https://docs.python.org/3/this-url-will-404.html
https://www.nytimes.com/guides/
https://www.mediamatters.org/
https://1.1.1.1/
https://www.politico.com/tipsheets/morning-money
https://www.bloomberg.com/markets/economics
https://www.ietf.org/rfc/rfc2616.txt

列表中的第二个网址应该返回一个404响应,你须要优雅地处理这个响应。若是你正在运行此程序的扩展版本,你可能须要处理比这更多的问题,例如服务器断开链接和无限重定向。

求自己应该使用单个会话进行,以充分利用会话的内部链接池。

让咱们来看看完整的程序。以后,咱们将一步一步地介绍这些内容:

#!/usr/bin/env python3
# areq.py

"""Asynchronously get links embedded in multiple pages' HMTL."""

import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse

import aiofiles
import aiohttp
from aiohttp import ClientSession

logging.basicConfig(
    format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True

HREF_RE = re.compile(r'href="(.*?)"')

async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
    """GET request wrapper to fetch page HTML.

    kwargs are passed to `session.request()`.
    """

    resp = await session.request(method="GET", url=url, **kwargs)
    resp.raise_for_status()
    logger.info("Got response [%s] for URL: %s", resp.status, url)
    html = await resp.text()
    return html

async def parse(url: str, session: ClientSession, **kwargs) -> set:
    """Find HREFs in the HTML of `url`."""
    found = set()
    try:
        html = await fetch_html(url=url, session=session, **kwargs)
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logger.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
        return found
    except Exception as e:
        logger.exception(
            "Non-aiohttp exception occured:  %s", getattr(e, "__dict__", {})
        )
        return found
    else:
        for link in HREF_RE.findall(html):
            try:
                abslink = urllib.parse.urljoin(url, link)
            except (urllib.error.URLError, ValueError):
                logger.exception("Error parsing URL: %s", link)
                pass
            else:
                found.add(abslink)
        logger.info("Found %d links for %s", len(found), url)
        return found

async def write_one(file: IO, url: str, **kwargs) -> None:
    """Write the found HREFs from `url` to `file`."""
    res = await parse(url=url, **kwargs)
    if not res:
        return None
    async with aiofiles.open(file, "a") as f:
        for p in res:
            await f.write(f"{url}\t{p}\n")
        logger.info("Wrote results for source URL: %s", url)

async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
    """Crawl & write concurrently to `file` for multiple `urls`."""
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                write_one(file=file, url=url, session=session, **kwargs)
            )
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    outpath = here.joinpath("foundurls.txt")
    with open(outpath, "w") as outfile:
        outfile.write("source_url\tparsed_url\n")

    asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))

这个脚本比咱们最初的玩具程序要长,因此让咱们把它分解一下。

常量HREF RE是一个正则表达式,用于提取咱们最终要搜索的HTML中的HREF标记:

>>> HREF_RE.search('Go to <a href="https://realpython.com/">Real Python</a>')
<re.Match object; span=(15, 45), match='href="https://realpython.com/"'>

协程 fetch html()是一个GET请求的包装器,用于发出请求并解码结果页面html。它发出请求,等待响应,并在非200状态的状况下当即提出:

resp = await session.request(method="GET", url=url, **kwargs)
resp.raise_for_status()

若是状态正常,则fetch_html()返回页面HTML(str)。值得注意的是,这个函数中没有执行异常处理。逻辑是将该异常传播给调用者并让它在那里处理:

html = await resp.text()

咱们等待session.request()resp.text(),由于它们是awaitable的协程。不然,请求/响应周期将是应用程序的长尾、占用时间的部分,可是对于异步输入输出,fetch_html()容许事件循环处理其余可用的做业,例如解析和写入已经获取的URLs。

协程链中的下一个是parse(),它等待fetch html()获取给定的URL,而后从该页面的s html中提取全部的href标记,确保每一个标记都是有效的,并将其格式化为绝对路径。

诚然,parse()的第二部分是阻塞的,但它包括快速正则表达式匹配,并确保发现的连接成为绝对路径。

在这种特殊状况下,这个同步代码应该是快速和不明显的。可是请记住,在给定的协程内的任何一行都会阻塞其余协程,除非该行使用yield、await或return。若是解析是一个更密集的过程,您可能须要考虑使用executor()中的loop.run_in_executor()在本身的进程中运行这部分。

接下来,协程 write()接受一个文件对象和一个URL,并等待parse()返回一组已解析的URL,经过使用aiofiles(一个用于异步文件IO的包)将每一个URL及其源URL异步地写入文件。

最后,bulk_crawl_and_write()做为脚本的协程链的主要入口点。 它使用单个会话,并为最终从urls.txt读取的每一个URL建立任务。

这里还有几点值得一提:

  • 默认的客户机会话有一个最多有100个打开链接的适配器。要更改这一点,请将asyncio.connector.TCPConnector的实例传递给ClientSession。您也能够按主机指定限制。
  • 能够为整个会话和单个请求指定最大超时
  • 此脚本还使用async with,它与异步上下文管理器一块儿使用。 我没有专门讨论这个概念,由于从同步到异步上下文管理器的转换至关简单。后者必须定义.__ aenter __().__ aexit __()而不是.__ exit __().__enter__()。正如您所料,async with只能在使用async def声明的协程函数中使用。

若是您想进一步了解,GitHub上本教程附带的文件有详细的注释。

下面是执行的所有荣耀,由于areq.py能够在一秒钟内获取、解析和保存9个url的结果:

$ python3 areq.py
21:33:22 DEBUG:asyncio: Using selector: KqueueSelector
21:33:22 INFO:areq: Got response [200] for URL: https://www.mediamatters.org/
21:33:22 INFO:areq: Found 115 links for https://www.mediamatters.org/
21:33:22 INFO:areq: Got response [200] for URL: https://www.nytimes.com/guides/
21:33:22 INFO:areq: Got response [200] for URL: https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Got response [200] for URL: https://www.ietf.org/rfc/rfc2616.txt
21:33:22 ERROR:areq: aiohttp exception for https://docs.python.org/3/this-url-will-404.html [404]: Not Found
21:33:22 INFO:areq: Found 120 links for https://www.nytimes.com/guides/
21:33:22 INFO:areq: Found 143 links for https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Wrote results for source URL: https://www.mediamatters.org/
21:33:22 INFO:areq: Found 0 links for https://www.ietf.org/rfc/rfc2616.txt
21:33:22 INFO:areq: Got response [200] for URL: https://1.1.1.1/
21:33:22 INFO:areq: Wrote results for source URL: https://www.nytimes.com/guides/
21:33:22 INFO:areq: Wrote results for source URL: https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Got response [200] for URL: https://www.bloomberg.com/markets/economics
21:33:22 INFO:areq: Found 3 links for https://www.bloomberg.com/markets/economics
21:33:22 INFO:areq: Wrote results for source URL: https://www.bloomberg.com/markets/economics
21:33:23 INFO:areq: Found 36 links for https://1.1.1.1/
21:33:23 INFO:areq: Got response [200] for URL: https://regex101.com/
21:33:23 INFO:areq: Found 23 links for https://regex101.com/
21:33:23 INFO:areq: Wrote results for source URL: https://regex101.com/
21:33:23 INFO:areq: Wrote results for source URL: https://1.1.1.1/

还不算太寒酸! 做为完整性检查,你能够检查输出的行数。 在我作这个实验的时候,它是626,但请记住,这可能会发生变更:

$ wc -l foundurls.txt
     626 foundurls.txt

$ head -n 3 foundurls.txt
source_url  parsed_url
https://www.bloomberg.com/markets/economics https://www.bloomberg.com/feedback
https://www.bloomberg.com/markets/economics https://www.bloomberg.com/notices/tos

下一步:若是你想增长难度,可让这个网络爬虫进行递归。您可使用aio-redis跟踪树中已爬网的URL,以免请求它们两次,并使用Python的networkx库进行连接。 记住要友好一点。将1000个并发请求发送到一个小的、毫无防备的网站是很是糟糕的。有一些方法能够限制您在一个批处理中进行的并发请求数,例如使用asyncio的sempahore对象或使用相似这样的模式。

上下文中的异步IO

既然您已经看到了至关多的代码,让咱们回过头来考虑一下何时异步IO是一个理想的选择,以及如何进行比较来得出这个结论,或者选择其余不一样的并发模型。

什么时候以及为什么异步IO是正确的选择?

本教程不适用于异步IO与线程、多处理的扩展论述。然而,了解异步IO什么时候多是三者中最好的候选是颇有用的。

关于异步IO与多处理之间的斗争实际上根本不是一场战争。事实上,它们能够一块儿使用。若是你有多个至关统一的CPU密集型任务(一个很好的例子是scikit-learn或keras等库中的网格搜索),多进程应该是一个明显的选择。

若是全部函数都使用阻塞调用,那么将async放在每一个函数以前不是一个好主意。(这实际上会下降你的代码速度。)是正如前面提到的,异步IO和多处理能够在一些地方和谐共存

线程的伸缩性也比异步IO要差,由于线程是具备有限可用性的系统资源.在许多机器上建立数千个线程都会失败,我不建议您首先尝试它。建立数千个异步IO任务是彻底可行的。

当您有多个IO绑定任务时,异步IO会闪烁,不然任务将经过阻止IO密集等待时间来控制,例如:

  • 网络IO,不管您的程序是服务器端仍是客户端
  • 无服务器设计,例如点对点,多用户网络,如组聊天室
  • 读/写操做,在这种操做中,您想要模仿“发射后无论”的风格,但没必要担忧锁定正在读写的内容

不使用await的最大缘由是await只支持定义特定方法集的特定对象集。若是要对某个DBMS执行异步读取操做,则不只须要查找该DBMS的Python包,这个包还必须支持python的async / await语法。包含同步调用的协程会阻止其余协程和任务运行。关使用async / await的库的列表,请参阅本教程末尾的列表

Async IO It Is, but Which One?

本教程重点介绍异步IO,async / await语法,以及使用asyncio进行事件循环管理和指定任务。

asyncio固然不是惟一的异步IO库。 Nathaniel J. Smith的观察说了不少:

[在]几年后,asyncio可能会发现本身沦落为精明的开发人员避免使用的stdlib库之一,好比urllib2。……实际上,我所说的是,asyncio是其自身成功的牺牲品:在设计时,它采用了可能的最好方法; 但从那之后,受asyncio启发的工做 - 好比async / await的加入 - 已经改变了局面,让咱们能够作得更好,如今asyncio受到其早期承诺的束缚。via:(来源)【https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/】

尽管使用不一样的api和方法,大名鼎鼎的curiotrio能作asyncio作的事情。就我的而言,我认为若是你正在构建一个中等规模,简单的程序,只需使用asyncio就足够了,并且易于理解,能够避免在Python的标准库以外添加另外一个大的依赖项。

但不管如何,看看curio和trio,你可能会发现他们用一种更直观的方式完成了一样的事情。此处介绍的许多与包不相关的概念也应该渗透到备用异步IO包中。

其余零碎

在接下来的几节中,您将看到asyncio和async/wait的一些杂项部分,这部分到目前为止尚未彻底融入教程,可是对于构建和理解一个完整的程序仍然很重要。

其余顶级asyncio 函数

除了asyncio.run()以外,您还看到了一些其余的包级函数,如asyncio.create_task()和asyncio.gather()

您可使用create task()来调度协调程序对象的执行,后面跟着asyncio.run()

>>> import asyncio

>>> async def coro(seq) -> list:
...     """'IO' wait time is proportional to the max element."""
...     await asyncio.sleep(max(seq))
...     return list(reversed(seq))
...
>>> async def main():
...     # This is a bit redundant in the case of one task
...     # We could use `await coro([3, 2, 1])` on its own
...     t = asyncio.create_task(coro([3, 2, 1]))  # Python 3.7+
...     await t
...     print(f't: type {type(t)}')
...     print(f't done: {t.done()}')
...
>>> t = asyncio.run(main())
t: type <class '_asyncio.Task'>
t done: True

这种模式有一个微妙之处:若是你没有在main()中await t,它可能在main()自己发出信号代表它已完成以前完成。由于在没有await t 的状况下asynio.run(main())调用loop.run_until_complete(main()),事件循环只关心main()是否完成了,而不是main()中建立的任务是否已经完成。没有await t,循环的其余事件可能在它们完成以前会被取消。若是须要获取当前待处理任务的列表,可使用asyncio.Task.all_tasks()。

注意:asyncio.create_task()是在Python 3.7中引入的。在Python 3.6或更低版本中,使用asyncio.ensure_future()代替create_task()。

另外,还有asyncio.gather()。虽然它没有作任何很是特殊的事情,可是gather()的目的是将一组协程(future)整齐地放到一个单一的future。所以,它返回一个单独的future对象,若是await asyncio.gather()并指定多个任务或协同程序,则表示您正在等待这些对象所有完成。(这与前面示例中的queue.join()有些类似。)gather()的结果将是跨输入的结果列表:

>>> import time
>>> async def main():
...     t = asyncio.create_task(coro([3, 2, 1]))
...     t2 = asyncio.create_task(coro([10, 5, 0]))  # Python 3.7+
...     print('Start:', time.strftime('%X'))
...     a = await asyncio.gather(t, t2)
...     print('End:', time.strftime('%X'))  # Should be 10 seconds
...     print(f'Both tasks done: {all((t.done(), t2.done()))}')
...     return a
...
>>> a = asyncio.run(main())
Start: 16:20:11
End: 16:20:21
Both tasks done: True
>>> a
[[1, 2, 3], [0, 5, 10]]

你可能已经注意到gather()等待您传递它的Futures或协程的整个结果集。或者,您能够按完成顺序循环遍历asyncio.as_completed()以完成任务。该函数返回一个迭代器,在完成任务时生成任务。下面coro([3,2,1])的结果将在coro([10,5,0])完成以前可用,而gather()的状况并不是如此:

>>> async def main():
...     t = asyncio.create_task(coro([3, 2, 1]))
...     t2 = asyncio.create_task(coro([10, 5, 0]))
...     print('Start:', time.strftime('%X'))
...     for res in asyncio.as_completed((t, t2)):
...         compl = await res
...         print(f'res: {compl} completed at {time.strftime("%X")}')
...     print('End:', time.strftime('%X'))
...     print(f'Both tasks done: {all((t.done(), t2.done()))}')
...
>>> a = asyncio.run(main())
Start: 09:49:07
res: [1, 2, 3] completed at 09:49:10
res: [0, 5, 10] completed at 09:49:17
End: 09:49:17
Both tasks done: True

最后,你可能还能够看到asyncio.ensure_future()。你应该不多须要它,由于它是一个较低级别的管道API,而且很大程度上被后来引入的create_task()取代。

await的优先级

虽然它们的行为有些类似,但await关键字的优先级明显高于yield。这意味着,因为它的绑定更紧密,在不少状况下,您须要在yield from语句中使用括号,而在相似的await语句中则不须要。有关更多信息,请参见PEP 492中的await表达式示例

总结

你如今已经准备好使用async / await和它构建的库了。 如下是你已经学到的的内容概述:

  • 异步IO做为一种与语言无关的模型,经过让协程彼此间进行间接通讯来实现并发
  • Python中用于标记和定义协程的新关键字async、await的一些细节。
  • 提供用于运行和管理协程的API的Python包asyncio

附加资源

Python版本细节

Python中的异步IO发展迅速,很难跟踪何时发生了什么。下面列出了与asyncio相关的Python小版本更改和介绍:

  • 3.3: yield from表达式容许生成器委派
  • 3.4:asyncio以临时API状态引入Python标准库
  • 3.5:async和await成为Python语法的一部分,用于表示和等待协程。它们尚未成为保留关键字(您仍然能够定义名为async和await的函数或变量)。
  • 3.6:引入异步生成器和异步理解/链、推导。asyncio的API被声明为稳定的,而不是临时的。
  • 3.7:async和await成为保留关键字(它们不能用做标识符。)。它们用于替换asyncio.coroutine()装饰器。asyncio.run()被引入asyncio包,其中包括许多其余功能

若是您想要安全(而且可以使用asyncio.run()),请使用Python 3.7或更高版原本获取完整的功能集。

相关文章

如下是其余资源的精选列表:

Python文档的 What’s New 部分更详细地解释了语言变化背后的动机:

来自David Beazley的:

YouTube 视频:

相关PEPs

PEP 建立时间
PEP 342 – 经过加强型生成器的协程 2005-05
PEP 380 – 委托给子生成器的语法 2009-02
PEP 3153 – 异步IO支持 2011-05
PEP 3156 – 异步IO支持从新启动:“asyncio”模块 2012-12
PEP 492 – async和await语法的协程 2015-04
PEP 525 – 异步生成器 2016-07
PEP 530 – Asynchronous Comprehensions 2016-09

使用async/await的库

来自 aio-libs:

  • aiohttp: 异步HTTP客户端/服务器框架
  • aioredis: 异步IO Redis支持
  • aiopg: 异步IO PostgreSQL 支持
  • aiomcache: 异步IO memcached 客户端
  • aiokafka: 异步IO Kafka 客户端
  • aiozmq: 异步IO ZeroMQ 支持
  • aiojobs:用于管理后台任务的做业调度程序
  • async_lru: 用于异步IO的简单LRU缓存

来自 magicstack:

  • uvloop:超快的异步IO事件循环
  • asyncpg: (也很是快)异步IO PostgreSQL支持

来自其余:

  • trio: 更友好的“asyncio”,旨在展现一个更加简单的设计
  • aiofiles: 异步 文件 IO
  • asks: 异步类requests的http 库
  • asyncio-redis: 异步IO Redis 支持
  • aioprocessing: 将multiprocessing模块与asyncio集成在一块儿
  • umongo: 异步IO MongoDB 客户端
  • unsync: Unsynchronize asyncio
  • aiostream:相似'itertools',但异步

个人感想:

其实读完这篇文章,我相信有不少人仍旧会有困惑——异步IO底层究竟是怎么实现的?早些时候我也很困惑,要说多线程多进程咱们很好理解,由于咱们知道经常使用的现代计算机是根据时间片分时运行程序的。到了异步IO或者协程这里居然会出现一段没有CPU参与的时间。在我学习Javascript/nodejs的时候就更困惑了,web-base的javascript和backend nodejs都是单线程设计的,它的定时器操做怎么实现的?它的界面异步操做怎么实现的?后来读了《UNIX环境高级编程》才有种“恍然大悟”的感受。在学习编程语言的时候,每每认为语言自己是图灵完备的,编程语言设定的规则就是整个世界。但实际上,编程语言的图灵完备仅体如今逻辑和运算上,其余的一些设施底层不是语言自己就可以彻底解释的。咱们,至少是我本身,在学习一个语言工具的时候每每忽略了一个早就知道的现实——现代常规的编程,都是面向操做系统的编程!不管是多线程、多进程仍是异步IO自己都是操做系统提供的功能。多余web-base的javascript更是面向浏览器编程。浏览器不提供异步IO相关的功能,Web-base 的javascript自己是没办法实现的,操做系统不支持异步IO,什么语言也不行~golang的go程也不过是从系统手中接管了生成线程以后的再分配管理。正像Linux/Unix编程标准是两个的合体——ANSI C + POSIX,咱们学习的语言正对应ANSI C,但多线程、多进程、信号这些东西自己不是语言规范里面的,他们是POSIX里的,是操做系统的规范,是操做系统提供的!再进一步,为何操做系统能实现?由于硬件支持这样的实现!

相关文章
相关标签/搜索