本文的代码来源于:snarky.ca/how-the-hec…python
上一篇文章介绍了 Python async、await 关键字的发展历史,说过,async和 await 是 API 而不是 implementation。基于 async、await实现的事件循环有不少,包括 asyncio、curio等。其中 asyncio 底层基于future
对象,curio 底层基于tuple。api
这篇文章咱们来用最小堆
实现一个简单的事件循环。bash
Heaps are arrays for which a[k] <= a[2k+1] and a[k] <= a[2k+2] for all k, counting elements from 0. For the sake of comparison, non-existing elements are considered to be infinite. The interesting property of a heap is that a[0] is always its smallest element. (来源于 Python 内置模块 heapq 源代码)微信
简单来讲,heaps就是一种有特殊性质的 Python 列表:a[k] <= a[2*k+1]
且 a[k] <= a[2*k+2]
,第一个元素永远是最小的。app
没错你确定已经看出来了,这就是一颗二叉树:async
heapq
模块主要有下面这几个 API:ide
Usage:
heap = [] # creates an empty heap
heappush(heap, item) # pushes a new item on the heap
item = heappop(heap) # pops the smallest item from the heap
item = heap[0] # smallest item on the heap without popping it
heapify(x) # transforms list into a heap, in-place, in linear time
item = heapreplace(heap, item) # pops and returns smallest item,and adds new item; the heap size is unchanged
复制代码
再回顾一下,这个可能有点难理解。函数
next_value = generator.send(value)
oop
会发生三件事:post
next_value
返回。看下这个例子:
>>> def double_inputs():
... while True:
... x = yield
... yield x * 2
...
>>> gen = double_inputs()
>>> next(gen) # run up to the first yield
>>> gen.send(10) # goes into 'x' variable
20
>>> next(gen) # run up to the next yield
>>> gen.send(6) # goes into 'x' again
12
>>> next(gen) # run up to the next yield
>>> gen.send(94.3) # goes into 'x' again
188.5999999999999
复制代码
执行gen.send(10)
发生的事情以下:
10
赋予了x = yield
的 x
x * 2
的值是20
,此时再次遇到 yield
,函数再次暂停,而且把x * 2
的值做为返回值,因此发现这个语句输出了20.next(g)
等价于g.send(None)
,这个常常用来让生成器运行到 yield 的地方而后停下来。
咱们要实现的事件循环很简单,核心功能以下:
你能够把这个想作是
asyncio.Task/curio.Task
。
class Task:
def __init__(self, wait_until, coro):
self.coro = coro
self.waiting_until = wait_until
def __eq__(self, other):
return self.waiting_until == other.waiting_until
def __lt__(self, other):
return self.waiting_until < other.waiting_until
复制代码
这里定义了两个特殊方法:__eq__
和__lt__
,用来对Task
进行<
和==
比较。由于咱们这里用的是heapq
最小堆,『最小』的排在最前面。Task 实例比较大小的依据是他们的waiting_until
(下一次恢复运行的时间点)。
因此,在某一个时刻,最小堆的状态多是这样的:
Task A
将在0秒后恢复运行,他的恢复运行时间(wait_until)『最小』,因此就会首先被弹出执行,而后 Task B
会取代他的位置成为『最小』的元素。
@types.coroutine
def sleep(seconds):
now = datetime.datetime.now()
wait_until = now + datetime.timedelta(seconds=seconds)
actual = yield wait_until
return actual - now
async def countdown(label, length, *, delay=0):
print(label, 'waiting', delay, 'seconds before starting countdown')
delta = await sleep(delay)
print(label, 'starting after waiting', delta)
while length:
print(label, 'T-minus', length)
waited = await sleep(1)
length -= 1
print(label, 'lift-off!')
复制代码
在delay
秒以后运行一个耗时length
秒的任务。简要分析一下代码:
有一点须要明确,countdown()
返回的是一个coroutine
对象,你若是不 await
它(或者调用 next()
, send()
),什么也不会真正执行。
delta = await sleep(delay)
这一句,会加入coroutine sleep()
里面,在第一个 yield 的地方暂停。要想让它恢复运行,须要经过某种方式"send stuff back"(参考上一篇文章),也就是对这个生成器调用send()
方法。 后面会看到,实际上这属于事件循环的工做。
另外,对于每一个任务,第一次恢复执行的时间应该是delay
秒,因此事件循环应该在程序开始delay
秒的时候调用send()
。
后面的while
循环会再次进行运行、暂停的循环,直到时间超过了length
秒,也就是任务结束。
class SleepingLoop:
def __init__(self, *coros):
self._new = coros
self._waiting = []
def run_until_complete(self):
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
while self._waiting:
now = datetime.datetime.now()
task = heapq.heappop(self._waiting)
if now < task.waiting_until:
delta = task.waiting_until - now
time.sleep(delta.total_seconds())
now = datetime.datetime.now()
try:
# It's time to resume the coroutine.
wait_until = task.coro.send(now)
heapq.heappush(self._waiting, Task(wait_until, task.coro))
except StopIteration:
# The coroutine is done.
pass
def main():
"""Start the event loop, counting down 3 separate launches. This is what a user would typically write. """
loop = SleepingLoop(
countdown('A', 5, delay=0),
countdown('B', 3, delay=2),
countdown('C', 4, delay=1)
)
start = datetime.datetime.now()
loop.run_until_complete()
print('Total elapsed time is', datetime.datetime.now() - start)
if __name__ == '__main__':
main()
复制代码
代码一共就只有这么点,是否是很简单?来分析一下:
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
复制代码
wait_for = coro.send(None)
是第一次对这些coroutine对象调用 send()
,如前面所说,这一步会在sleep
的actual = yield wait_until
这个地方停下来。wait_until
的值会传给wait_for
,这是第一次开始任务开始运行的时间。而后把这些Task 对象添加到最小堆里面。
接下来是一个 while
循环,每一个循环从最小堆中取出『最小』的元素,也就是下一次恢复运行时间最近的哪个任务。若是发现如今还没到它的恢复执行时间,就调用阻塞的 time.sleep()
。(这里能够阻塞,由于这个事件循环很是简单,咱们能够肯定这段时间没有新的任务须要恢复执行。)
接着对coro
调用send()
方法,若是还没遇到StopIteration
,就把新的 Task 推到最小堆(前面从最小堆里面取出任务,若是这个任务没迭代完,就更新它的下次恢复执行时间,再次推到最小堆里面)。
那么何时会发生StopIteration
异常呢?当countdown()
这个 coroutine 得 while 循环结束的时候,也就是没有更多的 yield 的时候。
import datetime
import heapq
import types
import time
class Task:
"""Represent how long a coroutine should wait before starting again. Comparison operators are implemented for use by heapq. Two-item tuples unfortunately don't work because when the datetime.datetime instances are equal, comparison falls to the coroutine and they don't implement comparison methods, triggering an exception. Think of this as being like asyncio.Task/curio.Task. """
def __init__(self, wait_until, coro):
self.coro = coro
self.waiting_until = wait_until
def __eq__(self, other):
return self.waiting_until == other.waiting_until
def __lt__(self, other):
return self.waiting_until < other.waiting_until
class SleepingLoop:
"""An event loop focused on delaying execution of coroutines. Think of this as being like asyncio.BaseEventLoop/curio.Kernel. """
def __init__(self, *coros):
self._new = coros
self._waiting = []
def run_until_complete(self):
# Start all the coroutines.
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
# Keep running until there is no more work to do.
while self._waiting:
now = datetime.datetime.now()
# Get the coroutine with the soonest resumption time.
task = heapq.heappop(self._waiting)
if now < task.waiting_until:
# We're ahead of schedule; wait until it's time to resume.
delta = task.waiting_until - now
time.sleep(delta.total_seconds())
now = datetime.datetime.now()
try:
# It's time to resume the coroutine.
wait_until = task.coro.send(now)
heapq.heappush(self._waiting, Task(wait_until, task.coro))
except StopIteration:
# The coroutine is done.
pass
@types.coroutine
def sleep(seconds):
"""Pause a coroutine for the specified number of seconds. Think of this as being like asyncio.sleep()/curio.sleep(). """
now = datetime.datetime.now()
wait_until = now + datetime.timedelta(seconds=seconds)
# Make all coroutines on the call stack pause; the need to use `yield`
# necessitates this be generator-based and not an async-based coroutine.
actual = yield wait_until
# Resume the execution stack, sending back how long we actually waited.
return actual - now
async def countdown(label, length, *, delay=0):
"""Countdown a launch for `length` seconds, waiting `delay` seconds. This is what a user would typically write. """
print(label, 'waiting', delay, 'seconds before starting countdown')
delta = await sleep(delay)
print(label, 'starting after waiting', delta)
while length:
print(label, 'T-minus', length)
waited = await sleep(1)
length -= 1
print(label, 'lift-off!')
def main():
"""Start the event loop, counting down 3 separate launches. This is what a user would typically write. """
loop = SleepingLoop(
countdown('A', 5, delay=0),
# countdown('B', 3, delay=2),
# countdown('C', 4, delay=1)
)
start = datetime.datetime.now()
loop.run_until_complete()
print('Total elapsed time is', datetime.datetime.now() - start)
if __name__ == '__main__':
main()
复制代码
把这个例子里面的元素和asyncio
作一下对应:
Task
类至关于asyncio.Task
。本文的Task
依据waiting_until
来判断恢复执行时间;asyncio.Task
是一个future
对象,当asyncio
的事件循环检测到这个future
对象的状态发生变化的时候,执行相应的逻辑。sleep()
函数相等于asyncio.sleep()
。不会阻塞。SleepingLoop
至关于asyncio.BaseEventLoop
。SleepingLoop
用的是最小堆,asyncio.BaseEventLoop
更加复杂,基于future
对象,以及 selectors
模块等。若是你像我同样真正热爱计算机科学,喜欢研究底层逻辑,欢迎关注个人微信公众号: