Python 协程的本质?原来也不过如此

Python编程学习圈 3月5日算法

这两天由于一点我的缘由写了点很久没碰的 Python ,其中涉及到「协程」编程,上次搞的时候,它仍是 Web 框架 tornado 特有的 feature,如今已经有 asyncawait 关键字支持了。思考了一下其实现,回顾了下这些年的演变,以为还有点意思。编程

都是单线程,为何原来低效率的代码用了  async await  加一些异步库就变得效率高了?

若是在作基于 Python 的网络或者 Web 开发时,对于这个问题曾感到疑惑,这篇文章尝试给一个答案。api

0x00 开始以前

首先,本文不是带你浏览源代码,而后对照原始代码给你讲 Python 标准的实现。相反,咱们会从实际问题出发,思考解决问题的方案,一步步体会解决方案的演进路径,最重要的,但愿能在过程当中得到知识系统性提高。⚠️ 本文仅是提供了一个独立的思考方向,并未遵循历史和现有实际具体的实现细节。其次,阅读这篇文章须要你对 Python 比较熟悉,至少了解 Python 中的生成器 generator 的概念。缓存

0x01 IO 多路复用

这是性能的关键。但咱们这里只解释概念,其实现细节不是重点,这对咱们理解 Python 的协程已经足够了,如已足够了解,前进到 0x02首先,你要知道全部的网络服务程序都是一个巨大的死循环,你的业务逻辑都在这个循环的某个时刻被调用:服务器

def handler(request):
    # 处理请求
    pass

# 你的 handler 运行在 while 循环中
while True:
    # 获取一个新请求
    request = accept()
    # 根据路由映射获取到用户写的业务逻辑函数
    handler = get_handler(request)
    # 运行用户的handler,处理请求
    handler(request)

设想你的 Web 服务的某个 handler,在接收到请求后须要一个 API 调用才能响应结果。对于最传统的网络应用,你的 API 请求发出去后在等待响应,此时程序中止运行,甚至新的请求也得在响应结束后才进得来。若是你依赖的 API 请求网络丢包严重,响应特别慢呢?那应用的吞吐量将很是低。不少传统 Web 服务器使用多线程技术解决这个问题:把 handler 的运行放到其余线程上,每一个线程处理一个请求,本线程阻塞不影响新请求进入。这能必定程度上解决问题,但对于并发比较大的系统,过多线程调度会带来很大的性能开销。IO 多路复用能够作到不使用线程解决问题,它是由操做系统内核提供的功能,能够说专门为这类场景而生。简单来说,你的程序遇到网络IO时,告诉操做系统帮你盯着,同时操做系统提供给你一个方法,让你能够随时获取到有哪些 IO 操做已经完成。就像这样:网络

# 操做系统的IO复用示例伪代码
# 向操做系统IO注册本身关注的IO操做的id和类型
io_register(io_id, io_type)
io_register(io_id, io_type)

# 获取完成的IO操做
events = io_get_finished()

for (io_id, io_type) in events:
    if io_type == READ:
        data = read_data(io_id) 
    elif io_type == WRITE:
        write_data(io_id,data)

把 IO 复用逻辑融合到咱们的服务器中,大概会像这样:数据结构

call_backs = {}

def handler(req):
    # do jobs here
    io_register(io_id, io_type)
    def call_back(result):
        # 使用返回的result完成剩余工做...
    call_backs[io_id] = call_back

# 新的循环
while True
    # 获取已经完成的io事件
    events = io_get_finished()
    for (io_id, io_type) in events:
        if io_type == READ: # 读取
            data = read(io_id) 
            call_back = call_backs[io_id]
            call_back(data)
        else:
            # 其余类型io事件的处理
            pass

    # 获取一个新请求
    request = accept()
    # 根据路由映射获取到用户写的业务逻辑函数
    handler = get_handler(request)
    # 运行用户的handler,处理请求
    handler(request)

咱们的 handler 对于 IO 操做,注册了回调就马上返回,同时每次迭代都会对已完成的 IO 执行回调,网络请求再也不阻塞整个服务器。上面的伪代码仅便于理解,具体实现细节更复杂。并且就链接受新请求也是在从操做系统获得监听端口的 IO 事件后进行的。咱们若是把循环部分还有 call_backs 字典拆分到单独模块,就能获得一个 EventLoop,也就是 Python 标准库 asyncio 包中提供的 ioloop多线程

0x02 用生成器消除 callback

着重看下咱们业务中常常写的 handler 函数,在有独立的 ioloop 后,它如今变成相似这样:并发

def handler(request):
    # 业务逻辑代码...
    # 须要执行一次 API 请求
    def call_back(result):
        # 使用 API 返回的result完成剩余工做
        print(result)
    # 没有io_call这个方法,这里只是示意,表示注册一个IO操做
    asyncio.get_event_loop().io_call(api, call_back)

到这里,性能问题已经解决了:咱们再也不须要多线程就能源源不断接受新请求,并且不用care依赖的 API 响应有多慢。可是咱们也引入了一个新问题,原来流畅的业务逻辑代码如今被拆成了两部分,请求 API 以前的代码还正常,请求 API 以后的代码只能写在回调函数里面了。这里咱们业务逻辑只有一个 API 调用,若是有多个 API ,再加上对 Redis 或者 MySQL 的调用(它们本质也是网络请求),整个逻辑会被拆分的更散,这对业务开发是一笔负担。对于有匿名函数的一些语言(没错就是JavaScript),还可能会引起所谓的「回调地狱」。接下来咱们想办法解决这个问题。咱们很容易会想到:若是函数在运行到网络 IO 操做处后可以暂停,完成后又能在断点处唤醒就行了。若是你对 Python 的「生成器」熟悉,你应该会发现,它刚好具备这个功能:app

def example():
    value = yield 2
    print("get", value)
    return value

g = example()
# 启动生成器,咱们会获得 2
got = g.send(None)
print(got)  # 2

try:
    # 再次启动 会显示 "get 4", 就是咱们传入的值
    got = g.send(got*2)
except StopIteration as e:
    # 生成器运行完成,将会print(4),e.value 是生成器return的值
    print(e.value)

函数中有 yield 关键字,调用函数将会获得一个生成器,生成器一个关键的方法 send() 能够跟生成器交互。g.send(None) 会运行生成器内代码直到遇到 yield,并返回其后的对象,也就是 2,生成器代码就停在这里了,直到咱们再次执行 g.send(got*2),会把 2*2 也就是 4 赋值给yield 前面的变量 value,而后继续运行生成器代码。yield 在这里就像一扇门,能够把一件东西从这里送出去,也能够把另外一件东西拿进来。若是 send 让生成器运行到下一个 yield 前就结束了,send 调用会引起一个特殊的异常StopIteration,这个异常自带一个属性 value,为生成器 return 的值。若是咱们把咱们的 handler 用 yield 关键字转换成一个生成器,运行它来把 IO 操做的具体内容返回,IO 完成后的回调函数中把 IO 结果放回并恢复生成器运行,那就解决了业务代码不流畅的问题了:

def handler(request):
    # 业务逻辑代码...
    # 须要执行一次 API 请求,直接把 IO 请求信息yield出去
    result = yield io_info
    # 使用 API 返回的result完成剩余工做
    print(result)

# 这个函数注册到ioloop中,用来当有新请求的时候回调
def on_request(request):
    # 根据路由映射获取到用户写的业务逻辑函数
    handler = get_handler(request)
    g = handler(request)
    # 首次启动得到io_info
    io_info = g.send(None)

    # io完成回调函数
    def call_back(result):
        # 从新启动生成器
        g.send(result)

    asyncio.get_event_loop().io_call(io_info, call_back)

上面的例子,用户写的 handler 代码已经不会被打散到 callback 中,on_request 函数使用 callback 和 ioloop 交互,但它会被实如今 Web 框架中,对用户不可见。上面代码足以给咱们提供用生成器消灭的 callback 的启发,但局限性有两点:

  1. 业务逻辑中仅发起一次网络 IO,但实际中每每更多
  2. 业务逻辑没有调用其余异步函数(协程),但实际中咱们每每会调用其余协程
0x03 解决完整调用链

咱们来看一个更复杂的例子:其中 request 执行真正的 IO,func1func2 仅调用。显然咱们的代码只能写成这样:

def func1():
    ret = yield request("http://test.com/foo")
    ret = yield func2(ret)
    return ret

def func2(data):
    result = yield request("http://test.com/"+data)
    return result

def request(url):
    # 这里模拟返回一个io操做,包含io操做的全部信息,这里用字符串简化代替
    result = yield "iojob of %s" % url
    return result

对于 request,咱们把 IO 操做经过 yield 暴露给框架。对于 func1 和 func2,调用 request 显然也要加 yield 关键字,不然 request 调用返回一个生成器后不会暂停,继续执行后续逻辑显然会出错。这基本就是咱们在没有 yield fromaysncawait 时代,在 tornado 框架中写异步代码的样子。要运行整个调用栈,大概流程以下:

  1. 调用 func1() 获得生成器
  2. 调用 send(None) 启动它获得会获得 request("http://test.com/foo") 的结果,仍是生成器对象
  3. send(None) 启动由 request() 产生的生成器,会获得 IO 操做,由框架注册到 ioloop并指定回调
  4. IO 完成后的回调函数内唤醒 request 生成器,生成器会走到 return 语句结束
  5. 捕获异常获得 request 生成器的返回值,将上一层 func1 唤醒,同时又获得 func2()生成器
  6. 继续执行...

对算法和数据结构熟悉的朋友遇到这种前进后退的遍历逻辑,能够递归也能够用栈,由于递归使用生成器还作不到,咱们可使用栈,其实这就是「调用栈」一词的由来。借助栈,咱们能够把整个调用链上串联的全部生成器对表现为一个生成器,对其不断 send就能不断获得全部 IO 操做信息并推进调用链前进,实现方法以下:

  1. 第一个生成器入栈
  2. 调用 send,若是获得生成器就入栈并进入下一轮迭代
  3. 遇到到 IO 请求 yield 出来,让框架注册到 ioloop
  4. IO 操做完成后被唤醒,缓存结果并出栈,进入下一轮迭代,目的让上层函数使用 IO 结果恢复运行
  5. 若是一个生成器运行完毕,也须要和4同样让上层函数恢复运行

若是实现出来,代码不长但信息量比较大。它把整个调用链对外变成一个生成器,对其调用 send,就能整个调用链中的 IO,完成这些 IO,继续推进调用链内的逻辑执行,直到总体逻辑结束:

def wrapper(gen):
    # 第一层调用 入栈
    stack = Stack()
    stack.push(gen)

    # 开始逐层调用
    while True:
        # 获取栈顶元素
        item = stack.peak()

        result = None
        # 生成器
        if isgenerator(item):
            try:
                # 尝试获取下层调用并入栈
                child = item.send(result)
                stack.push(child)
                # result 使用事后就还原为None
                result = None
                # 入栈后直接进入下次循环,继续向下探索
                continue
            except StopIteration as e:
                # 若是本身运行结束了,就暂存result,下一步让本身出栈
                result = e.value
        else:  # IO 操做
            # 遇到了 IO 操做,yield 出去,IO 完成后会被用 IO 结果唤醒并暂存到 result
            result = yield item

        # 走到这里则本层已经执行完毕,出栈,下次迭代将是调用链上一层
        stack.pop()
        # 没有上一层的话,那整个调用链都执行完成了,return        
        if stack.empty():
            print("finished")
            return result

这多是最复杂的部分,若是看起来吃力的话,其实只要明白,对于上面示例中的调用链,它能够实现的效果以下就行了:

w = wrapper(func1())
# 将会获得 "iojob of http://test.com/foo"
w.send(None)
# 上个iojob foo 完成后的结果"bar"传入,继续运行,获得 "iojob of http://test.com/bar"
w.send("bar")
# 上个iojob bar 完成后的结构"barz"传入,继续运行,结束。
w.send("barz")

有了这部分之后框架再加上配套的代码:

# 维护一个就绪列表,存放全部完成的IO事件,格式为(wrapper,result) 
ready = []

def on_request(request):
    handler = get_handler(request)
    # 使用 wrapper 包装后,能够只经过 send 处理 IO 了
    g = wrapper(func1())
    # 把开始状态直接视为结果为None的就绪状态
    ready.append((g, None))

# 让ioloop每轮循环都执行此函数,用来处理的就绪的IO
def process_ready(self):
    def call_back(g, result):
        ready.append((g, result)) 

    # 遍历全部已经就绪生成器,将其向下推动
    for g, result in self.ready:  
        # 用result唤醒生成器,并获得下一个io操做
        io_job = g.send(result)
        # 注册io操做 完成后把生成器加入就绪列表,等待下一轮处理
        asyncio.get_event_loop().io_call(
            io_job, lambda result: ready.append((g, result)

这里核心思想是维护一个就绪列表,ioloop 每轮迭代都来扫一遍,推进就绪的状态的生成器向下运行,并把新的 IO 操做注册,IO 完成后再次加入就绪,通过几轮 ioloop 的迭代一个 handler 最终会被执行完成。至此,咱们使用生成器写法写业务逻辑已经能够正常运行。

0x04 提升扩展性

若是到这里能读懂, Python 的协程原理基本就明白了。咱们已经实现了一个微型的协程框架,标准库的实现细节跟这里看起来大不同,但具体的思想是一致的。咱们的协程框架有一个限制,咱们只能把 IO 操做异步化,虽然在网络编程和 Web 编程的世界里,阻塞的基本只有 IO 操做,但也有一些例外,好比我想让当前操做 sleep 几秒,用 time.sleep() 又会让整个线程阻塞住,就须要特殊实现。再好比,能够把一些 CPU 密集的操做经过多线程异步化,让另外一个线程通知事件已经完成后再执行后续。因此,协程最好能与网络解耦开,让等待网络IO只是其中一种场景,提升扩展性。Python 官方的解决方案是让用户本身处理阻塞代码,至因而向 ioloop 来注册 IO 事件仍是开一个线程彻底由你本身,并提供了一个标准「占位符」Future,表示他的结果等到将来才会有,其部分原型以下:

class Future
    # 设置结果
    def set_result(result):
 pass
    # 获取结果
    def result():  pass
    #  表示这个future对象是否是已经被设置过结果了
    def done(): pass
    # 设置在他被设置结果时应该执行的回调函数,能够设置多个
    def add_done_callback(callback):  pass

咱们的稍加改动就能支持 Future,让扩展性变得更强。对于用户代码的中的网络请求函数 request

# 如今 request 函数,不是生成器,它返回future
def request(url):
    # future 理解为占位符
    fut = Future()

    def callback(result):
        # 当网络IO完成回调的时候给占位符赋值
        fut.set_result(result)
    asyncio.get_event_loop().io_call(url, callback)

    # 返回占位符
    return future

如今,request 再也不是一个生成器,而是直接返回 future而对于位于框架中处理就绪列表的函数:

def process_ready(self):
    def callback(fut):
        # future被设置结果会被放入就绪列表
        ready.append((g, fut.result()))

    # 遍历全部已经就绪生成器,将其向下推动
    for g, result in self.ready:  
        # 用result唤醒生成器,获得的再也不是io操做,而是future
        fut = g.send(result)
        # future被设置结果的时候会调用callback
        fut.add_done_callback(callback)
0x05 发展和变革

许多年前用 tornado 的时候,大概只有一个 yield 关键字可用,协程要想实现,就是这么个思路,甚至 yield 关键字和 return 关键字不能一个函数里面出现,你要想在生成器运行完后返回一个值,须要手动 raise 一个异常,虽然效果跟如今 return 同样,但写起来仍是很别扭,不优雅。后来有了 yield from 表达式。它能够作什么?通俗地说,它就是作了上面那个生成器 wrapper 所作的事:经过栈实现调用链遍历的 ,它是 wrapper 逻辑的语法糖。有了它,同一个例子你能够这么写:

def func1():
    # 注意 yield from
    ret = yield from request("http://test.com/foo")
    # 注意 yield from
    ret = yield from func2(ret) 
    return ret

def func2(data):
    # 注意 yield from
    result = yield from request("http://test.com/"+data)
    return result

# 如今 request 函数,不是生成器,它返回future
def request(url):
    # 同上基于future实现的request

而后你就再也不须要那个烧脑的 wrapper 函数了:

g = func1()
# 返回第一个请求的 future 
g.send(None)
# 继续运行,自动进入func2 并获得第它里面的那个future
g.send("bar")
# 继续运行,完成调用链剩余逻辑,抛出StopIteration异常
g.send("barz")

yield from 直接打通了整个调用链,已是很大的进步了,可是用来异步编程看着仍是别扭,其余语言都有专门的协程 asyncawait 关键字了,直到再后来的版本把这些内容用专用的 asyncawait 关键字包装,才成为今天比较优雅的样子。

0x06 总结和比较

总的来讲, Python 的原生的协程从两方面实现:

  1. 基于 IO 多路复用技术,让整个应用在 IO 上非阻塞,实现高效率
  2. 经过生成器让分散的 callback 代码变成同步代码,减小业务编写困难

有生成器这种对象的语言,其 IO 协程实现大抵如此,JavaScript 协程的演进基本如出一辙,关键字相同,Future 类比 Promise 本质相同。可是对于以协程出名的 Go 的协程实现跟这个就不一样了,并不显式地基于生成器。若是类比的话,能够 Python 的 gevent 算做一类,都是本身实现 runtime,并 patch 掉系统调用接入本身的 runtime,本身来调度协程,gevent 专一于网络相关,基于网络 IO 调度,比较简单,而 Go 实现了完善的多核支持,调度更加复杂和完善,并且创造了基于 channel 新编程范式。

相关文章
相关标签/搜索