这一篇主要想跟你们分享一下 Gevent 实现的基础逻辑,也是有同窗对这个很感兴趣,因此贴出来跟你们一块儿分享一下。python
咱们知道 Gevent 是基于 Greenlet 实现的,greenlet 有的时候也被叫作微线程或者协程。其实 Greenlet 自己很是简单,其自身实现的功能也很是直接。区别于常规的编程思路——顺序执行、调用进栈、返回出栈—— Greenlet 提供了一种在不一样的调用栈之间自由跳跃的功能。从一个简单的例子来看一下吧(摘自官方文档):git
from greenlet import greenlet def test1(): print 12 gr2.switch() print 34 def test2(): print 56 gr1.switch() print 78 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
这里,每个 greenlet
就是一个调用栈——您能够把他想象成一个线程,只不过真正的线程能够并行执行,而同一时刻只能有一个 greenlet
在执行(同一线程里)。正如例子中最后三句话,咱们建立了 gr1
和 gr2
两个不一样的调用栈空间,入口函数分别是 test1
和 test2
;这最后一句 gr1.switch()
得多解释一点。github
由于除了 gr1
和 gr2
,咱们还有一个栈空间,也就是全部 Python 程序都得有的默认的栈空间——咱们暂且称之为 main
,而这一句 gr1.switch()
偏偏实现了从 main
到 gr1
的跳跃,也就是从当前的栈跳到指定的栈。这时,就犹如常规调用 test1()
同样,gr1.switch()
的调用暂时不会返回结果,程序会跳转到 test1
继续执行;只不过区别于普通函数调用时 test1()
会向当前栈压栈,而 gr1.switch()
则会将当前栈存档,替换成 gr1
的栈。如图所示:算法
对于这种栈的切换,咱们有时也称之为执行权的转移,或者说 main
交出了执行权,同时 gr1
得到了执行权。Greenlet 在底层是用汇编实现的这样的切换:把当前的栈(main
)相关的寄存器啊什么的保存到内存里,而后把本来保存在内存里的 gr1
的相关信息恢复到寄存器里。这种操做速度很是快,比操做系统对多进程调度的上下文切换还要快。代码在这里,有兴趣的同窗能够一块儿研究一下(其中 switch_x32_unix.h
是我写的哈哈)。数据库
回到前面的例子,最后一句 gr1.switch()
调用将执行点跳到了 gr1
的第一句,因而输出了 12
。随后顺序执行到 gr2.switch()
,继而跳转到 gr2
的第一句,因而输出了 56
。接着又是 gr1.switch()
,跳回到 gr1
,从以前跳出的地方继续——对 gr1
而言就是 gr2.switch()
的调用返回告终果 None
,而后输出 34
。编程
这个时候 test1
执行到头了,gr1
的栈里面空了。Greenlet 设计了 parent greenlet 的概念,就是说,当一个 greenlet
的入口函数执行完以后,会自动切换回其 parent。默认状况下,greenlet
的 parent 就是建立该 greenlet
时所在的那个栈,前面的例子中,gr1
和 gr2
都是在 main
里被建立的,因此他们俩的 parent 都是 main
。因此当 gr1
结束的时候,会回到 main
的最后一句,接着 main
结束了,因此整个程序也就结束了——78
历来没有被执行到过。另外,greenlet
的 parent 也能够手工设置。segmentfault
简单来看,greenlet 只是为 Python 语言增长了建立多条执行序列的功能,并且多条执行序列之间的切换还必须得手动显式调用 switch()
才行;这些都跟异步 I/O 没有必然关系。后端
gevent.sleep
接着来看 Gevent。最简单的一个 Gevent 示例就是这样的了:服务器
import gevent gevent.sleep(1)
貌似很是简单的一个 sleep
,却包含了 Gevent 的关键结构,让咱们仔细看一下 sleep
的实现吧。代码在 gevent/hub.py
:多线程
def sleep(seconds=0): hub = get_hub() loop = hub.loop hub.wait(loop.timer(seconds))
这里我把一些当前用不着的代码作了一些清理,只留下了三句关键的代码,其中就有 Gevent 的两个关键的部件——hub
和 loop
。loop
是 Gevent 的核心部件,也就是主循环核心,默认是用 Cython 写的 libev 的包装(因此性能杠杠滴),稍后会在详细提到它。hub
则是一个 greenlet,里面跑着 loop
。
hub
是一个单例,从 get_hub()
的源码就能够看出来:
import _thread _threadlocal = _thread._local() def get_hub(*args, **kwargs): global _threadlocal try: return _threadlocal.hub except AttributeError: hubtype = get_hub_class() hub = _threadlocal.hub = hubtype(*args, **kwargs) return hub
因此第一次执行 get_hub()
的时候,就会建立一个 hub
实例:
class Hub(greenlet): loop_class = config('gevent.core.loop', 'GEVENT_LOOP') def __init__(self): greenlet.__init__(self) loop_class = _import(self.loop_class) self.loop = loop_class()
一样这是一段精简了的代码,反映了一个 hub
的关键属性——loop
。loop
实例随着 hub
实例的建立而建立,默认的 loop
就是 gevent/core.ppyx
里的 class loop
,也能够经过环境变量 GEVENT_LOOP
来自定义。
值得注意的是,截止到 hub = get_hub()
和 loop = hub.loop
,咱们都只是建立了 hub
和 loop
,并无真正开始跑咱们的主循环。稍安勿躁,第三句就要开始了。
loop
有一堆接口,对应着底层 libev 的各个功能,详见此处。咱们这里用到的是 timer(seconds)
,该函数返回的是一个 watcher
对象,对应着底层 libev 的 watcher 概念。咱们大概能猜到,这个 watcher
对象会在几秒钟以后作一些什么事情,可是具体怎么作,让咱们一块儿看看 hub.wait()
的实现吧。
def wait(self, watcher): waiter = Waiter() watcher.start(waiter.switch) waiter.get()
代码也不长,不过能看到 watcher
的接口 watcher.start(method)
,也就是说,当给定的几秒钟过了以后,会调用这里给的函数,也就是 waiter.switch
。让咱们再看一下这里用到的 Waiter
,都是在同一个文件 hub.py
里面:
from greenlet import getcurrent class Waiter(object): def __init__(self): self.hub = get_hub() self.greenlet = None def switch(self): assert getcurrent() is self.hub self.greenlet.switch() def get(self): assert self.greenlet is None self.greenlet = getcurrent() try: self.hub.switch() finally: self.greenlet = None
这里一样删掉了大量干扰因素。根据前面 wait()
的定义,咱们会先建立一个 waiter
,而后调用其 get()
,随后几秒钟以后 loop
会调用其 switch()
。一个个看。
get()
一上来会保证本身不会被同时调用到(assert
),接着就去获取了当前的 greenlet
,也就是调用 get()
时所处的栈,一直往前找,找到 sleep(1)
,因此 getcurrent()
的结果是 main
。Waiter
随后将 main
保存在了 self.greenlet
引用中。
下面的一句话是重中之重了,self.hub.switch()
!由无论任何上下文中,直接往 hub
里跳。因为这是第一次跳进 hub
里,因此此时 loop
就开始运转了。
正巧,咱们以前已经经过 loop.timer(1)
和 watcher.start(waiter.switch)
,在 loop
里注册了说,1
秒钟以后去调用 waiter.switch
,loop
一旦跑起来就会严格执行以前注册的命令。因此呢,一秒钟以后,咱们在 hub
的栈中,调用到了 Waiter.switch()
。
在 switch()
里,程序一上来就要验证当前上下文必须得是 hub
,翻阅一下前面的代码,这个是必然的。最后,跳到 self.greenlet
!还记得它被设置成什么了吗?——main
。因而乎,咱们就回到了最初的代码里,gevent.sleep(1)
在通过了 1
秒钟的等待以后终于返回了。
回头看一下这个过程,其实也很简单的:当咱们须要等待一个事件发生时——好比须要等待 1
秒钟的计时器事件,咱们就把当前的执行栈跟这个事件作一个绑定(watcher.start(waiter.switch)
),而后把执行权交给 hub
;hub
则会在事件发生后,根据注册的记录尽快回到原来的断点继续执行。
hub
一旦拿到执行权,就能够作不少事情了,好比切换到别的 greenlet
去执行一些其余的任务,直到这些 greenlet
又主动把执行权交回给 hub
。宏观的来看,就是这样的:一个 hub
,好多个其余的任务 greenlet
(其中没准就包括 main
),hub
负责总调度,去依次调用各个任务 greenlet
;任务 greenlet
则在执行至下一次断点时,主动切换回 hub
。这样一来,许多个任务 greenlet
就能够看似并行地同步运行了,这种任务调度方式叫作协做式的任务调度(cooperative scheduling)。
举个例子:
import gevent def beep(interval): while True: print("Beep %s" % interval) gevent.sleep(interval) for i in range(10): gevent.spawn(beep, i) beep(20)
例子里咱们总共建立了 10
个 greenlet
,每个都会按照不一样频率输出“蜂鸣”;最后一句的 beep(20)
又让 main greenlet
也不断地蜂鸣。算上 hub
,这个例子一共会有 12
个不一样的 greenlet
在协做式地运行。
Gevent 最主要的功能固然是异步 I/O 了。其实,I/O 跟前面 sleep
的例子没什么本质的区别,只不过 sleep
用的 watcher
是 timer
,而 I/O 用到的 watcher
是 io
。好比说 wait_read(fileno)
是这样的:
def wait_read(fileno): hub = get_hub() io = hub.loop.io(fileno, 1) return hub.wait(io)
没什么太大区别吧,原理其实都是同样的。基于这个,咱们就能够搞异步 socket 了。socket 的接口较为复杂,这里提取一些标志性的代码一块儿读一下吧:
class socket(object): def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0): self._sock = _realsocket(family, type, proto) # 建立底层的 socket self._sock.setblocking(0) # 将其设置为非阻塞的 fileno = self._sock.fileno() # 得到其文件描述符 self.hub = get_hub() # 本身留一份 hub 的引用,省的每次再现取 io = self.hub.loop.io # 快捷方式 self._read_event = io(fileno, 1) # socket 的读取事件 self._write_event = io(fileno, 2) # socket 的写入事件 def _wait(self, watcher): assert watcher.callback is None # 一个 socket 只能被一个 greenlet 用 self.hub.wait(watcher) # 见以前的例子,等待一个事件发生 def recv(self, *args): sock = self._sock while True: try: return sock.recv(*args) # 异步接收,要么当即成功,要么当即失败 except error as ex: if ex.args[0] != EWOULDBLOCK: # 若是失败的话,除了是异步等待的状况, raise # 其余状况都报错 self._wait(self._read_event) # 等待 socket 有数据可读
最后提一点关于 libev 的东西,由于有同窗也问到 Gevent 底层的调度方式。简单来讲,libev 是依赖操做系统底层的异步 I/O 接口实现的,Linux 用的是 epoll,FreeBSD 则是 kqueue。Python 代码里,socket 会建立一堆 io
watcher,对应底层则是将一堆文件描述符添加到一个——好比—— epoll 的句柄里。当切换到 hub
以后,libev 会调用底层的 epoll_wait
来等待这些 socket 中可能出现的事件。一旦有事件产生(多是一次出现好多个事件),libev 就会按照优先级依次调用每一个事件的回调函数。注意,epoll_wait
是有超时的,因此一些没法以文件描述符的形式存在的事件也能够有机会被触发。关于 libev 网上还有不少资料,有兴趣你们能够自行查阅。
Gevent 不是银弹,不能无限制地建立 greenlet。正如多线程编程同样,用 gevent 写服务器也应该建立一个“微线程池”,超过池子大小的 spawn
应该被阻塞而且开始排队。只有这样,才能保证同时运行的 greenlet 数量不至于多到显著增长异步等待的恢复时间,从而保证每一个任务的响应速度。其实,当池子的大小增长到必定程度以后,CPU 使用量的增速会放缓甚至变为 0,这时继续增长池子大小只能致使回调函数开始排队,不能真正增长吞吐量。正确的作法是增长硬件或者优化代码(提升算法效率、减小无谓调用等)。
关于 pool 的大小,我以为是能够算出来的:
一、在压力较小、pool 资源充足的状况下,测得单个请求平均处理总时间,记做 Ta
二、根据系统需求,估计一下能接受的最慢的请求处理时间,记做 Tm
三、设 Ta
中有 Ts
的时间,执行权是不属于当前处理中的 greenlet 的,好比正在进行异步的数据库访问或是调用远端 API 等后端访问
四、在常规压力下,经过测量后端访问请求处理的平均时间,根据代码实际调用状况测算出 Ts
五、pool 的大小 = (Tm / (Ta - Ts)) * 150%
,这里的 150%
是个 buffer 值,拍脑门拍出来的
好比理想状况下平均每一个请求处理须要 20ms
,其中平均有 15ms
是花在数据库访问上(假设数据库性能较为稳定,可以线性 scale)。若是最大能容忍的请求处理时间是 500ms
的话,那池子大小应该设置成 (500 / (20 - 15)) * 150% = 150
,也就意味着单进程最大并发量是 150
。
从这个算法也能够看出,花在 Python 端的 CPU 时间越少,系统并发量就越高,而花在后端访问上的时间长短对并发影响不是很大——固然了,依然得假设数据库等后端能够线性 scale。
下面是我以前在 Amazon EC2 m1.small 机器上的部分测试结果,对比了同步多进程和 Gevent 在处理包含异步 PostgreSQL 和 Redis 访问的请求时的性能:
Log Format (per actor) handling time for 500 requests / Time receiving 500 responses - time per handling / time per request - raw handling rate / request per second
8 actors, 128 testers: 798 rps on client
1230.08 ms / 5649.88 ms - 2.46 ms / 11.30 ms - 406.48 rps / 88.50 rps 1707.71 ms / 5938.53 ms - 3.42 ms / 11.88 ms - 292.79 rps / 84.20 rps 2219.12 ms / 6324.48 ms - 4.44 ms / 12.65 ms - 225.31 rps / 79.06 rps 1446.94 ms / 5491.89 ms - 2.89 ms / 10.98 ms - 345.56 rps / 91.04 rps 1064.61 ms / 5189.07 ms - 2.13 ms / 10.38 ms - 469.66 rps / 96.36 rps 2099.23 ms / 5844.37 ms - 4.20 ms / 11.69 ms - 238.18 rps / 85.55 rps
1 async actor with 8 concurrency limit, 128 testers: 1031 rps on client
3995.44 ms / 560.62 ms - 7.99 ms / 1.12 ms - 125.14 rps / 891.87 rps 4369.57 ms / 575.34 ms - 8.74 ms / 1.15 ms - 114.43 rps / 869.06 rps 4388.47 ms / 590.63 ms - 8.78 ms / 1.18 ms - 113.93 rps / 846.55 rps 4439.61 ms / 579.39 ms - 8.88 ms / 1.16 ms - 112.62 rps / 862.97 rps 3866.82 ms / 574.92 ms - 7.73 ms / 1.15 ms - 129.31 rps / 869.69 rps
1 async actor with no concurrency limit, 128 testers: 987 rps on client
38191.16 ms / 551.76 ms - 76.38 ms / 1.10 ms - 13.09 rps / 906.20 rps 34354.80 ms / 564.43 ms - 68.71 ms / 1.13 ms - 14.55 rps / 885.84 rps 40397.18 ms / 543.23 ms - 80.79 ms / 1.09 ms - 12.38 rps / 920.42 rps 45406.02 ms / 490.45 ms - 90.81 ms / 0.98 ms - 11.01 rps / 1019.48 rps 37106.92 ms / 581.95 ms - 74.21 ms / 1.16 ms - 13.47 rps / 859.18 rps
能看出来,一样是 8
的并发限制,同步比异步处理快两三倍(可是 load balance 拉低了同步的优点),吞吐量上虽比不上异步,但也不差。在去掉并发限制以后,吞吐量变化不大,但处理时间翻了 10 倍(由于大量 callback 开始排队,没法及时被调用到),且不稳定。