http://www.javashuo.com/article/p-ymqfsmua-bh.html 的升级版
能够知道当前是卡在哪个 task 甚至是多少行html
import asyncio import os import queue import signal import time import threading import logging # logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(filename)s : %(levelname)s %(message)s", ) logging.basicConfig(level=logging.DEBUG) # 魔改部分 # 魔改缘由, 在 loop.debug 为 True 的时候才会给 loop 设置 _current_handle from asyncio import events class Handle(events.Handle): def _run(self): self._loop._current_handle = self super()._run() events.Handle = Handle # 魔改结束 async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def test(): for i in range(100): print("sleep--", i) time.sleep(1) def handler(signum, frame): print('Signal handler called with signal', signum) raise Exception("Kill the task") signal.signal(signal.SIGTERM, handler) async def main(): await asyncio.gather( test(), factorial("A", 2), factorial("B", 3), factorial("C", 4), return_exceptions=True ) def check(co_name, threshold: int = 60) -> bool: """连续的记录超过阈值""" i = 0 for item in q: if item == co_name: i += 1 else: break if i >= threshold: return True else: return False def asyncio_monitor(loop, step: int = 1): while not stop: if hasattr(loop._current_handle, "_callback"): callback = loop._current_handle._callback task = getattr(callback, "__self__") # Task co_name = task._coro.cr_code.co_name # task._coro.cr_code.co_name # coro name if check(co_name, 10): # 长时间堵塞, 抛出异常让 task 结束 if pid != None: os.system(f"kill -{signal.SIGTERM} {pid}") if task._state == "PENDING": q.appendleft(co_name) # info 为一个的回显字符串 info = str(getattr(callback, "__self__", callback)) print(info) # # coro = task._coro.co_name # coro name # task._state # futures/_base.py:25 # time.sleep(step) def run_asyncio(loop): global stop # loop.set_debug(True) loop.run_until_complete(main()) stop = True if __name__ == '__main__': pid = os.getpid() print(pid) stop = False q = queue.deque(maxlen=100) loop = asyncio.get_event_loop() t1 = threading.Thread(target=asyncio_monitor, args=(loop,)) t1.start() run_asyncio(loop)