python gevent 模块

 Greenlethtml

    在gevent里面最多应用到的就是greenlet,一个轻量级的协程实现。在任什么时候间点,只有一个greenlet处于运行状态。Greenlet与multiprocessing 和 threading这两个库提供的真正的并行结构的区别在于这两个库会真正的切换进程,POSIX线程是由操做系统来负责调度,而且它们是真正并行的。python

同步和异步

    应对并发的主要思路就是将一个大的任务分解成一个子任务的集合而且可以让它并行或者异步地执行,而不是一次执行一个或者同步执行。在两个子任务中的切换被称为上下文切换。shell

    gevent里面的上下文切换是很是平滑的。在下面的例子程序中,咱们能够看到两个上下文经过调用 gevent.sleep()来互相切换。express

import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

    这段程序的执行结果以下:编程

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

    从这个执行结果能够看出这个程序的执行过程,在这里的两个函数是交替执行的。json

    gevent的真正威力是在处理网络和带有IO阻塞的功能时可以这些任务协调地运行。gevent来实现了这些具体的细节来保证在须要的时候greenlet上下文进行切换。在这里用一个例子来讲明。安全

import time
import gevent
from gevent import select

start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)

def gr1():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: ', tic())
    select.select([], [], [], 2)
    print('Ended Polling: ', tic())

def gr2():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: ', tic())
    select.select([], [], [], 2)
    print('Ended Polling: ', tic())

def gr3():
    print("Hey lets do some stuff while the greenlets poll, at", tic())
    gevent.sleep(1)

gevent.joinall([
    gevent.spawn(gr1),
    gevent.spawn(gr2),
    gevent.spawn(gr3),
])

在上面的例子里,select() 一般是一个阻塞的调用。服务器

程序的执行结果以下:网络

Started Polling:  at 0.0 seconds
Started Polling:  at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at at 0.0 seconds
Ended Polling:  at 2.0 seconds
Ended Polling:  at 2.0 seconds

接下来一个例子中能够看到gevent是安排各个任务的执行的。session

import gevent
import random

def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(random.randint(0,2)*0.001)
    print('Task', pid, 'done')

def synchronous():
    for i in range(1,10):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in xrange(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()

执行结果以下:

root@master:~# python two.py 
Synchronous:
('Task', 1, 'done')
('Task', 2, 'done')
('Task', 3, 'done')
('Task', 4, 'done')
('Task', 5, 'done')
('Task', 6, 'done')
('Task', 7, 'done')
('Task', 8, 'done')
('Task', 9, 'done')
Asynchronous:
('Task', 0, 'done')
('Task', 9, 'done')
('Task', 7, 'done')
('Task', 3, 'done')
('Task', 6, 'done')
('Task', 5, 'done')
('Task', 4, 'done')
('Task', 1, 'done')
('Task', 2, 'done')
('Task', 8, 'done')

在同步的状况下,任务是按顺序执行的,在执行各个任务的时候会阻塞主线程。

而gevent.spawn 的重要功能就是封装了greenlet里面的函数。初始化的greenlet放在了threads这个list里面,被传递给了 gevent.joinall 这个函数,它会阻塞当前的程序来执行全部的greenlet。

在异步执行的状况下,全部任务的执行顺序是彻底随机的。每个greenlet的都不会阻塞其余greenlet的执行。

在有时候须要异步地从服务器获取数据,gevent能够经过判断从服务器的数据载入状况来处理请求。

import gevent.monkey
gevent.monkey.patch_socket()

import gevent
import urllib2
import simplejson as json

def fetch(pid):
    response = urllib2.urlopen('http://json-time.appspot.com/time.json')
    result = response.read()
    json_result = json.loads(result)
    datetime = json_result['datetime']

    print 'Process ', pid, datetime
    return json_result['datetime']

def synchronous():
    for i in range(1,10):
        fetch(i)

def asynchronous():
    threads = []
    for i in range(1,10):
        threads.append(gevent.spawn(fetch, i))
    gevent.joinall(threads)

print 'Synchronous:'
synchronous()

print 'Asynchronous:'
asynchronous()

肯定性

就像以前说的,greenlet是肯定的。给每一个greenlet相同的配置和相同的输入,获得的输出是相同的。咱们能够用python 的多进程池和gevent池来做比较。下面的例子能够说明这个特色:

import time

def echo(i):
    time.sleep(0.001)
    return i

# Non Deterministic Process Pool

from multiprocessing.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print( run1 == run2 == run3 == run4 )

# Deterministic Gevent Pool

from gevent.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print( run1 == run2 == run3 == run4 )

下面是执行结果:

False
True

 从上面的例子能够看出,执行同一个函数,产生的greenlet是相同的,而产生的process是不一样的。

 在处理并发编程的时候会碰到一些问题,好比竞争资源的问题。最简单的状况,当有两个线程或进程访问同一资源而且修改这个资源的时候,就会引起资源竞争的问题。那么这个资源最终的值就会取决于那个线程或进程是最后执行的。这是个问题,总之,在处理全局的程序不肯定行为的时候,须要尽可能避免资源竞争的问题

 最好的方法就是在任什么时候候尽可能避免使用全局的状态。全局状态是常常会坑你的!

产生Greenlet

在gevent里面封装了一些初始化greenlet的方法,下面是几个最经常使用的例子:

import gevent
from gevent import Greenlet

def foo(message, n):
    """
    Each thread will be passed the message, and n arguments
    in its initialization.
    """
    gevent.sleep(n)
    print(message)

# Initialize a new Greenlet instance running the named function
# foo
thread1 = Greenlet.spawn(foo, "Hello", 1)

# Wrapper for creating and runing a new Greenlet from the named 
# function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)

# Lambda expressions
thread3 = gevent.spawn(lambda x: (x+1), 2)

threads = [thread1, thread2, thread3]

# Block until all threads complete.
gevent.joinall(threads)

在上面的程序里使用 spawn 方法来产生greenlet。还有一种初始化greenlet的方法,就是建立Greenlet的子类,而且重写 _run 方法。

import gevent
from gevent import Greenlet

class MyGreenlet(Greenlet):

    def __init__(self, message, n):
        Greenlet.__init__(self)
        self.message = message
        self.n = n

    def _run(self):
        print(self.message)
        gevent.sleep(self.n)

g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

Greenlet 的状态

就像其余的代码同样,greenlet在执行的时候也会出错。Greenlet有可能会没法抛出异常,中止失败,或者消耗了太多的系统资源。

greenlet的内部状态一般是一个依赖时间的参数。greenlet有一些标记来让你可以监控greenlet的状态。

  • started -- 标志greenlet是否已经启动
  • ready -- 标志greenlet是否已经被终止
  • successful() -- 标志greenlet是否已经被终止,而且没有抛出异常
  • value -- 由greenlet返回的值
  • exception -- 在greenlet里面没有被捕获的异常
import gevent

def win():
    return 'You win!'

def fail():
    raise Exception('You fail at failing.')

winner = gevent.spawn(win)
loser = gevent.spawn(fail)

print(winner.started) # True
print(loser.started)  # True

# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:
    gevent.joinall([winner, loser])
except Exception as e:
    print('This will never be reached')

print(winner.value) # 'You win!'
print(loser.value)  # None

print(winner.ready()) # True
print(loser.ready())  # True

print(winner.successful()) # True
print(loser.successful())  # False

# The exception raised in fail, will not propogate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.

print(loser.exception)

# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()

 这段代码的执行结果以下:

True
True
You win!
None
True
True
True
False
You fail at failing.

终止程序

在主程序收到一个SIGQUIT 以后会阻塞程序的执行让Greenlet没法继续执行。这会致使僵尸进程的产生,须要在操做系统中将这些僵尸进程清除掉。

import gevent
import signal

def run_forever():
    gevent.sleep(1000)

if __name__ == '__main__':
    gevent.signal(signal.SIGQUIT, gevent.shutdown)
    thread = gevent.spawn(run_forever)
    thread.join()

超时

gevent提供了对与代码运行时的时间限制功能,也就是超时功能。

import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)
timeout.start()

def wait():
    gevent.sleep(10)

try:
    gevent.spawn(wait).join()
except Timeout:
    print 'Could not complete'

也能够经过用with 上下文的方法来实现超时的功能:

import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)

gevent还提供了一些超时的参数以应对不一样的情况:

import gevent
from gevent import Timeout

def wait():
    gevent.sleep(2)

timer = Timeout(1).start()
thread1 = gevent.spawn(wait)

try:
    thread1.join(timeout=timer)
except Timeout:
    print('Thread 1 timed out')

# --

timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)

try:
    thread2.get(timeout=timer)
except Timeout:
    print('Thread 2 timed out')

# --

try:
    gevent.with_timeout(1, wait)
except Timeout:
    print('Thread 3 timed out')

运行结果以下:

Thread 1 timed out
Thread 2 timed out
Thread 3 timed out

Monkeypatching

如今这是gevent里面的一个难点。下面一个例子里可能看到 monkey.patch_socket() 可以在运行时里面修改基础库socket:

import socket
print( socket.socket )

print "After monkey patch"
from gevent import monkey
monkey.patch_socket()
print( socket.socket )

import select
print select.select
monkey.patch_select()
print "After monkey patch"
print( select.select )

 运行结果以下:

class 'socket.socket'
After monkey patch
class 'gevent.socket.socket'

built-in function select
After monkey patch
function select at 0x1924de8

 Python的运行时里面容许可以大部分的对象都是能够修改的,包括模块,类和方法。这一般是一个坏主意,然而在极端的状况下,当有一个库须要加入一些Python基本的功能的时候,monkey patch就能派上用场了。在上面的例子里,gevent可以改变基础库里的一些使用IO阻塞模型的库好比socket,ssl,threading等等而且把它们改为协程的执行方式。

 

事件

事件是一种可让greenlet进行异步通讯的手段。

 

import gevent
from gevent.event import AsyncResult

a = AsyncResult()

def setter():
    """
    After 3 seconds set wake all threads waiting on the value of
    a.
    """
    gevent.sleep(3)
    a.set()

def waiter():
    """
    After 3 seconds the get call will unblock.
    """
    a.get() # blocking
    print 'I live!'

gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
])

AsyncResult 是 event对象的扩展可以让你来发送值而且带有必定延迟。这种功能被成为feature或deferred,当它拿到一个将来的值的引用时,可以在任意安排好的时间内让它起做用。

队列

 

队列是一个有序的数据集合,一般有 put/get 的操做,这样能让队列在有在有greenletJ进行操做的时候可以进行安全的管理。

例如,若是greenlet从队列中取出了一项数据,那么这份数据就不能被另外一个greenlet取出。

 

import gevent
from gevent.queue import Queue

tasks = Queue()

def worker(n):
    while not tasks.empty():
        task = tasks.get()
        print('Worker %s got task %s' % (n, task))
        gevent.sleep(0)

    print('Quitting time!')

def boss():
    for i in xrange(1,25):
        tasks.put_nowait(i)

gevent.spawn(boss).join()

gevent.joinall([
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'nancy'),
])

 

执行的结果以下:

Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker nancy got task 5
Worker john got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Worker steve got task 10
Worker nancy got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker nancy got task 15
Worker steve got task 16
Worker nancy got task 17
Worker john got task 18
Worker steve got task 19
Worker john got task 20
Worker nancy got task 21
Worker steve got task 22
Worker nancy got task 23
Worker john got task 24
Quitting time!
Quitting time!
Quitting time!

队列的 put/get 操做在须要的状况下也能够阻塞程序的执行。

put 和 get 操做都有非阻塞的副本,就是 put_nowait 和 get_nowait。

在下面代码的例子里,运行一个叫boss的方法,同时运行worker方法,而且对队列有一个限制:队列的子项不能超过3个。这个限制意味着 put 操做在队列里面有足够空间以前会阻塞。相反,若是队列里没有任何子项,get操做会阻塞,同时也须要超时的机制,当一个操做在阻塞超过必定时间后会抛出异常。

import gevent
from gevent.queue import Queue, Empty

tasks = Queue(maxsize=3)

def worker(n):
    try:
        while True:
            task = tasks.get(timeout=1) # decrements queue size by 1
            print('Worker %s got task %s' % (n, task))
            gevent.sleep(0)
    except Empty:
        print('Quitting time!')

def boss():
    """
    Boss will wait to hand out work until a individual worker is
    free since the maxsize of the task queue is 3.
    """

    for i in xrange(1,10):
        tasks.put(i)
    print('Assigned all work in iteration 1')

    for i in xrange(10,20):
        tasks.put(i)
    print('Assigned all work in iteration 2')

gevent.joinall([
    gevent.spawn(boss),
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'bob'),
])

代码的执行结果以下:

Worker steve got task 1
Worker john got task 2
Worker bob got task 3
Worker steve got task 4
Worker bob got task 5
Worker john got task 6
Assigned all work in iteration 1
Worker steve got task 7
Worker john got task 8
Worker bob got task 9
Worker steve got task 10
Worker bob got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker bob got task 15
Worker steve got task 16
Worker bob got task 17
Worker john got task 18
Assigned all work in iteration 2
Worker steve got task 19
Quitting time!
Quitting time!
Quitting time!

组和池

组是一个由greenlet组成的集合,而且可以被统一管理。

import gevent
from gevent.pool import Group

def talk(msg):
    for i in xrange(3):
        print(msg)

g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')

group = Group()
group.add(g1)
group.add(g2)
group.join()

group.add(g3)
group.join()

这在管理一组异步任务的时候会颇有用。

Group还提供了一个API来分配成组的greenlet任务,而且经过不一样的方法来获取结果。

import gevent
from gevent import getcurrent
from gevent.pool import Group

group = Group()

def hello_from(n):
    print('Size of group', len(group))
    print('Hello from Greenlet %s' % id(getcurrent()))

group.map(hello_from, xrange(3))

def intensive(n):
    gevent.sleep(3 - n)
    return 'task', n

print('Ordered')

ogroup = Group()
for i in ogroup.imap(intensive, xrange(3)):
    print(i)

print('Unordered')

igroup = Group()
for i in igroup.imap_unordered(intensive, xrange(3)):
    print(i)

执行结果以下:

Size of group 3
Hello from Greenlet 10769424
Size of group 3
Hello from Greenlet 10770544
Size of group 3
Hello from Greenlet 10772304
Ordered
('task', 0)
('task', 1)
('task', 2)
Unordered
('task', 2)
('task', 1)
('task', 0)

池是用来处理当拥有动态数量的greenlet须要进行并发管理(限制并发数)时使用的。

这在处理大量的网络和IO操做的时候是很是须要的。

import gevent
from gevent.pool import Pool

pool = Pool(2)

def hello_from(n):
    print('Size of pool', len(pool))

pool.map(hello_from, xrange(3))
Size of pool 2
Size of pool 2
Size of pool 1

常常在建立gevent驱动程序的时候,整个服务须要围绕一个池的结构来执行。

锁和信号量

信号量是低级别的同步机制,可以让greenlet在执行的时候互相协调而且限制其并发数。信号量暴露了两个方法,acquire 和 release。若是信号量范围变成0,那么它会阻塞住直到另外一个greenlet释放它的得到物。

from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2)

def worker1(n):
    sem.acquire()
    print('Worker %i acquired semaphore' % n)
    sleep(0)
    sem.release()
    print('Worker %i released semaphore' % n)

def worker2(n):
    with sem:
        print('Worker %i acquired semaphore' % n)
        sleep(0)
    print('Worker %i released semaphore' % n)

pool = Pool()
pool.map(worker1, xrange(0,2))
pool.map(worker2, xrange(3,6))

一下是代码的执行结果:

Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
Worker 3 acquired semaphore
Worker 4 acquired semaphore
Worker 3 released semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore

 若是把信号量的数量限制为1那么它就成为了锁。它常常会在多个greenlet访问相同资源的时候用到。

本地线程

Gevent还可以让你给gevent上下文来指定那些数据是本地的。

import gevent
from gevent.local import local

stash = local()

def f1():
    stash.x = 1
    print(stash.x)

def f2():
    stash.y = 2
    print(stash.y)

    try:
        stash.x
    except AttributeError:
        print("x is not local to f2")

g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)

gevent.joinall([g1, g2])

 如下是执行结果:

1
2
x is not local to f2

 不少集成了gevent的框架把HTTP的session对象存在gevent 本地线程里面。好比下面的例子:

from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager

from gevent.wsgi import WSGIServer

_requests = local()
request = LocalProxy(lambda: _requests.request)

@contextmanager
def sessionmanager(environ):
    _requests.request = Request(environ)
    yield
    _requests.request = None

def logic():
    return "Hello " + request.remote_addr

def application(environ, start_response):
    status = '200 OK'

    with sessionmanager(environ):
        body = logic()

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    return [body]

WSGIServer(('', 8000), application).serve_forever()

 子进程

在gevent 1.0版本中,gevent.subprocess 这个库被添加上。这个库可以让子进程相互协调地执行。

import gevent
from gevent.subprocess import Popen, PIPE

def cron():
    while True:
        print "cron"
        gevent.sleep(0.2)

g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print out.rstrip()

 执行结果:

cron
cron
cron
cron
cron
Linux
相关文章
相关标签/搜索