深刻理解异步事件机制

前言

经过了解异步设计的由来,来深刻理解异步事件机制。html

代码地址node

什么是异步

为了深刻理解异步的概念,就必须先了解异步设计的由来。算法

同步

显然易见的是,同步的概念随着咱们学习第一个输出Hello World的程序,就已经深刻人心。编程

然而咱们也很容易忘记一个事实:一个现代编程语言(如Python)作了很是多的工做,来指导和约束你如何去构建你本身的一个程序。segmentfault

def f():
    print("in f()")
def g():
    print("in g()")
f()
g()

你知道in g()必定输出在in f()以后,即函数f完成前函数g不会执行。这即为同步。在现代编程语言的帮助下,这一切显得很是的天然,从而也让咱们能够将咱们的程序分解成
松散耦合的函数:一个函数并不须要关心谁调用了它,它甚至能够没有返回值,只是完成一些操做。api

固然关于这些是怎么具体实现的就不探究了,然而随着一个程序的功能的增长,同步设计的开发理念并不足以实现一些复杂的功能。数组

并发

写一个程序每隔3秒打印“Hello World”,同时等待用户命令行的输入。用户每输入一个天然数n,就计算并打印斐波那契函数的值F(n),以后继续等待下一个输入

因为等待用户输入是一个阻塞的操做,若是按照同步的设计理念:若是用户未输入,则意味着接下来的函数并不会执行,天然没有办法作到一边输出“Hello World”,
一边等待用户输入。为了让程序能解决这样一个问题,就必须引入并发机制,即让程序可以同时作不少事,线程是其中一种。

线程

具体代码在example/hello_threads.py中。

from threading import Thread
from time import sleep
from time import time
from fib import timed_fib
def print_hello():
    while True:
        print("{} - Hello world!".format(int(time())))
        sleep(3)
def read_and_process_input():
    while True:
        n = int(input())
        print('fib({}) = {}'.format(n, timed_fib(n)))
def main():
    # Second thread will print the hello message. Starting as a daemon means
    # the thread will not prevent the process from exiting.
    t = Thread(target=print_hello)
    t.daemon = True
    t.start()
    # Main thread will read and process input
    read_and_process_input()
if __name__ == '__main__':
    main()

对于以前那样的问题,引入线程机制就能够解决这种简单的并发问题。而对于线程咱们应该有一个简单的认知:

  • 一个线程能够理解为指令的序列和CPU执行的上下文的集合。
  • 一个同步的程序即进程,有且只会在一个线程中运行,因此当线程被阻塞,也就意味着整个进程被阻塞
  • 一个进程能够有多个线程,同一个进程中的线程共享了进程的一些资源,好比说内存,地址空间,文件描述符等。
  • 线程是由操做系统的调度器来调度的, 调度器统一负责管理调度进程中的线程。

    • 系统的调度器决定何时会把当前线程挂起,并把CPU的控制器交个另外一个线程。这个过程称之为称上下文切换,包括对于当前线程上下文的保存、对目标线程上下文的加载。
    • 上下文切换会对性能产生影响,由于它自己也须要CPU的周期来执行

I/O多路复用

而随着现实问题的复杂化,如10K问题。

在Nginx没有流行起来的时候,常被提到一个词 10K(并发1W)。在互联网的早期,网速很慢、用户群很小需求也只是简单的页面浏览,
因此最初的服务器设计者们使用基于进程/线程模型,也就是一个TCP链接就是分配一个进程(线程)。谁都没有想到如今Web 2.0时候用户群里和复杂的页面交互问题,
而如今即时通讯和实在实时互动已经很广泛了。那么你设想若是每个用户都和服务器保持一个(甚至多个)TCP链接才能进行实时的数据交互,别说BAT这种量级的网站,
就是豆瓣这种比较小的网站,同时的并发链接也要过亿了。进程是操做系统最昂贵的资源,一台机器没法建立不少进程。若是要建立10K个进程,那么操做系统是没法承受的。
就算咱们不讨论随着服务器规模大幅上升带来复杂度几何级数上升的问题,采用分布式系统,只是维持1亿用户在线须要10万台服务器,成本巨大,也只有FLAG、BAT这样公司才有财力购买如此多的服务器。

而一样存在一些缘由,让咱们避免考虑多线程的方式:

  • 线程在计算和资源消耗的角度来讲是比较昂贵的。
  • 线程并发所带来的问题,好比由于共享的内存空间而带来的死锁和竞态条件。这些又会致使更加复杂的代码,在编写代码的时候须要时不时地注意一些线程安全的问题。

为了解决这一问题,出现了「用同一进程/线程来同时处理若干链接」的思路,也就是I/O多路复用。

以Linux操做系统为例,Linux操做系统给出了三种监听文件描述符的机制,具体实现可参考

  • select: 每一个链接对应一个描述符(socket),循环处理各个链接,先查下它的状态,ready了就进行处理,不ready就不进行处理。可是缺点不少:

    • 每次调用select,都须要把fd集合从用户态拷贝到内核态,这个开销在fd不少时会很大
    • 同时每次调用select都须要在内核遍历传递进来的全部fd,这个开销在fd不少时也很大
    • select支持的文件描述符数量过小了,默认是1024
  • poll: 本质上和select没有区别,可是因为它是基于链表来存储的,没有最大链接数的限制。缺点是:

    • 大量的的数组被总体复制于用户态和内核地址空间之间,而无论这样的复制是否是有意义。
    • poll的特色是「水平触发(只要有数据能够读,无论怎样都会通知)」,若是报告后没有被处理,那么下次poll时会再次报告它。
  • epoll: 它使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。epoll支持水平触发和边缘触发,最大的特色在于「边缘触发」,它只告诉进程哪些刚刚变为就绪态,而且只会通知一次。使用epoll的优势不少:

    • 没有最大并发链接的限制,能打开的fd的上限远大于1024(1G的内存上能监听约10万个端口)
    • 效率提高,不是轮询的方式,不会随着fd数目的增长效率降低
    • 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减小复制开销

综上所述,经过epoll的机制,给现代高级语言提供了高并发、高性能解决方案的基础。而一样FreeBSD推出了kqueue,Windows推出了IOCP,Solaris推出了/dev/poll。

而在Python3.4中新增了selectors模块,用于封装各个操做系统所提供的I/O多路复用的接口。
那么以前一样的问题,咱们能够经过I/O多路复用的机制实现并发。

写一个程序每隔3秒打印“Hello World”,同时等待用户命令行的输入。用户每输入一个天然数n,就计算并打印斐波那契函数的值F(n),以后继续等待下一个输入

经过最基础的轮询机制(poll),轮询标准输入(stdin)是否变为可读的状态,从而当标准输入能被读取时,去执行计算Fibonacci数列。而后判断时间是否过去三秒钟,从而是否输出"Hello World!".
具体代码在example/hello_selectors_poll.py中。

注意:在Windows中并不是一切都是文件,因此该实例代码没法在Windows平台下运行。

import selectors
import sys
from time import time
from fib import timed_fib
def process_input(stream):
    text = stream.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))
def print_hello():
    print("{} - Hello world!".format(int(time())))
def main():
    selector = selectors.DefaultSelector()
    # Register the selector to poll for "read" readiness on stdin
    selector.register(sys.stdin, selectors.EVENT_READ)
    last_hello = 0  # Setting to 0 means the timer will start right away
    while True:
        # Wait at most 100 milliseconds for input to be available
        for event, mask in selector.select(0.1):
            process_input(event.fileobj)
        if time() - last_hello > 3:
            last_hello = time()
            print_hello()
if __name__ == '__main__':
    main()

从上面解决问题的设计方案演化过程,从同步到并发,从线程到I/O多路复用。能够看出根本思路去须要程序自己高效去阻塞,
让CPU可以执行核心任务。意味着将数据包处理,内存管理,处理器调度等任务从内核态切换到应用态,操做系统只处理控制层,
数据层彻底交给应用程序在应用态中处理。极大程度的减小了程序在应用态和内核态之间切换的开销,让高性能、高并发成为了可能。

异步

经过以前的探究,不难发现一个同步的程序也能经过操做系统的接口实现“并发”,而这种“并发”的行为便可称之为异步

以前经过I/O复用的所提供的解决方案,进一步抽象,便可抽象出最基本的框架事件循环(Event Loop),而其中最容易理解的实现,
则是回调(Callback).

回调

经过对事件自己的抽象,以及其对应的处理函数(handler),能够实现以下算法:

维护一个按时间排序的事件列表,最近须要运行的定时器在最前面。这样的话每次只须要从头检查是否有超时的事件并执行它们。

bisect.insort使得维护这个列表更加容易,它会帮你在合适的位置插入新的定时器事件组。
具体代码在example/hello_event_loop_callback.py中。

注意:在Windows中并不是一切都是文件,因此该实例代码没法在Windows平台下运行。

from bisect import insort
from fib import timed_fib
from time import time
import selectors
import sys
class EventLoop(object):
    """
    Implements a callback based single-threaded event loop as a simple
    demonstration.
    """
    def __init__(self, *tasks):
        self._running = False
        self._stdin_handlers = []
        self._timers = []
        self._selector = selectors.DefaultSelector()
        self._selector.register(sys.stdin, selectors.EVENT_READ)
    def run_forever(self):
        self._running = True
        while self._running:
            # First check for available IO input
            for key, mask in self._selector.select(0):
                line = key.fileobj.readline().strip()
                for callback in self._stdin_handlers:
                    callback(line)
            # Handle timer events
            while self._timers and self._timers[0][0] < time():
                handler = self._timers[0][1]
                del self._timers[0]
                handler()
    def add_stdin_handler(self, callback):
        self._stdin_handlers.append(callback)
    def add_timer(self, wait_time, callback):
        insort(self._timers, (time() + wait_time, callback))
    def stop(self):
        self._running = False
def main():
    loop = EventLoop()
    def on_stdin_input(line):
        if line == 'exit':
            loop.stop()
            return
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))
    def print_hello():
        print("{} - Hello world!".format(int(time())))
        loop.add_timer(3, print_hello)
    def f(x):
        def g():
            print(x)
        return g
    loop.add_stdin_handler(on_stdin_input)
    loop.add_timer(0, print_hello)
    loop.run_forever()
if __name__ == '__main__':
    main()

参考文献

相关文章
相关标签/搜索