#Coroutineshtml
做者:MetalBug 时间:2015-03-17 出处:http://my.oschina.net/u/247728/blog 声明:版权全部,侵犯必究
tornado.gen
— Simplify asynchronous code tornado.concurrent
— Work with threads and futures安全
##1.Future Future
用于保存异步任务的结果。 在同步应用中,一般用Future
存储来自其余线程异步任务的结果。 可是在Tornado中,Future
并非线程安全的,这是处于效率的考虑,由于Tornado模型的使用的是 one loop per thread
。数据结构
###1.1内部实现-实现细节 在Future
中,使用了_TracebackLogger
用于记录exception。 在设置了exception(即异步任务抛异常),添加_TracebackLogger
。 当调用了Funture.result()
,Future.exception()
时会将_TracebackLogger
清除。由于调用者已经知道了出现异常。 只有在没有调用以上函数时,Future
被析构时_TracebackLogger
会将错误信息记录。这样防止异步任务中出现异常却由于没被调用致使异常不被意识到。app
Future.set_exec_info
中,添加_TracebackLogger
异步
def set_exc_info(self, exc_info): ## if not _GC_CYCLE_FINALIZERS: self._tb_logger = _TracebackLogger(exc_info)
在__del__
中,记录exceptionasync
if _GC_CYCLE_FINALIZERS: def __del__(self): if not self._log_traceback: # set_exception() was not called, or result() or exception() # has consumed the exception return tb = traceback.format_exception(*self._exc_info) app_log.error('Future %r exception was never retrieved: %s', self, ''.join(tb).rstrip())
##2.coroutine coroutine
是一个修饰符,使用coroutine
,能够将异步的操做以一个Generator
实现,而不用写一系列回调函数。函数
示例:tornado
一般状况下异步操做须要写成回调函数:oop
class AsyncHandler(RequestHandler): @asynchronous def get(self): http_client = AsyncHTTPClient() http_client.fetch("http://example.com", callback=self.on_fetch) def on_fetch(self, response): do_something_with_response(response) self.render("template.html")
使用coroutine
,能够写成一个Generator
形式,而不用写成多个回调。fetch
class GenAsyncHandler(RequestHandler): @gen.coroutine def get(self): http_client = AsyncHTTPClient() response = yield http_client.fetch("http://example.com") do_something_with_response(response) self.render("template.html")
在用coroutine
修饰的Generator
中,在遇到异步函数时,使用yield
,这里须要注意的是在Tornado中,异步函数返回的大部分是Future
(还能够是dict
,list
可用convert_yield
转换),这样yield
返回的的Future.result()
。
coroutine
内部经过_make_coroutine_wrapper
和Runner
实现。 流程图以下:
###1._make_coroutine_wrapper
####内部实现-主要函数 _make_coroutine_wrapper
能够认为是一个Inline的Runner
。 由于不能保证每一个被修饰的func都是Generator
,都会yield,因此将获取第一个yield
分离开来减小了初始化Runner
的开销。
其主要作了如下工做:
调用func,获得result
若是result是Generator
,调用Generator.next
获得第一个yield
,调用Runner
处理以后工做
def _make_coroutine_wrapper(func, replace_callback): @functools.wraps(func) def wrapper(*args, **kwargs): future = TracebackFuture() ### try: result = func(*args, **kwargs) ### else: if isinstance(result, types.GeneratorType): try: ##用于检查栈的一致性,由于yield可以在StackContext中返回 orig_stack_contexts = stack_context._state.contexts yielded = next(result) ### ##generator已经执行完毕,或者经过yield返回一个Return(Exception) except (StopIteration, Return) as e: future.set_result(getattr(e, 'value', None)) except Exception: future.set_exc_info(sys.exc_info()) ### else: ##调用Runner处理后续工做 Runner(result, future, yielded) try: return future finally: future = None future.set_result(result) return future return wrapper
####内部实现-实现细节
_make_coroutine_wrapper
在返回Future
时,使用了try...finally
。 这里对内存进行了优化,若是在next(result)
时抛出异常,那么Future.set_exc_info
会被调用,这时候_TracebackLogger
记录当前栈内容(使用traceback),也包含了future
自己所在栈,这样出现了循环引用,将函数内即本地的future
置为None
,可以避免循环,从而提升GC回收效率。
# Subtle memory optimization: if next() raised an exception, # the future's exc_info contains a traceback which # includes this stack frame. This creates a cycle, # which will be collected at the next full GC but has # been shown to greatly increase memory usage of # benchmarks (relative to the refcount-based scheme # used in the absence of cycles). We can avoid the # cycle by clearing the local variable after we return it. try: return future finally: future = None
###2.Runner
Runner
处理Generator
,将执行结果以Future
返回。
其工做流程以下:
yield
返回的是Future
,得Runner
到Future
的result,调用Generator.send
将result返回到Generator
中Generator.next
获得下一个yield
,回到1####内部实现-数据结构
self.gen
处理的Generator
self.result_future
用于存储result的Future
self.future
当前yield的异步函数的Future
####内部实现-主要函数
在初始化中,能够看到主要涉及到的是handle_yield
和run
函数。
def __init__(self, gen, result_future, first_yielded): ### if self.handle_yield(first_yielded): self.run()
handle_yield 处理当前yield的异步函数。 若是异步函数已经完成(即self.future.done()
),那么返回Ture
。 不然利用IOLoop.add_future
将self.run
注册到IOLoop
中,因此当self.future
完成时,调用self.run
。
if not self.future.done() or self.future is moment: self.io_loop.add_future( self.future, lambda f: self.run()) return False return True
run 开始和重启Generator
,持续执行到下一个yield
。当Generator
已经完成,返回设置好结果的Future
。
def run(self): ### try: self.running = True while True: future = self.future ### try: ### try: value = future.result() ### else: ###将self.future.result()发送到generator中,重启generator yielded = self.gen.send(value) ### ###generator结束,设置self.result_future而后返回 except (StopIteration, Return) as e: self.finished = True self.future = _null_future if self.pending_callbacks and not self.had_exception: raise LeakedCallbackError( "finished without waiting for callbacks %r" % self.pending_callbacks) self.result_future.set_result(getattr(e, 'value', None)) self.result_future = None self._deactivate_stack_context() return ### if not self.handle_yield(yielded): return finally: self.running = False