转自http://xlambda.com/gevent-tutorial/html
gevent是一个基于 libev的并发库。它为各类并发和网络相关的任务提供了整洁的API。
本指南假定读者有中级Python水平,但不要求有其它更多的知识,不期待读者有并发方面的知识。本指南的目标在于给予你须要的工具来开始使用gevent,帮助你驯服现有的并发问题,并从今开始编写异步应用程序。python
按提供贡献的时间前后顺序列出以下: Stephen Diehl Jérémy Bethmont sww Bruno Bigras David Ripton Travis Cline Boris Feld youngsterxyf Eddie Hebert Alexis Metaireau Daniel Velkovgit
同时感谢Denis Bilenko写了gevent和相应的指导以造成本指南。程序员
这是一个以MIT许可证发布的协做文档。你想添加一些内容?或看见一个排版错误? Fork一个分支发布一个request到 Github. 咱们欢迎任何贡献。github
本页也有日文版本。web
在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。express
在任什么时候刻,只有一个协程在运行。json
这与multiprocessing
或threading
等提供真正并行构造的库是不一样的。这些库轮转使用操做系统调度的进程和线程,是真正的并行。数组
并发的核心思想在于,大的任务能够分解成一系列的子任务,后者能够被调度成同时执行或异步执行,而不是一次一个地或者同步地执行。两个子任务之间的切换也就是上下文切换。服务器
在gevent里面,上下文切换是经过yielding来完成的. 在下面的例子里,咱们有两个上下文,经过调用gevent.sleep(0)
,它们各自yield向对方。
import gevent def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
下图将控制流形象化,就像在调试器中单步执行整个程序,以说明上下文切换如何发生。
当咱们在受限于网络或IO的函数中使用gevent,这些函数会被协做式的调度, gevent的真正能力会获得发挥。Gevent处理了全部的细节,来保证你的网络库会在可能的时候,隐式交出greenlet上下文的执行权。这样的一种用法是如何强大,怎么强调都不为过。或者咱们举些例子来详述。
下面例子中的select()
函数一般是一个在各类文件描述符上轮询的阻塞调用。
import time import gevent from gevent import select start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start) def gr1(): # Busy waits for a second, but we don't want to stick around... print('Started Polling: %s' % tic()) select.select([], [], [], 2) print('Ended Polling: %s' % tic()) def gr2(): # Busy waits for a second, but we don't want to stick around... print('Started Polling: %s' % tic()) select.select([], [], [], 2) print('Ended Polling: %s' % tic()) def gr3(): print("Hey lets do some stuff while the greenlets poll, %s" % tic()) gevent.sleep(1) gevent.joinall([ gevent.spawn(gr1), gevent.spawn(gr2), gevent.spawn(gr3), ])
Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at 0.0 seconds Ended Polling: at 2.0 seconds Ended Polling: at 2.0 seconds
下面是另一个多少有点人造色彩的例子,定义一个非肯定性的(non-deterministic) 的task
函数(给定相同输入的状况下,它的输出不保证相同)。此例中执行这个函数的反作用就是,每次task在它的执行过程当中都会随机地停某些秒。
import gevent import random def task(pid): """ Some non-deterministic task """ gevent.sleep(random.randint(0,2)*0.001) print('Task %s done' % pid) def synchronous(): for i in range(1,10): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in xrange(10)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
Synchronous: Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done Asynchronous: Task 3 done Task 7 done Task 9 done Task 2 done Task 4 done Task 1 done Task 8 done Task 6 done Task 0 done Task 5 done
上例中,在同步的部分,全部的task都同步的执行,结果当每一个task在执行时主流程被阻塞(主流程的执行暂时停住)。
程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn
。初始化的greenlet列表存放在数组threads
中,此数组被传给gevent.joinall
函数,后者阻塞当前流程,并执行全部给定的greenlet。执行流程只会在全部greenlet执行完后才会继续向下走。
要重点留意的是,异步的部分本质上是随机的,并且异步部分的总体运行时间比同步要大大减小。事实上,同步部分的最大运行时间,便是每一个task停0.002秒,结果整个队列要停0.02秒。而异步部分的最大运行时间大体为0.002秒,由于没有任何一个task会阻塞其它task的执行。
一个更常见的应用场景,如异步地向服务器取数据,取数据操做的执行时间依赖于发起取数据请求时远端服务器的负载,各个请求的执行时间会有差异。
import gevent.monkey gevent.monkey.patch_socket() import gevent import urllib2 import simplejson as json def fetch(pid): response = urllib2.urlopen('http://json-time.appspot.com/time.json') result = response.read() json_result = json.loads(result) datetime = json_result['datetime'] print('Process %s: %s' % (pid, datetime)) return json_result['datetime'] def synchronous(): for i in range(1,10): fetch(i) def asynchronous(): threads = [] for i in range(1,10): threads.append(gevent.spawn(fetch, i)) gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
就像以前所提到的,greenlet具备肯定性。在相同配置相同输入的状况下,它们老是会产生相同的输出。下面就有例子,咱们在multiprocessing的pool之间执行一系列的任务,与在gevent的pool之间执行做比较。
import time def echo(i): time.sleep(0.001) return i # Non Deterministic Process Pool from multiprocessing.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print(run1 == run2 == run3 == run4) # Deterministic Gevent Pool from gevent.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print(run1 == run2 == run3 == run4)
False True
即便gevent一般带有肯定性,当开始与如socket或文件等外部服务交互时,不肯定性也可能溜进你的程序中。所以尽管gevent线程是一种“肯定的并发”形式,使用它仍然可能会遇到像使用POSIX线程或进程时遇到的那些问题。
涉及并发长期存在的问题就是竞争条件(race condition)。简单来讲,当两个并发线程/进程都依赖于某个共享资源同时都尝试去修改它的时候,就会出现竞争条件。这会致使资源修改的结果状态依赖于时间和执行顺序。这是个问题,咱们通常会作不少努力尝试避免竞争条件,由于它会致使整个程序行为变得不肯定。
最好的办法是始终避免全部全局的状态。全局状态和导入时(import-time)反作用老是会反咬你一口!
gevent对Greenlet初始化提供了一些封装,最经常使用的使用模板之一有
import gevent from gevent import Greenlet def foo(message, n): """ Each thread will be passed the message, and n arguments in its initialization. """ gevent.sleep(n) print(message) # Initialize a new Greenlet instance running the named function # foo thread1 = Greenlet.spawn(foo, "Hello", 1) # Wrapper for creating and running a new Greenlet from the named # function foo, with the passed arguments thread2 = gevent.spawn(foo, "I live!", 2) # Lambda expressions thread3 = gevent.spawn(lambda x: (x+1), 2) threads = [thread1, thread2, thread3] # Block until all threads complete. gevent.joinall(threads)
Hello I live!
除使用基本的Greenlet类以外,你也能够子类化Greenlet类,重载它的_run
方法。
import gevent from gevent import Greenlet class MyGreenlet(Greenlet): def __init__(self, message, n): Greenlet.__init__(self) self.message = message self.n = n def _run(self): print(self.message) gevent.sleep(self.n) g = MyGreenlet("Hi there!", 3) g.start() g.join()
Hi there!
就像任何其余成段代码,Greenlet也可能以不一样的方式运行失败。 Greenlet可能未能成功抛出异常,不能中止运行,或消耗了太多的系统资源。
一个greenlet的状态一般是一个依赖于时间的参数。在greenlet中有一些标志,让你能够监视它的线程内部状态:
started
-- Boolean, 指示此Greenlet是否已经启动ready()
-- Boolean, 指示此Greenlet是否已经中止successful()
-- Boolean, 指示此Greenlet是否已经中止并且没抛异常value
-- 任意值, 此Greenlet代码返回的值exception
-- 异常, 此Greenlet内抛出的未捕获异常import gevent def win(): return 'You win!' def fail(): raise Exception('You fail at failing.') winner = gevent.spawn(win) loser = gevent.spawn(fail) print(winner.started) # True print(loser.started) # True # Exceptions raised in the Greenlet, stay inside the Greenlet. try: gevent.joinall([winner, loser]) except Exception as e: print('This will never be reached') print(winner.value) # 'You win!' print(loser.value) # None print(winner.ready()) # True print(loser.ready()) # True print(winner.successful()) # True print(loser.successful()) # False # The exception raised in fail, will not propogate outside the # greenlet. A stack trace will be printed to stdout but it # will not unwind the stack of the parent. print(loser.exception) # It is possible though to raise the exception again outside # raise loser.exception # or with # loser.get()
True True You win! None True True True False You fail at failing.
当主程序(main program)收到一个SIGQUIT信号时,不能成功作yield操做的 Greenlet可能会令意外地挂起程序的执行。这致使了所谓的僵尸进程,它须要在Python解释器以外被kill掉。
对此,一个通用的处理模式就是在主程序中监听SIGQUIT信号,在程序退出调用gevent.shutdown
。
import gevent import signal def run_forever(): gevent.sleep(1000) if __name__ == '__main__': gevent.signal(signal.SIGQUIT, gevent.shutdown) thread = gevent.spawn(run_forever) thread.join()
超时是一种对一块代码或一个Greenlet的运行时间的约束。
import gevent from gevent import Timeout seconds = 10 timeout = Timeout(seconds) timeout.start() def wait(): gevent.sleep(10) try: gevent.spawn(wait).join() except Timeout: print('Cou