一块儿读 Gevent 源码

这一篇主要想跟你们分享一下 Gevent 实现的基础逻辑,也是有同窗对这个很感兴趣,因此贴出来跟你们一块儿分享一下。python

Greenlet

咱们知道 Gevent 是基于 Greenlet 实现的,greenlet 有的时候也被叫作微线程或者协程。其实 Greenlet 自己很是简单,其自身实现的功能也很是直接。区别于常规的编程思路——顺序执行、调用进栈、返回出栈—— Greenlet 提供了一种在不一样的调用栈之间自由跳跃的功能。从一个简单的例子来看一下吧(摘自官方文档):git

from greenlet import greenlet

def test1():
    print 12
    gr2.switch()
    print 34

def test2():
    print 56
    gr1.switch()
    print 78

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

这里,每个 greenlet 就是一个调用栈——您能够把他想象成一个线程,只不过真正的线程能够并行执行,而同一时刻只能有一个 greenlet 在执行(同一线程里)。正如例子中最后三句话,咱们建立了 gr1gr2 两个不一样的调用栈空间,入口函数分别是 test1test2;这最后一句 gr1.switch() 得多解释一点。github

由于除了 gr1gr2,咱们还有一个栈空间,也就是全部 Python 程序都得有的默认的栈空间——咱们暂且称之为 main,而这一句 gr1.switch() 偏偏实现了从 maingr1 的跳跃,也就是从当前的栈跳到指定的栈。这时,就犹如常规调用 test1() 同样,gr1.switch() 的调用暂时不会返回结果,程序会跳转到 test1 继续执行;只不过区别于普通函数调用时 test1() 会向当前栈压栈,而 gr1.switch() 则会将当前栈存档,替换成 gr1 的栈。如图所示:算法

请输入图片描述

对于这种栈的切换,咱们有时也称之为执行权的转移,或者说 main 交出了执行权,同时 gr1 得到了执行权。Greenlet 在底层是用汇编实现的这样的切换:把当前的栈(main)相关的寄存器啊什么的保存到内存里,而后把本来保存在内存里的 gr1 的相关信息恢复到寄存器里。这种操做速度很是快,比操做系统对多进程调度的上下文切换还要快。代码在这里,有兴趣的同窗能够一块儿研究一下(其中 switch_x32_unix.h 是我写的哈哈)。数据库

回到前面的例子,最后一句 gr1.switch() 调用将执行点跳到了 gr1 的第一句,因而输出了 12。随后顺序执行到 gr2.switch(),继而跳转到 gr2 的第一句,因而输出了 56。接着又是 gr1.switch(),跳回到 gr1,从以前跳出的地方继续——对 gr1 而言就是 gr2.switch() 的调用返回告终果 None,而后输出 34编程

这个时候 test1 执行到头了,gr1 的栈里面空了。Greenlet 设计了 parent greenlet 的概念,就是说,当一个 greenlet 的入口函数执行完以后,会自动切换回其 parent。默认状况下,greenlet 的 parent 就是建立该 greenlet 时所在的那个栈,前面的例子中,gr1gr2 都是在 main 里被建立的,因此他们俩的 parent 都是 main。因此当 gr1 结束的时候,会回到 main 的最后一句,接着 main 结束了,因此整个程序也就结束了——78 历来没有被执行到过。另外,greenlet 的 parent 也能够手工设置。segmentfault

简单来看,greenlet 只是为 Python 语言增长了建立多条执行序列的功能,并且多条执行序列之间的切换还必须得手动显式调用 switch() 才行;这些都跟异步 I/O 没有必然关系。后端

gevent.sleep

接着来看 Gevent。最简单的一个 Gevent 示例就是这样的了:服务器

import gevent
gevent.sleep(1)

貌似很是简单的一个 sleep,却包含了 Gevent 的关键结构,让咱们仔细看一下 sleep 的实现吧。代码在 gevent/hub.py多线程

def sleep(seconds=0):
    hub = get_hub()
    loop = hub.loop
    hub.wait(loop.timer(seconds))

这里我把一些当前用不着的代码作了一些清理,只留下了三句关键的代码,其中就有 Gevent 的两个关键的部件——hublooploop 是 Gevent 的核心部件,也就是主循环核心,默认是用 Cython 写的 libev 的包装(因此性能杠杠滴),稍后会在详细提到它。hub 则是一个 greenlet,里面跑着 loop

hub 是一个单例,从 get_hub() 的源码就能够看出来:

import _thread
_threadlocal = _thread._local()

def get_hub(*args, **kwargs):
    global _threadlocal
    try:
        return _threadlocal.hub
    except AttributeError:
        hubtype = get_hub_class()
        hub = _threadlocal.hub = hubtype(*args, **kwargs)
        return hub

因此第一次执行 get_hub() 的时候,就会建立一个 hub 实例:

class Hub(greenlet):
    loop_class = config('gevent.core.loop', 'GEVENT_LOOP')

    def __init__(self):
        greenlet.__init__(self)
        loop_class = _import(self.loop_class)
        self.loop = loop_class()

一样这是一段精简了的代码,反映了一个 hub 的关键属性——looploop 实例随着 hub 实例的建立而建立,默认的 loop 就是 gevent/core.ppyx 里的 class loop,也能够经过环境变量 GEVENT_LOOP 来自定义。

值得注意的是,截止到 hub = get_hub()loop = hub.loop,咱们都只是建立了 hubloop,并无真正开始跑咱们的主循环。稍安勿躁,第三句就要开始了。

loop 有一堆接口,对应着底层 libev 的各个功能,详见此处。咱们这里用到的是 timer(seconds),该函数返回的是一个 watcher 对象,对应着底层 libev 的 watcher 概念。咱们大概能猜到,这个 watcher 对象会在几秒钟以后作一些什么事情,可是具体怎么作,让咱们一块儿看看 hub.wait() 的实现吧。

def wait(self, watcher):
        waiter = Waiter()
        watcher.start(waiter.switch)
        waiter.get()

代码也不长,不过能看到 watcher 的接口 watcher.start(method),也就是说,当给定的几秒钟过了以后,会调用这里给的函数,也就是 waiter.switch。让咱们再看一下这里用到的 Waiter,都是在同一个文件 hub.py 里面:

from greenlet import getcurrent

class Waiter(object):
    def __init__(self):
        self.hub = get_hub()
        self.greenlet = None

    def switch(self):
        assert getcurrent() is self.hub
        self.greenlet.switch()

    def get(self):
        assert self.greenlet is None
        self.greenlet = getcurrent()
        try:
            self.hub.switch()
        finally:
            self.greenlet = None

这里一样删掉了大量干扰因素。根据前面 wait() 的定义,咱们会先建立一个 waiter,而后调用其 get(),随后几秒钟以后 loop 会调用其 switch()。一个个看。

get() 一上来会保证本身不会被同时调用到(assert),接着就去获取了当前的 greenlet,也就是调用 get() 时所处的栈,一直往前找,找到 sleep(1),因此 getcurrent() 的结果是 mainWaiter 随后将 main 保存在了 self.greenlet 引用中。

下面的一句话是重中之重了,self.hub.switch()!由无论任何上下文中,直接往 hub 里跳。因为这是第一次跳进 hub 里,因此此时 loop 就开始运转了。

正巧,咱们以前已经经过 loop.timer(1)watcher.start(waiter.switch),在 loop 里注册了说,1 秒钟以后去调用 waiter.switchloop 一旦跑起来就会严格执行以前注册的命令。因此呢,一秒钟以后,咱们在 hub 的栈中,调用到了 Waiter.switch()

switch() 里,程序一上来就要验证当前上下文必须得是 hub,翻阅一下前面的代码,这个是必然的。最后,跳到 self.greenlet!还记得它被设置成什么了吗?——main。因而乎,咱们就回到了最初的代码里,gevent.sleep(1) 在通过了 1 秒钟的等待以后终于返回了。

回头看一下这个过程,其实也很简单的:当咱们须要等待一个事件发生时——好比须要等待 1 秒钟的计时器事件,咱们就把当前的执行栈跟这个事件作一个绑定(watcher.start(waiter.switch)),而后把执行权交给 hubhub 则会在事件发生后,根据注册的记录尽快回到原来的断点继续执行。

异步

hub 一旦拿到执行权,就能够作不少事情了,好比切换到别的 greenlet 去执行一些其余的任务,直到这些 greenlet 又主动把执行权交回给 hub。宏观的来看,就是这样的:一个 hub,好多个其余的任务 greenlet(其中没准就包括 main),hub 负责总调度,去依次调用各个任务 greenlet;任务 greenlet 则在执行至下一次断点时,主动切换回 hub。这样一来,许多个任务 greenlet 就能够看似并行地同步运行了,这种任务调度方式叫作协做式的任务调度(cooperative scheduling)。

举个例子:

import gevent

def beep(interval):
    while True:
        print("Beep %s" % interval)
        gevent.sleep(interval)

for i in range(10):
    gevent.spawn(beep, i)

beep(20)

例子里咱们总共建立了 10greenlet,每个都会按照不一样频率输出“蜂鸣”;最后一句的 beep(20) 又让 main greenlet 也不断地蜂鸣。算上 hub,这个例子一共会有 12 个不一样的 greenlet 在协做式地运行。

I/O

Gevent 最主要的功能固然是异步 I/O 了。其实,I/O 跟前面 sleep 的例子没什么本质的区别,只不过 sleep 用的 watchertimer,而 I/O 用到的 watcherio。好比说 wait_read(fileno) 是这样的:

def wait_read(fileno):
    hub = get_hub()
    io = hub.loop.io(fileno, 1)
    return hub.wait(io)

没什么太大区别吧,原理其实都是同样的。基于这个,咱们就能够搞异步 socket 了。socket 的接口较为复杂,这里提取一些标志性的代码一块儿读一下吧:

class socket(object):
    def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0):
        self._sock = _realsocket(family, type, proto)  # 建立底层的 socket
        self._sock.setblocking(0)  # 将其设置为非阻塞的
        fileno = self._sock.fileno()  # 得到其文件描述符
        self.hub = get_hub()  # 本身留一份 hub 的引用,省的每次再现取
        io = self.hub.loop.io  # 快捷方式
        self._read_event = io(fileno, 1)  # socket 的读取事件
        self._write_event = io(fileno, 2)  # socket 的写入事件

    def _wait(self, watcher):
        assert watcher.callback is None  # 一个 socket 只能被一个 greenlet 用
        self.hub.wait(watcher)  # 见以前的例子,等待一个事件发生

    def recv(self, *args):
        sock = self._sock
        while True:
            try:
                return sock.recv(*args)  # 异步接收,要么当即成功,要么当即失败
            except error as ex:
                if ex.args[0] != EWOULDBLOCK:  # 若是失败的话,除了是异步等待的状况,
                    raise  # 其余状况都报错
            self._wait(self._read_event)  # 等待 socket 有数据可读

libev

最后提一点关于 libev 的东西,由于有同窗也问到 Gevent 底层的调度方式。简单来讲,libev 是依赖操做系统底层的异步 I/O 接口实现的,Linux 用的是 epoll,FreeBSD 则是 kqueue。Python 代码里,socket 会建立一堆 io watcher,对应底层则是将一堆文件描述符添加到一个——好比—— epoll 的句柄里。当切换到 hub 以后,libev 会调用底层的 epoll_wait 来等待这些 socket 中可能出现的事件。一旦有事件产生(多是一次出现好多个事件),libev 就会按照优先级依次调用每一个事件的回调函数。注意,epoll_wait 是有超时的,因此一些没法以文件描述符的形式存在的事件也能够有机会被触发。关于 libev 网上还有不少资料,有兴趣你们能够自行查阅。

Gevent 的性能调优

Gevent 不是银弹,不能无限制地建立 greenlet。正如多线程编程同样,用 gevent 写服务器也应该建立一个“微线程池”,超过池子大小的 spawn 应该被阻塞而且开始排队。只有这样,才能保证同时运行的 greenlet 数量不至于多到显著增长异步等待的恢复时间,从而保证每一个任务的响应速度。其实,当池子的大小增长到必定程度以后,CPU 使用量的增速会放缓甚至变为 0,这时继续增长池子大小只能致使回调函数开始排队,不能真正增长吞吐量。正确的作法是增长硬件或者优化代码(提升算法效率、减小无谓调用等)。

关于 pool 的大小,我以为是能够算出来的:

一、在压力较小、pool 资源充足的状况下,测得单个请求平均处理总时间,记做 Ta
二、根据系统需求,估计一下能接受的最慢的请求处理时间,记做 Tm
三、设 Ta 中有 Ts 的时间,执行权是不属于当前处理中的 greenlet 的,好比正在进行异步的数据库访问或是调用远端 API 等后端访问
四、在常规压力下,经过测量后端访问请求处理的平均时间,根据代码实际调用状况测算出 Ts
五、pool 的大小 = (Tm / (Ta - Ts)) * 150%,这里的 150% 是个 buffer 值,拍脑门拍出来的

好比理想状况下平均每一个请求处理须要 20ms,其中平均有 15ms 是花在数据库访问上(假设数据库性能较为稳定,可以线性 scale)。若是最大能容忍的请求处理时间是 500ms 的话,那池子大小应该设置成 (500 / (20 - 15)) * 150% = 150,也就意味着单进程最大并发量是 150

从这个算法也能够看出,花在 Python 端的 CPU 时间越少,系统并发量就越高,而花在后端访问上的时间长短对并发影响不是很大——固然了,依然得假设数据库等后端能够线性 scale。

下面是我以前在 Amazon EC2 m1.small 机器上的部分测试结果,对比了同步多进程和 Gevent 在处理包含异步 PostgreSQL 和 Redis 访问的请求时的性能:

Log Format (per actor)
handling time for 500 requests / Time receiving 500 responses - time per handling / time per request - raw handling rate / request per second

8 actors, 128 testers: 798 rps on client

1230.08 ms / 5649.88 ms - 2.46 ms / 11.30 ms - 406.48 rps / 88.50 rps
1707.71 ms / 5938.53 ms - 3.42 ms / 11.88 ms - 292.79 rps / 84.20 rps
2219.12 ms / 6324.48 ms - 4.44 ms / 12.65 ms - 225.31 rps / 79.06 rps
1446.94 ms / 5491.89 ms - 2.89 ms / 10.98 ms - 345.56 rps / 91.04 rps
1064.61 ms / 5189.07 ms - 2.13 ms / 10.38 ms - 469.66 rps / 96.36 rps
2099.23 ms / 5844.37 ms - 4.20 ms / 11.69 ms - 238.18 rps / 85.55 rps

1 async actor with 8 concurrency limit, 128 testers: 1031 rps on client

3995.44 ms / 560.62 ms - 7.99 ms / 1.12 ms - 125.14 rps / 891.87 rps
4369.57 ms / 575.34 ms - 8.74 ms / 1.15 ms - 114.43 rps / 869.06 rps
4388.47 ms / 590.63 ms - 8.78 ms / 1.18 ms - 113.93 rps / 846.55 rps
4439.61 ms / 579.39 ms - 8.88 ms / 1.16 ms - 112.62 rps / 862.97 rps
3866.82 ms / 574.92 ms - 7.73 ms / 1.15 ms - 129.31 rps / 869.69 rps

1 async actor with no concurrency limit, 128 testers: 987 rps on client

38191.16 ms / 551.76 ms - 76.38 ms / 1.10 ms - 13.09 rps / 906.20 rps
34354.80 ms / 564.43 ms - 68.71 ms / 1.13 ms - 14.55 rps / 885.84 rps
40397.18 ms / 543.23 ms - 80.79 ms / 1.09 ms - 12.38 rps / 920.42 rps
45406.02 ms / 490.45 ms - 90.81 ms / 0.98 ms - 11.01 rps / 1019.48 rps
37106.92 ms / 581.95 ms - 74.21 ms / 1.16 ms - 13.47 rps / 859.18 rps

能看出来,一样是 8 的并发限制,同步比异步处理快两三倍(可是 load balance 拉低了同步的优点),吞吐量上虽比不上异步,但也不差。在去掉并发限制以后,吞吐量变化不大,但处理时间翻了 10 倍(由于大量 callback 开始排队,没法及时被调用到),且不稳定。

相关文章
相关标签/搜索