Python学习之路37-使用asyncio包处理并发

《流畅的Python》笔记。javascript

本篇主要讨论asyncio包,这个包使用事件循环驱动的协程实现并发。java

1. 前言

本篇主要介绍若是使用asyncio包将上一篇中线程版的“国旗下载”程序改成协程版本,经过异步非阻塞来实现并发。python

说实话,我在读这部份内容的时候是懵逼的,书中阻塞非阻塞、同步异步的概念和我以前的理解有很大差别。以前一直觉得同步就意味着阻塞,异步就意味着非阻塞。但其实,阻塞非阻塞与同步异步并无本质的联系。编程

同步(Synchronizing)异步(Asynchronizing)是对指令而言的,也就是程序(理解成“函数”会更好一些)。以含有I/O操做的函数为例(被调用方),若是这个函数要等到I/O操做结束,获取了数据,才返回到调用方,这就叫同步(绝大部分函数都同步);反之,不等I/O执行完毕就返回到调用方,获取的数据以其余方式转给调用方,这就叫异步。api

阻塞(Blocking)非阻塞(Non-Blocking)是对进程线程而言(为了简洁,只以“线程”为例)。由于某些缘由(好比I/O),线程被挂起(被移出CPU),这就叫阻塞;反之,即便由于这些缘由,线程依然不被挂起(不被移出CPU),这就叫非阻塞。服务器

可见,这两组概念一共能够组成四种不一样状况:同步阻塞(常见),同步非阻塞(不常见),异步阻塞(不常见),异步非阻塞(常见)。微信

仍以上述I/O函数为例:网络

  • 若是这个函数的I/O请求已发出,只是单纯地在等服务器发回数据,线程也只是单纯地在等这个函数返回结果,CPU将会把这个线程挂起,这就叫作同步阻塞
  • 若是这个函数中调用的是一个执行复杂计算的子函数,此时,函数依然在等结果没有返回,但线程并非没有运行,不会被CPU挂起,这就叫作同步非阻塞(“CPU以轮询的方式查看I/O是否结束”更能说明这种状况,但这已经是很古老的方式了);
  • 若是这个函数在I/O请求没获得结果以前就返回了,但线程依然在等这个结果(在函数体以外等待使用这个数据),这就叫异步阻塞
  • 若是这个函数在没获得结果以前返回了,线程继续执行其余函数,这就叫作异步非阻塞。更具体一点,这种状况对应的是使用回调实现异步非阻塞的状况;而Python中还有一种状况,也是本篇要讲的,就是使用协程实现异步非阻塞:协程在获得结果前依然不返回,但线程并无等待,而是去执行其余协程。协程看起来就像同步同样。

因为以前并无遇到代码世界中的同步非阻塞异步阻塞这两种状况,因此我也不肯定上述这两种状况的例子是否准确,欢迎大佬留言指导。但这四种状况在现实生活中就很常见了,下面举个在某处看到的例子:session

  • 老张把一普通水壶接上水放火上,眼睛直勾勾盯着等水开,不干其余事,这叫同步阻塞
  • 老张依然用一普通水壶烧水,但把水壶放火上后去客厅看电视,时不时回来看水烧好了没有,这叫同步非阻塞
  • 老张用一能响的水壶烧水,没盯着看,但也没干其余事,只是在那儿发愣。水烧好后,壶可劲儿的响,老张一惊,取走水壶,这叫异步阻塞
  • 老张用一能响的水壶烧水,把壶放火上后去客厅看电视,等壶响了再去拿壶,这叫异步非阻塞

从这四个例子能够看出,阻不阻塞是对老张而言的,在计算机中对应的就是进程线程;同步异步是对水壶而言的,在计算机中对应的就是函数。闭包

有了上述概念后,咱们接下来将使用asyncio包,将以前下载国旗的程序改成协程版本。

2. 异步

以前咱们使用线程实现了并发下载数据,它是同步阻塞的,由于一到I/O操做,线程就被阻塞,而后调入新的线程。如今,咱们将实现一个异步非阻塞版本。但从上述介绍知道,异步有两种方式:回调和协程。本文并不会实现回调版本的“下载国旗”,提出回调只是为了和协程进行比较。

2.1 回调

举个例子说明回调。在调用函数A时除了传入必要的参数外,还传入一个参数:函数B。A中有一些费时的操做,好比I/O,A在没获得结果以前就返回,而将等待结果以及进行后续处理的事情交给函数B。这个过程就是回调,函数B就称为回调函数

这种编程方式不太符合人的思惟习惯,代码也不易于理解,状况一复杂,就极可能遇到**“回调地狱”**:多层嵌套回调。下面是一个JavaScript中使用回调的例子,它嵌套了3层:

// 代码2.1
api_call1(request1, function (response1){  // 多么痛的领悟
    var request2 = step1(response1);  // 第一步
    api_call2(request2, function (response2){
        var request3 = step2(response2);  // 第二步
        api_call3(request3, function (response3){
            step(response3);  // 第三步
        })
    })
})
复制代码

api_call1api_call2api_call3都是库函数,用于异步获取结果。JavaScript中经常使用匿名函数做为回调函数。下面咱们使用Python来实现上述代码,上述三个匿名函数分别命名为stage1stage2stage3

# 代码2.2
def stage1(response1):
    request2 = step1(response1)
    api_call2(request2, stage2)

def stage2(response2):
    request3 = step2(response2)
    api_call3(request3, stage3)

def stage3(response3):
    step3(response3)

api_call1(request1, stage1)  # 代码从这里开始执行
复制代码

可见,即便用Python写,也不容易理解,这要是再多嵌套几层,不逼疯已经不错了。并且,若是要在stage2中使用request2,还得使用闭包,这就又变成了嵌套定义函数的状况。而且上述代码尚未考虑抛出异常的状况:在基于回调的API中,这个问题的解决办法是为每一个异步调用注册两个回调,一个用于处理操做成功时返回的结果,一个用于处理错误。能够看出,一旦涉及错误处理,回调将更可怕。

2.2 协程

如今咱们用协程来改写上述代码:

# 代码2.3
import asyncio

@asyncio.coroutine
def three_stages(request1):
    response1 = yield from api_call1(request1)
    request2 = step1(response1)
    response2 = yield from api_call2(request2)
    request3 = step2(response2)
    response3 = yield from api_call3(request3)
    step3(response3)

loop = asyncio.get_event_loop()
loop.create_task(three_stages(request1))
复制代码

与前面两个版本的回调相比,这个版本的代码将3个步骤依次写在同一函数中,易于理解,这样看起来是否是也更像同步函数?若是要处理异常,只须要相应的yield from语句处添加try/except便可。

但也别急着把这称为“协程天堂”,由于:

  • 不能使用常规函数,必须使用协程,并且要习惯yield from语句;
  • 不能直接调用协程。即,不能像直接调用api_call1(request1)那样直接调用three_stages(request1),必须使用事件循环(上面的loop)来驱动协程。

但无论怎样,代码读起来和写起来比回调简单多了,尤为是嵌套回调。

小技巧:读协程的代码时,为了便于理解代码的意思,能够直接将yield from关键字忽略掉。

2.3 下载国旗批量版

下面咱们开始实现协程版本的“下载国旗”。

为了将其改成协程版本,咱们不能使用以前的requests包,由于它会阻塞线程,改成使用aiohttp包。为了尽可能保持代码的简洁,这里不处理异常。下方是完整的代码,代码中咱们使用了新语法。如下代码的基本思路是:在一个单线程程序中使用主循环一次激活队列中的协程,各个协程向前执行几步,而后把控制权让给主循环,主循环再激活队列中的下一个协程

# 代码2.4
import aiohttp, os, sys, time, asyncio   # 代码中请勿这么写,这里只是为了减小行数

POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
BASE_URL = "http://flupy.org/data/flags"
DEST_DIR = "downloads/"

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, "wb") as fp: 
        fp.write(img)

def show(text):
    print(text, end=" ")
    sys.stdout.flush()

async def get_flag(cc):   # aiohttp只支持TCP和UDP请求
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    async with aiohttp.ClientSession() as session: # <1> 开启一个会话
        async with session.get(url) as resp:   # 发送请求
            image = await resp.read()   # 读取请求
    return image

async def download_one(cc):
    image = await get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + ".gif")
    return cc

def download_many(cc_list):
    loop = asyncio.get_event_loop()   # 获取事件循环
    to_do = [download_one(cc) for cc in sorted(cc_list)]  # 生成协程列表
    wait_coro = asyncio.wait(to_do)   # 将协程包装成Task类,wait_coro并非运行结果!而是协程!
    res, _ = loop.run_until_complete(wait_coro) # 驱动每一个协程运行
    loop.close()   # 循环结束
    return len(res)

def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = "\n{} flags downloaded in {:.2f}s"
    print(msg.format(count, elapsed))

if __name__ == "__main__":
    main(download_many)

# 结果:
VN TR FR DE IN ID RU NG CN EG BR MX PH CD IR PK ET JP BD US 
20 flags downloaded in 1.27s
复制代码

解释:

①这里使用了新的语法async/await。再Python3.5以前,若是想定义一个协程只能延用函数的定义方式def,而后在定义体里面使用yieldyield from。若是想把一个函数更明确地声明为协程(或者说异步函数),还可使用asyncio中的coroutine装饰器,但这么作是否是挺麻烦的?从Python3.5起,能够明确**使用async来定义协程(异步函数)**和异步生成器。使用async则能够省略掉@asyncio.coroutine装饰器;在用async修饰的协程的定义体中可使用yield关键字,但不能使用yield from,它必须被替换为await,即便yield from后面只是一个普通的生成器;从由async修饰的协程或生成器中获取数据时,必须使用await

②若是要使用@asyncio.coroutine装饰器明确声明协程,那么在协程定义体内部只能使用yield from,不能使用yield,由于使用到yield的地方已经在asyncio中所有封装成了函数或者方法。最新版的@asyncio.coroutine也能够装饰async修饰的协程,这种状况下coroutine不作任何事,只是原封不动的返回被装饰的协程。

③ <1>处的代码之因此改用async with(异步上下文管理器),是由于新版asyncio并不支持书中的旧语法yield from aiohttp.request("GET", url)。关于async/awaitasync with/async for的相关内容将在后续文章中介绍,这里只须要知道async对应于@asyncio.coroutineawait对应于yield from便可。

④咱们将get_flag改为了协程版本,并使用aiohttp来实现异步请求;download_one函数也随之变成了协程版本。

download_many只是一个普通函数,它要驱动协程运行。在这个函数中,咱们经过asyncio.get_event_loop()建立事件循环(实质就是一个线程)来驱动协程的运行。接着生成含20个download_one协程的协程列表to_do,随后再调用asyncio.wait(to_do)将这个协程列表包装成一个wait协程,取名为wait_corowait协程会将to_do中全部的协程包装成Task对象(Future的子类),再造成列表。最后,咱们经过loop.run_until_complete(wait_coro)驱动协程wait_coro运行。整个的驱动链是这样的:loop.run_until_complete驱动协程wait_corowait_coro再在内部驱动20个协程。

wait协程最后会返回一个元组,第一个元素是完成的协程数,第二个是未完成的协程数loop.run_until_complete返回传入的协程的返回值(实际代码是Future.result())。有点绕,其实就是wait_coro最后返回一个元组给run_until_completerun_until_complete再把这个值返回给调用方。

⑦在上一篇中,咱们知道concurrent.futures中有一个Future,且经过它的result方法获取最后运行的结果;在asyncio包中,不光有Future,还有它的子类Task,但获取结果一般并非调用result方法,而是经过yield fromawait,即yield from future获取结果。asyncio.Future类的result方法没有参数,不能设置超时时间;若是调用resultfuture还未运行完毕,它并不会阻塞去等待结果,而是抛出asyncio.InvalidStateError异常。

2.4 下载国旗改进版

上一篇中,咱们除了使用Executor.map()批量处理线程以外,咱们还使用了concurrent.futures.as_completed()挨个迭代运行完的线程返回的结果。asyncio也实现了这个方法,咱们将使用这个函数改写上方的代码。

还有一个问题:咱们每每只关注了网络I/O请求,经常忽略本地的I/O操做。线程版本中的save_flag函数也是会阻塞线程的,由于它操做了磁盘。但因为图片过小,速度太快,咱们感受并不明显,若是换成更高像素的图片,这种速度差别就会很明显。咱们将会以某种方式使其避免阻塞线程。下面是改写的代码:

# 代码2.5
import asyncio, os, sys, time, aiohttp

async def download_one(cc, semaphore):
    async with semaphore:
        image = await get_flag(cc)
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None, save_flag, image, cc + ".gif")
    return cc

async def download_coro(cc_list, concur_req):
    semaphore = asyncio.Semaphore(concur_req)  # 它是一个信号量,用于控制并发量
    to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
    to_do_iter = asyncio.as_completed(to_do)
    for future in to_do_iter:
        res = await future
        print("Downloaded", res)

def download_many(cc_list, concur_req):  # 变化不大
    loop = asyncio.get_event_loop()
    coro = download_coro(cc_list, concur_req)
    loop.run_until_complete(coro)
    loop.close()

if __name__ == "__main__":
    t0 = time.time()
    download_many(POP20_CC, 1000)  # 第二个参数表示最大并发数
    print("\nDone! Time elapsed {:.2f}s.".format(time.time() - t0))

# 结果:
Downloaded BD
Downloaded CN
-- snip --
Downloaded US

Done! Time elapsed 1.21s.
复制代码

上述代码有3个地方值得关注:

  • asyncio.as_completed()元素为协程的可迭代对象为参数,但自身并非协程,只是一个生成器。它在内部将传入的协程包装成Task,而后返回一个生成器,产出协程的返回值。这个生成器按协程完成的顺序生成值(先完成先产出),而不是按协程在迭代器中的顺序生成值。
  • asyncio.Semaphore是个信号量类,内部维护这一个计数器,调用它的acquire方法(这个方法是个协程),计数器减一;对其调用release方法(这个方法不是协程),计数器加一;当计数器为0时,会阻塞调用这个方法的协程。
  • 咱们将save_flag函数放到了其余线程中,loop.run_in_executor()的第一个参数是Executor实例,若是为None,则使用事件循环的默认ThreadPoolExecutor实例。余下的参数是可调用对象,以及可调用对象的位置参数。

3. 总结

本章开篇介绍了阻塞非阻塞、同步异步的概念,而后介绍了异步的两种实现方式:回调和协程。并经过代码比较了回调和协程的实现方式。而后咱们使用asyncioaiohttp两个库,将以前线程版本的下载国旗程序改成了协程版本。惋惜我也是刚接触协程不久,写的内容不必定准确,尤为是关于asyncio的内容,这个库以前是一点都没接触过。后面我会专门研究Python中的协程,以及asyncio的实现,争取把这部份内容完全搞懂。


迎你们关注个人微信公众号"代码港" & 我的网站 www.vpointer.net ~

相关文章
相关标签/搜索