Greenlethtml
在gevent里面最多应用到的就是greenlet,一个轻量级的协程实现。在任什么时候间点,只有一个greenlet处于运行状态。Greenlet与multiprocessing 和 threading这两个库提供的真正的并行结构的区别在于这两个库会真正的切换进程,POSIX线程是由操做系统来负责调度,而且它们是真正并行的。python
应对并发的主要思路就是将一个大的任务分解成一个子任务的集合而且可以让它并行或者异步地执行,而不是一次执行一个或者同步执行。在两个子任务中的切换被称为上下文切换。shell
gevent里面的上下文切换是很是平滑的。在下面的例子程序中,咱们能够看到两个上下文经过调用 gevent.sleep()来互相切换。express
import gevent def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
这段程序的执行结果以下:编程
Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
从这个执行结果能够看出这个程序的执行过程,在这里的两个函数是交替执行的。json
gevent的真正威力是在处理网络和带有IO阻塞的功能时可以这些任务协调地运行。gevent来实现了这些具体的细节来保证在须要的时候greenlet上下文进行切换。在这里用一个例子来讲明。安全
import time import gevent from gevent import select start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start) def gr1(): # Busy waits for a second, but we don't want to stick around... print('Started Polling: ', tic()) select.select([], [], [], 2) print('Ended Polling: ', tic()) def gr2(): # Busy waits for a second, but we don't want to stick around... print('Started Polling: ', tic()) select.select([], [], [], 2) print('Ended Polling: ', tic()) def gr3(): print("Hey lets do some stuff while the greenlets poll, at", tic()) gevent.sleep(1) gevent.joinall([ gevent.spawn(gr1), gevent.spawn(gr2), gevent.spawn(gr3), ])
在上面的例子里,select() 一般是一个阻塞的调用。服务器
程序的执行结果以下:网络
Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at at 0.0 seconds Ended Polling: at 2.0 seconds Ended Polling: at 2.0 seconds
接下来一个例子中能够看到gevent是安排各个任务的执行的。session
import gevent import random def task(pid): """ Some non-deterministic task """ gevent.sleep(random.randint(0,2)*0.001) print('Task', pid, 'done') def synchronous(): for i in range(1,10): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in xrange(10)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
执行结果以下:
root@master:~# python two.py Synchronous: ('Task', 1, 'done') ('Task', 2, 'done') ('Task', 3, 'done') ('Task', 4, 'done') ('Task', 5, 'done') ('Task', 6, 'done') ('Task', 7, 'done') ('Task', 8, 'done') ('Task', 9, 'done') Asynchronous: ('Task', 0, 'done') ('Task', 9, 'done') ('Task', 7, 'done') ('Task', 3, 'done') ('Task', 6, 'done') ('Task', 5, 'done') ('Task', 4, 'done') ('Task', 1, 'done') ('Task', 2, 'done') ('Task', 8, 'done')
在同步的状况下,任务是按顺序执行的,在执行各个任务的时候会阻塞主线程。
而gevent.spawn 的重要功能就是封装了greenlet里面的函数。初始化的greenlet放在了threads这个list里面,被传递给了 gevent.joinall 这个函数,它会阻塞当前的程序来执行全部的greenlet。
在异步执行的状况下,全部任务的执行顺序是彻底随机的。每个greenlet的都不会阻塞其余greenlet的执行。
在有时候须要异步地从服务器获取数据,gevent能够经过判断从服务器的数据载入状况来处理请求。
import gevent.monkey gevent.monkey.patch_socket() import gevent import urllib2 import simplejson as json def fetch(pid): response = urllib2.urlopen('http://json-time.appspot.com/time.json') result = response.read() json_result = json.loads(result) datetime = json_result['datetime'] print 'Process ', pid, datetime return json_result['datetime'] def synchronous(): for i in range(1,10): fetch(i) def asynchronous(): threads = [] for i in range(1,10): threads.append(gevent.spawn(fetch, i)) gevent.joinall(threads) print 'Synchronous:' synchronous() print 'Asynchronous:' asynchronous()
就像以前说的,greenlet是肯定的。给每一个greenlet相同的配置和相同的输入,获得的输出是相同的。咱们能够用python 的多进程池和gevent池来做比较。下面的例子能够说明这个特色:
import time def echo(i): time.sleep(0.001) return i # Non Deterministic Process Pool from multiprocessing.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print( run1 == run2 == run3 == run4 ) # Deterministic Gevent Pool from gevent.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print( run1 == run2 == run3 == run4 )
下面是执行结果:
False True
从上面的例子能够看出,执行同一个函数,产生的greenlet是相同的,而产生的process是不一样的。
在处理并发编程的时候会碰到一些问题,好比竞争资源的问题。最简单的状况,当有两个线程或进程访问同一资源而且修改这个资源的时候,就会引起资源竞争的问题。那么这个资源最终的值就会取决于那个线程或进程是最后执行的。这是个问题,总之,在处理全局的程序不肯定行为的时候,须要尽可能避免资源竞争的问题
最好的方法就是在任什么时候候尽可能避免使用全局的状态。全局状态是常常会坑你的!
在gevent里面封装了一些初始化greenlet的方法,下面是几个最经常使用的例子:
import gevent from gevent import Greenlet def foo(message, n): """ Each thread will be passed the message, and n arguments in its initialization. """ gevent.sleep(n) print(message) # Initialize a new Greenlet instance running the named function # foo thread1 = Greenlet.spawn(foo, "Hello", 1) # Wrapper for creating and runing a new Greenlet from the named # function foo, with the passed arguments thread2 = gevent.spawn(foo, "I live!", 2) # Lambda expressions thread3 = gevent.spawn(lambda x: (x+1), 2) threads = [thread1, thread2, thread3] # Block until all threads complete. gevent.joinall(threads)
在上面的程序里使用 spawn 方法来产生greenlet。还有一种初始化greenlet的方法,就是建立Greenlet的子类,而且重写 _run 方法。
import gevent from gevent import Greenlet class MyGreenlet(Greenlet): def __init__(self, message, n): Greenlet.__init__(self) self.message = message self.n = n def _run(self): print(self.message) gevent.sleep(self.n) g = MyGreenlet("Hi there!", 3) g.start() g.join()
就像其余的代码同样,greenlet在执行的时候也会出错。Greenlet有可能会没法抛出异常,中止失败,或者消耗了太多的系统资源。
greenlet的内部状态一般是一个依赖时间的参数。greenlet有一些标记来让你可以监控greenlet的状态。
import gevent def win(): return 'You win!' def fail(): raise Exception('You fail at failing.') winner = gevent.spawn(win) loser = gevent.spawn(fail) print(winner.started) # True print(loser.started) # True # Exceptions raised in the Greenlet, stay inside the Greenlet. try: gevent.joinall([winner, loser]) except Exception as e: print('This will never be reached') print(winner.value) # 'You win!' print(loser.value) # None print(winner.ready()) # True print(loser.ready()) # True print(winner.successful()) # True print(loser.successful()) # False # The exception raised in fail, will not propogate outside the # greenlet. A stack trace will be printed to stdout but it # will not unwind the stack of the parent. print(loser.exception) # It is possible though to raise the exception again outside # raise loser.exception # or with # loser.get()
这段代码的执行结果以下:
True True You win! None True True True False You fail at failing.
在主程序收到一个SIGQUIT 以后会阻塞程序的执行让Greenlet没法继续执行。这会致使僵尸进程的产生,须要在操做系统中将这些僵尸进程清除掉。
import gevent import signal def run_forever(): gevent.sleep(1000) if __name__ == '__main__': gevent.signal(signal.SIGQUIT, gevent.shutdown) thread = gevent.spawn(run_forever) thread.join()
gevent提供了对与代码运行时的时间限制功能,也就是超时功能。
import gevent from gevent import Timeout seconds = 10 timeout = Timeout(seconds) timeout.start() def wait(): gevent.sleep(10) try: gevent.spawn(wait).join() except Timeout: print 'Could not complete'
也能够经过用with 上下文的方法来实现超时的功能:
import gevent from gevent import Timeout time_to_wait = 5 # seconds class TooLong(Exception): pass with Timeout(time_to_wait, TooLong): gevent.sleep(10)
gevent还提供了一些超时的参数以应对不一样的情况:
import gevent from gevent import Timeout def wait(): gevent.sleep(2) timer = Timeout(1).start() thread1 = gevent.spawn(wait) try: thread1.join(timeout=timer) except Timeout: print('Thread 1 timed out') # -- timer = Timeout.start_new(1) thread2 = gevent.spawn(wait) try: thread2.get(timeout=timer) except Timeout: print('Thread 2 timed out') # -- try: gevent.with_timeout(1, wait) except Timeout: print('Thread 3 timed out')
运行结果以下:
Thread 1 timed out Thread 2 timed out Thread 3 timed out
如今这是gevent里面的一个难点。下面一个例子里可能看到 monkey.patch_socket() 可以在运行时里面修改基础库socket:
import socket print( socket.socket ) print "After monkey patch" from gevent import monkey monkey.patch_socket() print( socket.socket ) import select print select.select monkey.patch_select() print "After monkey patch" print( select.select )
运行结果以下:
class 'socket.socket' After monkey patch class 'gevent.socket.socket' built-in function select After monkey patch function select at 0x1924de8
Python的运行时里面容许可以大部分的对象都是能够修改的,包括模块,类和方法。这一般是一个坏主意,然而在极端的状况下,当有一个库须要加入一些Python基本的功能的时候,monkey patch就能派上用场了。在上面的例子里,gevent可以改变基础库里的一些使用IO阻塞模型的库好比socket,ssl,threading等等而且把它们改为协程的执行方式。
事件是一种可让greenlet进行异步通讯的手段。
import gevent from gevent.event import AsyncResult a = AsyncResult() def setter(): """ After 3 seconds set wake all threads waiting on the value of a. """ gevent.sleep(3) a.set() def waiter(): """ After 3 seconds the get call will unblock. """ a.get() # blocking print 'I live!' gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])
AsyncResult 是 event对象的扩展可以让你来发送值而且带有必定延迟。这种功能被成为feature或deferred,当它拿到一个将来的值的引用时,可以在任意安排好的时间内让它起做用。
队列是一个有序的数据集合,一般有 put/get 的操做,这样能让队列在有在有greenletJ进行操做的时候可以进行安全的管理。
例如,若是greenlet从队列中取出了一项数据,那么这份数据就不能被另外一个greenlet取出。
import gevent from gevent.queue import Queue tasks = Queue() def worker(n): while not tasks.empty(): task = tasks.get() print('Worker %s got task %s' % (n, task)) gevent.sleep(0) print('Quitting time!') def boss(): for i in xrange(1,25): tasks.put_nowait(i) gevent.spawn(boss).join() gevent.joinall([ gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'nancy'), ])
执行的结果以下:
Worker steve got task 1 Worker john got task 2 Worker nancy got task 3 Worker steve got task 4 Worker nancy got task 5 Worker john got task 6 Worker steve got task 7 Worker john got task 8 Worker nancy got task 9 Worker steve got task 10 Worker nancy got task 11 Worker john got task 12 Worker steve got task 13 Worker john got task 14 Worker nancy got task 15 Worker steve got task 16 Worker nancy got task 17 Worker john got task 18 Worker steve got task 19 Worker john got task 20 Worker nancy got task 21 Worker steve got task 22 Worker nancy got task 23 Worker john got task 24 Quitting time! Quitting time! Quitting time!
队列的 put/get 操做在须要的状况下也能够阻塞程序的执行。
put 和 get 操做都有非阻塞的副本,就是 put_nowait 和 get_nowait。
在下面代码的例子里,运行一个叫boss的方法,同时运行worker方法,而且对队列有一个限制:队列的子项不能超过3个。这个限制意味着 put 操做在队列里面有足够空间以前会阻塞。相反,若是队列里没有任何子项,get操做会阻塞,同时也须要超时的机制,当一个操做在阻塞超过必定时间后会抛出异常。
import gevent from gevent.queue import Queue, Empty tasks = Queue(maxsize=3) def worker(n): try: while True: task = tasks.get(timeout=1) # decrements queue size by 1 print('Worker %s got task %s' % (n, task)) gevent.sleep(0) except Empty: print('Quitting time!') def boss(): """ Boss will wait to hand out work until a individual worker is free since the maxsize of the task queue is 3. """ for i in xrange(1,10): tasks.put(i) print('Assigned all work in iteration 1') for i in xrange(10,20): tasks.put(i) print('Assigned all work in iteration 2') gevent.joinall([ gevent.spawn(boss), gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'bob'), ])
代码的执行结果以下:
Worker steve got task 1 Worker john got task 2 Worker bob got task 3 Worker steve got task 4 Worker bob got task 5 Worker john got task 6 Assigned all work in iteration 1 Worker steve got task 7 Worker john got task 8 Worker bob got task 9 Worker steve got task 10 Worker bob got task 11 Worker john got task 12 Worker steve got task 13 Worker john got task 14 Worker bob got task 15 Worker steve got task 16 Worker bob got task 17 Worker john got task 18 Assigned all work in iteration 2 Worker steve got task 19 Quitting time! Quitting time! Quitting time!
组是一个由greenlet组成的集合,而且可以被统一管理。
import gevent from gevent.pool import Group def talk(msg): for i in xrange(3): print(msg) g1 = gevent.spawn(talk, 'bar') g2 = gevent.spawn(talk, 'foo') g3 = gevent.spawn(talk, 'fizz') group = Group() group.add(g1) group.add(g2) group.join() group.add(g3) group.join()
这在管理一组异步任务的时候会颇有用。
Group还提供了一个API来分配成组的greenlet任务,而且经过不一样的方法来获取结果。
import gevent from gevent import getcurrent from gevent.pool import Group group = Group() def hello_from(n): print('Size of group', len(group)) print('Hello from Greenlet %s' % id(getcurrent())) group.map(hello_from, xrange(3)) def intensive(n): gevent.sleep(3 - n) return 'task', n print('Ordered') ogroup = Group() for i in ogroup.imap(intensive, xrange(3)): print(i) print('Unordered') igroup = Group() for i in igroup.imap_unordered(intensive, xrange(3)): print(i)
执行结果以下:
Size of group 3 Hello from Greenlet 10769424 Size of group 3 Hello from Greenlet 10770544 Size of group 3 Hello from Greenlet 10772304 Ordered ('task', 0) ('task', 1) ('task', 2) Unordered ('task', 2) ('task', 1) ('task', 0)
池是用来处理当拥有动态数量的greenlet须要进行并发管理(限制并发数)时使用的。
这在处理大量的网络和IO操做的时候是很是须要的。
import gevent from gevent.pool import Pool pool = Pool(2) def hello_from(n): print('Size of pool', len(pool)) pool.map(hello_from, xrange(3))
Size of pool 2 Size of pool 2 Size of pool 1
常常在建立gevent驱动程序的时候,整个服务须要围绕一个池的结构来执行。
信号量是低级别的同步机制,可以让greenlet在执行的时候互相协调而且限制其并发数。信号量暴露了两个方法,acquire 和 release。若是信号量范围变成0,那么它会阻塞住直到另外一个greenlet释放它的得到物。
from gevent import sleep from gevent.pool import Pool from gevent.coros import BoundedSemaphore sem = BoundedSemaphore(2) def worker1(n): sem.acquire() print('Worker %i acquired semaphore' % n) sleep(0) sem.release() print('Worker %i released semaphore' % n) def worker2(n): with sem: print('Worker %i acquired semaphore' % n) sleep(0) print('Worker %i released semaphore' % n) pool = Pool() pool.map(worker1, xrange(0,2)) pool.map(worker2, xrange(3,6))
一下是代码的执行结果:
Worker 0 acquired semaphore Worker 1 acquired semaphore Worker 0 released semaphore Worker 1 released semaphore Worker 3 acquired semaphore Worker 4 acquired semaphore Worker 3 released semaphore Worker 4 released semaphore Worker 5 acquired semaphore Worker 5 released semaphore
若是把信号量的数量限制为1那么它就成为了锁。它常常会在多个greenlet访问相同资源的时候用到。
Gevent还可以让你给gevent上下文来指定那些数据是本地的。
import gevent from gevent.local import local stash = local() def f1(): stash.x = 1 print(stash.x) def f2(): stash.y = 2 print(stash.y) try: stash.x except AttributeError: print("x is not local to f2") g1 = gevent.spawn(f1) g2 = gevent.spawn(f2) gevent.joinall([g1, g2])
如下是执行结果:
1 2 x is not local to f2
不少集成了gevent的框架把HTTP的session对象存在gevent 本地线程里面。好比下面的例子:
from werkzeug.local import LocalProxy from werkzeug.wrappers import Request from contextlib import contextmanager from gevent.wsgi import WSGIServer _requests = local() request = LocalProxy(lambda: _requests.request) @contextmanager def sessionmanager(environ): _requests.request = Request(environ) yield _requests.request = None def logic(): return "Hello " + request.remote_addr def application(environ, start_response): status = '200 OK' with sessionmanager(environ): body = logic() headers = [ ('Content-Type', 'text/html') ] start_response(status, headers) return [body] WSGIServer(('', 8000), application).serve_forever()
在gevent 1.0版本中,gevent.subprocess 这个库被添加上。这个库可以让子进程相互协调地执行。
import gevent from gevent.subprocess import Popen, PIPE def cron(): while True: print "cron" gevent.sleep(0.2) g = gevent.spawn(cron) sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True) out, err = sub.communicate() g.kill() print out.rstrip()
执行结果:
cron cron cron cron cron Linux