大部分基于asyncio的程序都是须要长期运行、基于网络的应用,处理这种应用的正确开启与关闭存在惊人的复杂性。html
开启相对来讲更简单点,常规作法是建立一个task,而后调用loop.run_forever(),就如第三章QuickStart中的例子同样。python
一个例外是当启动监听服务器时须要通过两个阶段:shell
run_until_complete()
来初始化并启动服务器自己;loop.run_forever()
来调用main函数。一般启动是很简单的,碰到上述例外状况,查看官方示例。编程
关闭就要复杂得多,以前讲过run_forever()
调用会阻塞主线程,当执行关闭时,会解除阻塞并执行后续代码,此时就须要:安全
run_until_complete()
来等待执行完毕。在这以后关闭才算完成,初学者在写异步代码时老是极力摆脱的一些错误信息好比task还未等待就被关闭了,主要缘由就是遗失了上述步骤中的一个或多个,用个例子来讲明。bash
import asyncio async def f(delay): await asyncio.sleep(delay) loop = asyncio.get_event_loop() t1 = loop.create_task(f(1)) # 任务1执行1秒 t2 = loop.create_task(f(2)) # 任务2执行2秒 loop.run_until_complete(t1) # 只有任务1被执行完成 loop.close()
λ python3 taskwaring.py Task was destroyed but it is pending! task: <Task pending coro=<f() running at taskwaring.py:4> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0312D6D0>()]>>
这个错误是说有些任务在loop关闭时还没完成,这也就是为何规范的关闭过程要将全部的task收集到一个task中,取消它们而后在loop关闭以前等待取消完成。服务器
再多看些比QuickStart代码更细节的例子,此次用官方文档中的echo服务器代码做为服务器,经过客户端代码来深刻学习。网络
from asyncio import ( get_event_loop, start_server, CancelledError, StreamReader, StreamWriter, Task, gather ) async def echo(reader: StreamReader, writer: StreamWriter): # 1 print('New connection.') try: while True: # 2 data: bytes = await reader.readlines() # 3 if data in [b'', b'quit']: break writer.write(data.upper()) # 4 await writer.drain() print('Leaving Connection.') except CancelledError: # 5 writer.write_eof() print('Cancelled') finally: writer.close() loop = get_event_loop() coro = start_server(echo, '127.0.0.1', 8888, loop=loop) # 6 server = loop.run_until_complete(coro) # 7 try: loop.run_forever() # 8 except KeyboardInterrupt: print('Shutting Down!') server.close() # 9 loop.run_until_complete(server.wait_closed()) # 10 tasks = Task.all_tasks() # 11 group = gather(*tasks, return_exceptions=True) # 12 group.cancel() loop.run_until_complete(group) # 13 loop.close()
这个协程用于为每一个创建的链接建立一个协程,使用了Stream的API;app
为了保持链接,用死循环获取消息;异步
从服务器获取信息;
将消息的字符所有大写返回;
此到处理退出,进行环境退出的清理工做;
这里是程序开始的地方,服务器须要单独循行,start_server方法返回一个corountine,必须在run_until_complete中执行;
运行coroutine来启动TCP服务器;
如今才开始程序的监听部分,为链接到服务器的每一个TCP生成一个coroutine来执行echo例程函数,惟一能打断loop的只能是KeyboardInterrupt异常;
程序运行到这里的话,关闭操做已经开始,从如今开始要让服务器中止接受新的链接,第一步是调用server.close();
第二步是调用server.wait_closed()来关闭那些仍在等待链接创建的socket,仍处于活跃状态的链接不会受影响;
开始关闭task,先收集当前全部等待状态的task;
将task汇集到一个group中,而后调用cancel方法,此处的return_exceptions参数后面讲;
运行group这个协程。
要注意的一点是,若是在一个coroutine内部捕捉了一个CancelledError,要注意在异常捕捉代码中不要建立任何coroutine,all_tasks()
没法感知在run_until_complete()
运行阶段建立的任何新任务。
return_exceptions=True
参数是干什么的?gather()
方法有个默认参数是return_exceptions=False,经过默认设置来关闭异常处理是有问题的,很难直接解释清楚,能够经过一系列事实来讲明:
1. run_until_complete()
方法执行Future对象,在关闭期间,执行由gather()
方法返回的Future对象;
2. 若是这个Future对象抛出了一个异常,那么这个异常会继续向上抛出,致使loop中止;
3. 若是run_until_complete()
被用来执行一个group Future对象,任何group内子任务未处理而抛出的异常都会被向上抛出,也包含CancelledError;
4. 若是一部分子任务处理了CancelledError异常,另外一部分未处理,则未处理的那部分的异常也会致使loop中止,这意味着loop在全部tasks完成前就中止了;
5. 在关闭loop时,不但愿上述特性被触发,只是想要全部在group中的task尽快执行结束,也不理会某些task是否抛出异常;
6. 使用gather(*, return_exceptions=True)
可让group将子任务中的异常看成返回值处理,所以不会影响run_until_complete()
的执行。
关于捕获异常不合人意的一点就是某些异常在group内被处理了而没有被抛出,这对经过结果查找异常、写logging形成了困难。
import asyncio async def f(delay): await asyncio.sleep(1/delay) # 传入值是0就很恶心了 return delay loop = asyncio.get_event_loop() for i in range(10): loop.create_task(f(i)) pending = asyncio.Task.all_tasks() group = asyncio.gather(*pending, return_exceptions=True) results = loop.run_until_complete(group) print(f'Results: {results}') loop.close()
不设置参数的话就会致使异常被向上抛出,而后loop中止并致使其余task没法完成。安全退出是网络编程最难的问题之一,这对asyncio也是同样的。
在上一个例子中演示了如何经过KeyboardInterrupt
来退出loop,这个异常有效地结束了run_forever()
的阻塞,并容许后续代码得以执行。
KeyboardInterrupt
异常等同于SIGINT
信号,在网络服务中最经常使用的中止信号实际上是SIGTERM
,而且也是在UNIX shell环境中使用kill
指令发出的默认信号。
在UNIX系统中kill
指令其实就是发送信号给进程,不加参数地调用就会发送TERM
信号使进程安全退出或被忽视掉,一般这不是个好办法,由于若是进程没有退出,kill
就会发送KILL信号来强制退出,这会致使你的程序没法可控地结束。
asyncio原生支持处理进程信号,但处理通常信号的复杂度过高(不是针对asyncio),本文不会深刻讲解,只会挑一些常见信号来举例。先看下例:
# shell_signal01.py import asyncio async def main(): # 这里是应用的主体部分,简单的用一个死循环来表示程序运行 while True: print('<Your app is running>') await asyncio.sleep(1) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.create_task(main()) # 这里与前几个例子同样,将coroutine添加到loop中 try: loop.run_forever() except KeyboardInterrupt: # 在本例中,只有Ctrl-C会终止loop,而后像前例中进行善后工做 print('<Got signal: SIGINT, shutting down.>') tasks = asyncio.Task.all_tasks() group = asyncio.gather(*tasks, return_exceptions=True) group.cancel() loop.run_until_complete(group) loop.close()
这些很简单,下面思考一些复杂的功能:
1. 产品须要将SIGINT和SIGTERM都看成中止信号;
2. 须要在应用的main()
中处理CancelledError
,而且处理异常的代码也须要一小段时间来运行(例若有一堆网络链接须要关闭);
3. 应用屡次接收中止信号不会出现异常,在接收到一次中止信号后,后续的信号都不做处理。
asyncio提供了足够粒度的API来处理这些场景。
# shell_signal02.py import asyncio from signal import SIGINT, SIGTERM # 从标准库中导入信号值 async def main(): try: while True: print('<Your app is running>') await asyncio.sleep(1) except asyncio.CancelledError: # 1 for i in range(3): print('<Your app is shtting down...>') await asyncio.sleep(1) def handler(sig): # 2 loop.stop() # 3 print(f'Got signal: {sig}, shtting down.') loop.remove_signal_handler(SIGTERM) # 4 loop.add_signal_handler(SIGINT, lambda: None) # 5 if __name__ == "__main__": loop = asyncio.get_event_loop() for sig in (SIGINT, SIGTERM): # 6 loop.add_signal_handler(sig, handler, sig) loop.create_task(main()) loop.run_forever() tasks = asyncio.Task.all_tasks() group = asyncio.gather(*tasks, return_exceptions=True) group.cancel() loop.run_until_complete(group) loop.close()
如今在coroutine内部处理中止业务,在调用group.cancel()时收到取消信号,在处理关闭loop的run_until_complete阶段,main将继续运行一段时间;
这是收到信号后的回调函数,它经过add_signal_handler()修改了loop的配置;
在回调函数开始执行时,首先要中止loop,这使得关闭业务代码开始执行;
此时已经开始中止代码业务,所以移除SIGTERM来忽视后续的中止信号,不然会使中止代码业务也被终止;
原理与上面相似,但SIGINT不能简单地remove,由于KeyboardInterrupt默认是SIGINT信号的handler,须要将SIGINT的handler置空;
在这里配置信号的回调函数,都指向handler,所以配置了SIGINT的handler,会覆盖掉默认的KeyboardInterrupt。
在QuickStart中有一段代码使用了阻塞的sleep()
调用,当时说明了一个状况即若是该阻塞调用耗时比loop的执行耗时长时会发生什么,如今来讨论,先放结论,若是不进行人工干预将会获得一系列errors。
import time import asyncio async def main(): print(f'{time.ctime()} Hello!') await asyncio.sleep(1.0) print(f'{time.ctime()} Goodbye!') loop.stop() def blocking(): time.sleep(1.5) print(f"{time.ctime()} Hello from a thread!") loop = asyncio.get_event_loop() loop.create_task(main()) loop.run_in_executor(None, blocking) loop.run_forever() tasks = asyncio.Task.all_tasks(loop=loop) group = asyncio.gather(*tasks, return_exceptions=True) loop.run_until_complete(group) loop.close()
λ python3 quickstart.py Sun Sep 30 14:11:57 2018 Hello! Sun Sep 30 14:11:58 2018 Goodbye! Sun Sep 30 14:11:59 2018 Hello from a thread! exception calling callback for <Future at 0x36cff70 state=finished returned NoneType> Traceback (most recent call last): ... raise RuntimeError('Event loop is closed') RuntimeError: Event loop is closed
来看下背后发生了什么,run_in_executor()
返回的是Future而不是Task,这说明它不能被asyncio.Task.all_tasks()
感知,因此后续的run_until_complete()
也就不会等待这个Future执行完毕。
有三个解决思路,都通过了不一样程度的权衡,下面逐个过一遍,从不一样视角观察事件loop的内涵,思考在程序中相互调用的全部coroutine、线程、子进程的生命周期管理。
第一个思路,将executor放到coroutine中并以此创建一个task。
# OPTION-A import time import asyncio async def main(): print(f'{time.ctime()} Hello!') await asyncio.sleep(1.0) print(f'{time.ctime()} Goodbye!') loop.stop() def blocking(): time.sleep(2.0) print(f"{time.ctime()} Hello from a thread!") async def run_blocking(): # 1 await loop.run_in_executor(None, blocking) loop = asyncio.get_event_loop() loop.create_task(main()) loop.create_task(run_blocking()) # 2 loop.run_forever() tasks = asyncio.Task.all_tasks(loop=loop) group = asyncio.gather(*tasks, return_exceptions=False) loop.run_until_complete(group) loop.close()
这个想法是run_in_executor返回的Future而不是task,虽然没法用all_tasks()捕获,但能够用await等待一个Future,因此用一个新的coroutine来await在executor中的阻塞调用,这个新的coroutine将被做为task添加到loop;
就像运行main同样将这个coroutine添加到loop中。
上述代码看起来不错,除了不能执行任务取消。能够发现代码中少了group.cancel()
,假若加回来又会获得Event loop is closed
错误,甚至不能在run_blocking()
中处理CancelledError以便从新await Future,不管作什么该task都会被取消,但executor会将其内部的sleep执行完。
第二个思路,收集还没有完成的task,仅取消它们,但在调用run_until_complete()
以前要将run_in_executor()
生成的Future添加进去。
# OPTION-B import time import asyncio async def main(): print(f'{time.ctime()} Hello!') await asyncio.sleep(1.0) print(f'{time.ctime()} Goodbye!') loop.stop() def blocking(): time.sleep(2.0) print(f"{time.ctime()} Hello from a thread!") loop = asyncio.get_event_loop() loop.create_task(main()) future = loop.run_in_executor(None, blocking) # 1 loop.run_forever() tasks = asyncio.Task.all_tasks(loop=loop) # 2 group_tasks = asyncio.gather(*tasks, return_exceptions=True) group_tasks.cancel() # 取消tasks group = asyncio.gather(group_task, future) # 3 loop.run_until_complete(group) loop.close()
记录返回的Future;
此处loop已中止,先得到全部task,注意这里面没有executor的Future;
建立了一个新的group来合并tasks和Future,在这种状况下executor也能正常退出,而tasks仍然经过正常的cancel来取消。
这个解决办法在关闭时比较友好,但仍然有缺陷。一般来讲,在整个程序中经过某种方式收集全部的executor返回的Future对象,而后与tasks合并,而后等待执行完成,这十分不方便,虽然有效,但还有更好的解决办法。
# OPTION-C import time import asyncio from concurrent.futures import ThreadPoolExecutor as Executor async def main(): print(f'{time.ctime()} Hello!') await asyncio.sleep(1.0) print(f'{time.ctime()} Goodbye!') loop.stop() def blocking(): time.sleep(2.0) print(f"{time.ctime()} Hello from a thread!") loop = asyncio.get_event_loop() executor = Executor() # 1 loop.set_default_executor(executor) # 2 loop.create_task(main()) future = loop.run_in_executor(None, blocking) # 3 loop.run_forever() tasks = asyncio.Task.all_tasks(loop=loop) group = asyncio.gather(*tasks, return_exceptions=True) group.cancel() loop.run_until_complete(group) executor.shutdown(wait=True) # 4 loop.close()
创建本身的executor实例;
将其设定为loop的默认executor;
像之前同样;
明确地在loop关闭前等待executor的全部Future执行完,这能够避免"Event loop is closed"这样的错误信息,能这样作是由于得到了使用executor的权限,而asyncio默认的executor没有开放相应的接口调用。
如今能够在任何地方调用run_in_executor()
,而且程序能够优雅地退出了。