Python3.4 采用了一个强大的框架来支持代码的并发执行: asyncio。这个框架使用事件循环来编排回调和异步任务。
事件循环位于事件循环策略的上下文中-这是 asyncio 所特有的概念。
下图是协程,事件循环和策略之间的相互做用
协程能够被认为是能够在明确标记有某种语法元素的阶段“暂停”的函数.
经过任务对象跟踪协程的状态,由相应的事件循环实例化。 事件循环跟踪当前正在运行的任务,并将 CPU 时间从空闲协程委派给待处理协议。在本章中,咱们将更多地了解事件循环接口及其生命周期。将讨论事件循环策略-以及全局 asyncio API 对它们的影响。或者和其余异步工做单元(callbacks, promises/futures, and coroutines), 不一样的事件循环,可是事件循环是区别于操做系统的。html
因为各类缘由,并发框架必须可以告诉您事件循环当前是否正在运行以及它是哪个。例如,您的代码可能必须断言只有一个特定的循环实现正在运行您的任务。所以,只有一个任务能够改变某些共享资源或确保将调度您的回调python
使用全局 asyncio.get_event_loop 和 asyncio.get_running_loop 的 api。
代码示例 1windows
import asyncio loop = asyncio.get_event_loop()
输出api
<_UnixSelectorEventLoop running=False closed=False debug=False> #windows 输出 <_WindowsSelectorEventLoop running=False closed=False debug=False>
代码示例 2promise
import asyncio try: loop = asyncio.get_running_loop() except RuntimeError: print("No loop running")
在 Python 3.7 中,有两种有效的方法来获取当前正在运行的循环实例。
咱们能够调用 asyncio.get_event_loop 或 asyncio.get_running_loop
但 asyncio.get_event_loop 内部是作了什么?大概下面几点
1.检查在调用函数时是否有循环运行
2.返回其 pid 与当前进程 pid 匹配的运行循环(若是有)
3.若是没有,获取存储在 asynci omodule 中的全局变量中的线程全局 LoopPolicy 实例。
4.若是没有设置它,则使用锁用 DefaultLoopPolicy 实例化它。(_init_event_loop_policy 方法)
5.注意,DefaultLoopPolicy 是依赖于操做系统的子类 BaseDefaultEventLoopPolicy,它提供了一个默认的循环实现。获取被调用的事件循环
6.这是有个问题:仅在主线程上实例化循环并将其分配给线程局部变量时才会使用 loop_policy.get_event_loop 方法。
若是你不在主线程上而且没有经过其余方式实例化运行循环,则会引起 RuntimeError并发
这个过程有一些问题app
若是你不在主线程上,它将引起 RuntimeError
asyncio.get_running_loop 的工做方式不一样。 若是有一个正在运行,它将始终返回当前正在运行的循环实例。 若是没有,则会引起 RuntimeError。框架
因为 asyncio 中的循环与循环策略的概念紧密耦合,所以不建议经过循环构造函数建立循环实例。
不然,咱们可能会遇到范围问题,由于全局 asyncio.get_event_loop 函数只检索本身建立的循环或经过 asyncio.set_event_loop 设置的循环。dom
import asyncio import sys loop = asyncio.new_event_loop() print(loop) # Print the loop asyncio.set_event_loop(loop) if sys.platform != "win32": watcher = asyncio.get_child_watcher() watcher.attach_loop(loop)
上面的代码怎么运行的呢
若是从主线程调用,那么 asyncio.get_event_loop 应用程序接口仅实例化该循环
下面是一个循环绑定到线程的例子异步
import asyncio import threading from functools import partial def _worker(worker, *args, **kwargs): # 循环存在于循环策略的上下文中。DefaultLoopPolicy 对每一个线程的循环进行限定, # 不容许经过 asyncio.get_event_loop 在主线程以外建立循环 # 所以,咱们必须经过 asyncio.set_event_loop(asyncio.new_event_loop())建立一个线程本地事件循环。 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(worker(*args, **kwargs)) finally: loop.close() def create_event_loop_thread(worker, *args, **kwargs): return threading.Thread(target=partial(_worker, worker), args=args, kwargs=kwargs) async def print_coro(*args, **kwargs): print(f"Inside the print coro on {threading.get_ident()}:", (args, kwargs)) def start_threads(*threads): [t.start() for t in threads if isinstance(t, threading.Thread)] def join_threads(*threads): [t.join() for t in threads if isinstance(t, threading.Thread)] def main(): workers = [create_event_loop_thread(print_coro) for i in range(10)] start_threads(*workers) join_threads(*workers) if __name__ == '__main__': main()
使用更高级的 multiprocessing 模块,咱们能够构建一个跨平台的解决方案,在流程本地事件循环中运行多个协程。
这样咱们就能够规避 GIL 强加的 CPython 限制,并利用 asyncio 来提升 I/O 密集型任务的单核 CPU 使用率。
###协程附加到进程 import asyncio import os import random import typing from multiprocessing import Process processes = [] def cleanup(): global processes while processes: proc = processes.pop() try: proc.join() except KeyboardInterrupt: # Ctrl+C 终止进程 proc.terminate() async def worker(): random_delay = random.randint(0, 3) result = await asyncio.sleep(random_delay, result=f"Working in process: {os.getpid()}") print(result) def process_main(coro_worker: typing.Callable, num_of_coroutines: int, ): """ 在单独的进程中运行多个协程的进程类。将在每一个进程中运行的函数 建议使用 asyncio.run 而不是实例化本身的事件循环。 此示例仅用于说明如何在不一样进程中实例化事件循环! :param coro_worker: :param num_of_coroutines: :return: """ loop = asyncio.new_event_loop() try: workers = [coro_worker() for _ in range(num_of_coroutines)] loop.run_until_complete(asyncio.gather(*workers, loop=loop)) except KeyboardInterrupt: print(f"Stoping {os.getpid()}") loop.stop() finally: loop.close() def main(processes, num_procs, num_coros, process_main): for _ in range(num_procs): proc = Process(target=process_main, args=(worker, num_coros)) processes.append(proc) proc.start() if __name__ == '__main__': try: main(processes, 10, 2, process_main, ) except KeyboardInterrupt: print("Ctrl+C 中止运行") finally: cleanup() print("CleanUp finished")
此示例说明如何编写使用多处理的应用程序。
若是不想费心修改循环策略和清理异步生成器以后的代码(您将在下一章中了解它们),请使用如下代码。
若是你只有一个线程和进程,而且只有一个协程须要从头至尾运行,这也很好。
import asyncio async def main(): pass asyncio.run(main())
在 Python3.6 你可使用如下方法
import asyncio async def main(): pass loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: try: # 清理任何没有彻底消耗的异步生成器。 loop.run_until_complete(loop.shutdown_asyncgens()) finally: loop.close()
若是代码可能运行在线程中,须要使用下面的方式
import asyncio import sys async def main(): pass loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) if sys.platform != "win32": # 返回当前策略的当前子监视器。 watcher = asyncio.get_child_watcher() # 给一个事件循环绑定监视器。 # 若是监视器以前已绑定另外一个事件循环,那么在绑定新循环前会先解绑原来的事件循环。 watcher.attach_loop(loop) try: loop.run_forever() finally: try: loop.run_until_complete(loop.shutdown_asyncgens()) finally: loop.close()
import asyncio async def main(loop): assert loop == asyncio.get_running_loop() print("ok") loop = asyncio.get_event_loop() loop.run_until_complete(main(loop))
或者下面这种
import asyncio async def main(): pass loop = asyncio.get_event_loop() # 经过使用 loop.create_task API,可确保协程将在特定循环上运行。 task = loop.create_task(main()) task.add_done_callback(lambda fut: loop.stop()) loop.run_forever()
import asyncio import functools async def main(loop): print("Print in main") def stop_loop(fut, *, loop): loop.call_soon_threadsafe(loop.stop) loop = asyncio.get_event_loop() tasks = [loop.create_task(main(loop)) for _ in range(10)] # 为了可以正确地中止循环,咱们须要确保已经消耗了全部任务,所以咱们经过调用 asyncio.gather 来包装它们并向其 add_done_callback,这将关闭咱们的循环。 asyncio.gather(*tasks).add_done_callback(functools.partial(stop_loop, loop=loop)) try: loop.run_forever() finally: try: loop.run_until_complete(loop.shutdown_asyncgens()) finally: loop.close()
咱们经过 loop.add_signal_handler 添加一个新的信号处理程序。添加信号处理器。它相似于信号 API, 在这种状况下,咱们决定在每一个处理程序结束时中止循环。
若是要为示例添加另外一个处理程序,只需将信号名称添加到 SIGNAL_NAMES 以及以此方式命名的相应处理程序.
import asyncio import functools import os import signal SIGNAL_NAMES = ('SIGINT', 'SIGTERM') SIGNAL_NAME_MESSAGE = " or ".join(SIGNAL_NAMES) def sigint_handler(signame, *, loop, ): print(f"Stopped loop because of {signame}") loop.stop() def sigterm_handler(signame, *, loop, ): print(f"Stopped loop because of {signame}") loop.stop() loop = asyncio.get_event_loop() for signame in SIGNAL_NAMES: loop.add_signal_handler(getattr(signal, signame), functools.partial(locals()[f"{signame.lower()}_handler"], signame, loop=loop)) print("Event loop running forever, press Ctrl+C to interrupt.") print(f"pid {os.getpid()}: send {SIGNAL_NAME_MESSAGE} to exit.") try: loop.run_forever() finally: loop.close() # optional
为何不直接使用 signal API 在循环迭代过程当中检查添加到循环中的信号处理程序呢?由于,当它关闭时,不可能向循环添加信号处理程序.另外一个好处是,当循环关闭时,信号处理程序会为您清理。
异步生成子流程并在单独的部分中有效地分割建立和状态管理是使用循环生成子流程的缘由之一。
下面的解决方案对于异步子流程 api 的大多数非交互式使用已经足够了。
经过在 Windows 系统上设置适当的事件循环策略,它具备跨平台的优势。
import asyncio import shutil import sys from typing import Tuple, Union async def invoke_command_async(*command, loop, encoding="UTF-8", decode=True) -> Tuple[ Union[str, bytes], Union[str, bytes], int]: """ Invoke a command asynchronously and return the stdout, stderr and the process return code. :param command: :param loop: :param encoding: :param decode: :return: """ if sys.platform != 'win32': # 若是不是 windows 系统,防止有线程的使用 asyncio.get_child_watcher().attach_loop(loop) process = await asyncio.create_subprocess_exec(*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, loop=loop) out, err = await process.communicate() ret_code = process.returncode if not decode: return out, err, ret_code output_decoded, err_decoded = out.decode(encoding) if out else None, \ err.decode(encoding) if err else None return output_decoded, err_decoded, ret_code async def main(loop): # shutil 返回路径 cmd 里可执行文件的路径。 out, err, ret_code = await invoke_command_async(shutil.which("ping"), "-c", "1", "8.8.8.8", loop=loop) print(out, err, ret_code) if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) loop = asyncio.get_event_loop() loop.run_until_complete(main(loop))
为了确保咱们能够在 Windows 下等待子进程的终止,咱们将轮询子进程以得到进程返回代码,该代码指示已终止的子进程。
import asyncio # Quote from https://docs.python.org/3/library/asyncio-subprocess.html: # 在从其余线程执行子进程以前,必须在主线程中实例化子监视器 # 调用主线程中的 get_child_watcher()函数来实例化子监视器 import functools import shutil import sys if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) def stop_loop(*args, loop, **kwargs): loop.stop() async def is_windows_process_alive(process, delay=0.5): """ On windows the signal API is very sparse, meaning we don't have SIGCHILD. So we just check if we have a return code on our process object. :param process: :param delay: :return: """ while process.returncode is None: await asyncio.sleep(delay) async def main(process_coro, *, loop): process = await process_coro print(process) if sys.platform != "win32": child_watcher: asyncio.AbstractChildWatcher = asyncio.get_child_watcher() # 观察者链接到循环并方便地为咱们调用 watcher.add_child_handler # 注册一个新的子处理回调函数。 child_watcher.add_child_handler(process.pid, functools.partial(stop_loop, loop=loop)) else: await is_windows_process_alive(process) loop.stop() loop = asyncio.get_event_loop() process_coro = asyncio.create_subprocess_exec(shutil.which("ping"), "-c", "1", "127.0.0.1", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL) loop.create_task(main(process_coro, loop=loop)) loop.run_forever()