引言: 本文出自David Beazley 的关于协程的PPT,如今笔者将他翻译过来。并整理成文。感谢在协程方面的专家David Beazley, 能给咱们这么深刻的协程上面的讲座。也但愿本文能给更多pythoner普及yield的更多用法,使python的这个特性可以更加多的活跃在你们的代码中。
html
http://www.dabeaz.com/coroutines/python
1. 什么是协程
2. 协程怎么用
3. 要注意什么
4. 用他们好么
复制代码
生成器 是 能够生成必定序列的 函数。 函数能够调用next()方法。web
import time
def follow(thefile):
thefile.seek(0,2) # Go to the end of the file
while True:
line = thefile.readline()
if not line:
time.sleep(0.1) # Sleep briefly
continue
yield line
复制代码
ps:unix pipe
A pipeline is a sequence of processes chained together by their standard streams
标注:unix管道
一个uinx管道是由标准流连接在一块儿的一系列流程.
复制代码
pipeline.py数据库
def grep(pattern,lines):
for line in lines:
if pattern in line:
yield line
if __name__ == '__main__':
from follow import follow
# Set up a processing pipe : tail -f | grep python
logfile = open("access-log")
loglines = follow(logfile)
pylines = grep("python",loglines)
# Pull results out of the processing pipeline
for line in pylines:
print line,
复制代码
理解pipeline.py
在pipeline中,follow函数和grep函数至关于程序链,这样就能链式处理程序。编程
grep.pyjson
def grep(pattern):
print "Looking for %s" % pattern
print "give a value in the coroutines"
while True:
line = (yield)
if pattern in line:
print line
# Example use
if __name__ == '__main__':
g = grep("python")
g.next()
g.send("Yeah, but no, but yeah, but no")
g.send("A series of tubes")
g.send("python generators rock!")
复制代码
yield最重要的问题在于yield的值是多少。安全
yield的值须要使用coroutine协程这个概念 相对于仅仅生成值,函数能够动态处理传送进去的值,而最后值经过yield返回。bash
协程的执行和生成器的执行很类似。 当你初始化一个协程,不会返回任何东西。 协程只能响应run和send函数。 协程的执行依赖run和send函数。服务器
全部的协程都须要调用.next( )函数。 调用的next( )函数将要执行到第一个yield表达式的位置。 在yield表达式的位置上,很容易去执行就能够。 协程使用next()启动。网络
由【协程启动】中咱们知道,启动一个协程须要记得调用next( )来开始协程,而这个启动器容易忘记使用。 使用修饰器包一层,来让咱们启动协程。 【之后全部的协程器都会先有@coroutine
def coroutine(func):
def start(*args, **kwargs):
cr = func(*args, **kwargs)
cr.next()
return cr
return start
@coroutine
def grep(pattern):
...
复制代码
使用close()来关闭。
grepclose.py
@coroutine
def grep(pattern):
print "Looking for %s" % pattern
try:
while True:
line = (yield)
if pattern in line:
print line,
except GeneratorExit:
print "Going away. Goodbye"
复制代码
使用GeneratorExit这个异常类型
在一个协程中,能够抛出一个异常
g.throw(RuntimeError,"You're hosed")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in grep
RuntimeError: You're hosed 复制代码
异常起源于yield表达式 能够用常规方法去抓取
* 尽管有点类似,可是生成器和协程是*两个彻底不一样的概念*。
* 生成器用来产生序列。
* 协程用来处理序列。
* 很容易产生一些误解。由于协程有的时候用来对进程里面的用来产生迭代对象的生成器做微调。
复制代码
* 不能往generator里面send东西。
* 协程和迭代器的概念没有关系
* 虽然有一种用法,确实是在一个协程里面生成一些值,可是并不和迭代器有关系。
复制代码
进程管道须要一个初始的源(一个生产者)。 这个初始的源驱动整个管道。 管道源不是协程。
管道必须有个终止点。 管道终止/协程终止是进程管道的终止点。
from coroutine import coroutine
# A data source. This is not a coroutine, but it sends
# data into one (target)
import time
def follow(thefile, target):
thefile.seek(0,2) # Go to the end of the file
while True:
line = thefile.readline()
if not line:
time.sleep(0.1) # Sleep briefly
continue
target.send(line)
# A sink. A coroutine that receives data
@coroutine
def printer():
while True:
line = (yield)
print line,
# Example use
if __name__ == '__main__':
f = open("access-log")
follow(f,printer())
复制代码
分析:第一个follow函数是协程源,第二个printer函数是协程终止。协程源不是一个协程,可是须要传入一个已经初始化完毕的协程。在协程源当中,调用send()。
叫过滤器其实并不贴切,应该叫中间人Intermediate:其两端都是send()函数。
@coroutine
def filter(target): # 这个target是传递参数的对象
while True:
item = (yield) # 这里用来接收上一个send()传入的value
# Transform/filter item
# processing items
# Send it along to the next stage
target.send(item) # 像target传递参数
复制代码
分析可知,中间层须要接受上一个coroutine,也须要往下一个coroutine里面传递值。
一个管道过滤器的例子 从文章中找出具备“python”关键字的句子打印。 grep.py:
@coroutine
def grep(pattern, target): # 这个target用来接收参数
while True:
line = (yield) # 这里用来接收上一个send()传入的value
# Transform/filter item
# processing items
if pattern in line:
target.send(line)
# Send it along to the next stage
复制代码
Hook it up with follow and printer:
f = open("access-log")
follow(f, grep('python', printer()))
复制代码
grep 从中间传入follow,而后printer传入grep。
图示:
使用协程,你能够发送数据 给 多个 协程过滤器/协程终了。可是请注意,协程源只是用来传递数据的,过多的在协程源中传递数据是使人困惑而且复杂的。
一个例子
@coroutine
def broadcast(targets):
while True:
item = (yield)
for target in targets:
target.send(item)
复制代码
Hook it Up!
if __name__ == '__main__':
f = open("access-log")
follow(f,
broadcast([grep('python',printer()),
grep('ply',printer()),
grep('swig',printer())])
)
复制代码
从文章中分别打印出含有’python‘ ’ply‘ ’swig‘ 关键字的句子。使用了一个协程队列向全部printer协程 送出 接收到的数据。 图示:
或者这样Hook them up:
if __name__ == '__main__':
f = open("access-log")
p = printer()
follow(f,
broadcast([grep('python',p),
grep('ply',p),
grep('swig',p)])
)
复制代码
图示:
为何咱们用协程
协程能够用在写各类各样处理事件流的组件。
介绍一个例子【这个例子会贯穿这个第三部分始终】要求作一个实时的公交车GPS位置监控。编写程序的主要目的是处理一份文件。传统上,使用SAX进行处理。【SAX处理能够减小内存空间的使用,但SAX事件驱动的特性会让它笨重和低效】。
复制代码
咱们可使用协程分发SAX事件,好比:
import xml.sax
class EventHandler(xml.sax.ContentHandler):
def __init__(self,target):
self.target = target
def startElement(self,name,attrs):
self.target.send(('start',(name,attrs._attrs)))
def characters(self,text):
self.target.send(('text',text))
def endElement(self,name):
self.target.send(('end',name))
# example use
if __name__ == '__main__':
from coroutine import *
@coroutine
def printer():
while True:
event = (yield)
print event
xml.sax.parse("allroutes.xml",
EventHandler(printer()))
复制代码
解析:整个事件的处理如图所示
好比,把xml改为json最后从中筛选的出固定信息. buses.py
@coroutine
def buses_to_dicts(target):
while True:
event, value = (yield)
# Look for the start of a <bus> element
if event == 'start' and value[0] == 'bus':
busdict = {}
fragments = []
# Capture text of inner elements in a dict
while True:
event, value = (yield)
if event == 'start':
fragments = []
elif event == 'text':
fragments.append(value)
elif event == 'end':
if value != 'bus':
busdict[value] = "".join(fragments)
else:
target.send(busdict)
break
复制代码
协程的一个有趣的事情是,您能够将初始数据源推送到低级别的语言,而不须要重写全部处理阶段。好比,PPT 中69-73页介绍的,能够经过协程和低级别的语言进行联动,从而达成很是好的优化效果。如Expat模块或者cxmlparse模块。 ps: ElementTree具备快速的递增xml句法分析
协程有如下特色。
咱们往协程内传送数据,向线程内传送数据,也向进程内传送数据。那么,协程天然很容易和线程和分布式系统联系起来。
咱们能够经过添加一个额外的层,从而封装协程进入线程或者子进程。这描绘了几个基本的概念。
下面看一个线程的例子。 cothread.py
@coroutine
def threaded(target):
# 第一部分:
messages = Queue()
def run_target():
while True:
item = messages.get()
if item is GeneratorExit:
target.close()
return
else:
target.send(item)
Thread(target=run_target).start()
# 第二部分:
try:
while True:
item = (yield)
messages.put(item)
except GeneratorExit:
messages.put(GeneratorExit)
复制代码
例子解析:第一部分:先新建一个队列。而后定义一个永久循环的线程;这个线程能够将其中的元素拉出消息队列,而后发送到目标里面。第二部分:接受上面送来的元素,并经过队列,将他们传送进线程里面。其中用到了GeneratorExit ,使得线程能够正确的关闭。
Hook up:cothread.py
if __name__ == '__main__':
import xml.sax
from cosax import EventHandler
from buses import *
xml.sax.parse("allroutes.xml", EventHandler(
buses_to_dicts(
threaded(
filter_on_field("route", "22",
filter_on_field("direction", "North Bound",
bus_locations()))))))
复制代码
可是:添加线程让这个例子慢了50%
咱们知道,进程之间是不共享系统资源的,因此要进行两个子进程之间的通讯,咱们须要经过一个文件桥接两个协程。
import cPickle as pickle
from coroutine import *
@coroutine
def sendto(f):
try:
while True:
item = (yield)
pickle.dump(item, f)
f.flush()
except StopIteration:
f.close()
def recvfrom(f, target):
try:
while True:
item = pickle.load(f)
target.send(item)
except EOFError:
target.close()
# Example use
if __name__ == '__main__':
import xml.sax
from cosax import EventHandler
from buses import *
import subprocess
p = subprocess.Popen(['python', 'busproc.py'],
stdin=subprocess.PIPE)
xml.sax.parse("allroutes.xml",
EventHandler(
buses_to_dicts(
sendto(p.stdin))))
复制代码
程序经过sendto()和recvfrom()传递文件。
使用协程,咱们能够从一个任务的执行环境中剥离出他的实现。而且,协程就是那个实现。执行环境是你选择的线程,子进程,网络等。
须要注意的警告:
在并发编程中,一般将问题细分为“任务”。 “任务”有下面几个经典的特色: * 拥有独立的控制流。 * 拥有内在的状态。 * 能够被安排规划/挂起/恢复。 * 可与其余的任务通讯。 协程也是任务的一种。
@coroutine
def grep(pattern):
print "Looking for %s" % pattern
print "give a value in the coroutines"
while True:
line = (yield)
if pattern in line:
print line
复制代码
@coroutine
def grep(pattern):
print "Looking for %s" % pattern
print "give a value in the coroutines"
while True:
line = (yield)
if pattern in line:
print line
复制代码
@coroutine
def grep(pattern):
print "Looking for %s" % pattern
print "give a value in the coroutines"
while True:
line = (yield)
if pattern in line:
print line
复制代码
总之,一个协程知足以上全部任务(task)的特色,因此协程很是像任务。可是协程不用与任何一个线程或者子进程绑定。
当计算机运行时,电脑没有同时运行好几条指令的打算。而不管是处理器,应用程序都不懂多任务处理。因此,操做系统须要去完成多任务的调度。操做系统经过在多个任务中快速切换来实现多任务。
CPU执行的是应用程序,而不是你的操做系统,那没有被CPU执行的操做系统是怎么控制正在运行的应用程序中断的呢。
操做系统只能经过两个机制去得到对应用程序的控制:中断和陷阱。 * 中断:和硬件有关的balabala。 * 陷阱:一个软件发出的信号。 在两种情况下,CPU都会挂起正在作的,而后执行OS的代码(这个时候,OS的代码成功插入了应用程序的执行),此时,OS来切换了程序。
* 中断(Traps)使得OS的代码能够实现。
* 在程序运行遇到中断(Traps)时,OS强制在CPU上中止你的程序。
* 程序挂起,而后OS运行。
复制代码
表现以下图:
为了执行不少任务,添加一簇任务队列。
BB了这么多微嵌的内容,获得的是什么结论呢。类比任务调度,协程中yield声明能够理解为中断(Traps)。当一个生成器函数碰到了yield声明,那函数将当即挂起。而执行被传给生成器函数运行的任何代码。若是你把yield声明当作了一个中断,那么你就能够组件一个多任务执行的操做系统了。
1. 用纯python语句。
2. 不用线程。
3. 不用子进程。
4. 使用生成器和协程器。
复制代码
* 尤为在存在线程锁(GIL)的条件下,在线程间切换会变得很是重要。我要高并发!
* 不阻塞和异步I/O。我要高并发!
* 在实战中可能会遇到:服务器要同时处理上千条客户端的链接。我要高并发!
* 大量的工做 致力于实现 事件驱动 或者说 响应式模型。我要组件化!
* 综上,python构建操做系统,有利于了解如今高并发,组件化的趋势。
复制代码
定义一个任务类:任务像一个协程的壳,协程函数传入target;任务类仅仅有一个run()函数。 pyos1.py
# Step 1: Tasks
# This object encapsulates a running task.
class Task(object):
taskid = 0 # 全部task对象会共享这个值。不熟悉的朋友请补一下类的知识
def __init__(self,target):
Task.taskid += 1
self.tid = Task.taskid # Task ID
self.target = target # Target coroutine
self.sendval = None # Value to send
# Run a task until it hits the next yield statement
def run(self):
return self.target.send(self.sendval)
复制代码
任务类的执行:
if __name__ == '__main__':
# A simple generator/coroutine function
def foo():
print "Part 1"
yield
print "Part 2"
yield
t1 = Task(foo())
print "Running foo()"
t1.run()
print "Resuming foo()"
t1.run()
复制代码
在foo中,yield就像中断(Traps)同样,每次执行run(),任务就会执行到下一个yield(一个中断)。
下面是调度者类,两个属性分别是Task队列和task_id与Task类对应的map。schedule()向队列里面添加Task。new()用来初始化目标函数(协程函数),将目标函数包装在Task,进而装入Scheduler。最后mainloop会从队列里面拉出task而后执行到task的target函数的yield为止,执行完之后再把task放回队列。这样下一次会从下一个yield开始执行。 pyos2.py
from Queue import Queue
class Scheduler(object):
def __init__(self):
self.ready = Queue()
self.taskmap = {}
def new(self,target):
newtask = Task(target)
self.taskmap[newtask.tid] = newtask
self.schedule(newtask)
return newtask.tid
def schedule(self,task):
self.ready.put(task)
def mainloop(self):
while self.taskmap:
task = self.ready.get()
result = task.run()
self.schedule(task)
复制代码
下面是一个执行的例子:
# === Example ===
if __name__ == '__main__':
# Two tasks
def foo():
while True:
print "I'm foo"
yield
print "I am foo 2"
yield
def bar():
while True:
print "I'm bar"
yield
print "i am bar 2"
yield
# Run them
sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()
复制代码
执行结果,能够发现两个task之间任务是交替的,而且以yield做为中断点。每当执行撞到yield(中断点)以后,Scheduler对Tasks作从新的规划。下图是两个循环。 上述执行的结果:
若是,target函数里面不是死循环,那么上面的代码就会出错。因此咱们对Scheduler作改进。添加一个从任务队列中删除的操做,和对于StopIteration的验证。 【对scheduler作改进的缘由是任务的性质:能够被安排规划/挂起/恢复。】
class Scheduler(object):
def __init__(self):
...
def new(self,target):
...
def schedule(self,task):
...
def exit(self,task):
print "Task %d terminated" % task.tid
del self.taskmap[task.tid]
def mainloop(self):
while self.taskmap:
task = self.ready.get()
try:
result = task.run()
except StopIteration:
self.exit(task)
continue
self.schedule(task)
复制代码
在OS中,中断是应用程序请求系统服务的方式。在咱们的代码中,OS是调度者(scheduler),而中断是yield。为了请求调度者服务,任务须要带值使用yield声明。 pyos4.py
class Scheduler(object):
...
def mainloop(self):
while self.taskmap: # 1
task = self.ready.get()
try: # 2
result = task.run()
if isinstance(result, SystemCall):
result.task = task
result.sched = self
result.handle()
continue
except StopIteration:
self.exit(task)
continue
self.schedule(task) # 3
class SystemCall(object): # 4
def handle(self):
pass
复制代码
代码解析: 1. 若是taskmap里面存在task,就从ready队列里面拿任务出来,若是没有就结束mainloop。 2. 【就是传说中的系统调运部分】ready队列里面的task被拿出来之后,执行task,返回一个result对象,并初始化这个result对象。若是队列里面的task要中止迭代了(终止yield这个过程)就从队列里删除这个任务。 3. 最后再经过schedule函数把执行后的task放回队列里面。 4. 系统调用基类,以后全部的系统调用都要从这个基类继承。
这个系统调用想返回任务的id。 Task的sendval属性就像一个系统调用的返回值。当task从新运行的是后,sendval将会传入这个系统调用。 pyos4.py
...
class GetTid(SystemCall):
def handle(self):
# 把task的id传给task的返回参数:
self.task.sendval = self.task.tid
# 再把task给放入Scheduler的队列里面
self.sched.schedule(self.task)
class Task(object):
...
# Run a task until it hits the next yield statement
def run(self):
return self.target.send(self.sendval)
复制代码
进行最后的调用:
if __name__ == '__main__':
def foo():
mytid = yield GetTid()
for i in xrange(5):
print "I'm foo", mytid
yield
def bar():
mytid = yield GetTid()
for i in xrange(10):
print "I'm bar", mytid
yield
sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()
复制代码
理解这段代码的前提:(很是重要) 1. send()函数有返回值的,返回值是yield表达式右边的值。在本段代码中,result的返回值是yield GetTid()的GetTid的实例或者是yield后面的None。 2. 执行send(sendval)之后,sendval被传入了yield表达式。并赋给了mytid,返回GetTid()给ruselt。
执行顺序: 先建立一个调度者(Scheduler),而后在调度者里面添加两个协程函数:foo(), bar(),最后触发mainloop进行协程的调度执行。
系统调用原理: 系统调用是基于系统调用类实现的,如GetTid类,其目的是传出本身的tid。传出本身的tid以后,再将task放回队列。
上面咱们搞定了一个GetTid系统调用。咱们如今搞定更多的系统调用: * 建立一个新的任务。 * 杀掉一个已经存在的任务。 * 等待一个任务结束。 这些细小的相同的操做会与线程,进程配合。
1. *建立一个新的系统调用*:经过系统调用加入一个task。
复制代码
# Create a new task
class NewTask(SystemCall):
def __init__(self,target):
self.target = target
def handle(self):
tid = self.sched.new(self.target)
self.task.sendval = tid
self.sched.schedule(self.task)
复制代码
2. *杀掉一个系统调用*:经过系统调用杀掉一个task。
复制代码
class KillTask(SystemCall):
def __init__(self, tid):
self.tid = tid
def handle(self):
task = self.sched.taskmap.get(self.tid, None)
if task:
task.target.close()
self.task.sendval = True
else:
self.task.sendval = False
self.sched.schedule(self.task)
复制代码
3. 进程等待:须要大幅度改进Scheduler。
复制代码
class Scheduler(object):
def __init__(self):
...
# Tasks waiting for other tasks to exit
self.exit_waiting = {}
def new(self, target):
...
def exit(self, task):
print "Task %d terminated" % task.tid
del self.taskmap[task.tid]
# Notify other tasks waiting for exit
for task in self.exit_waiting.pop(task.tid, []):
self.schedule(task)
def waitforexit(self, task, waittid):
if waittid in self.taskmap:
self.exit_waiting.setdefault(waittid, []).append(task)
return True
else:
return False
def schedule(self, task):
...
def mainloop(self):
...
复制代码
exit_waiting 是用来暂时存放要退出task的地方。
class WaitTask(SystemCall):
def __init__(self, tid):
self.tid = tid
def handle(self):
result = self.sched.waitforexit(self.task, self.tid)
self.task.sendval = result
# If waiting for a non-existent task,
# return immediately without waiting
if not result:
self.sched.schedule(self.task)
复制代码
设计讨论: * 在任务中引用另外一个任务的惟一办法 是 使用scheduler分配给它的任务ID。 * 上述准则是一个安全的封装策略。 * 这个准则让任务保持独立,不与内核混淆在一块儿。 * 这个准则能让全部的任务都被scheduler管理的好好的。
如今已经完成了: * 多任务。 * 开启新的进程。 * 进行新任务的管理。 这些特色都很是符合一个web服务器的各类特色。下面作一个Echo Server的尝试。
from pyos6 import *
from socket import *
def handle_client(client, addr):
print "Connection from", addr
while True:
data = client.recv(65536)
if not data:
break
client.send(data)
client.close()
print "Client closed"
yield # Make the function a generator/coroutine
def server(port):
print "Server starting"
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(("", port))
sock.listen(5)
while True:
client, addr = sock.accept()
yield NewTask(handle_client(client, addr))
def alive():
while True:
print "I'm alive!"
yield
sched = Scheduler()
sched.new(alive())
sched.new(server(45000))
sched.mainloop()
复制代码
但问题是这个网络服务器是I / O阻塞的。整个python的解释器须要挂起,一直到I/O操做结束。
先额外介绍一个叫Select的模块。select模块能够用来监视一组socket连接的活跃状态。用法以下:
reading = [] # List of sockets waiting for read
writing = [] # List of sockets waiting for write
# Poll for I/O activity
r,w,e = select.select(reading,writing,[],timeout)
# r is list of sockets with incoming data
# w is list of sockets ready to accept outgoing data
# e is list of sockets with an error state
复制代码
下面实现一个非阻塞I/O的网络服务器,所用的思想就是以前所实现的Task waiting 思想。
class Scheduler(object):
def __init__(self):
...
# I/O waiting
self.read_waiting = {}
self.write_waiting = {}
...
# I/O waiting
def waitforread(self, task, fd):
self.read_waiting[fd] = task
def waitforwrite(self, task, fd):
self.write_waiting[fd] = task
def iopoll(self, timeout):
if self.read_waiting or self.write_waiting:
r, w, e = select.select(self.read_waiting,
self.write_waiting,
[], timeout)
for fd in r:
self.schedule(self.read_waiting.pop(fd))
for fd in w:
self.schedule(self.write_waiting.pop(fd))
复制代码
源码解析:__init__里面的是两个字典。用来存储阻塞的IO的任务。waitforread()和waitforwrite()将须要等待写入和等待读取的task放在dict里面。这里的iopoll():使用select()去决定使用哪一个文件描述器,而且可以不阻塞任意一个和I/O才作有关系的任务。poll这个东西也能够放在mainloop里面,可是这样会带来线性的开销增加。 详情请见:Python Select 解析 - 金角大王 - 博客园
添加新的系统调用:
# Wait for a task to exit
class WaitTask(SystemCall):
def __init__(self, tid):
self.tid = tid
def handle(self):
result = self.sched.waitforexit(self.task, self.tid)
self.task.sendval = result
# If waiting for a non-existent task,
# return immediately without waiting
if not result:
self.sched.schedule(self.task)
# Wait for reading
class ReadWait(SystemCall):
def __init__(self, f):
self.f = f
def handle(self):
fd = self.f.fileno()
self.sched.waitforread(self.task, fd)
# Wait for writing
class WriteWait(SystemCall):
def __init__(self, f):
self.f = f
def handle(self):
fd = self.f.fileno()
self.sched.waitforwrite(self.task, fd)
复制代码
更多请见开头那个链接里面的代码:pyos8.py
这样咱们就完成了一个多任务处理的OS。这个OS能够并发执行,能够建立、销毁、等待任务。任务能够进行I/O操做。而且最后咱们实现了并发服务器。
先来看一段示例代码:
def Accept(sock):
yield ReadWait(sock)
return sock.accept()
def server(port):
while True:
client,addr = Accept(sock)
yield NewTask(handle_client(client,addr))
复制代码
这种状况下,server()函数里面的有调用Accept(),可是accept函数里面的yield不起做用。这是由于yield只能在函数栈的最顶层挂起一个协程。你也不可以把yield写进库函数里面。 【这个限制是Stackless Python要解决的问题之一。
解决这个只能在函数栈顶挂起协程的解决方法。 * 有且只有一种方法,可以建立可挂起的子协程和函数。 * 可是,建立可挂起的子协程和函数须要经过咱们以前所说的Task Scheduler自己。 * 咱们必须严格遵照yield声明。 * 咱们须要使用一种 -奇淫巧技- 叫作Trampolining(蹦床)。
代码:trampoline.py
def add(x, y):
yield x + y
# A function that calls a subroutine
def main():
r = yield add(2, 2)
print r
yield
def run():
m = main()
# An example of a "trampoline"
sub = m.send(None)
result = sub.send(None)
m.send(result)
# execute:
run()
复制代码
整个控制流以下:
咱们看到,上图中左侧为统一的scheduler,若是咱们想调用一个子线程,咱们都用经过上面的scheduler进行调度。
控制过程: scheduler -> subroutine_1 -> scheduler -> subroutine_2 -> scheduler -> subroutine_1 就像蹦床(trampolining)同样,全部的子进程调度都要先返回scheduler,再进行下一步。【有点像汽车换挡。
而不是: -scheduler -> subroutine_1 -> subroutine_2 -> subroutine_1- 这种直接栈式的子协程调度是不被容许的。
有不少更加深远的话题值得咱们去讨论。其实在上面的套路里面都说了一些。 * 在task之间的通讯。 * 处理阻塞的一些操做:好比和数据库的一些连接。 * 多进程的协程和多线程的协程。 * 异常处理。
Python 的生成器比不少人想象的有用的多。生成器能够:
* 定制可迭代对象。
* 处理程序管道和数据流。【第二部分提到】
* 事物处理。【第三部分提到的和SAX结合的事务处理】
* 合做的多任务处理【第四部分提到的Task,子进程子线程合做】
复制代码
* 迭代器:要产生数据。
* 接受数据/消息:消费数据。
* 一个中断:在合做性的多任务里面。
复制代码
千万不要一个函数里面包含两个或多个以上的功能,好比函数是generator就是generator,是一个coroutine就是一个coroutin。
感谢你们阅读。我是LumiaXu。一名电子科大正在找实习的pythoner~。
更多访问原做者的网站: http://www.dabeaz.com
#python/coroutine#