[toc]python
In [1]: import asyncio In [2]: async def f(i): ...: await asyncio.sleep(i) ...: print(i) ...: In [3]: async def func(): ...: tasks = [] ...: for i in range(10): ...: await asyncio.sleep(0) ...: print('建立协程参数',i) ...: tasks.append(asyncio.create_task(f(i))) ...: _ = [await t for t in tasks] ...: In [4]: asyncio.run(func()) 建立协程参数 0 建立协程参数 1 0 建立协程参数 2 建立协程参数 3 建立协程参数 4 建立协程参数 5 建立协程参数 6 建立协程参数 7 建立协程参数 8 建立协程参数 9 1 2 3 4 5 6 7 8 9
源码就从 asyncio.run()
看起:linux
# asyncio.runners.py def run(main, *, debug=False): """Run a coroutine. ... """ if events._get_running_loop() is not None: # 检查当前是否已经有 loop 实例,有则报异常 raise RuntimeError( "asyncio.run() cannot be called from a running event loop") if not coroutines.iscoroutine(main): # 检查 main 是不是协程对象, 不是则报异常 raise ValueError("a coroutine was expected, got {!r}".format(main)) loop = events.new_event_loop() # 建立事件循环实例 try: events.set_event_loop(loop) # 绑定事件循环实例 loop.set_debug(debug) # 设置 debug 模式 return loop.run_until_complete(main) # 开始在事件循环中执行main函数,直到main函数运行结束 finally: try: _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) finally: events.set_event_loop(None) loop.close()
上面这个过程就是建立 loop 而后在loop中执行函数。下面看一下 events.new_event_loop()
如何建立的事件循环实例:api
# asyncio.events.py def new_event_loop(): """Equivalent to calling get_event_loop_policy().new_event_loop().""" return get_event_loop_policy().new_event_loop() # 调用 new_event_loop() 方法返回一个 loop 实例 def get_event_loop_policy(): """Get the current event loop policy.""" if _event_loop_policy is None: _init_event_loop_policy() # 为空就初始化 loop return _event_loop_policy # _event_loop_policy 是事件循环策略,全局变量,初始为None def _init_event_loop_policy(): # 初始化全局变量 _event_loop_policy global _event_loop_policy with _lock: # 加锁 if _event_loop_policy is None: # pragma: no branch from . import DefaultEventLoopPolicy _event_loop_policy = DefaultEventLoopPolicy() # 使用默认的DefaultEventLoopPolicy初始化并获取实例
loop
实例就是由 DefaultEventLoopPolicy()
实例的 new_event_loop()
方法建立的。DefaultEventLoopPolicy
类在linux中和为window中是不一样的:app
我电脑是linux 那就看linux下DefaultEventLoopPo
-> DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
:async
# asyncio.unix_events.py class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): """UNIX event loop policy with a watcher for child processes.""" _loop_factory = _UnixSelectorEventLoop # loop 工厂 ... ... # asyncio.events.py class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): _loop_factory = None class _Local(threading.local): # 保存一个全局变量,只有当前线程可以访问 _loop = None _set_called = False def __init__(self): self._local = self._Local() def get_event_loop(self): # 获取当前线程的 loop ,没有则建立 """Get the event loop. This may be None or an instance of EventLoop. """ if (self._local._loop is None and not self._local._set_called and isinstance(threading.current_thread(), threading._MainThread)): self.set_event_loop(self.new_event_loop()) if self._local._loop is None: raise RuntimeError('There is no current event loop in thread %r.' % threading.current_thread().name) return self._local._loop def set_event_loop(self, loop): # 设置当前线程的 loop 并标记已经建立 """Set the event loop.""" self._local._set_called = True assert loop is None or isinstance(loop, AbstractEventLoop) self._local._loop = loop def new_event_loop(self): # 建立 loop """Create a new event loop. You must call set_event_loop() to make this the current event loop. """ return self._loop_factory() # loop 由 _loop_factory 工厂建立 即 _UnixSelectorEventLoop
到此 loop 就是_UnixSelectorEventLoop
的实例。有了 loop 而后就是下一步run_until_complete
, run_until_complete
方法并不在 _UnixSelectorEventLoop
类中定义查看其父类:unix_events._UnixSelectorEventLoop
---> selector_events.BaseSelectorEventLoop
---> base_events.BaseEventLoop
--> events.AbstractEventLoop
在 BaseEventLoop
中找到该方法:ide
# asyncio.base_events.py class BaseEventLoop(events.AbstractEventLoop): ... ... def run_until_complete(self, future): """Run until the Future is done. If the argument is a coroutine, it is wrapped in a Task. WARNING: It would be disastrous to call run_until_complete() with the same coroutine twice -- it would wrap it in two different Tasks and that can't be good. Return the Future's result, or raise its exception. """ self._check_closed() # 检查当前 loop 循环是否关闭,已关报异常 new_task = not futures.isfuture(future) # 判断传进来的 future 是不是 future或task类型 future = tasks.ensure_future(future, loop=self) # 若是 future 是协程coroutine 那么调用loop.create_task() 建立task 并将task 加入loop,而后返回task if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False future.add_done_callback(_run_until_complete_cb) # 添加 task 执行完时的回调函数, task 执行完 中止 loop try: self.run_forever() # 开始循环运行 loop 中的事件 except: if new_task and future.done() and not future.cancelled(): # 任务已经完成而且没有被取消 # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn't have access to the # local task. future.exception() # 触发任务执行中的异常 raise finally: future.remove_done_callback(_run_until_complete_cb) # 移除任务的回调函数 if not future.done(): # 任务没有完成报错 raise RuntimeError('Event loop stopped before Future completed.') return future.result() # 返回任务执行结果
接下来看 task 是如何建立的 tasks.ensure_future(future, loop=self)
:函数
# asyncio.tasks.py def ensure_future(coro_or_future, *, loop=None): """Wrap a coroutine or an awaitable in a future. If the argument is a Future, it is returned directly. """ if coroutines.iscoroutine(coro_or_future): # 若是 coro_or_future 是协程函数 if loop is None: loop = events.get_event_loop() task = loop.create_task(coro_or_future) # 调用loop 的 create_task 方法 if task._source_traceback: del task._source_traceback[-1] return task elif futures.isfuture(coro_or_future): # 若是是 future 直接返回 if loop is not None and loop is not futures._get_loop(coro_or_future): raise ValueError('loop argument must agree with Future') return coro_or_future elif inspect.isawaitable(coro_or_future): # 若是是 可等待对象即 协程对象 使用 _wrap_awaitable 封装一下再调用本函数 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) else: raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 'required') @coroutine def _wrap_awaitable(awaitable): # 被该函数包装后变为 协程函数 """Helper for asyncio.ensure_future(). Wraps awaitable (an object with __await__) into a coroutine that will later be wrapped in a Task by ensure_future(). """ return (yield from awaitable.__await__())
接下来看 loop.create_task(coro_or_future)
, task 是如何建立的,该方法在BaseEventLoop()
:oop
# asyncio.base_events def create_task(self, coro): """Schedule a coroutine object. Return a task object. """ self._check_closed() if self._task_factory is None: # 若是task 工厂为空(task工厂能够设置) task = tasks.Task(coro, loop=self) # 调用tasks.Task 来建立task if task._source_traceback: del task._source_traceback[-1] else: # 若是_task_factory不是空的话使用 这个工厂函数建立 task = self._task_factory(self, coro) return task
而后看一下 Task
类是如何初始化的:ui
class Task(futures._PyFuture): # Inherit Python Task implementation # 协程继承自 Future # from a Python Future implementation. """A coroutine wrapped in a Future.""" _log_destroy_pending = True @classmethod def current_task(cls, loop=None): # 在事件循环中返回当前正在运行的任务,或者返回“无”。 warnings.warn("Task.current_task() is deprecated, " "use asyncio.current_task() instead", PendingDeprecationWarning, stacklevel=2) if loop is None: loop = events.get_event_loop() return current_task(loop) def __init__(self, coro, *, loop=None): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] if not coroutines.iscoroutine(coro): # raise after Future.__init__(), attrs are required for __del__ # prevent logging for pending task in __del__ self._log_destroy_pending = False raise TypeError(f"a coroutine was expected, got {coro!r}") self._must_cancel = False self._fut_waiter = None self._coro = coro # coro 就是协程函数 self._context = contextvars.copy_context() # contextvars 用来存储管理上下文的,这里的这个方法是拷贝当前上下文 self._loop.call_soon(self.__step, context=self._context) # 在下一次事件循环的迭代中调用 self.__step 方法 环境变量是 self._context _register_task(self) # 存储记录当前task def set_result(self, result): raise RuntimeError('Task does not support set_result operation') def set_exception(self, exception): raise RuntimeError('Task does not support set_exception operation') def get_stack(self, *, limit=None): """Return the list of stack frames for this task's coroutine. ... """ return base_tasks._task_get_stack(self, limit) def print_stack(self, *, limit=None, file=None): """Print the stack or traceback for this task's coroutine. ... """ return base_tasks._task_print_stack(self, limit, file) def cancel(self): """Request that this task cancel itself. ... """ self._log_traceback = False if self.done(): return False if self._fut_waiter is not None: if self._fut_waiter.cancel(): # Leave self._fut_waiter; it may be a Task that # catches and ignores the cancellation so we may have # to cancel it again later. return True # It must be the case that self.__step is already scheduled. self._must_cancel = True return True def __step(self, exc=None): if self.done(): # __step 已经运行完成报异常 raise futures.InvalidStateError( f'_step(): already done: {self!r}, {exc!r}') if self._must_cancel: # if not isinstance(exc, futures.CancelledError): exc = futures.CancelledError() self._must_cancel = False coro = self._coro # 协程函数的主体 self._fut_waiter = None _enter_task(self._loop, self) # 设置当前正在运行的 task # Call either coro.throw(exc) or coro.send(None). try: if exc is None: # 若是没有异常 # We use the `send` method directly, because coroutines # don't have `__iter__` and `__next__` methods. result = coro.send(None) # 执行协程函数的 send 方法,并返回 result else: result = coro.throw(exc) # 向 协程函数发送异常(协程函数可能会处理该异常, 而后触发 Stopiteration) except StopIteration as exc: # 协程函数执行完成,如有返回结果存在于 exc.value 中 if self._must_cancel: # 若是task被取消了 # Task is cancelled right before coro stops. self._must_cancel = False super().set_exception(futures.CancelledError()) # 向 Future (task) 设置 CancelledError() 异常 else: super().set_result(exc.value) # 向 Future (task) 设置结果, 并设置状态为完成 except futures.CancelledError: super().cancel() # I.e., Future.cancel(self). except Exception as exc: # 下面都是向 Future 设置异常的 super().set_exception(exc) except BaseException as exc: super().set_exception(exc) raise # 触发异常 else: blocking = getattr(result, '_asyncio_future_blocking', None) # result 是否具备 '_asyncio_future_blocking' 属性,有而且不为 false 就是一个 future??,看 asyncio.base_futures.isfuture 函数 if blocking is not None: # blocking 不是 None 多是 False 或 True # Yielded Future must come from Future.__iter__(). if futures._get_loop(result) is not self._loop: # task 的 loop 是否当前 loop,不是将错误传递给 协程函数 new_exc = RuntimeError( f'Task {self!r} got Future ' f'{result!r} attached to a different loop') self._loop.call_soon( self.__step, new_exc, context=self._context) # 将错误传递给 协程函数 elif blocking: # True, result 是一个 future 对象(此时 该future 已经加入下一次loop循环了?) if result is self: # 若是 result 是 当前 task,将异常传递给 协程函数来处理 new_exc = RuntimeError( f'Task cannot await on itself: {self!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) # 将异常传递给 协程函数来处理 else: result._asyncio_future_blocking = False # 将 result 的 _asyncio_future_blocking 设置为 False result.add_done_callback( self.__wakeup, context=self._context) # 向 result 设置回调函数,上下文同本task self._fut_waiter = result # 设置 _fut_waiter, 若_fut_waiter 不是 None 那么 当前 task 就没结束 if self._must_cancel: # 当前是取消状态那么 result future 也要被取消 if self._fut_waiter.cancel(): self._must_cancel = False else: # 若 _asyncio_future_blocking 是 false 那么说明 result future 不是 由 yield from 代理的? new_exc = RuntimeError( f'yield was used instead of yield from ' f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) # 将该异常交给 协程函数处理 elif result is None: # 协程函数.send 后没有结果输出,那么将 .send 将在下一次loop循环中执行 # Bare yield relinquishes control for one event loop iteration. self._loop.call_soon(self.__step, context=self._context) elif inspect.isgenerator(result): # 若是返回的 result 是一 生成器 # Yielding a generator is just wrong. new_exc = RuntimeError( # 生成器应该由 yield from 代理 f'yield was used instead of yield from for ' f'generator in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) else: # 产生其余结果都是错误的 # Yielding something else is an error. new_exc = RuntimeError(f'Task got bad yield: {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) finally: _leave_task(self._loop, self) # 切换loop中正在执行的 task self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): # 给 result future 添加的回调函数 try: future.result() # 获取 result future 的结果 except Exception as exc: # This may also be a cancellation. self.__step(exc) # 出现异常交给 协程函数去处理 else: # Don't pass the value of `future.result()` explicitly, # as `Future.__iter__` and `Future.__await__` don't need it. # If we call `_step(value, None)` instead of `_step()`, # Python eval loop would use `.send(value)` method call, # instead of `__next__()`, which is slower for futures # that return non-generator iterators from their `__iter__`. self.__step() # 协程函数的result future 执行完了,继续执行 本task 的 __step 即协程函数的 send() 方法 self = None # Needed to break cycles when an exception occurs.
⇑ps:_asyncio_future_blocking
标志会在 future
的 __await__
中置为 True,而 __await__
会被上面的 _wrap_awaitable
函数所代理 yield from
.this
# asyncio.future.Future() def __await__(self): if not self.done(): self._asyncio_future_blocking = True yield self # This tells Task to wait for completion. if not self.done(): raise RuntimeError("await wasn't used with future") return self.result() # May raise too.
接下来看一下 loop 如何循环执行其中的task:
# base_event.BaseEventLoop def call_soon(self, callback, *args, context=None): """Arrange for a callback to be called as soon as possible. This operates as a FIFO queue: callbacks are called in the order in which they are registered. Each callback will be called exactly once. Any positional arguments after the callback will be passed to the callback when it is called. """ self._check_closed() if self._debug: self._check_thread() self._check_callback(callback, 'call_soon') handle = self._call_soon(callback, args, context) # 调用 _call_soon if handle._source_traceback: del handle._source_traceback[-1] return handle # 返回 handle def _call_soon(self, callback, args, context): handle = events.Handle(callback, args, self, context) # 建立 handle if handle._source_traceback: del handle._source_traceback[-1] self._ready.append(handle) # 将 handle 加入 self._ready return handle ... def run_forever(self): """Run until stop() is called.""" self._check_closed() if self.is_running(): raise RuntimeError('This event loop is already running') if events._get_running_loop() is not None: raise RuntimeError( 'Cannot run the event loop while another loop is running') self._set_coroutine_origin_tracking(self._debug) self._thread_id = threading.get_ident() # 线程id old_agen_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook) # 这两行不知道干啥的。。 try: events._set_running_loop(self) # 设置事件循环 while True: # 开始循环执行 self._run_once() # 执行一次循环 if self._stopping: break finally: self._stopping = False self._thread_id = None events._set_running_loop(None) self._set_coroutine_origin_tracking(False) sys.set_asyncgen_hooks(*old_agen_hooks) ... def _run_once(self): """Run one full iteration of the event loop. This calls all currently ready callbacks, polls for I/O, schedules the resulting callbacks, and finally schedules 'call_later' callbacks. """ sched_count = len(self._scheduled) # 定时任务数量 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and # 清理已经被取消的 定时handle? self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # Remove delayed calls that were cancelled if their number # is too high new_scheduled = [] for handle in self._scheduled: if handle._cancelled: handle._scheduled = False else: new_scheduled.append(handle) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Remove delayed calls that were cancelled from head of queue. while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False timeout = None if self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. when = self._scheduled[0]._when timeout = max(0, when - self.time()) if self._debug and timeout != 0: t0 = self.time() event_list = self._selector.select(timeout) dt = self.time() - t0 if dt >= 1.0: level = logging.INFO else: level = logging.DEBUG nevent = len(event_list) if timeout is None: logger.log(level, 'poll took %.3f ms: %s events', dt * 1e3, nevent) elif nevent: logger.log(level, 'poll %.3f ms took %.3f ms: %s events', timeout * 1e3, dt * 1e3, nevent) elif dt >= 1.0: logger.log(level, 'poll %.3f ms took %.3f ms: timeout', timeout * 1e3, dt * 1e3) else: event_list = self._selector.select(timeout) # linux 下是因此用 SelectSelector 选择有事件的 fb self._process_events(event_list) # 将有读事件或写事件 就将事件 event 的 handle 加入 self._ready # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: # 定时任务的时间 大于当前事件 退出循环 break handle = heapq.heappop(self._scheduled) # pop 出定时任务的headle handle._scheduled = False # 标记已经执行 self._ready.append(handle) # 将这个到时间的 headle 加入 self._ready # This is the only place where callbacks are actually *called*. # All other places just add them to ready. # Note: We run all currently scheduled callbacks, but not any # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. ntodo = len(self._ready) for i in range(ntodo): # 开始挨个处理 _ready 中的任务 handle = self._ready.popleft() if handle._cancelled: # 若是已经取消就下一个 continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) finally: self._current_handle = None else: handle._run() # 执行这个处理器对管理的代码即task handle = None # Needed to break cycles when an exception occurs.
看完了源码,开始一步步解析例子中的代码是如何切换 协程与执行写成的:await
+ 可等待对象
其实就是建立了一个 Task,而后加入到 loop。好比 asyncio.sleep
就是一个使用 async def
定义的函数, 建立 Task 的过程看了源码咱们就知道了:由 async def
复合语句定义函数在建立Task时,都会被 yield from
代理, 因此
await asyncio.sleep(n)
能够写为:
yield from asyncio.sleep(n).__await__() # .__await__() 可省略
可是 yield from
不能够写在 async def
中,那么可使用协程函数(非协程对象),那么上面的例子能够写成:
In [2]: @asyncio.coroutine ...: def f(i): ...: yield from asyncio.sleep(i).__await__() ...: print(i) ...: In [3]: @asyncio.coroutine ...: def func(): ...: tasks = [] ...: for i in range(10): ...: yield from asyncio.sleep(0) ...: print('建立协程参数',i) ...: tasks.append(asyncio.create_task(f(i))) ...: for t in tasks: ...: yield from t ...: In [4]: asyncio.run(func())
协程函数 和 协程对象(可等待对象)是不一样的,在 ensure_future
函数中如果协程函数就会直接建立 Task 对象,如果协程对象则先被 _wrap_awaitable
封装一下再建立 Task 对象。
await 转换成 yield from 更容易分析,开始一步一步解析例子:
loop=[]
:建立 func() Task记为 m
, 并执行__step
--> send(None)
返回第一个 result
;loop=[m]
:第一个 result
是由 asyncio.sleep(0)
建立的 延时 Task(Future类型)记为--> sm
,而后把它的 __step
加入到 loop ,在下次循环时被执行, sleep 0秒;loop=[sm]
:sm
执行完,接着上次挂起的位置往下执行,打印 建立协程参数 0
,而后建立 task(f(0), 此时这个 task.__step
已经直接加入到loop)记为---> f1
。接着下一个 for 循环,返回第二个 result,sleep future,--> sm
。loop=[f1, sm]
: 执行 f1, 遇到 sleep --> sf1
,。切换协程 sm
,接着上次挂起的位置往下执行,打印建立协程参数1
,建立 协程 --> f2
,下一个for循环 sleep --> sm
。loop=[sf1, f2, sm]
: sf1
, 接着 f(0) 函数上次挂起的地方执行,打印 0
,切换 f2
,sleep --> sf2
挂起。切换协程 sm
,接着上次挂起的位置往下执行,打印建立协程参数2
,建立 协程 --> f3
,下一个for循环 sleep --> sm。loop=[sf2, f3, sm]
:...