深刻Asyncio(十一)优雅地开始与结束

Startup and Shutdown Graceful

大部分基于asyncio的程序都是须要长期运行、基于网络的应用,处理这种应用的正确开启与关闭存在惊人的复杂性。html

开启相对来讲更简单点,常规作法是建立一个task,而后调用loop.run_forever(),就如第三章QuickStart中的例子同样。python

一个例外是当启动监听服务器时须要通过两个阶段:shell

  1. 为服务器的启动建立一个coroutine,而后调用run_until_complete()来初始化并启动服务器自己;
  2. 经过调用loop.run_forever()来调用main函数。

一般启动是很简单的,碰到上述例外状况,查看官方示例编程

关闭就要复杂得多,以前讲过run_forever()调用会阻塞主线程,当执行关闭时,会解除阻塞并执行后续代码,此时就须要:安全

  1. 收集全部还没有完成的task对象;
  2. 将他们汇集到一个group任务中;
  3. 取消group任务(须要捕捉CancelledError);
  4. 经过run_until_complete()来等待执行完毕。

在这以后关闭才算完成,初学者在写异步代码时老是极力摆脱的一些错误信息好比task还未等待就被关闭了,主要缘由就是遗失了上述步骤中的一个或多个,用个例子来讲明。bash

import asyncio

async def f(delay):
    await asyncio.sleep(delay)

loop = asyncio.get_event_loop()
t1 = loop.create_task(f(1))    # 任务1执行1秒
t2 = loop.create_task(f(2))    # 任务2执行2秒
loop.run_until_complete(t1)    # 只有任务1被执行完成
loop.close()
λ python3 taskwaring.py
Task was destroyed but it is pending!
task: <Task pending coro=<f() running at taskwaring.py:4> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0312D6D0>()]>>

这个错误是说有些任务在loop关闭时还没完成,这也就是为何规范的关闭过程要将全部的task收集到一个task中,取消它们而后在loop关闭以前等待取消完成。服务器

再多看些比QuickStart代码更细节的例子,此次用官方文档中的echo服务器代码做为服务器,经过客户端代码来深刻学习。网络

from asyncio import (
    get_event_loop,
    start_server,
    CancelledError,
    StreamReader,
    StreamWriter,
    Task,
    gather
    )

async def echo(reader: StreamReader, writer: StreamWriter):    # 1
    print('New connection.')
    try:
        while True:    # 2
            data: bytes = await reader.readlines()  # 3
            if data in [b'', b'quit']:
                break
            writer.write(data.upper())  # 4
            await writer.drain()
        print('Leaving Connection.')
    except CancelledError:  # 5
        writer.write_eof()
        print('Cancelled')
    finally:
        writer.close()

loop = get_event_loop()
coro = start_server(echo, '127.0.0.1', 8888, loop=loop)    # 6
server = loop.run_until_complete(coro)  # 7

try:
    loop.run_forever()  # 8
except KeyboardInterrupt:
    print('Shutting Down!')

server.close()  # 9
loop.run_until_complete(server.wait_closed())   # 10

tasks = Task.all_tasks()    # 11
group = gather(*tasks, return_exceptions=True)  # 12
group.cancel()
loop.run_until_complete(group)  # 13
loop.close()
  1. 这个协程用于为每一个创建的链接建立一个协程,使用了Stream的API;app

  2. 为了保持链接,用死循环获取消息;异步

  3. 从服务器获取信息;

  4. 将消息的字符所有大写返回;

  5. 此到处理退出,进行环境退出的清理工做;

  6. 这里是程序开始的地方,服务器须要单独循行,start_server方法返回一个corountine,必须在run_until_complete中执行;

  7. 运行coroutine来启动TCP服务器;

  8. 如今才开始程序的监听部分,为链接到服务器的每一个TCP生成一个coroutine来执行echo例程函数,惟一能打断loop的只能是KeyboardInterrupt异常;

  9. 程序运行到这里的话,关闭操做已经开始,从如今开始要让服务器中止接受新的链接,第一步是调用server.close();

  10. 第二步是调用server.wait_closed()来关闭那些仍在等待链接创建的socket,仍处于活跃状态的链接不会受影响;

  11. 开始关闭task,先收集当前全部等待状态的task;

  12. 将task汇集到一个group中,而后调用cancel方法,此处的return_exceptions参数后面讲;

  13. 运行group这个协程。


要注意的一点是,若是在一个coroutine内部捕捉了一个CancelledError,要注意在异常捕捉代码中不要建立任何coroutine,all_tasks()没法感知在run_until_complete()运行阶段建立的任何新任务。

return_exceptions=True参数是干什么的?

gather()方法有个默认参数是return_exceptions=False,经过默认设置来关闭异常处理是有问题的,很难直接解释清楚,能够经过一系列事实来讲明:
1. run_until_complete()方法执行Future对象,在关闭期间,执行由gather()方法返回的Future对象;
2. 若是这个Future对象抛出了一个异常,那么这个异常会继续向上抛出,致使loop中止;
3. 若是run_until_complete()被用来执行一个group Future对象,任何group内子任务未处理而抛出的异常都会被向上抛出,也包含CancelledError;
4. 若是一部分子任务处理了CancelledError异常,另外一部分未处理,则未处理的那部分的异常也会致使loop中止,这意味着loop在全部tasks完成前就中止了;
5. 在关闭loop时,不但愿上述特性被触发,只是想要全部在group中的task尽快执行结束,也不理会某些task是否抛出异常;
6. 使用gather(*, return_exceptions=True)可让group将子任务中的异常看成返回值处理,所以不会影响run_until_complete()的执行。

关于捕获异常不合人意的一点就是某些异常在group内被处理了而没有被抛出,这对经过结果查找异常、写logging形成了困难。

import asyncio

async def f(delay):
    await asyncio.sleep(1/delay)    # 传入值是0就很恶心了
    return delay

loop = asyncio.get_event_loop()
for i in range(10):
    loop.create_task(f(i))
pending = asyncio.Task.all_tasks()
group = asyncio.gather(*pending, return_exceptions=True)
results = loop.run_until_complete(group)
print(f'Results: {results}')
loop.close()

不设置参数的话就会致使异常被向上抛出,而后loop中止并致使其余task没法完成。安全退出是网络编程最难的问题之一,这对asyncio也是同样的。

Signals

在上一个例子中演示了如何经过KeyboardInterrupt来退出loop,这个异常有效地结束了run_forever()的阻塞,并容许后续代码得以执行。

KeyboardInterrupt异常等同于SIGINT信号,在网络服务中最经常使用的中止信号实际上是SIGTERM,而且也是在UNIX shell环境中使用kill指令发出的默认信号。

在UNIX系统中kill指令其实就是发送信号给进程,不加参数地调用就会发送TERM信号使进程安全退出或被忽视掉,一般这不是个好办法,由于若是进程没有退出,kill就会发送KILL信号来强制退出,这会致使你的程序没法可控地结束。

asyncio原生支持处理进程信号,但处理通常信号的复杂度过高(不是针对asyncio),本文不会深刻讲解,只会挑一些常见信号来举例。先看下例:

# shell_signal01.py
import asyncio

async def main():   # 这里是应用的主体部分,简单的用一个死循环来表示程序运行
    while True:
        print('<Your app is running>')
        await asyncio.sleep(1)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.create_task(main())    # 这里与前几个例子同样,将coroutine添加到loop中
    try:
        loop.run_forever()
    except KeyboardInterrupt:   # 在本例中,只有Ctrl-C会终止loop,而后像前例中进行善后工做
        print('<Got signal: SIGINT, shutting down.>')
    tasks = asyncio.Task.all_tasks()
    group = asyncio.gather(*tasks, return_exceptions=True)
    group.cancel()
    loop.run_until_complete(group)
    loop.close()

这些很简单,下面思考一些复杂的功能:
1. 产品须要将SIGINT和SIGTERM都看成中止信号;
2. 须要在应用的main()中处理CancelledError,而且处理异常的代码也须要一小段时间来运行(例若有一堆网络链接须要关闭);
3. 应用屡次接收中止信号不会出现异常,在接收到一次中止信号后,后续的信号都不做处理。

asyncio提供了足够粒度的API来处理这些场景。

# shell_signal02.py
import asyncio
from signal import SIGINT, SIGTERM    # 从标准库中导入信号值

async def main():
    try:
        while True:
            print('<Your app is running>')
            await asyncio.sleep(1)
    except asyncio.CancelledError:  # 1
        for i in range(3):
            print('<Your app is shtting down...>')
            await asyncio.sleep(1)

def handler(sig):   # 2
    loop.stop()    # 3
    print(f'Got signal: {sig}, shtting down.')
    loop.remove_signal_handler(SIGTERM)    # 4
    loop.add_signal_handler(SIGINT, lambda: None)   # 5


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    for sig in (SIGINT, SIGTERM):   # 6
        loop.add_signal_handler(sig, handler, sig)
    loop.create_task(main())
    loop.run_forever()
    tasks = asyncio.Task.all_tasks()
    group = asyncio.gather(*tasks, return_exceptions=True)
    group.cancel()
    loop.run_until_complete(group)
    loop.close()
  1. 如今在coroutine内部处理中止业务,在调用group.cancel()时收到取消信号,在处理关闭loop的run_until_complete阶段,main将继续运行一段时间;

  2. 这是收到信号后的回调函数,它经过add_signal_handler()修改了loop的配置;

  3. 在回调函数开始执行时,首先要中止loop,这使得关闭业务代码开始执行;

  4. 此时已经开始中止代码业务,所以移除SIGTERM来忽视后续的中止信号,不然会使中止代码业务也被终止;

  5. 原理与上面相似,但SIGINT不能简单地remove,由于KeyboardInterrupt默认是SIGINT信号的handler,须要将SIGINT的handler置空;

  6. 在这里配置信号的回调函数,都指向handler,所以配置了SIGINT的handler,会覆盖掉默认的KeyboardInterrupt。


在关闭过程当中等待Executor执行

在QuickStart中有一段代码使用了阻塞的sleep()调用,当时说明了一个状况即若是该阻塞调用耗时比loop的执行耗时长时会发生什么,如今来讨论,先放结论,若是不进行人工干预将会获得一系列errors。

import time
import asyncio

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()


def blocking():
    time.sleep(1.5)
    print(f"{time.ctime()} Hello from a thread!")


loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_in_executor(None, blocking)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()
λ python3 quickstart.py
Sun Sep 30 14:11:57 2018 Hello!
Sun Sep 30 14:11:58 2018 Goodbye!
Sun Sep 30 14:11:59 2018 Hello from a thread!
exception calling callback for <Future at 0x36cff70 state=finished returned NoneType>
Traceback (most recent call last):
    ...
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

来看下背后发生了什么,run_in_executor()返回的是Future而不是Task,这说明它不能被asyncio.Task.all_tasks()感知,因此后续的run_until_complete()也就不会等待这个Future执行完毕。

有三个解决思路,都通过了不一样程度的权衡,下面逐个过一遍,从不一样视角观察事件loop的内涵,思考在程序中相互调用的全部coroutine、线程、子进程的生命周期管理。

第一个思路,将executor放到coroutine中并以此创建一个task。

# OPTION-A
import time
import asyncio

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(2.0)
    print(f"{time.ctime()} Hello from a thread!")

async def run_blocking():  # 1
    await loop.run_in_executor(None, blocking)

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.create_task(run_blocking())  # 2
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=False)
loop.run_until_complete(group)
loop.close()
  1. 这个想法是run_in_executor返回的Future而不是task,虽然没法用all_tasks()捕获,但能够用await等待一个Future,因此用一个新的coroutine来await在executor中的阻塞调用,这个新的coroutine将被做为task添加到loop;

  2. 就像运行main同样将这个coroutine添加到loop中。


上述代码看起来不错,除了不能执行任务取消。能够发现代码中少了group.cancel(),假若加回来又会获得Event loop is closed错误,甚至不能在run_blocking()中处理CancelledError以便从新await Future,不管作什么该task都会被取消,但executor会将其内部的sleep执行完。

第二个思路,收集还没有完成的task,仅取消它们,但在调用run_until_complete()以前要将run_in_executor()生成的Future添加进去。

# OPTION-B
import time
import asyncio

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(2.0)
    print(f"{time.ctime()} Hello from a thread!")

loop = asyncio.get_event_loop()
loop.create_task(main())
future = loop.run_in_executor(None, blocking)   # 1
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)   # 2
group_tasks = asyncio.gather(*tasks, return_exceptions=True)
group_tasks.cancel()    # 取消tasks
group = asyncio.gather(group_task, future)  # 3
loop.run_until_complete(group)
loop.close()
  1. 记录返回的Future;

  2. 此处loop已中止,先得到全部task,注意这里面没有executor的Future;

  3. 建立了一个新的group来合并tasks和Future,在这种状况下executor也能正常退出,而tasks仍然经过正常的cancel来取消。


这个解决办法在关闭时比较友好,但仍然有缺陷。一般来讲,在整个程序中经过某种方式收集全部的executor返回的Future对象,而后与tasks合并,而后等待执行完成,这十分不方便,虽然有效,但还有更好的解决办法。

# OPTION-C
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor as Executor

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(2.0)
    print(f"{time.ctime()} Hello from a thread!")

loop = asyncio.get_event_loop()
executor = Executor()   # 1
loop.set_default_executor(executor)    # 2
loop.create_task(main())
future = loop.run_in_executor(None, blocking)   # 3
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
group.cancel()
loop.run_until_complete(group)
executor.shutdown(wait=True)    # 4
loop.close()
  1. 创建本身的executor实例;

  2. 将其设定为loop的默认executor;

  3. 像之前同样;

  4. 明确地在loop关闭前等待executor的全部Future执行完,这能够避免"Event loop is closed"这样的错误信息,能这样作是由于得到了使用executor的权限,而asyncio默认的executor没有开放相应的接口调用。

如今能够在任何地方调用run_in_executor(),而且程序能够优雅地退出了。

相关文章
相关标签/搜索