首先要看的是关于 epoll 操做的方法,还记得前文说过的 epoll 只须要四个 api 就能彻底操做嘛? 咱们来看 PollIOLoop 的实现:linux
1 def add_handler(self, fd, handler, events): 2 fd, obj = self.split_fd(fd) 3 self._handlers[fd] = (obj, stack_context.wrap(handler)) 4 self._impl.register(fd, events | self.ERROR) 5 def update_handler(self, fd, events): 6 fd, obj = self.split_fd(fd) 7 self._impl.modify(fd, events | self.ERROR) 8 def remove_handler(self, fd): 9 fd, obj = self.split_fd(fd) 10 self._handlers.pop(fd, None) 11 self._events.pop(fd, None) 12 try: 13 self._impl.unregister(fd) 14 except Exception: 15 gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
epoll_ctl:这个三个方法分别对应 epoll_ctl 中的 add 、 modify 、 del 参数。 因此这三个方法实现了 epoll 的 epoll_ctl 。web
epoll_create:而后 epoll 的生成在前文 EPollIOLoop 的初始化中就已经完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
。 这个至关于 epoll_create 。api
epoll_wait:epoll_wait 操做则在 start()
中:event_pairs = self._impl.poll(poll_timeout)
服务器
epoll_close:而 epoll 的 close 则在 PollIOLoop 中的 close
方法内调用: self._impl.close()
完成。app
接下来看 PollIOLoop 的初始化方法中做了什么:ide
1 def initialize(self, impl, time_func=None, **kwargs): 2 super(PollIOLoop, self).initialize(**kwargs) 3 self._impl = impl # 指定 epoll 4 if hasattr(self._impl, 'fileno'): 5 set_close_exec(self._impl.fileno()) # fork 后关闭无用文件描述符 6 self.time_func = time_func or time.time # 指定获取当前时间的函数 7 self._handlers = {} # handler 的字典,储存被 epoll 监听的 handler,与打开它的文件描述符 ( file descriptor 简称 fd ) 一一对应 8 self._events = {} # event 的字典,储存 epoll 返回的活跃的 fd event pairs 9 self._callbacks = [] # 储存各个 fd 回调函数的列表 10 self._callback_lock = threading.Lock() # 指定进程锁 11 self._timeouts = [] # 将是一个最小堆结构,按照超时时间从小到大排列的 fd 的任务堆( 一般这个任务都会包含一个 callback ) 12 self._cancellations = 0 # 关于 timeout 的计数器 13 self._running = False # ioloop 是否在运行 14 self._stopped = False # ioloop 是否中止 15 self._closing = False # ioloop 是否关闭 16 self._thread_ident = None # 当前线程堆标识符 ( thread identify ) 17 self._blocking_signal_threshold = None # 系统信号, 主要用来在 epoll_wait 时判断是否会有 signal alarm 打断 epoll 18 self._timeout_counter = itertools.count() # 超时计数器 ( 暂时不是很明白具体做用,好像和前面的 _cancellations 有关系? 请大神讲讲) 19 self._waker = Waker() # 一个 waker 类,主要是对于管道 pipe 的操做,由于 ioloop 属于底层的数据操做,这里 epoll 监听的是 pipe 20 self.add_handler(self._waker.fileno(), 21 lambda fd, events: self._waker.consume(), 22 self.READ) # 将管道加入 epoll 监听,对于 web server 初始化时只须要关心 READ 事件
除了注释中的解释,还有几点补充:函数
2.Waker(): Waker 封装了对于管道 pipe 的操做:oop
1 def set_close_exec(fd): 2 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 3 fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) 4 def _set_nonblocking(fd): 5 flags = fcntl.fcntl(fd, fcntl.F_GETFL) 6 fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) 7 class Waker(interface.Waker): 8 def __init__(self): 9 r, w = os.pipe() 10 _set_nonblocking(r) 11 _set_nonblocking(w) 12 set_close_exec(r) 13 set_close_exec(w) 14 self.reader = os.fdopen(r, "rb", 0) 15 self.writer = os.fdopen(w, "wb", 0) 16 def fileno(self): 17 return self.reader.fileno() 18 def write_fileno(self): 19 return self.writer.fileno() 20 def wake(self): 21 try: 22 self.writer.write(b"x") 23 except IOError: 24 pass 25 def consume(self): 26 try: 27 while True: 28 result = self.reader.read() 29 if not result: 30 break 31 except IOError: 32 pass 33 def close(self): 34 self.reader.close() 35 self.writer.close() 36 ```
能够看到 waker 把 pipe 分为读、 写两个管道并都设置了非阻塞和 close_exec
。 注意wake(self)
方法中:self.writer.write(b"x")
直接向管道中写入随意字符从而释放管道。spa
ioloop 最核心的部分:.net
1 def start(self): 2 if self._running: # 判断是否已经运行 3 raise RuntimeError("IOLoop is already running") 4 self._setup_logging() 5 if self._stopped: 6 self._stopped = False # 设置中止为假 7 return 8 old_current = getattr(IOLoop._current, "instance", None) 9 IOLoop._current.instance = self 10 self._thread_ident = thread.get_ident() # 得到当前线程标识符 11 self._running = True # 设置运行 12 old_wakeup_fd = None 13 if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': 14 try: 15 old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) 16 if old_wakeup_fd != -1: 17 signal.set_wakeup_fd(old_wakeup_fd) 18 old_wakeup_fd = None 19 except ValueError: 20 old_wakeup_fd = None 21 try: 22 while True: # 服务器进程正式开始,相似于其余服务器的 serve_forever 23 with self._callback_lock: # 加锁,_callbacks 作为临界区不加锁进行读写会产生脏数据 24 callbacks = self._callbacks # 读取 _callbacks 25 self._callbacks = []. # 清空 _callbacks 26 due_timeouts = [] # 用于存放这个周期内已过时( 已超时 )的任务 27 if self._timeouts: # 判断 _timeouts 里是否有数据 28 now = self.time() # 获取当前时间,用来判断 _timeouts 里的任务有没有超时 29 while self._timeouts: # _timeouts 有数据时一直循环, _timeouts 是个最小堆,第一个数据永远是最小的, 这里第一个数据永远是最接近超时或已超时的 30 if self._timeouts[0].callback is None: # 超时任务无回调 31 heapq.heappop(self._timeouts) # 直接弹出 32 self._cancellations -= 1 # 超时计数器 -1 33 elif self._timeouts[0].deadline <= now: # 判断最小的数据是否超时 34 due_timeouts.append(heapq.heappop(self._timeouts)) # 超时就加到已超时列表里。 35 else: 36 break # 由于最小堆,若是没超时就直接退出循环( 后面的数据一定未超时 ) 37 if (self._cancellations > 512 38 and self._cancellations > (len(self._timeouts) >> 1)): # 当超时计数器大于 512 而且 大于 _timeouts 长度一半( >> 为右移运算, 至关于十进制数据被除 2 )时,清零计数器,并剔除 _timeouts 中无 callbacks 的任务 39 self._cancellations = 0 40 self._timeouts = [x for x in self._timeouts 41 if x.callback is not None] 42 heapq.heapify(self._timeouts) # 进行 _timeouts 最小堆化 43 for callback in callbacks: 44 self._run_callback(callback) # 运行 callbacks 里全部的 calllback 45 for timeout in due_timeouts: 46 if timeout.callback is not None: 47 self._run_callback(timeout.callback) # 运行全部已过时任务的 callback 48 callbacks = callback = due_timeouts = timeout = None # 释放内存 49 if self._callbacks: # _callbacks 里有数据时 50 poll_timeout = 0.0 # 设置 epoll_wait 时间为0( 当即返回 ) 51 elif self._timeouts: # _timeouts 里有数据时 52 poll_timeout = self._timeouts[0].deadline - self.time() 53 # 取最小过时时间当 epoll_wait 等待时间,这样当第一个任务过时时当即返回 54 poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) 55 # 若是最小过时时间大于默认等待时间 _POLL_TIMEOUT = 3600,则用 3600,若是最小过时时间小于0 就设置为0 当即返回。 56 else: 57 poll_timeout = _POLL_TIMEOUT # 默认 3600 s 等待时间 58 if not self._running: # 检查是否有系统信号中断运行,有则中断,无则继续 59 break 60 if self._blocking_signal_threshold is not None: 61 signal.setitimer(signal.ITIMER_REAL, 0, 0) # 开始 epoll_wait 以前确保 signal alarm 都被清空( 这样在 epoll_wait 过程当中不会被 signal alarm 打断 ) 62 try: 63 event_pairs = self._impl.poll(poll_timeout) # 获取返回的活跃事件队 64 except Exception as e: 65 if errno_from_exception(e) == errno.EINTR: 66 continue 67 else: 68 raise 69 if self._blocking_signal_threshold is not None: 70 signal.setitimer(signal.ITIMER_REAL, 71 self._blocking_signal_threshold, 0) # epoll_wait 结束, 再设置 signal alarm 72 self._events.update(event_pairs) # 将活跃事件加入 _events 73 while self._events: 74 fd, events = self._events.popitem() # 循环弹出事件 75 try: 76 fd_obj, handler_func = self._handlers[fd] # 处理事件 77 handler_func(fd_obj, events) 78 except (OSError, IOError) as e: 79 if errno_from_exception(e) == errno.EPIPE: 80 pass 81 else: 82 self.handle_callback_exception(self._handlers.get(fd)) 83 except Exception: 84 self.handle_callback_exception(self._handlers.get(fd)) 85 fd_obj = handler_func = None 86 finally: 87 self._stopped = False # 确保发生异常也继续运行 88 if self._blocking_signal_threshold is not None: 89 signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm 90 IOLoop._current.instance = old_current 91 if old_wakeup_fd is not None: 92 signal.set_wakeup_fd(old_wakeup_fd) # 和 start 开头部分对应,可是不是很清楚做用,求老司机带带路
最后来看 stop
:
1 def stop(self): 2 self._running = False 3 self._stopped = True 4 self._waker.wake()
这个很简单,设置判断条件,而后调用 self._waker.wake()
向 pipe 写入随意字符唤醒 ioloop 事件循环(感谢 mlcyng 指正这里的错误)。 over!
噗,写了这么长,终于写完了。 通过分析,咱们能够看到, ioloop 其实是对 epoll 的封装,并加入了一些对上层事件的处理和 server 相关的底层处理。
最后,感谢你们任劳任怨看到这,文中理解有误的地方还请多多指教!