tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就本身实现了一整套基于 epoll 的单线程异步架构(其余 python web 框架的自带 server 基本是基于 wsgi 写的简单服务器,并无本身实现底层结构。 那么 tornado.ioloop
就是 tornado web server 最底层的实现。html
看 ioloop 以前,咱们须要了解一些预备知识,有助于咱们理解 ioloop。python
ioloop 的实现基于 epoll ,那么什么是 epoll? epoll 是Linux内核为处理大批量文件描述符而做了改进的 poll 。
那么什么又是 poll ? 首先,咱们回顾一下, socket 通讯时的服务端,当它接受( accept )一个链接并创建通讯后( connection )就进行通讯,而此时咱们并不知道链接的客户端有没有信息发完。 这时候咱们有两种选择:linux
第一种办法虽然能够解决问题,但咱们要注意的是对于一个线程\进程同时只能处理一个 socket 通讯,其余链接只能被阻塞。 显然这种方式在单进程状况下不现实。git
第二种办法要比第一种好一些,多个链接能够统一在必定时间内轮流看一遍里面有没有数据要读写,看上去咱们能够处理多个链接了,这个方式就是 poll / select 的解决方案。 看起来彷佛解决了问题,但实际上,随着链接愈来愈多,轮询所花费的时间将愈来愈长,而服务器链接的 socket 大多不是活跃的,因此轮询所花费的大部分时间将是无用的。为了解决这个问题, epoll 被创造出来,它的概念和 poll 相似,不过每次轮询时,他只会把有数据活跃的 socket 挑出来轮询,这样在有大量链接时轮询就节省了大量时间。github
对于 epoll 的操做,其实也很简单,只要 4 个 API 就能够彻底操做它。web
用来建立一个 epoll 描述符( 就是建立了一个 epoll )api
操做 epoll 中的 event;可用参数有:服务器
参数 | 含义 |
---|---|
EPOLL_CTL_ADD | 添加一个新的epoll事件 |
EPOLL_CTL_DEL | 删除一个epoll事件 |
EPOLL_CTL_MOD | 改变一个事件的监听方式 |
而事件的监听方式有七种,而咱们只须要关心其中的三种:架构
宏定义 | 含义 |
---|---|
EPOLLIN | 缓冲区满,有数据可读 |
EPOLLOUT | 缓冲区空,可写数据 |
EPOLLERR | 发生错误 |
就是让 epoll 开始工做,里面有个参数 timeout,当设置为非 0 正整数时,会监听(阻塞) timeout 秒;设置为 0 时当即返回,设置为 -1 时一直监听。并发
在监听时有数据活跃的链接时其返回活跃的文件句柄列表(此处为 socket 文件句柄)。
关闭 epoll
如今了解了 epoll 后,咱们就能够来看 ioloop 了 (若是对 epoll 还有疑问能够看这两篇资料: epoll 的原理是什么、百度百科:epoll)
不少初学者必定好奇 tornado 运行服务器最后那一句 tornado.ioloop.IOLoop.current().start()
究竟是干什么的。 咱们先不解释做用,来看看这一句代码背后到底都在干什么。
先贴 ioloop 代码:
1 from __future__ import absolute_import, division, print_function, with_statement 2 import datetime 3 import errno 4 import functools 5 import heapq # 最小堆 6 import itertools 7 import logging 8 import numbers 9 import os 10 import select 11 import sys 12 import threading 13 import time 14 import traceback 15 import math 16 from tornado.concurrent import TracebackFuture, is_future 17 from tornado.log import app_log, gen_log 18 from tornado.platform.auto import set_close_exec, Waker 19 from tornado import stack_context 20 from tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds 21 try: 22 import signal 23 except ImportError: 24 signal = None 25 if PY3: 26 import _thread as thread 27 else: 28 import thread 29 _POLL_TIMEOUT = 3600.0 30 class TimeoutError(Exception): 31 pass 32 class IOLoop(Configurable): 33 _EPOLLIN = 0x001 34 _EPOLLPRI = 0x002 35 _EPOLLOUT = 0x004 36 _EPOLLERR = 0x008 37 _EPOLLHUP = 0x010 38 _EPOLLRDHUP = 0x2000 39 _EPOLLONESHOT = (1 << 30) 40 _EPOLLET = (1 << 31) 41 # Our events map exactly to the epoll events 42 NONE = 0 43 READ = _EPOLLIN 44 WRITE = _EPOLLOUT 45 ERROR = _EPOLLERR | _EPOLLHUP 46 # Global lock for creating global IOLoop instance 47 _instance_lock = threading.Lock() 48 _current = threading.local() 49 @staticmethod 50 def instance(): 51 if not hasattr(IOLoop, "_instance"): 52 with IOLoop._instance_lock: 53 if not hasattr(IOLoop, "_instance"): 54 # New instance after double check 55 IOLoop._instance = IOLoop() 56 return IOLoop._instance 57 @staticmethod 58 def initialized(): 59 """Returns true if the singleton instance has been created.""" 60 return hasattr(IOLoop, "_instance") 61 def install(self): 62 assert not IOLoop.initialized() 63 IOLoop._instance = self 64 @staticmethod 65 def clear_instance(): 66 """Clear the global `IOLoop` instance. 67 .. versionadded:: 4.0 68 """ 69 if hasattr(IOLoop, "_instance"): 70 del IOLoop._instance 71 @staticmethod 72 def current(instance=True): 73 current = getattr(IOLoop._current, "instance", None) 74 if current is None and instance: 75 return IOLoop.instance() 76 return current 77 def make_current(self): 78 IOLoop._current.instance = self 79 @staticmethod 80 def clear_current(): 81 IOLoop._current.instance = None 82 @classmethod 83 def configurable_base(cls): 84 return IOLoop 85 @classmethod 86 def configurable_default(cls): 87 if hasattr(select, "epoll"): 88 from tornado.platform.epoll import EPollIOLoop 89 return EPollIOLoop 90 if hasattr(select, "kqueue"): 91 # Python 2.6+ on BSD or Mac 92 from tornado.platform.kqueue import KQueueIOLoop 93 return KQueueIOLoop 94 from tornado.platform.select import SelectIOLoop 95 return SelectIOLoop 96 def initialize(self, make_current=None): 97 if make_current is None: 98 if IOLoop.current(instance=False) is None: 99 self.make_current() 100 elif make_current: 101 if IOLoop.current(instance=False) is not None: 102 raise RuntimeError("current IOLoop already exists") 103 self.make_current() 104 def close(self, all_fds=False): 105 raise NotImplementedError() 106 def add_handler(self, fd, handler, events): 107 raise NotImplementedError() 108 def update_handler(self, fd, events): 109 raise NotImplementedError() 110 def remove_handler(self, fd): 111 raise NotImplementedError() 112 def set_blocking_signal_threshold(self, seconds, action): 113 raise NotImplementedError() 114 def set_blocking_log_threshold(self, seconds): 115 self.set_blocking_signal_threshold(seconds, self.log_stack) 116 def log_stack(self, signal, frame): 117 gen_log.warning('IOLoop blocked for %f seconds in\n%s', 118 self._blocking_signal_threshold, 119 ''.join(traceback.format_stack(frame))) 120 def start(self): 121 raise NotImplementedError() 122 def _setup_logging(self): 123 if not any([logging.getLogger().handlers, 124 logging.getLogger('tornado').handlers, 125 logging.getLogger('tornado.application').handlers]): 126 logging.basicConfig() 127 def stop(self): 128 raise NotImplementedError() 129 def run_sync(self, func, timeout=None): 130 future_cell = [None] 131 def run(): 132 try: 133 result = func() 134 if result is not None: 135 from tornado.gen import convert_yielded 136 result = convert_yielded(result) 137 except Exception: 138 future_cell[0] = TracebackFuture() 139 future_cell[0].set_exc_info(sys.exc_info()) 140 else: 141 if is_future(result): 142 future_cell[0] = result 143 else: 144 future_cell[0] = TracebackFuture() 145 future_cell[0].set_result(result) 146 self.add_future(future_cell[0], lambda future: self.stop()) 147 self.add_callback(run) 148 if timeout is not None: 149 timeout_handle = self.add_timeout(self.time() + timeout, self.stop) 150 self.start() 151 if timeout is not None: 152 self.remove_timeout(timeout_handle) 153 if not future_cell[0].done(): 154 raise TimeoutError('Operation timed out after %s seconds' % timeout) 155 return future_cell[0].result() 156 def time(self): 157 return time.time()
IOLoop 类首先声明了 epoll 监听事件的宏定义,固然,如前文所说,咱们只要关心其中的 EPOLLIN 、 EPOLLOUT 、 EPOLLERR 就行。
类中的方法有不少,看起来有点晕,但其实咱们只要关心 IOLoop 核心功能的方法便可,其余的方法在明白核心功能后也就不难理解了。因此接下来咱们着重分析核心代码。
instance
、 initialized
、 install
、 clear_instance
、 current
、 make_current
、 clear_current
这些方法不用在乎细节,总之如今记住它们都是为了让 IOLoop 类变成一个单例,保证从全局上调用的都是同一个 IOLoop 就好。
你必定疑惑 IOLoop 为什么没有 __init__
, 实际上是由于要初始化成为单例,IOLoop 的 new 函数已经被改写了,同时指定了 initialize
作为它的初始化方法,因此此处没有 __init__
。 说到这,ioloop 的代码里好像没有看到 new
方法,这又是什么状况? 咱们先暂时记住这里。
接着咱们来看这个初始化方法:
1 def initialize(self, make_current=None): 2 if make_current is None: 3 if IOLoop.current(instance=False) is None: 4 self.make_current() 5 elif make_current: 6 if IOLoop.current(instance=False) is None: 7 raise RuntimeError("current IOLoop already exists") 8 self.make_current() 9 def make_current(self): 10 IOLoop._current.instance = self
what? 里面只是判断了是否第一次初始化或者调用 self.make_current()
初始化,而 make_current()
里也仅仅是把实例指定为本身,那么初始化到底去哪了?
而后再看看 start()
、 run()
、 close()
这些关键的方法都成了返回 NotImplementedError
错误,所有未定义?!跟网上搜到的源码分析彻底不同啊。 这时候看下 IOLoop 的继承关系,原来问题出在这里,以前的 tornado.ioloop 继承自 object 因此全部的一切都本身实现,而如今版本的 tornado.ioloop 则继承自 Configurable
看起来如今的 IOLoop 已经成为了一个基类,只定义了接口。 因此接着看 Configurable
代码:
1 class Configurable(object): 2 __impl_class = None 3 __impl_kwargs = None 4 def __new__(cls, *args, **kwargs): 5 base = cls.configurable_base() 6 init_kwargs = {} 7 if cls is base: 8 impl = cls.configured_class() 9 if base.__impl_kwargs: 10 init_kwargs.update(base.__impl_kwargs) 11 else: 12 impl = cls 13 init_kwargs.update(kwargs) 14 instance = super(Configurable, cls).__new__(impl) 15 # initialize vs __init__ chosen for compatibility with AsyncHTTPClient 16 # singleton magic. If we get rid of that we can switch to __init__ 17 # here too. 18 instance.initialize(*args, **init_kwargs) 19 return instance 20 @classmethod 21 def configurable_base(cls): 22 """Returns the base class of a configurable hierarchy. 23 This will normally return the class in which it is defined. 24 (which is *not* necessarily the same as the cls classmethod parameter). 25 """ 26 raise NotImplementedError() 27 @classmethod 28 def configurable_default(cls): 29 """Returns the implementation class to be used if none is configured.""" 30 raise NotImplementedError() 31 def initialize(self): 32 """Initialize a `Configurable` subclass instance. 33 Configurable classes should use `initialize` instead of ``__init__``. 34 .. versionchanged:: 4.2 35 Now accepts positional arguments in addition to keyword arguments. 36 """ 37 @classmethod 38 def configure(cls, impl, **kwargs): 39 """Sets the class to use when the base class is instantiated. 40 Keyword arguments will be saved and added to the arguments passed 41 to the constructor. This can be used to set global defaults for 42 some parameters. 43 """ 44 base = cls.configurable_base() 45 if isinstance(impl, (unicode_type, bytes)): 46 impl = import_object(impl) 47 if impl is not None and not issubclass(impl, cls): 48 raise ValueError("Invalid subclass of %s" % cls) 49 base.__impl_class = impl 50 base.__impl_kwargs = kwargs 51 @classmethod 52 def configured_class(cls): 53 """Returns the currently configured class.""" 54 base = cls.configurable_base() 55 if cls.__impl_class is None: 56 base.__impl_class = cls.configurable_default() 57 return base.__impl_class 58 @classmethod 59 def _save_configuration(cls): 60 base = cls.configurable_base() 61 return (base.__impl_class, base.__impl_kwargs) 62 @classmethod 63 def _restore_configuration(cls, saved): 64 base = cls.configurable_base() 65 base.__impl_class = saved[0] 66 base.__impl_kwargs = saved[1]
以前咱们寻找的 __new__
出现了! 注意其中这句: impl = cls.configured_class()
impl 在这里就是 epoll ,它的生成函数是 configured_class()
, 而其方法里又有 base.__impl_class = cls.configurable_default()
,调用了 configurable_default()
。而 Configurable
的 configurable_default()
:
1 def configurable_default(cls): 2 """Returns the implementation class to be used if none is configured.""" 3 raise NotImplementedError()
显然也是个接口,那么咱们再回头看 ioloop 的 configurable_default()
:
1 def configurable_default(cls): 2 if hasattr(select, "epoll"): 3 from tornado.platform.epoll import EPollIOLoop 4 return EPollIOLoop 5 if hasattr(select, "kqueue"): 6 # Python 2.6+ on BSD or Mac 7 from tornado.platform.kqueue import KQueueIOLoop 8 return KQueueIOLoop 9 from tornado.platform.select import SelectIOLoop 10 return SelectIOLoop
原来这是个工厂函数,根据不一样的操做系统返回不一样的事件池(linux 就是 epoll, mac 返回 kqueue,其余就返回普通的 select。 kqueue 基本等同于 epoll, 只是不一样系统对其的不一样实现)
如今线索转移到了 tornado.platform.epoll.EPollIOLoop
上,咱们再来看看 EPollIOLoop
:
1 import select 2 from tornado.ioloop import PollIOLoop 3 class EPollIOLoop(PollIOLoop): 4 def initialize(self, **kwargs): 5 super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
EPollIOLoop
彻底继承自 PollIOLoop
(注意这里是 PollIOLoop 不是 IOLoop)并只是在初始化时指定了 impl 是 epoll,因此看起来咱们用 IOLoop 初始化最后初始化的其实就是这个 PollIOLoop,因此接下来,咱们真正须要理解和阅读的内容应该都在这里:
1 class PollIOLoop(IOLoop): 2 """Base class for IOLoops built around a select-like function. 3 For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` 4 (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or 5 `tornado.platform.select.SelectIOLoop` (all platforms). 6 """ 7 def initialize(self, impl, time_func=None, **kwargs): 8 super(PollIOLoop, self).initialize(**kwargs) 9 self._impl = impl 10 if hasattr(self._impl, 'fileno'): 11 set_close_exec(self._impl.fileno()) 12 self.time_func = time_func or time.time 13 self._handlers = {} 14 self._events = {} 15 self._callbacks = [] 16 self._callback_lock = threading.Lock() 17 self._timeouts = [] 18 self._cancellations = 0 19 self._running = False 20 self._stopped = False 21 self._closing = False 22 self._thread_ident = None 23 self._blocking_signal_threshold = None 24 self._timeout_counter = itertools.count() 25 # Create a pipe that we send bogus data to when we want to wake 26 # the I/O loop when it is idle 27 self._waker = Waker() 28 self.add_handler(self._waker.fileno(), 29 lambda fd, events: self._waker.consume(), 30 self.READ) 31 def close(self, all_fds=False): 32 with self._callback_lock: 33 self._closing = True 34 self.remove_handler(self._waker.fileno()) 35 if all_fds: 36 for fd, handler in self._handlers.values(): 37 self.close_fd(fd) 38 self._waker.close() 39 self._impl.close() 40 self._callbacks = None 41 self._timeouts = None 42 def add_handler(self, fd, handler, events): 43 fd, obj = self.split_fd(fd) 44 self._handlers[fd] = (obj, stack_context.wrap(handler)) 45 self._impl.register(fd, events | self.ERROR) 46 def update_handler(self, fd, events): 47 fd, obj = self.split_fd(fd) 48 self._impl.modify(fd, events | self.ERROR) 49 def remove_handler(self, fd): 50 fd, obj = self.split_fd(fd) 51 self._handlers.pop(fd, None) 52 self._events.pop(fd, None) 53 try: 54 self._impl.unregister(fd) 55 except Exception: 56 gen_log.debug("Error deleting fd from IOLoop", exc_info=True) 57 def set_blocking_signal_threshold(self, seconds, action): 58 if not hasattr(signal, "setitimer"): 59 gen_log.error("set_blocking_signal_threshold requires a signal module " 60 "with the setitimer method") 61 return 62 self._blocking_signal_threshold = seconds 63 if seconds is not None: 64 signal.signal(signal.SIGALRM, 65 action if action is not None else signal.SIG_DFL) 66 def start(self): 67 ... 68 try: 69 while True: 70 # Prevent IO event starvation by delaying new callbacks 71 # to the next iteration of the event loop. 72 with self._callback_lock: 73 callbacks = self._callbacks 74 self._callbacks = [] 75 # Add any timeouts that have come due to the callback list. 76 # Do not run anything until we have determined which ones 77 # are ready, so timeouts that call add_timeout cannot 78 # schedule anything in this iteration. 79 due_timeouts = [] 80 if self._timeouts: 81 now = self.time() 82 while self._timeouts: 83 if self._timeouts[0].callback is None: 84 # The timeout was cancelled. Note that the 85 # cancellation check is repeated below for timeouts 86 # that are cancelled by another timeout or callback. 87 heapq.heappop(self._timeouts) 88 self._cancellations -= 1 89 elif self._timeouts[0].deadline <= now: 90 due_timeouts.append(heapq.heappop(self._timeouts)) 91 else: 92 break 93 if (self._cancellations > 512 94 and self._cancellations > (len(self._timeouts) >> 1)): 95 # Clean up the timeout queue when it gets large and it's 96 # more than half cancellations. 97 self._cancellations = 0 98 self._timeouts = [x for x in self._timeouts 99 if x.callback is not None] 100 heapq.heapify(self._timeouts) 101 for callback in callbacks: 102 self._run_callback(callback) 103 for timeout in due_timeouts: 104 if timeout.callback is not None: 105 self._run_callback(timeout.callback) 106 # Closures may be holding on to a lot of memory, so allow 107 # them to be freed before we go into our poll wait. 108 callbacks = callback = due_timeouts = timeout = None 109 if self._callbacks: 110 # If any callbacks or timeouts called add_callback, 111 # we don't want to wait in poll() before we run them. 112 poll_timeout = 0.0 113 elif self._timeouts: 114 # If there are any timeouts, schedule the first one. 115 # Use self.time() instead of 'now' to account for time 116 # spent running callbacks. 117 poll_timeout = self._timeouts[0].deadline - self.time() 118 poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) 119 else: 120 # No timeouts and no callbacks, so use the default. 121 poll_timeout = _POLL_TIMEOUT 122 if not self._running: 123 break 124 if self._blocking_signal_threshold is not None: 125 # clear alarm so it doesn't fire while poll is waiting for 126 # events. 127 signal.setitimer(signal.ITIMER_REAL, 0, 0) 128 try: 129 event_pairs = self._impl.poll(poll_timeout) 130 except Exception as e: 131 # Depending on python version and IOLoop implementation, 132 # different exception types may be thrown and there are 133 # two ways EINTR might be signaled: 134 # * e.errno == errno.EINTR 135 # * e.args is like (errno.EINTR, 'Interrupted system call') 136 if errno_from_exception(e) == errno.EINTR: 137 continue 138 else: 139 raise 140 if self._blocking_signal_threshold is not None: 141 signal.setitimer(signal.ITIMER_REAL, 142 self._blocking_signal_threshold, 0) 143 # Pop one fd at a time from the set of pending fds and run 144 # its handler. Since that handler may perform actions on 145 # other file descriptors, there may be reentrant calls to 146 # this IOLoop that update self._events 147 self._events.update(event_pairs) 148 while self._events: 149 fd, events = self._events.popitem() 150 try: 151 fd_obj, handler_func = self._handlers[fd] 152 handler_func(fd_obj, events) 153 except (OSError, IOError) as e: 154 if errno_from_exception(e) == errno.EPIPE: 155 # Happens when the client closes the connection 156 pass 157 else: 158 self.handle_callback_exception(self._handlers.get(fd)) 159 except Exception: 160 self.handle_callback_exception(self._handlers.get(fd)) 161 fd_obj = handler_func = None 162 finally: 163 # reset the stopped flag so another start/stop pair can be issued 164 self._stopped = False 165 if self._blocking_signal_threshold is not None: 166 signal.setitimer(signal.ITIMER_REAL, 0, 0) 167 IOLoop._current.instance = old_current 168 if old_wakeup_fd is not None: 169 signal.set_wakeup_fd(old_wakeup_fd) 170 def stop(self): 171 self._running = False 172 self._stopped = True 173 self._waker.wake() 174 def time(self): 175 return self.time_func() 176 def call_at(self, deadline, callback, *args, **kwargs): 177 timeout = _Timeout( 178 deadline, 179 functools.partial(stack_context.wrap(callback), *args, **kwargs), 180 self) 181 heapq.heappush(self._timeouts, timeout) 182 return timeout 183 def remove_timeout(self, timeout): 184 # Removing from a heap is complicated, so just leave the defunct 185 # timeout object in the queue (see discussion in 186 # http://docs.python.org/library/heapq.html). 187 # If this turns out to be a problem, we could add a garbage 188 # collection pass whenever there are too many dead timeouts. 189 timeout.callback = None 190 self._cancellations += 1 191 def add_callback(self, callback, *args, **kwargs): 192 with self._callback_lock: 193 if self._closing: 194 raise RuntimeError("IOLoop is closing") 195 list_empty = not self._callbacks 196 self._callbacks.append(functools.partial( 197 stack_context.wrap(callback), *args, **kwargs)) 198 if list_empty and thread.get_ident() != self._thread_ident: 199 # If we're in the IOLoop's thread, we know it's not currently 200 # polling. If we're not, and we added the first callback to an 201 # empty list, we may need to wake it up (it may wake up on its 202 # own, but an occasional extra wake is harmless). Waking 203 # up a polling IOLoop is relatively expensive, so we try to 204 # avoid it when we can. 205 self._waker.wake() 206 def add_callback_from_signal(self, callback, *args, **kwargs): 207 with stack_context.NullContext(): 208 if thread.get_ident() != self._thread_ident: 209 # if the signal is handled on another thread, we can add 210 # it normally (modulo the NullContext) 211 self.add_callback(callback, *args, **kwargs) 212 else: 213 # If we're on the IOLoop's thread, we cannot use 214 # the regular add_callback because it may deadlock on 215 # _callback_lock. Blindly insert into self._callbacks. 216 # This is safe because the GIL makes list.append atomic. 217 # One subtlety is that if the signal interrupted the 218 # _callback_lock block in IOLoop.start, we may modify 219 # either the old or new version of self._callbacks, 220 # but either way will work. 221 self._callbacks.append(functools.partial( 222 stack_context.wrap(callback), *args, **kwargs))
果真, PollIOLoop 继承自 IOLoop 并实现了它的全部接口,如今咱们终于能够进入真正的正题了