asyncio 是如何利用事件循环来监控和处理io事件的,看源代码:python
# asyncio.streams.py async def open_connection(host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """A wrapper for create_connection() returning a (reader, writer) pair. """ if loop is None: loop = events.get_event_loop() reader = StreamReader(limit=limit, loop=loop) # 初始化 reader 实例 protocol = StreamReaderProtocol(reader, loop=loop) # 协议实例 而且对 reader 封装,提供几个方法来设置 reader 的读通道等 transport, _ = await loop.create_connection( # 建立 tcp 链接, 返回写通道 lambda: protocol, host, port, **kwds) writer = StreamWriter(transport, protocol, reader, loop) # 建立 writer 实例 return reader, writer
loop.create_connection
方法缓存
# asyncio.base_events.py async def create_connection( self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None): """Connect to a TCP server. Create a streaming transport connection to a given Internet host and port: socket family AF_INET or socket.AF_INET6 depending on host (or family if specified), socket type SOCK_STREAM. protocol_factory must be a callable returning a protocol instance. This method is a coroutine which will try to establish the connection in the background. When successful, the coroutine returns a (transport, protocol) pair. """ if server_hostname is not None and not ssl: raise ValueError('server_hostname is only meaningful with ssl') if server_hostname is None and ssl: # Use host as default for server_hostname. It is an error # if host is empty or not set, e.g. when an # already-connected socket was passed or when only a port # is given. To avoid this error, you can pass # server_hostname='' -- this will bypass the hostname # check. (This also means that if host is a numeric # IP/IPv6 address, we will attempt to verify that exact # address; this will probably fail, but it is possible to # create a certificate for a specific IP address, so we # don't judge it here.) if not host: raise ValueError('You must set server_hostname ' 'when using ssl without a host') server_hostname = host if ssl_handshake_timeout is not None and not ssl: # 只有在 ssl 模式下才能使用 timeout 参数 raise ValueError( 'ssl_handshake_timeout is only meaningful with ssl') if host is not None or port is not None: if sock is not None: # host/port 不能和 socket 同时传递 raise ValueError( 'host/port and sock can not be specified at the same time') infos = await self._ensure_resolved( # 根据 host port 获取socket 信息包括协议族、类型、协议六、cname 、(ip,port) (host, port), family=family, type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) if not infos: raise OSError('getaddrinfo() returned empty list') if local_addr is not None: # 若是传递了本地地址 (ip, port) laddr_infos = await self._ensure_resolved( # 根据 ip, port 获取 socket 信息 local_addr, family=family, type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) if not laddr_infos: raise OSError('getaddrinfo() returned empty list') exceptions = [] for family, type, proto, cname, address in infos: try: sock = socket.socket(family=family, type=type, proto=proto) # 建立socket套接字 sock.setblocking(False) # 设置非阻塞 socket if local_addr is not None: # 若是有传递本地ip及端口, 则开始监听端口 for _, _, _, _, laddr in laddr_infos: try: sock.bind(laddr) break except OSError as exc: msg = ( f'error while attempting to bind on ' f'address {laddr!r}: ' f'{exc.strerror.lower()}' ) exc = OSError(exc.errno, msg) exceptions.append(exc) else: sock.close() sock = None continue if self._debug: logger.debug("connect %r to %r", sock, address) await self.sock_connect(sock, address) # 发出 Tcp 链接 except OSError as exc: if sock is not None: sock.close() exceptions.append(exc) except: if sock is not None: sock.close() raise else: break else: if len(exceptions) == 1: raise exceptions[0] else: # If they all have the same str(), raise one. model = str(exceptions[0]) if all(str(exc) == model for exc in exceptions): raise exceptions[0] # Raise a combined exception so the user can see all # the various error messages. raise OSError('Multiple exceptions: {}'.format( ', '.join(str(exc) for exc in exceptions))) else: if sock is None: raise ValueError( 'host and port was not specified and no sock specified') if sock.type != socket.SOCK_STREAM: # We allow AF_INET, AF_INET6, AF_UNIX as long as they # are SOCK_STREAM. # We support passing AF_UNIX sockets even though we have # a dedicated API for that: create_unix_connection. # Disallowing AF_UNIX in this method, breaks backwards # compatibility. raise ValueError( f'A Stream Socket was expected, got {sock!r}') # 建立链接通道,返回写通道 #(_SelectorSocketTransport()._loop.call_soon(self._protocol.connection_made, self) 经过该函数绑定写通道) # 上面 self._protocol 就是 protocol transport, protocol = await self._create_connection_transport( sock, protocol_factory, ssl, server_hostname, ssl_handshake_timeout=ssl_handshake_timeout) if self._debug: # Get the socket from the transport because SSL transport closes # the old socket and creates a new SSL socket sock = transport.get_extra_info('socket') logger.debug("%r connected to %s:%r: (%r, %r)", sock, host, port, transport, protocol) return transport, protocol # 返回 写通道和协议对象
而后简单看一下 StreamWriter
类:app
class StreamWriter: ... def write(self, data): self._transport.write(data) # 调用 通道的 write 方法, 将数据写入到 通道缓存或直接发给socket async def wait_closed(self): await self._protocol._closed async def drain(self): # 等待缓存中的数据所有存储完毕 """Flush the write buffer. The intended use is to write w.write(data) await w.drain() """ if self._reader is not None: exc = self._reader.exception() if exc is not None: raise exc if self._transport.is_closing(): await sleep(0, loop=self._loop) await self._protocol._drain_helper() # 建立 future 直到 取消暂停状态
writer 只有 wait_closed
和 drain
两个协程方法。异步
# asyncio.selector_events.BaseSelectorEventLoop def _add_writer(self, fd, callback, *args): self._check_closed() handle = events.Handle(callback, args, self, None) try: key = self._selector.get_key(fd) except KeyError: self._selector.register(fd, selectors.EVENT_WRITE, (None, handle)) else: mask, (reader, writer) = key.events, key.data self._selector.modify(fd, mask | selectors.EVENT_WRITE, (reader, handle)) if writer is not None: writer.cancel() # asyncio.selector_events._SelectorSocketTransport def write(self, data): ... # 省略的都是一些检测 if not self._buffer: # 若是缓存是空,直接尝试经过socket 发送数据 # Optimization: try to send now. try: n = self._sock.send(data) except (BlockingIOError, InterruptedError): pass except Exception as exc: self._fatal_error(exc, 'Fatal write error on socket transport') return else: data = data[n:] if not data: return # 若是直接经过 socket 发送了就结束 # Not all was written; register write handler. # 直接发送失败 self._loop._add_writer(self._sock_fd, self._write_ready) # 建立写处理器,修改 监听已注册的 fd 状态或注册 fd 写事件 # Add it to the buffer. self._buffer.extend(data) # 若是有缓存 将数据存入缓存 self._maybe_pause_protocol() # 若是缓存大小 达到 64M (默认) 就设置 protocol 写暂停 def _write_ready(self): # 有可写的 fd 后的回调函数 assert self._buffer, 'Data should not be empty' if self._conn_lost: return try: n = self._sock.send(self._buffer) # 发送缓存中的数据 except (BlockingIOError, InterruptedError): pass except Exception as exc: self._loop._remove_writer(self._sock_fd) self._buffer.clear() self._fatal_error(exc, 'Fatal write error on socket transport') if self._empty_waiter is not None: self._empty_waiter.set_exception(exc) else: if n: del self._buffer[:n] self._maybe_resume_protocol() # May append to buffer. # 解除写暂停状态 if not self._buffer: self._loop._remove_writer(self._sock_fd) if self._empty_waiter is not None: self._empty_waiter.set_result(None) if self._closing: self._call_connection_lost(None) elif self._eof: self._sock.shutdown(socket.SHUT_WR)
若是调用 StreamWriter.write
方法(通道的write)
首先尝试直接发送
若发送失败,将数据放入缓存区,建立事件处理器 而后注册(或更新)selector 所监听的 fd(若 selector 发现此fd可写就将该事件处理器加入到这次事件循环中),若缓存区满(> 64m)调用FlowControlMixin.pause_writing()
暂停写状态,在调用 writer 的 drain 方法刷新缓存区时建立 future 而后等待 future 完成,在缓存中有空位置后调用FlowControlMixin.resume_writing()
来解除暂停状态并给 future 结果。那么此时就能够再次写入数据到缓存中。socket
而后再看一下 StreamReader
类:async
class StreamReader(): ... def set_transport(self, transport): # 设置读取通道 assert self._transport is None, 'Transport already set' self._transport = transport def feed_data(self, data): # 通道会调用 protocol.data_received 将 socket 数据传递给 feed_data, 而后存储到 缓存中 assert not self._eof, 'feed_data after feed_eof' if not data: return self._buffer.extend(data) # 存入缓存 self._wakeup_waiter() # 唤醒协程 if (self._transport is not None and not self._paused and len(self._buffer) > 2 * self._limit): # 若是缓存大小超过了 limit 两倍 暂停传输? try: self._transport.pause_reading() except NotImplementedError: # The transport can't be paused. # We'll just have to buffer all data. # Forget the transport so we don't keep trying. self._transport = None else: self._paused = True async def _wait_for_data(self, func_name): # 调用通道的 resume_reading 方法,而后等待 数据传输过来(等待 feed_data() or feed_eof() 被调用,self._waiter 就能够结束等待了) """Wait until feed_data() or feed_eof() is called. If stream was paused, automatically resume it. """ if self._waiter is not None: raise RuntimeError( f'{func_name}() called while another coroutine is ' f'already waiting for incoming data') assert not self._eof, '_wait_for_data after EOF' # Waiting for data while paused will make deadlock, so prevent it. # This is essential for readexactly(n) for case when n > self._limit. if self._paused: # 若是如今是暂停状态 self._paused = False # 修改成False self._transport.resume_reading() # 调用 通道的 resume_reading() 方法恢复读取状态,修改通道的暂停状态, # 而后使用 selector (modify 修改已经注册的该通道绑定的 fd )或(register注册该 fd ) self._waiter = self._loop.create_future() try: await self._waiter finally: self._waiter = None async def read(self, n=-1): .... if not self._buffer and not self._eof: # 若是缓存中没有数据,就等待 _wait_for_data await self._wait_for_data('read') ...
# asyncio.selector_events.BaseSelectorEventLoop def _add_reader(self, fd, callback, *args): self._check_closed() handle = events.Handle(callback, args, self, None) # 建立事件处理器,回调函数是从socket 或 缓存中读取数据而后再存储到 reader 的缓存中 try: key = self._selector.get_key(fd) except KeyError: self._selector.register(fd, selectors.EVENT_READ, (handle, None)) # 注册 selector 监听的 fd 并传递事件处理器, (注册读??) else: mask, (reader, writer) = key.events, key.data self._selector.modify(fd, mask | selectors.EVENT_READ, (handle, writer)) # 修改 selector 监听的fd 并传递事件处理器, (注册写??) if reader is not None: reader.cancel() # selector_events._SelectorSocketTransport def resume_reading(self): if self._closing or not self._paused: return self._paused = False # 暂停状态改成 False self._add_reader(self._sock_fd, self._read_ready) # 注册或修改 fd 监听事件 if self._loop.get_debug(): logger.debug("%r resumes reading", self)
拿read(10)
举例(readline 同理)读取10字节内容,若是当前缓存为空而且没有遇到结束符,await self._wait_for_data('read')
建立一个 future 等待,若是当前处于读暂停
状态那解除读暂停
状态,并调用通道的resume_reading
方法(_SelectorSocketTransport.resume_reading
)建立事件处理器(回调函数有两种这里说一种,_SelectorSocketTransport._read_ready__data_received
接收数据并存储到reader的缓存中,唤醒_wait_for_data
建立的 future)来注册(或更新)selector
监控的fd,若selector 监控到有可读的fd,将该 fd 绑定的读事件处理器加入当前事件循环中。tcp