经过了解异步设计的由来,来深刻理解异步事件机制。html
代码地址node
为了深刻理解异步的概念,就必须先了解异步设计的由来。算法
显然易见的是,同步的概念随着咱们学习第一个输出Hello World的程序,就已经深刻人心。编程
然而咱们也很容易忘记一个事实:一个现代编程语言(如Python)作了很是多的工做,来指导和约束你如何去构建你本身的一个程序。segmentfault
def f(): print("in f()") def g(): print("in g()") f() g()
你知道in g()
必定输出在in f()
以后,即函数f完成前函数g不会执行。这即为同步。在现代编程语言的帮助下,这一切显得很是的天然,从而也让咱们能够将咱们的程序分解成
松散耦合的函数:一个函数并不须要关心谁调用了它,它甚至能够没有返回值,只是完成一些操做。api
固然关于这些是怎么具体实现的就不探究了,然而随着一个程序的功能的增长,同步设计的开发理念并不足以实现一些复杂的功能。数组
写一个程序每隔3秒打印“Hello World”,同时等待用户命令行的输入。用户每输入一个天然数n,就计算并打印斐波那契函数的值F(n),以后继续等待下一个输入
因为等待用户输入是一个阻塞的操做,若是按照同步的设计理念:若是用户未输入,则意味着接下来的函数并不会执行,天然没有办法作到一边输出“Hello World”,
一边等待用户输入。为了让程序能解决这样一个问题,就必须引入并发机制,即让程序可以同时作不少事,线程是其中一种。
具体代码在example/hello_threads.py
中。
from threading import Thread from time import sleep from time import time from fib import timed_fib def print_hello(): while True: print("{} - Hello world!".format(int(time()))) sleep(3) def read_and_process_input(): while True: n = int(input()) print('fib({}) = {}'.format(n, timed_fib(n))) def main(): # Second thread will print the hello message. Starting as a daemon means # the thread will not prevent the process from exiting. t = Thread(target=print_hello) t.daemon = True t.start() # Main thread will read and process input read_and_process_input() if __name__ == '__main__': main()
对于以前那样的问题,引入线程机制就能够解决这种简单的并发问题。而对于线程咱们应该有一个简单的认知:
线程是由操做系统的调度器来调度的, 调度器统一负责管理调度进程中的线程。
而随着现实问题的复杂化,如10K问题。
在Nginx没有流行起来的时候,常被提到一个词 10K(并发1W)。在互联网的早期,网速很慢、用户群很小需求也只是简单的页面浏览,
因此最初的服务器设计者们使用基于进程/线程模型,也就是一个TCP链接就是分配一个进程(线程)。谁都没有想到如今Web 2.0时候用户群里和复杂的页面交互问题,
而如今即时通讯和实在实时互动已经很广泛了。那么你设想若是每个用户都和服务器保持一个(甚至多个)TCP链接才能进行实时的数据交互,别说BAT这种量级的网站,
就是豆瓣这种比较小的网站,同时的并发链接也要过亿了。进程是操做系统最昂贵的资源,一台机器没法建立不少进程。若是要建立10K个进程,那么操做系统是没法承受的。
就算咱们不讨论随着服务器规模大幅上升带来复杂度几何级数上升的问题,采用分布式系统,只是维持1亿用户在线须要10万台服务器,成本巨大,也只有FLAG、BAT这样公司才有财力购买如此多的服务器。
而一样存在一些缘由,让咱们避免考虑多线程的方式:
为了解决这一问题,出现了「用同一进程/线程来同时处理若干链接」的思路,也就是I/O多路复用。
以Linux操做系统为例,Linux操做系统给出了三种监听文件描述符的机制,具体实现可参考:
select: 每一个链接对应一个描述符(socket),循环处理各个链接,先查下它的状态,ready了就进行处理,不ready就不进行处理。可是缺点不少:
poll: 本质上和select没有区别,可是因为它是基于链表来存储的,没有最大链接数的限制。缺点是:
epoll: 它使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。epoll支持水平触发和边缘触发,最大的特色在于「边缘触发」,它只告诉进程哪些刚刚变为就绪态,而且只会通知一次。使用epoll的优势不少:
综上所述,经过epoll的机制,给现代高级语言提供了高并发、高性能解决方案的基础。而一样FreeBSD推出了kqueue,Windows推出了IOCP,Solaris推出了/dev/poll。
而在Python3.4中新增了selectors模块,用于封装各个操做系统所提供的I/O多路复用的接口。
那么以前一样的问题,咱们能够经过I/O多路复用的机制实现并发。
写一个程序每隔3秒打印“Hello World”,同时等待用户命令行的输入。用户每输入一个天然数n,就计算并打印斐波那契函数的值F(n),以后继续等待下一个输入
经过最基础的轮询机制(poll),轮询标准输入(stdin)是否变为可读的状态,从而当标准输入能被读取时,去执行计算Fibonacci数列。而后判断时间是否过去三秒钟,从而是否输出"Hello World!".
具体代码在example/hello_selectors_poll.py
中。
注意:在Windows中并不是一切都是文件,因此该实例代码没法在Windows平台下运行。
import selectors import sys from time import time from fib import timed_fib def process_input(stream): text = stream.readline() n = int(text.strip()) print('fib({}) = {}'.format(n, timed_fib(n))) def print_hello(): print("{} - Hello world!".format(int(time()))) def main(): selector = selectors.DefaultSelector() # Register the selector to poll for "read" readiness on stdin selector.register(sys.stdin, selectors.EVENT_READ) last_hello = 0 # Setting to 0 means the timer will start right away while True: # Wait at most 100 milliseconds for input to be available for event, mask in selector.select(0.1): process_input(event.fileobj) if time() - last_hello > 3: last_hello = time() print_hello() if __name__ == '__main__': main()
从上面解决问题的设计方案演化过程,从同步到并发,从线程到I/O多路复用。能够看出根本思路去须要程序自己高效去阻塞,
让CPU可以执行核心任务。意味着将数据包处理,内存管理,处理器调度等任务从内核态切换到应用态,操做系统只处理控制层,
数据层彻底交给应用程序在应用态中处理。极大程度的减小了程序在应用态和内核态之间切换的开销,让高性能、高并发成为了可能。
经过以前的探究,不难发现一个同步的程序也能经过操做系统的接口实现“并发”,而这种“并发”的行为便可称之为异步。
以前经过I/O复用的所提供的解决方案,进一步抽象,便可抽象出最基本的框架事件循环(Event Loop),而其中最容易理解的实现,
则是回调(Callback).
经过对事件自己的抽象,以及其对应的处理函数(handler),能够实现以下算法:
维护一个按时间排序的事件列表,最近须要运行的定时器在最前面。这样的话每次只须要从头检查是否有超时的事件并执行它们。
bisect.insort使得维护这个列表更加容易,它会帮你在合适的位置插入新的定时器事件组。
具体代码在example/hello_event_loop_callback.py
中。
注意:在Windows中并不是一切都是文件,因此该实例代码没法在Windows平台下运行。
from bisect import insort from fib import timed_fib from time import time import selectors import sys class EventLoop(object): """ Implements a callback based single-threaded event loop as a simple demonstration. """ def __init__(self, *tasks): self._running = False self._stdin_handlers = [] self._timers = [] self._selector = selectors.DefaultSelector() self._selector.register(sys.stdin, selectors.EVENT_READ) def run_forever(self): self._running = True while self._running: # First check for available IO input for key, mask in self._selector.select(0): line = key.fileobj.readline().strip() for callback in self._stdin_handlers: callback(line) # Handle timer events while self._timers and self._timers[0][0] < time(): handler = self._timers[0][1] del self._timers[0] handler() def add_stdin_handler(self, callback): self._stdin_handlers.append(callback) def add_timer(self, wait_time, callback): insort(self._timers, (time() + wait_time, callback)) def stop(self): self._running = False def main(): loop = EventLoop() def on_stdin_input(line): if line == 'exit': loop.stop() return n = int(line) print("fib({}) = {}".format(n, timed_fib(n))) def print_hello(): print("{} - Hello world!".format(int(time()))) loop.add_timer(3, print_hello) def f(x): def g(): print(x) return g loop.add_stdin_handler(on_stdin_input) loop.add_timer(0, print_hello) loop.run_forever() if __name__ == '__main__': main()