Python 开源异步并发框架的将来

呵呵,这个标题有点大,其实只是想从零开始介绍一下异步的基础,以及 Python 开源异步并发框架的发展和互操做性。html

另外,这是我在 OSTC 2014 作的一个同题演讲,幻灯片在这里,欢迎拍砖。python

开源

Python 是开源的,介绍的这几个框架 TwistedTornadoGeventtulip 也都是开源的,最后这个演讲是在开源大会弄的,因此标题里确定少不了开源。另外,个人 gevent3 项目也是开源的——貌似很多同窗被我起的极品名字给搞混了,特别说明一下,gevent3 虽然有跟 Gevent 同样的接口外貌,但底层倒是 tulip 驱动的(考虑把名字改回 gulip 之类的);请区别于未来会支持 Python 3 的 Gevent 1.1。git

非阻塞

先上一段代码。请原谅我用 Python 代码充当伪代码了,但 Python 的语法实在是太简单了,忍不住啊。程序员

import socket
s = socket.socket()
s.connect(('www.google.com', 80))
print("We are connected to %s:%d" % s.getpeername())

这是很简单的一个客户端 TCP 链接程序。假如网络情况不是很好,执行这段程序时,咱们颇有可能要等个几秒钟,才能看到 We are connected 的输出字样。github

对于这样的代码,咱们就能够说程序阻塞在了 connect() 的调用上;而这样的函数咱们叫作阻塞式的。web

那么非阻塞呢?仍是看一段代码。redis

import socket
s = socket.socket()
s.setblocking(0)

try:
    s.connect(('www.google.com', 80))
except socket.error as e:
    print(str(e))
    i = 0
    while True:
        try:
            print("We are connected to %s:%d" % s.getpeername())
            break
        except:
            print("Let's do some math while waiting: %d" % i)
            i += 1
else:
    print("We are connected to %s:%d" % s.getpeername())

这一下代码就多了——可是并不复杂。express

首先看一开始的变化,多了一句 s.setblocking(0)。这是说,将这个 socket 对象变成非阻塞式的。这样一来,接下来的许多本应阻塞的调用将不会阻塞。django

好比 connect()。非阻塞的 connect() 调用将会当即结束,而无论这个 TCP 链接是否真正创建了——若是 TCP 链接尚未完成握手,那么 connect() 会抛出一个异常说“开始连了,别着急一下子就好”;不然(应该没有不然)就会“正常”地走 try...else 的路线。

抓到这个异常以后呢,咱们就能够充分利用这段本来要阻塞的时间,在链接彻底创建以前作一些有意义的事情——好比数数。我这里网络条件还凑合,通常状况下数到一万多的时候就能跟 Google 连上了。

异步

能够看得出来,阻塞和非阻塞是说函数调用的,调用了以后要等到底层完事儿了以后才能继续的叫作阻塞;调用了以后,要么当即返回,要么当即抛异常,这就是非阻塞。

而与之形影不离的一对儿概念——同步和异步——则说的是一段程序的执行处理方式。通常状况下,阻塞式的调用均可以叫作同步,但非阻塞式的调用不必定是异步的。怎么讲呢,咱们仍是来看几个例子。

while server.running:
    request = server.receive()
    response = handle(request)
    server.send(response)

这片代码片断示意的是同步的处理方式。能够看得出来,接收请求、处理请求、发送响应依次执行,前一个任务完成了才会作下一个;最外面还有一个 while 循环,使之不断地收请求发响应,且是发送完上一个响应以后才会接收下一个请求。请注意,咱们并无看到 receive() 等函数的实现细节,他们在底层能够是阻塞的,也能够是非阻塞的,这都不会影响咱们看到的这片代码片断是同步的。

那么异步的代码看上去是什么样的呢?请容许我用 Twisted 风格的代码来展现,由于异步的代码太“扭曲”了:

while server.running:
    deferred = server.receive()
    deferred.addCallback(on_request)

def on_request(request):
    deferred = handle(request)
    deferred.addCallback(on_response)

def on_response(response):
    server.send(response)

让我来大概地解释一下。为了实现异步,这里的 receive()handle() 都必须是非阻塞的。在 Twisted 中非阻塞的函数会当即返回一个 Deferred 对象,经过给 Deferred 对象添加回调函数,咱们能够实如今这件事情真正完成以后,执行回调函数中定义的接下来要作的事儿。

看到扭曲的程度了吧。先接收一个请求——等等,你不必定当即就能接收到。好吧,等到接收到了的时候(on_request),咱们把这个请求送去处理,而后——等等,处理不必定立刻能完成。那好吧,等处处理完成以后(on_response),咱们再把这个响应发送回去。说实话,我没忍心写,其实发送也不会当即完成……

虽然上面这段代码示例有些过份,仍有一些能够变得更简洁的地方,可是这对于大型项目中异步代码的描述并不失真。难道用所谓的异步框架写代码都会是这么扭曲么?

前面咱们说的异步只是异步编码——从编写代码的方式上来判断。而一般说的异步框架,每每还会展示给用户一些同步的接口(后面还会提到),在框架内部,这些接口也都是用非阻塞的异步代码来实现的。对于这样的框架,咱们仍然叫他们异步框架——总不能叫非阻塞框架,或是同步框架吧。

另外,异步编码也不必定就非要扭曲人性,仍是有不少项目能够简洁明了地编写异步代码的,只不过对于程序员的要求会比编写同步代码稍高一些罢了。

并发与并行

好了,让咱们先把纠结的异步放下,来看看另外两个容易混淆的概念。

估计您已经从视频里听了我办港澳通行证的惨痛经历了,这里就不重复了,但仍然用这个例子来解释一下并发和并行的概念吧。

并行的概念着重于处理端,也就是办理通行证的工做人员。有 5 个窗口开放,就意味着同一时间能够有 5 个业务能够获得并行的处理。对于计算机来讲,并行势必要有多颗处理器,真正从物理上能够并行地处理多个任务;单 CPU 用多线程实现的叫作时分复用——也许超线程除外。

相对于并行着重于处理端,并发的概念则是关于请求端,也就是关于用户的。当咱们谈及朝阳区出入境办证大厅的并发量的时候,咱们是在说该大厅在某一时刻能容纳的前来办证的人数,最大并发量说白了就是大厅里能站下多少人——包括正在办的和排队的。

包括排队的?那往大厅外面使劲儿排呗,这并发量岂不是无限大了?

与并发一块儿的还有很重要的一个概念,就是处理时间。若是一味追求并发量,势必会致使处理时间的大幅上升,大量请求多半时间在排队,这样并不能算是一个高效的系统设计。因此在系统资源到达瓶颈的时候,也许限制并发量,拒绝一些请求也许是一个明智的选择。

并发并非不关心处理端,只不过多核并行或者单核时分复用都能实现并发,并且在实践中这两种实现方法每每会同时使用。多核并行实现的并发,其任务调度主要由操做系统完成,咱们接下来着重关心一下单线程并发的任务调度问题。

事件驱动的单线程并发

只有一个线程,用阻塞调用是确定没法实现并发的——除非把每次仅服务一个客户叫作“并发量为 1 的并发”。因此,咱们必然会用到非阻塞调用。

请回忆一下前面咱们演示非阻塞调用的那个例子,咱们在等待链接创建的过程当中,作了一些其余的有意义的事情,一旦链接创建成功,咱们会接着以前作一些关于链接的事情——输出对方的地址。如今咱们试着扩展这个例子,实现并发链接——咱们同时启动 100 个 TCP 链接,任何一个链接成功了就当即输出对方地址。一开始咱们能够这么写:

import socket
sockets = {}
for i in range(100):
    s = socket.socket()
    sockets[s.fileno()] = s
    s.setblocking(0)
    try:
        s.connect(('www.google.com', 80))
    except:
        pass

咱们将这 100 个 socket 对象按照他们的文件描述符保存在了一个叫作 sockets 的字典里,而且一一调用了非阻塞的 connect() 函数。

但是,接下来怎么写呢?难道要重复调用每个 socket 对象的 getpeername() 函数,直到他们都正确返回了为止?CPU 消耗太大了吧。

操做系统给咱们提供了一些接口,专门用于这类问题的:select 及其升级版 epoll(Linux) 和 kqueue(*BSD 和 Mac OS X),他们一般也被统称为 select 函数。select 是一种阻塞调用,专门用于从一些文件描述符中,选出那些有新事件到达的描述符,其中事件包括可读、可写和出错。换句话讲呢,就是监视给出的 socket,任何一个有动静了就当即返回有动静的描述符。

好比前面这个例子里,咱们但愿在任何一个链接成功创建的时候,输出该链接的目的地址。因而接下来就能够这么写:

import select
while sockets:
    fds = select.select([], list(sockets.keys()), [])[1]
    for fd in fds:
        s = sockets.pop(fd)
        print("%d connected to %s:%d" % ((fd,) + s.getpeername()))

也就是说,每次循环,咱们都会从剩余的链接中,选出一些可写的 socket 对象——那意味着链接已经成功创建了,而后将他们的目标地址输出出来。

这就是一个很简单的事件驱动的异步并发了,虽然咱们只是建立了 100 个 TCP 链接,但咱们并发了,是事件驱动的了,并且咱们异步地调用了后续的操做——输出目的地址。

异步并发不过如此,而已。

框架

只用 socketselect 来写一个异步 web 服务器也行,只不过会出一两条人命而已。虽然是开玩笑,可是咱们多数状况下仍是会选择使用一些现有的框架。

何谓框架呢,其实就是把上一小节的例子代码给拆开,一部分是仅包含 www.google.comprint() 的所谓用户代码,另外一部分就是全部剩下的叫作框架的东西。好比这样:

import socket

sockets = {}
for i in range(   ):
    s = 
    sockets[s.fileno()] = s
    s.setblocking(0)
    try:
        s.       (              )
    except:
        pass

import select
while sockets:
    fds = select.select([], list(sockets.keys()), [])[ ]
    for fd in fds:
        s = sockets.pop(fd)
             (            s.          )

固然这段代码并非一个框架,由于它根本没法运行。可是咱们能够经过它看到一个异步框架应该有的东西:

  1. 用于建立与框架契合的、非阻塞的 I/O 对象的接口
  2. 有一个主循环,用户能够启动它
  3. 用户能够在关心的事件发生时,执行本身的代码

回调函数和 Tornado

让咱们以 Tornado 为例,来看一下最基本的异步框架是怎么用的——虽然 Tornado 并不只限于此。

sock = socket.socket()
sock.setblocking(0)
sock.bind((“”, 80))
sock.listen(128)

def on_conn(fd, events):
    conn, address = sock.accept()
    conn.send(b’Hello’)

io_loop = ioloop.IOLoop.instance()
io_loop.add_handler(sock.fileno(), on_conn, io_loop.READ)
io_loop.start()

这是一个简单的服务器程序,它会向每个连进来的客户端发送一句问候。其中 add_handler() 的调用就是——我认为—— Tornado 的经典用法,也就是注册回调函数。当有链接进来的时候,Tornado 就会根据要求来调用 on_conn(),后者随即会与客户端链接并送上问候。

Twisted 和封装……和回调函数

Twisted 里是各类封装,经过 Transport 将 socket 对象封装的更隐蔽,经过 Protocol 来实现用户协议的封装,像这样:

from twisted.internet import protocol, reactor

class Echo(protocol.Protocol):
    def dataReceived(self, data):
        self.transport.write(data)

class EchoFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Echo()

reactor.listenTCP(1234, EchoFactory())
reactor.run()

对于回调函数,Twisted 则发明了著名的 Deferred 用以实现事件源与回调函数的分离,其实本质上没有区别,只是在写法上略有不一样,这里就很少说了。

同步地异步

正如前面提到的,异步的编码方式——不管是 Tornado 的回调函数,仍是 TwistedDeferred——想要用的出彩,须要程序员有相对较高的心理素质和职业修养。那若是能正常地、用同步的方式来编写异步执行的代码呢?

借助 Python 的 generator 功能TwistedTornado 纷纷提供了这样的功能。好比下面这一段 Twisted 的代码(请关注开头的修饰器和代码中的 yield):

@defer.inlineCallbacks
def main(endpoint, username="alice", password=“secret”):
    endpoint = endpoints.clientFromString(reactor, strport)
    factory = protocol.Factory()
    factory.protocol = imap4.IMAP4Client
    try:
        client = yield endpoint.connect(factory)
        yield client.login(username, password)
        yield client.select('INBOX')
        info = yield client.fetchEnvelope(imap4.MessageSet(1))
        print 'First message subject:', info[1]['ENVELOPE'][1]
    except:
        print "IMAP4 client interaction failed"
        failure.Failure().printTraceback()
task.react(main, sys.argv[1:])

这里的第一个 yield 中,endpoint.connect() 返回的是一个 Deferred 对象,其回调函数的参数才是前面的 client 对象。经过 yieldinlineCallbacks 修饰器的配合,咱们就把回调函数和 main 函数揉在了一块儿,后面那三个 yield 也是如此,这样的代码看上去是同步的,执行的底层实则是异步的。Tornado 也有相似的用法,这里就很少说了。

神奇的 yield!在这里到底发生了什么事情呢?我管它叫作异步切换,具体的代码能够看 inlineCallbacks 的实现。简单来讲呢,yield 以前,connect() 在主循环里注册了一个关于链接创立的事件监听,而后经过 yield 把事件的处理权交给了 inlineCallbacks,同时将当前函数的执行状态挂起(yield 的功能,能够把栈保存下来),切换到 inlineCallbaks 里继续执行,而 inlineCallbacks 则会返回至主循环,继续执行别的异步任务,直至前述事件发生且主循环排到了该事件,主循环会调用 inlineCallbacks 里的回调函数,后者会将以前挂起的执行状态恢复,这样 client 就被赋上了正确的值。

总的来看,在 yield 的时候,当前执行流程会被暂停以等待事件,别的执行流程会插进来执行,直至事件发生后,当前执行流程才有可能恢复执行。这很是相似于操做系统里面的任务调度,因此我管它叫作异步切换,只不过这种切换是主动进行的,而不是操做系统强制的。因此,若是你不 yield 交出执行权,别的执行流程永远没有办法被执行到,这也是单线程异步并发的一个须要注意的点。另外,单线程异步并发须要有足够的异步切换才能作到近似公平的排程,因此很是适合 I/O 密集型的运算,而 CPU 密集型的运算在这里每每会遇到比较严重的问题。

隐式的异步切换

在写单线程异步代码的时候,切记不要混合调用底层会阻塞的代码,由于那样会阻塞整个线程,致使全部并发的处理时间增长,最终会致使严重的性能问题。若是有一些阻塞的、同步的遗留代码,那该如何是好呢?答案是:把它们统一改为非阻塞的,或者使用多线程/多进程来处理。但是,若是要改为非阻塞的形式,那得加多少 yield 呀!

不要紧,还有隐式的异步切换呢。一般咱们把这种须要显式地写 yield 的代码叫作显式的异步切换,与之相对的就是隐式的异步切换。好比下面这段代码,我说它有隐式的异步切换,您信吗?

import socket
s = socket.socket()
s.connect(('www.google.com', 80))
print("We are connected to %s:%d" % s.getpeername())

这不就是文章一开头的那个例子嘛。别急,若是在最前面加这么两句,状况就彻底不同了:

from gevent import monkey
monkey.patch_all()

Gevent 就是隐式的异步切换的表明。经过所谓的 monkey patch,Gevent 把系统库里的 socket 等模块,替换成了 Gevent 本身提供的相应的非阻塞模块。这样,上面的代码就变成(底层)异步的了。考虑到 monkey patch 的侵入性,您也能够考虑直接使用 Gevent 提供的模块,好比这样:

from gevent import socket

Gevent 这样的隐式的异步切换有个好处很明显,就是能够很容易地将阻塞式的遗留代码迁移到 Gevent 上来,而不须要额外修改大量代码,这对于须要异步并发支持的许多大型现有项目来讲,无疑是为数很少的几个选择之一——好比说 Django

可是,有很多人也认为,隐式的异步切换的代价太大——倒不是说它的性能有多差,而是这种写法把异步切换隐藏的太深了,不知道何时就切换到别的地方去执行了。这样带来的直接问题就是——跟常规共享状态的多线程编程同样——咱们很难保证在一段程序的执行过程当中,某些本地状态不会被别的代码修改,再加上状态同步的代价,隐式的异步切换并不被特别看好。若是非得要用,记得尽可能少共享状态,多用队列来实现信息传递,而后当心编码,仔细检查。

绿色的 Gevent

Gevent 之因此能实现隐式的异步切换,主要归功于 GreenletGreenletStackless Python 的一个分项目,用于在标准 CPython 中实现微线程(也称协程、绿色线程)。

Python 中的 Greenlet 跟常规线程相似,也是会在独立的空间中执行一段代码,也有本身独立的栈空间。不一样的是:

  1. Greenlet 并不启动任何操做系统的线程,是绿色产品
  2. Greenlet 任务之间的调度须要每一个微线程里的代码本身显式地实现

用官方的一个例子演示一下这两个特色吧:

from greenlet import greenlet

def test1():
    print 12
    gr2.switch()
    print 34

def test2():
    print 56
    gr1.switch()
    print 78

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

这个例子里一共有三个微线程,分别是 main(也就是最外层默认的主微线程,自动建立的)、gr1gr2。程序一直顺序执行,直至最后一句 gr1.switch(),由 main 微线程切换至 gr1gr1 输出 12 以后,又切换至 gr2;接着 gr2 输出 56 后,又切换回 gr1 以前的切出点,继续输出 34;这是 gr1 结束了,系统会自动切换回 gr1 的父微线程——也就是 main 的最后一句 switch() 返回,至此整个程序结束。注意,78 并无机会被输出。

Gevent 的主循环叫作 Hub,跑在一个单独的 greenlet 里。用户的程序从 main greenlet 开始执行,直至第一个异步切换。此时,Gevent 会把当前微线程——也就是 main ——与异步事件作一个关联,而后切换到 HubHub 因而开始运转,当某些事件发生时,Gevent 就会切换到相应关联的 greenlet 来执行,直至他们结束返回 Hub,或者主动切换回 Hub。好比 main 等待的事件发生了,Hub 就会切到 main 上执行——固然,若是这时 main 结束了,就不会像其余 greenlet 同样再返回 Hub 了。

因此,greenlet 和 generator、Deferred 同样,其实都是用来实现回调封装的一些工具,因此前面提到过的一些异步并发的注意事项,Gevent 也都适用。

互操做性存在的问题

多种框架的存在,说好听了是百花齐放各显神通、竞争才有发展,说难听了就是碎片化、选择恐惧症和维护代价巨大。好比说,一样是一个 Python 的 PostgreSQL 链接适配程序,有支持 Twistedtxpostgres,有支持 Tornadomomoko,还有 Gevent 须要的 psycogreen——有啥话咱不能一气儿说完呢?若是上游的 psycopg 更新了,这么多的适配器,是否是得要跟着更新哪。

再一个问题就是遗留代码。若是一个项目一直在用 Twisted,有一天老板拿着张光盘说给我把这个弄上去,打开一看全都是 .pyc 文件,木有源代码——直接调用会有以前提到的阻塞主线程的问题,扔到线程池里作又不甘心。若是能在 Twisted 里用 Gevent 就行了(如今确实能够,不过会替换 Twisted 的一部分)。

将来

asyncio 这个项目其实叫作 tulip,主要开发也都在那里,由于要进 Python 标准库了,因此才几经周折选了 asyncio 这么一个名字。asyncio 是 Python 做者的一个新项目,要求至少是 Python 3.3(手动安装),Python 3.4 里它就已是标准库的一部分了。

之因此要求 Python 3.3,是由于 asyncio 的微线程依赖于 Python 3.3 的新语法:yield from。区别于 yieldyield from co 实现了相似于这样的功能:

for x in co:
    yield x

这里说“相似”,是由于实际状况要比这复杂不少,但意思是同样的:将内层迭代器的元素无缝地合并到外层的迭代器里。有了这个,asyncio 就能够很容易地作微线程的嵌套了——也就是在一个微线程里面等待另外一个结束返回结果。

asyncio 做为又一个异步并发框架,与其余现有框架差异并不大:主循环相似于 Twisted 的 reactor,Future 对回调函数进行封装相似于 Deferred,可选的微线程相似于 inlineCallbacks,基于 yield from 的显式的异步切换相似于 yield,这里就很少介绍了,总的来看很是像 Twisted。可是呢,它能进入标准库,仍是有缘由的。

互操做性

asyncio 做为参考实现,与其规格文档 PEP 3156 是一块儿作出来的,蟒爹在作的过程当中尤为关注了互操做性。

好比 asyncio 的主循环就是能够任意替换的,任何知足 asyncio 主循环接口要求的核心均可以被安装上去。为了作到这一点,PEP 3156 定义了严格的主循环接口,将 asyncio 的框架代码部分与主循环核心彻底分离。这样一来,许多现有框架加个壳就能够支持 asyncio 了——不用改现有代码,写一个现有主循环接口到 asyncio 主循环接口的适配层,替换掉 asyncio 自带的主循环,这样 asyncio 的代码就能够跑在现有框架上面了。

另外一个方向也是行得通的。PEP 3156 一样定义了丰富而清晰的用户接口,咱们可使用这些接口来实现一个现有框架的主循环替代品,这样就能够在不替换 asyncio 已有主循环的前提下,将别的框架的代码嫁接到 asyncio 上来。好比说个人 gevent3 就是这么一个例子,我将 Gevent 中原有的 libev 代码删掉,用 asyncio 实现了一份 Gevent Hub,这样,gevent 的代码就能够跑在 asyncio 框架上了。更使人兴奋的是,若是 asyncio 使用的主循环核心又刚好是好比说 Twisted,那么原先分别依赖 GeventTwisted 的代码,如今就能够跑在一块儿了,甚至互相调用也是能够的。

好比下面一段示例代码就演示了三个框架的融合:

import asyncio
import gevent  ## gevent3
import redis
from gevent import socket
from redis import connection
from twisted.web import server, resource
from twisted.internet import reactor


asyncio.set_event_loop(some_twisted_wrapper)


class GreenInetConnection(connection.Connection):
    def _connect(self):
        #noinspection PyUnresolvedReferences
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(self.socket_timeout)
        sock.connect((self.host, self.port))
        return sock


class HelloResource(resource.Resource):
    isLeaf = True

    def render_GET(self, request):
        gevent.spawn(self.green_GET, request)
        return server.NOT_DONE_YET

    def green_GET(self, request):
        r = redis.StrictRedis(
            connection_pool=connection.ConnectionPool(
                connection_class=GreenInetConnection))
        numberRequests = r.incr("numberRequests")
        request.setHeader("content-type", "text/plain")
        request.write("I am request #" + str(numberRequests) + "\n")
        request.finish()


reactor.listenTCP(8080, server.Site(HelloResource()))
asyncio.run_forever()

代码演示了一个简单的 Twisted web 服务器,使用 Gevent 来处理逻辑,asyncio 则起到了牵线搭桥的做用。

虽然目前这段代码还不能运行,可是我相信在不久的未来,这种程度的互操做性终将实现。

更新:gevent3 项目已更名为 tulipcore(连接仍然有效),第一个 alpha 版本已经发布至 pypi.python.org。

相关文章
相关标签/搜索