事件循环和协程

前言

第一次接触异步编程这个概念是在 Python 里面,去年的时候就由于不清楚 Python 中异步编程的实现原理找了不少资料研究,但最后也没有搞得很清楚。python

后来又由于重心逐渐转移到了 Java 上,就暂时放弃了对 Python 中异步原理的探究,直到前段时间,被 JavaScript 的异步坑了一次后, 让我想起了 Python 里面的异步……ajax

而后我就在网上看到了一篇文章,看了半天后在文章的评论中发现了本身两年前的评论……编程

当时我那个心情啊,太曲折了,因而乎决定找个时间在研究一下 Python 中的异步,这即是这篇博客的由来。多线程

  • 注 1:虽然这篇博客源自探究 Python 异步编程实现原理的过程,可是,并不会包含太多 Python 中异步编程的实现原理,由于我尚未搞明白 QAQ
  • 注 2:博客中最后的实现代码存在问题,但一时间找不到解决办法,只好做为一种思路发出来,等待后续研究解决

Python 中异步编程模型的特殊性

我前后接触过了 Python、JavaScript1 和 Java 中的异步编程,其中,Python 的是最为特殊的一个,由于它是 单线程异步阻塞 的。app

一般来讲,异步编程模型下最小的执行单位是一个个任务(每每就是一个函数),而为了可以在一个任务执行完成后执行另外的操做,又会引入回调任务(函数)。 不一样的任务和回调任务的执行,每每又是经过事件循环和任务队列来完成。异步

假如将这些东西拆分开来,像 JavaScript 那样,放在不一样的线程下面处理,是很容易理解的,由于总体结构足够清晰,可是 Python 不行,由于 GIL 的缘由,若是 Python 仍是使用多线程的方式来实现异步编程的话, 并不能带来多少性能上的提高,所以,Python 异步中的事件循环、任务队列和任务的执行都在一个线程里面。async

并且,Python 中异步的实现是基于协程的,这不只使得 Python 中的异步是阻塞的,并且,最小执行单位再也不是单个任务,而是单个任务的一部分。异步编程

这就让 Python 中异步的实现变得复杂起来,原本 Python 的源码就很差读,好家伙,如今更很差读了,同时又由于异步编程每每都和异步 I/O 挂钩,刷的一下,源码中一堆和异步 I/O 相关的代码……函数

这让我明白了,想快速搞明白 Python 异步是咋回事是不可能的,毕竟,我要作的是经过阅读源码倒推做者的思路,这很难!!!oop

所以,我换了一个思路,我先本身用协程实现一个简单的事件循环,在慢慢去读 Python 的源码,总能够了吧!

这就是为啥这篇博客说是在探究 Python 异步编程的实现原理,可是标题连 Python 这个单词都没有的缘由。

协程的基本认识

Python 中的协程是经过生成器来实现的,可是,基本上全部博客将协程的时候都会说的一句话,协程不等于生成器,它们只是长得像:

def grep(pattern):
    """ >>> g = grep("python") >>> g.send(None) Looking for python >>> g.send("Yeah, but no, but yeah, but no") >>> g.send("A series of tubes") >>> g.send("python generators rock!") python generators rock! """
    print("Looking for %s" % pattern)
    while True:
        line = yield
        if pattern in line:
            print(line)
复制代码

上面这个协程不断接收来自 send 方法的输入,在通过判断后进行输出,假如把它当作生成器使用的话,那么你只能获得无数的 None 值。

本质上,在低版本的 Python 中生成器和协程没有区别,就看你怎么用,关键就在于协程 消费 值,而生成器 生成 值,固然了,高版本的 Python 对协程提供了更多的支持, 使得它们再也不同样,可是,这篇博客里面,全部的协程都经过生成器实现。

所以,咱们须要关注后面会用到的几个特性:

  1. yield 的左值会接收来自 send 方法的输入,可是协程在第一次运行时还没到达 yield 语句处,所以没法传递参数,只能经过 None 值来调用协程:

    def coroutine():
        while True:
            val = yield
            print(val)
    复制代码

    执行输出:

    In [9]: coro = coroutine()
    
    In [10]: coro.send(None)
    
    In [11]: coro.send(1)
    1
    
    In [12]: coro = coroutine()
    
    In [13]: coro.send(1)
    ---------------------------------------------------------------------------
    TypeError                                 Traceback (most recent call last)
    <ipython-input-13-e272bd1527da> in <module>()
    ----> 1 coro.send(1)
    
    TypeError: can't send non-None value to a just-started generator
    复制代码
  2. 能够经过 yield from 语句递归调用协程,效果以下:

    def coroutine():
        for i in range(3):
            val = yield
            print('coroutine %s' % val)
    
    def invoker():
        yield from coroutine()
    复制代码

    执行输出(就是会报异常):

    In [29]: coro = invoker()
    
    In [30]: coro.send(None)
    
    In [31]: coro.send(1)
    coroutine 1
    
    In [32]: coro.send(2)
    coroutine 2
    
    In [33]: coro.send(3)
    coroutine 3
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-33-8e657389bc11> in <module>()
    ----> 1 coro.send(3)
    
    StopIteration:
    复制代码
  3. 协程能够有返回值,保存在 StopIteration 异常中,做为 yield from 的左值时能够直接接收:

    def coroutine():
        val = yield
        return 'coroutine %s' % val
    复制代码

    执行输出:

    In [42]: coro = coroutine()
    
    In [43]: coro.send(None)
    
    In [44]: coro.send(1)
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-44-e272bd1527da> in <module>()
    ----> 1 coro.send(1)
    
    StopIteration: coroutine 1
    复制代码

简单事件循环的实现

原本想将 Future & TaskEventLoop 分红两节的,结果 TaskEventLoop 耦合在了一块儿,只好合在一块儿了,下面是代码:

class EventLoop:
    def __init__(self):
        self._ready = []

    def call_soon(self, task):
        self._ready.append(task)

    def run_forever(self, coro):
        root = Task(coro, self)
        while self._ready:
            task = self._ready.pop(0)
            task.step(Future())
        return root.result

class Future:
    def __init__(self):
        # 经过 result 来保存协程的返回值
        self.result = None
        # 经过 _callbacks 来保存回调函数
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        # try suppression bug
        self.result = self.result or result
        # 执行完成后将自身做为参数传递给回调函数
        for callback in self._callbacks:
            callback(self)

class Task(Future):
    # 协程类型
    coroutine = type((i for i in range(0)))

    def __init__(self, coro, loop):
        super().__init__()
        self.coro = coro
        self.loop = loop
        # 将自身加入任务队列
        self.loop.call_soon(self)

    def step(self, future):
        try:
            result = self.coro.send(future.result)
        except StopIteration as exc:
            # 触发 StopIteration 异常时说明协程已经执行结束
            self.set_result(exc.value)
        else:
            # 协程返回协程,将其转换为 Task 后将 self.step 注册为期回调函数等待唤醒
            if type(result) == self.coroutine:
                result = Task(result, self.loop)
                result.add_done_callback(self.step)
            # 协程返回任务,将 self.step 注册为回调函数等待唤醒
            elif isinstance(result, Task):
                # there is a bug
                result.add_done_callback(self.step)
                self.loop.call_soon(result)
            # 协程返回其余东西,不受理,直接将 self 再次放入任务队列
            else:
                self.loop.call_soon(self)
复制代码

一开始实现的时候是想用一个外部的事件循环来操做,不须要 Task 持有事件循环,可是实现过程当中发现那样存在一点问题,便学着 Python 中的方式将事件循环传递给 Task 操做, 但这里的实现是依然存在问题。

在只存在协程和同序返回 Task 的状况下测试没有问题,可是当存在异序返回 Task 的状况下问题就出现了,下面的测试代码即是异序返回,我经过在 set_result 中判断 result 的方式暂时抑制了该异常, 可是,这是治标不治本的方式。若是有大佬知道方案,请务必告诉我 QAQ

测试代码:

_loop = EventLoop()

def main():
    ta = Task(say_hello(), _loop)
    tb = Task(say_world(), _loop)

    b = yield tb
    a = yield ta

    return a + b

def say_world():
    print('world')
    yield
    return 'world'

def say_hello():
    print('hello')
    yield from say_other()
    return 'hello '


def say_other():
    print('other')
    yield

print(_loop.run_forever(main()))
复制代码

输出:

hello
other
world
hello world
复制代码

结语

折腾了一圈后结果仍是只能获得一份存在问题的代码,和去年的时候差很少,但比去年好的是,多少多了一点思路。

可是,仍是差得好远啊……

参考连接

Footnotes

1 ES6 中 async/await 的原理尚未怎么了解过,所以这里的异步只包括 ajax 这类异步操做