异步编程 101:写一个事件循环

本文的代码来源于:snarky.ca/how-the-hec…python

回顾一下

上一篇文章介绍了 Python async、await 关键字的发展历史,说过,async和 await 是 API 而不是 implementation。基于 async、await实现的事件循环有不少,包括 asyncio、curio等。其中 asyncio 底层基于future对象,curio 底层基于tuple。api

这篇文章咱们来用最小堆实现一个简单的事件循环。bash

heapq 模块

Heaps are arrays for which a[k] <= a[2k+1] and a[k] <= a[2k+2] for all k, counting elements from 0. For the sake of comparison, non-existing elements are considered to be infinite. The interesting property of a heap is that a[0] is always its smallest element. (来源于 Python 内置模块 heapq 源代码)微信

简单来讲,heaps就是一种有特殊性质的 Python 列表:a[k] <= a[2*k+1]a[k] <= a[2*k+2],第一个元素永远是最小的。app

没错你确定已经看出来了,这就是一颗二叉树:async

heapq模块主要有下面这几个 API:ide

Usage:

heap = []            # creates an empty heap
heappush(heap, item) # pushes a new item on the heap
item = heappop(heap) # pops the smallest item from the heap
item = heap[0]       # smallest item on the heap without popping it
heapify(x)           # transforms list into a heap, in-place, in linear time
item = heapreplace(heap, item) # pops and returns smallest item,and adds new item; the heap size is unchanged
复制代码
  • 初始化堆:heap = []
  • 往堆中添加元素:heappush(heap,item)
  • 从堆中 pop 出最小的元素:item = heappop(heap)
  • 从堆中获取最小元素可是不移除:item = heap[0]
  • 将队列转换成堆:heapify(x)
  • pop 最小元素并添加一个新的元素进去:item = heapreplace(heap, item)

生成器 send() 方法

再回顾一下,这个可能有点难理解。函数

next_value = generator.send(value)oop

会发生三件事:post

  • 恢复生成器继续执行
  • value 成为了生成器当前 yield 表达式的值
  • 生成器下一次 yield表达式的值,做为next_value返回。

看下这个例子:

>>> def double_inputs():
...     while True:
...         x = yield
...         yield x * 2
...
>>> gen = double_inputs()
>>> next(gen)       # run up to the first yield
>>> gen.send(10)    # goes into 'x' variable
20
>>> next(gen)       # run up to the next yield
>>> gen.send(6)     # goes into 'x' again
12
>>> next(gen)       # run up to the next yield
>>> gen.send(94.3)  # goes into 'x' again
188.5999999999999
复制代码

执行gen.send(10)发生的事情以下:

  • 让生成器恢复运行
  • 10赋予了x = yieldx
  • x * 2的值是20,此时再次遇到 yield,函数再次暂停,而且把x * 2的值做为返回值,因此发现这个语句输出了20.

next(g)等价于g.send(None),这个常常用来让生成器运行到 yield 的地方而后停下来。

事件循环功能设计

咱们要实现的事件循环很简单,核心功能以下:

  • 处理不少延时任务
  • 运行时间点最先的任务最早运行
  • 假如前面的任务须要很长时间才能完成,不会阻塞后面的任务(也就是他们能够并行执行)

代码

Task 类

你能够把这个想作是asyncio.Task/curio.Task

class Task:
    def __init__(self, wait_until, coro):
        self.coro = coro
        self.waiting_until = wait_until

    def __eq__(self, other):
        return self.waiting_until == other.waiting_until

    def __lt__(self, other):
        return self.waiting_until < other.waiting_until
复制代码

这里定义了两个特殊方法:__eq____lt__,用来对Task进行<==比较。由于咱们这里用的是heapq最小堆,『最小』的排在最前面。Task 实例比较大小的依据是他们的waiting_until下一次恢复运行的时间点)。

因此,在某一个时刻,最小堆的状态多是这样的:

Task A将在0秒后恢复运行,他的恢复运行时间(wait_until)『最小』,因此就会首先被弹出执行,而后 Task B会取代他的位置成为『最小』的元素。

实际执行的任务

@types.coroutine
def sleep(seconds):
    now = datetime.datetime.now()
    wait_until = now + datetime.timedelta(seconds=seconds)
    actual = yield wait_until
    return actual - now

async def countdown(label, length, *, delay=0):
    print(label, 'waiting', delay, 'seconds before starting countdown')
    delta = await sleep(delay)
    print(label, 'starting after waiting', delta)
    while length:
        print(label, 'T-minus', length)
        waited = await sleep(1)
        length -= 1
    print(label, 'lift-off!')
复制代码

delay秒以后运行一个耗时length秒的任务。简要分析一下代码:

有一点须要明确,countdown()返回的是一个coroutine对象,你若是不 await它(或者调用 next(), send()),什么也不会真正执行。

delta = await sleep(delay)这一句,会加入coroutine sleep()里面,在第一个 yield 的地方暂停。要想让它恢复运行,须要经过某种方式"send stuff back"(参考上一篇文章),也就是对这个生成器调用send()方法。 后面会看到,实际上这属于事件循环的工做。

另外,对于每一个任务,第一次恢复执行的时间应该是delay秒,因此事件循环应该在程序开始delay秒的时候调用send()

后面的while循环会再次进行运行、暂停的循环,直到时间超过了length秒,也就是任务结束。

事件循环代码

class SleepingLoop:
    def __init__(self, *coros):
        self._new = coros
        self._waiting = []

    def run_until_complete(self):
        for coro in self._new:
            wait_for = coro.send(None)
            heapq.heappush(self._waiting, Task(wait_for, coro))
        while self._waiting:
            now = datetime.datetime.now()
            task = heapq.heappop(self._waiting)
            if now < task.waiting_until:
                delta = task.waiting_until - now
                time.sleep(delta.total_seconds())
                now = datetime.datetime.now()
            try:
                # It's time to resume the coroutine.
                wait_until = task.coro.send(now)
                heapq.heappush(self._waiting, Task(wait_until, task.coro))
            except StopIteration:
                # The coroutine is done.
                pass

def main():
    """Start the event loop, counting down 3 separate launches. This is what a user would typically write. """
    loop = SleepingLoop(
        countdown('A', 5, delay=0),
        countdown('B', 3, delay=2),
        countdown('C', 4, delay=1)
    )
    start = datetime.datetime.now()
    loop.run_until_complete()
    print('Total elapsed time is', datetime.datetime.now() - start)

if __name__ == '__main__':
    main()
复制代码

代码一共就只有这么点,是否是很简单?来分析一下:

for coro in self._new:
    wait_for = coro.send(None)
    heapq.heappush(self._waiting, Task(wait_for, coro))
复制代码

wait_for = coro.send(None) 是第一次对这些coroutine对象调用 send(),如前面所说,这一步会在sleepactual = yield wait_until这个地方停下来。wait_until的值会传给wait_for,这是第一次开始任务开始运行的时间。而后把这些Task 对象添加到最小堆里面。

接下来是一个 while循环,每一个循环从最小堆中取出『最小』的元素,也就是下一次恢复运行时间最近的哪个任务。若是发现如今还没到它的恢复执行时间,就调用阻塞time.sleep()。(这里能够阻塞,由于这个事件循环很是简单,咱们能够肯定这段时间没有新的任务须要恢复执行。)

接着对coro调用send()方法,若是还没遇到StopIteration,就把新的 Task 推到最小堆(前面从最小堆里面取出任务,若是这个任务没迭代完,就更新它的下次恢复执行时间,再次推到最小堆里面)。

那么何时会发生StopIteration异常呢?当countdown()这个 coroutine 得 while 循环结束的时候,也就是没有更多的 yield 的时候。

最终的代码

import datetime
import heapq
import types
import time


class Task:
    """Represent how long a coroutine should wait before starting again. Comparison operators are implemented for use by heapq. Two-item tuples unfortunately don't work because when the datetime.datetime instances are equal, comparison falls to the coroutine and they don't implement comparison methods, triggering an exception. Think of this as being like asyncio.Task/curio.Task. """

    def __init__(self, wait_until, coro):
        self.coro = coro
        self.waiting_until = wait_until

    def __eq__(self, other):
        return self.waiting_until == other.waiting_until

    def __lt__(self, other):
        return self.waiting_until < other.waiting_until


class SleepingLoop:
    """An event loop focused on delaying execution of coroutines. Think of this as being like asyncio.BaseEventLoop/curio.Kernel. """

    def __init__(self, *coros):
        self._new = coros
        self._waiting = []

    def run_until_complete(self):
        # Start all the coroutines.
        for coro in self._new:
            wait_for = coro.send(None)
            heapq.heappush(self._waiting, Task(wait_for, coro))
        # Keep running until there is no more work to do.
        while self._waiting:
            now = datetime.datetime.now()
            # Get the coroutine with the soonest resumption time.
            task = heapq.heappop(self._waiting)
            if now < task.waiting_until:
                # We're ahead of schedule; wait until it's time to resume.
                delta = task.waiting_until - now
                time.sleep(delta.total_seconds())
                now = datetime.datetime.now()
            try:
                # It's time to resume the coroutine.
                wait_until = task.coro.send(now)
                heapq.heappush(self._waiting, Task(wait_until, task.coro))
            except StopIteration:
                # The coroutine is done.
                pass


@types.coroutine
def sleep(seconds):
    """Pause a coroutine for the specified number of seconds. Think of this as being like asyncio.sleep()/curio.sleep(). """
    now = datetime.datetime.now()
    wait_until = now + datetime.timedelta(seconds=seconds)
    # Make all coroutines on the call stack pause; the need to use `yield`
    # necessitates this be generator-based and not an async-based coroutine.
    actual = yield wait_until
    # Resume the execution stack, sending back how long we actually waited.
    return actual - now


async def countdown(label, length, *, delay=0):
    """Countdown a launch for `length` seconds, waiting `delay` seconds. This is what a user would typically write. """
    print(label, 'waiting', delay, 'seconds before starting countdown')
    delta = await sleep(delay)
    print(label, 'starting after waiting', delta)
    while length:
        print(label, 'T-minus', length)
        waited = await sleep(1)
        length -= 1
    print(label, 'lift-off!')


def main():
    """Start the event loop, counting down 3 separate launches. This is what a user would typically write. """
    loop = SleepingLoop(
        countdown('A', 5, delay=0),
        # countdown('B', 3, delay=2),
        # countdown('C', 4, delay=1)
    )
    start = datetime.datetime.now()
    loop.run_until_complete()
    print('Total elapsed time is', datetime.datetime.now() - start)



if __name__ == '__main__':
    main()
复制代码

总结一下

把这个例子里面的元素和asyncio作一下对应:

  • Task类至关于asyncio.Task。本文的Task依据waiting_until来判断恢复执行时间;asyncio.Task是一个future对象,当asyncio的事件循环检测到这个future对象的状态发生变化的时候,执行相应的逻辑。
  • sleep()函数相等于asyncio.sleep()。不会阻塞。
  • SleepingLoop至关于asyncio.BaseEventLoopSleepingLoop用的是最小堆,asyncio.BaseEventLoop更加复杂,基于future对象,以及 selectors模块等。

若是你像我同样真正热爱计算机科学,喜欢研究底层逻辑,欢迎关注个人微信公众号:

相关文章
相关标签/搜索