#Asynchronous NetWorking#ios
做者:MetalBug 时间:2015-02-28 出处:http://my.oschina.net/u/247728/blog 声明:版权全部,侵犯必究
tornado.ioloop
— Main event looptornado.iostream
— Convenient wrappers for non-blocking sockets##1.ioloop##后端
IOLoop
是一个非阻塞的IO事件循环。 典型的应用使用一个IOLoop
对象,一般经过IOLoop.instance()
得到。 使用如下三个函数往IOLoop
中注册事件,回调或者定时器,最后使用IOLoop.start()
便可。数据结构
IOLoop.add_handler(fd, handler, events)
多线程
增长一个IO事件,当事件发生时,hanlder(fd,events)会被调用app
IOLoop.add_callback(callback, *args, **kwargs)
socket
增长一个回调函数,该回调函数将会在下一次IO迭代中被执行。函数
IOLoop.add_timeout(self, deadline, callback)
tornado
增长一个定时器,该定时器会在到达dealline时被执行。oop
如下是个简单的例子,利用IOLoop
实现了一个TCP Server操作系统
import errno import functools import ioloop import socket def connection_ready(sock, fd, events): while True: try: connection, address = sock.accept() except socket.error, e: if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN): raise return connection.setblocking(0) handle_connection(connection, address) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) sock.bind(("", port)) sock.listen(128) io_loop = ioloop.IOLoop.instance() callback = functools.partial(connection_ready, sock) io_loop.add_handler(sock.fileno(), callback, io_loop.READ) io_loop.start()
内部实现中IOLoop
根据不一样的操做系统使用不一样的IO复用机制。
Linux
-------epoll
:使用的是level-triggered
触发方式。FreeBSD
---kQuene
other
-----select
###1.1内部实现-数据结构###
self._impl = impl or _poll() self._handlers = {} self._events = {} self._callbacks = set() self._timeouts = [] self._running = False self._stopped = False self._blocking_log_threshold = None
_impl
表示后端使用的IO复用机制,能够本身指定或者采用系统默认
_handlers
维护了fd与其对应handler关系
_events
维护了fd与其对应event关系
_callback
维护全部在下一个IO迭代中会被调用的回调函数
_timeouts
是一个有序列表,根据deadline排序,保存了未到期的全部定时器
_running
和_stopped
是用于表示IOLoop是否start
_blocking_log_threshold
表示最大阻塞时间
###1.2内部实现-主要函数### IOLoop.start()
实现了事件循环,内部实现为一个巨大的while
循环,在每次迭代中,会一次检查如下事件:
_callbacks
_timeouts
poll
返回的_events
一旦事件就绪,就出发对应的回调函数,这个循环会一直持续到IOLoop.stop()
的调用,即_stopped
被置为True
。
IOLoop.start()
流程图:
###1.3内部实现-实现细节###
因为
IOLoop
平时阻塞在poll
调用中,为了让IOLoop
可以当即执行callback
函数,须要设法唤醒它。这里采用的是pipe
,IOLoop
始终监视该管道的readable事件,在须要唤醒的时候,往管道中写入一个字节,这样IOLoop
技能从IO复用(poll)中返回。
初始化pipe if os.name != 'nt': r, w = os.pipe() self._set_nonblocking(r) self._set_nonblocking(w) self._set_close_exec(r) self._set_close_exec(w) self._waker_reader = os.fdopen(r, "r", 0) self._waker_writer = os.fdopen(w, "w", 0)
唤醒
IOLoop
def _wake(self): try: self._waker_writer.write("x") except IOError: pass 在add_callback
时,须要唤醒IOLoop
从而使其当即执行callback
def add_callback(self, callback): self._callbacks.add(callback) self._wake()
由于callback函数可以对
_callbacks
进行修改(add, remove)等,因此用一个局部变量存储当前_callbacks
,对该局部变量进行操做。
对于
_callbacks
的执行,并无反复执行callback直到_callbacks
为空,这里这样作应该是为了防止IOLoop
陷入死循环,没法处理IO时间,并且也设置_blocking_log_threshold
,经过singer
的timer
来防止IOLoop
卡死。 若是_callbacks
不能执行完,这里会将poll_timeout
设置为0,即为当即返回,为的的在下次IO迭代中可以当即执行_callbacks
。
callbacks = list(self._callbacks) for callback in callbacks: if callback in self._callbacks: self._callbacks.remove(callback) self._run_callback(callback) if self._callbacks: poll_timeout = 0.0
对于
_timeouts
,采用列表存储,而且按照deadline从小到大排序,这样才每一个IO迭代中,只须要从头开始遍历列表获得比deadline小于当前时间的事件并执行便可。
#_Timeout的cmp函数 def __cmp__(self, other): return cmp((self.deadline, id(self.callback)), (other.deadline, id(other.callback)))
#往IOLoop中添加timeout,保持有序 def add_timeout(self, deadline, callback): timeout = _Timeout(deadline, callback) bisect.insort(self._timeouts, timeout) return timeout
##2.iostream## iostream
对非阻塞式的 socket 的简单封装,以方便经常使用读写操做。
###2.1内部实现-数据结构###
IOStream
内部维护了一个read_buffer和write_buffer,将维护的socket注册到IOLoop
上,利用IOLoop
管理读写事件。
def __init__(self, socket, io_loop=None, max_buffer_size=104857600, read_chunk_size=4096): ## self._read_buffer = "" self._write_buffer = "" self.io_loop = io_loop or ioloop.IOLoop.instance() self.io_loop.add_handler( self.socket.fileno(), self._handle_events, self._state) ##
###2.2内部实现-主要函数###
IOStream._handl_events()
根据对应events
的类型,调用不一样的callback
def _handle_events(self, fd, events): if events & self.io_loop.READ: self._handle_read() if events & self.io_loop.WRITE: self._handle_write() if events & self.io_loop.ERROR: self.close() return state = self.io_loop.ERROR if self._read_delimiter or self._read_bytes: state |= self.io_loop.READ if self._write_buffer: state |= self.io_loop.WRITE if state != self._state: self._state = state self.io_loop.update_handler(self.socket.fileno(), self._state)
#总结 在Tornado1.0版本中,IOLoop
只考虑在单线程下的实现,对于多线程的处理并无考虑,其函数并无考虑跨线程调用对关键数据的保护。
例如对于_callbacks
,暴露给全部的线程,单多线程状况下,可能会出现callback None的状况。 在Tornado4.1中,对于多线程的状况有了考虑,具体的见后序博文。
在IOStream
中,主要涉及到的是一个buffer的设计,内部使用了chunk,一个简易的块进行加快读写。这个的设计没有什么出彩之处,对于buffer的设计能够看看别的库是怎么设计的(TODO)。