《流畅的Python》笔记。javascript
本篇主要讨论asyncio包,这个包使用事件循环驱动的协程实现并发。java
本篇主要介绍若是使用asyncio
包将上一篇中线程版的“国旗下载”程序改成协程版本,经过异步非阻塞来实现并发。python
说实话,我在读这部份内容的时候是懵逼的,书中阻塞非阻塞、同步异步的概念和我以前的理解有很大差别。以前一直觉得同步就意味着阻塞,异步就意味着非阻塞。但其实,阻塞非阻塞与同步异步并无本质的联系。编程
同步(Synchronizing)异步(Asynchronizing)是对指令而言的,也就是程序(理解成“函数”会更好一些)。以含有I/O操做的函数为例(被调用方),若是这个函数要等到I/O操做结束,获取了数据,才返回到调用方,这就叫同步(绝大部分函数都同步);反之,不等I/O执行完毕就返回到调用方,获取的数据以其余方式转给调用方,这就叫异步。api
阻塞(Blocking)非阻塞(Non-Blocking)是对进程线程而言(为了简洁,只以“线程”为例)。由于某些缘由(好比I/O),线程被挂起(被移出CPU),这就叫阻塞;反之,即便由于这些缘由,线程依然不被挂起(不被移出CPU),这就叫非阻塞。服务器
可见,这两组概念一共能够组成四种不一样状况:同步阻塞(常见),同步非阻塞(不常见),异步阻塞(不常见),异步非阻塞(常见)。微信
仍以上述I/O函数为例:网络
因为以前并无遇到代码世界中的同步非阻塞和异步阻塞这两种状况,因此我也不肯定上述这两种状况的例子是否准确,欢迎大佬留言指导。但这四种状况在现实生活中就很常见了,下面举个在某处看到的例子:session
从这四个例子能够看出,阻不阻塞是对老张而言的,在计算机中对应的就是进程线程;同步异步是对水壶而言的,在计算机中对应的就是函数。闭包
有了上述概念后,咱们接下来将使用asyncio
包,将以前下载国旗的程序改成协程版本。
以前咱们使用线程实现了并发下载数据,它是同步阻塞的,由于一到I/O操做,线程就被阻塞,而后调入新的线程。如今,咱们将实现一个异步非阻塞版本。但从上述介绍知道,异步有两种方式:回调和协程。本文并不会实现回调版本的“下载国旗”,提出回调只是为了和协程进行比较。
举个例子说明回调。在调用函数A时除了传入必要的参数外,还传入一个参数:函数B。A中有一些费时的操做,好比I/O,A在没获得结果以前就返回,而将等待结果以及进行后续处理的事情交给函数B。这个过程就是回调,函数B就称为回调函数。
这种编程方式不太符合人的思惟习惯,代码也不易于理解,状况一复杂,就极可能遇到**“回调地狱”**:多层嵌套回调。下面是一个JavaScript中使用回调的例子,它嵌套了3层:
// 代码2.1
api_call1(request1, function (response1){ // 多么痛的领悟
var request2 = step1(response1); // 第一步
api_call2(request2, function (response2){
var request3 = step2(response2); // 第二步
api_call3(request3, function (response3){
step(response3); // 第三步
})
})
})
复制代码
api_call1
、api_call2
和api_call3
都是库函数,用于异步获取结果。JavaScript中经常使用匿名函数做为回调函数。下面咱们使用Python来实现上述代码,上述三个匿名函数分别命名为stage1
,stage2
和stage3
:
# 代码2.2
def stage1(response1):
request2 = step1(response1)
api_call2(request2, stage2)
def stage2(response2):
request3 = step2(response2)
api_call3(request3, stage3)
def stage3(response3):
step3(response3)
api_call1(request1, stage1) # 代码从这里开始执行
复制代码
可见,即便用Python写,也不容易理解,这要是再多嵌套几层,不逼疯已经不错了。并且,若是要在stage2
中使用request2
,还得使用闭包,这就又变成了嵌套定义函数的状况。而且上述代码尚未考虑抛出异常的状况:在基于回调的API中,这个问题的解决办法是为每一个异步调用注册两个回调,一个用于处理操做成功时返回的结果,一个用于处理错误。能够看出,一旦涉及错误处理,回调将更可怕。
如今咱们用协程来改写上述代码:
# 代码2.3
import asyncio
@asyncio.coroutine
def three_stages(request1):
response1 = yield from api_call1(request1)
request2 = step1(response1)
response2 = yield from api_call2(request2)
request3 = step2(response2)
response3 = yield from api_call3(request3)
step3(response3)
loop = asyncio.get_event_loop()
loop.create_task(three_stages(request1))
复制代码
与前面两个版本的回调相比,这个版本的代码将3个步骤依次写在同一函数中,易于理解,这样看起来是否是也更像同步函数?若是要处理异常,只须要相应的yield from
语句处添加try/except
便可。
但也别急着把这称为“协程天堂”,由于:
yield from
语句;api_call1(request1)
那样直接调用three_stages(request1)
,必须使用事件循环(上面的loop
)来驱动协程。但无论怎样,代码读起来和写起来比回调简单多了,尤为是嵌套回调。
小技巧:读协程的代码时,为了便于理解代码的意思,能够直接将yield from
关键字忽略掉。
下面咱们开始实现协程版本的“下载国旗”。
为了将其改成协程版本,咱们不能使用以前的requests
包,由于它会阻塞线程,改成使用aiohttp
包。为了尽可能保持代码的简洁,这里不处理异常。下方是完整的代码,代码中咱们使用了新语法。如下代码的基本思路是:在一个单线程程序中使用主循环一次激活队列中的协程,各个协程向前执行几步,而后把控制权让给主循环,主循环再激活队列中的下一个协程。
# 代码2.4
import aiohttp, os, sys, time, asyncio # 代码中请勿这么写,这里只是为了减小行数
POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
BASE_URL = "http://flupy.org/data/flags"
DEST_DIR = "downloads/"
def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, "wb") as fp:
fp.write(img)
def show(text):
print(text, end=" ")
sys.stdout.flush()
async def get_flag(cc): # aiohttp只支持TCP和UDP请求
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
async with aiohttp.ClientSession() as session: # <1> 开启一个会话
async with session.get(url) as resp: # 发送请求
image = await resp.read() # 读取请求
return image
async def download_one(cc):
image = await get_flag(cc)
show(cc)
save_flag(image, cc.lower() + ".gif")
return cc
def download_many(cc_list):
loop = asyncio.get_event_loop() # 获取事件循环
to_do = [download_one(cc) for cc in sorted(cc_list)] # 生成协程列表
wait_coro = asyncio.wait(to_do) # 将协程包装成Task类,wait_coro并非运行结果!而是协程!
res, _ = loop.run_until_complete(wait_coro) # 驱动每一个协程运行
loop.close() # 循环结束
return len(res)
def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = "\n{} flags downloaded in {:.2f}s"
print(msg.format(count, elapsed))
if __name__ == "__main__":
main(download_many)
# 结果:
VN TR FR DE IN ID RU NG CN EG BR MX PH CD IR PK ET JP BD US
20 flags downloaded in 1.27s
复制代码
解释:
①这里使用了新的语法async/await
。再Python3.5以前,若是想定义一个协程只能延用函数的定义方式def
,而后在定义体里面使用yield
或yield from
。若是想把一个函数更明确地声明为协程(或者说异步函数),还可使用asyncio
中的coroutine
装饰器,但这么作是否是挺麻烦的?从Python3.5起,能够明确**使用async
来定义协程(异步函数)**和异步生成器。使用async
则能够省略掉@asyncio.coroutine
装饰器;在用async
修饰的协程的定义体中可使用yield
关键字,但不能使用yield from
,它必须被替换为await
,即便yield from
后面只是一个普通的生成器;从由async修饰
的协程或生成器中获取数据时,必须使用await
。
②若是要使用@asyncio.coroutine
装饰器明确声明协程,那么在协程定义体内部只能使用yield from
,不能使用yield
,由于使用到yield
的地方已经在asyncio
中所有封装成了函数或者方法。最新版的@asyncio.coroutine
也能够装饰async
修饰的协程,这种状况下coroutine
不作任何事,只是原封不动的返回被装饰的协程。
③ <1>处的代码之因此改用async with
(异步上下文管理器),是由于新版asyncio
并不支持书中的旧语法yield from aiohttp.request("GET", url)
。关于async/await
,async with/async for
的相关内容将在后续文章中介绍,这里只须要知道async
对应于@asyncio.coroutine
,await
对应于yield from
便可。
④咱们将get_flag
改为了协程版本,并使用aiohttp
来实现异步请求;download_one
函数也随之变成了协程版本。
⑤download_many
只是一个普通函数,它要驱动协程运行。在这个函数中,咱们经过asyncio.get_event_loop()
建立事件循环(实质就是一个线程)来驱动协程的运行。接着生成含20个download_one
协程的协程列表to_do
,随后再调用asyncio.wait(to_do)
将这个协程列表包装成一个wait
协程,取名为wait_coro
。wait
协程会将to_do
中全部的协程包装成Task
对象(Future
的子类),再造成列表。最后,咱们经过loop.run_until_complete(wait_coro)
驱动协程wait_coro
运行。整个的驱动链是这样的:loop.run_until_complete
驱动协程wait_coro
,wait_coro
再在内部驱动20个协程。
⑥wait
协程最后会返回一个元组,第一个元素是完成的协程数,第二个是未完成的协程数,loop.run_until_complete
返回传入的协程的返回值(实际代码是Future.result())。有点绕,其实就是wait_coro
最后返回一个元组给run_until_complete
,run_until_complete
再把这个值返回给调用方。
⑦在上一篇中,咱们知道concurrent.futures
中有一个Future
,且经过它的result
方法获取最后运行的结果;在asyncio
包中,不光有Future
,还有它的子类Task
,但获取结果一般并非调用result
方法,而是经过yield from
或await
,即yield from future
获取结果。asyncio.Future
类的result
方法没有参数,不能设置超时时间;若是调用result
时future
还未运行完毕,它并不会阻塞去等待结果,而是抛出asyncio.InvalidStateError
异常。
上一篇中,咱们除了使用Executor.map()
批量处理线程以外,咱们还使用了concurrent.futures.as_completed()
挨个迭代运行完的线程返回的结果。asyncio
也实现了这个方法,咱们将使用这个函数改写上方的代码。
还有一个问题:咱们每每只关注了网络I/O请求,经常忽略本地的I/O操做。线程版本中的save_flag
函数也是会阻塞线程的,由于它操做了磁盘。但因为图片过小,速度太快,咱们感受并不明显,若是换成更高像素的图片,这种速度差别就会很明显。咱们将会以某种方式使其避免阻塞线程。下面是改写的代码:
# 代码2.5
import asyncio, os, sys, time, aiohttp
async def download_one(cc, semaphore):
async with semaphore:
image = await get_flag(cc)
loop = asyncio.get_event_loop()
loop.run_in_executor(None, save_flag, image, cc + ".gif")
return cc
async def download_coro(cc_list, concur_req):
semaphore = asyncio.Semaphore(concur_req) # 它是一个信号量,用于控制并发量
to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
to_do_iter = asyncio.as_completed(to_do)
for future in to_do_iter:
res = await future
print("Downloaded", res)
def download_many(cc_list, concur_req): # 变化不大
loop = asyncio.get_event_loop()
coro = download_coro(cc_list, concur_req)
loop.run_until_complete(coro)
loop.close()
if __name__ == "__main__":
t0 = time.time()
download_many(POP20_CC, 1000) # 第二个参数表示最大并发数
print("\nDone! Time elapsed {:.2f}s.".format(time.time() - t0))
# 结果:
Downloaded BD
Downloaded CN
-- snip --
Downloaded US
Done! Time elapsed 1.21s.
复制代码
上述代码有3个地方值得关注:
asyncio.as_completed()
以元素为协程的可迭代对象为参数,但自身并非协程,只是一个生成器。它在内部将传入的协程包装成Task
,而后返回一个生成器,产出协程的返回值。这个生成器按协程完成的顺序生成值(先完成先产出),而不是按协程在迭代器中的顺序生成值。asyncio.Semaphore
是个信号量类,内部维护这一个计数器,调用它的acquire
方法(这个方法是个协程),计数器减一;对其调用release
方法(这个方法不是协程),计数器加一;当计数器为0时,会阻塞调用这个方法的协程。save_flag
函数放到了其余线程中,loop.run_in_executor()
的第一个参数是Executor
实例,若是为None
,则使用事件循环的默认ThreadPoolExecutor
实例。余下的参数是可调用对象,以及可调用对象的位置参数。本章开篇介绍了阻塞非阻塞、同步异步的概念,而后介绍了异步的两种实现方式:回调和协程。并经过代码比较了回调和协程的实现方式。而后咱们使用asyncio
和aiohttp
两个库,将以前线程版本的下载国旗程序改成了协程版本。惋惜我也是刚接触协程不久,写的内容不必定准确,尤为是关于asyncio
的内容,这个库以前是一点都没接触过。后面我会专门研究Python中的协程,以及asyncio
的实现,争取把这部份内容完全搞懂。
迎你们关注个人微信公众号"代码港" & 我的网站 www.vpointer.net ~