Tornado源码分析系列之一: 化异步为'同步'的Future和gen.coroutine

转自:http://blog.nathon.wang/2015/06/24/tornado-source-insight-01-gen/html

用Tornado也有一段时间,Tornado的文档仍是比较匮乏的,可是幸亏其代码短小精悍,颇有可读性,遇到问题时老是习惯深刻到其源码中。
这对于提高本身的Python水平和对于网络及HTTP的协议的理解也颇有帮助。本文是Tornado源码系列的第一篇文章,网上关于Tornado源码分
析的文章也很多,大可能是从Event loop入手,分析Event loop的工做原理,以及在其上如何构建TCPServer和HTTPServer。因此我就不想拾前
人的牙慧再去写一遍,固然这些内容我后续会涉及到,可是作为开篇第一章,我想从更加独特的角度来分析Tornado,这里就说说Tornado的gen
和concurrent两个模块, 这个话题网上彷佛还很少,呵呵。node

设计从需求出发,要考证一段的代码为何写成这样而不是那样, 首先要看代码解决了什么需求。 看下代码中的例子先:网络

1
2
3
4
5
6
7
8
9
class AsyncHandler(RequestHandler):
@asynchronous
def get(self):
http_client = AsyncHTTPClient()
http_client.fetch('http://example.com', callback=self.on_fetch)

def on_fetch(self, response):
do_something_with_response(response)
self.render('template.html')

通过gen.coroutine修饰以后上面的这段代码能够改成app

1
2
3
4
5
6
7
class GenAsyncHandler(RequestHandler):
@gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch('http://example.com')
do_something_with_response(response)
self.render('template.html')

初识这段代码以为好神奇,其实gen.coroutine只不过是将一个基于callback的典型的异步调用适配成基于yield的伪同步,说是伪同步是由于代码流程上类
似同步,可是实际倒是异步的。这样作有几个好处:
1。控制流跟同步相似,咱们知道callback里去作控制流仍是比较恶心的,就算nodejs里的async这样的模块,可是分支多起来也很是很差写。(爽)
2。能够共享变量,没有了callback,全部的本地变量在同一个做用域中。 (爽爽)
3。能够并行执行,yield能够抛出list或dict,并行执行其中的异步流程。(爽爽爽。。。此处省略一万个爽)异步

神奇的gen.coroutine装饰器是怎么作到这一切的?让我首先买个关子,不是进入到gen里面分析coroutine和Runner这两核心的方法(类),而是首先分析一些这
些方法(类)中用到的一些技术, 而后再回到coroutine装饰器和Runner类中。async

首先要理解的是generator是如何经过yield与外界进行通讯的。函数

1
2
3
4
5
6
7
8
9
def test():
print ('step 1.......')
res = yield 10
print ('step 2.......', res) (3)

gen = test()
gen.send(None) #next(gen) (1)
data = gen.send(20) (2)
print ('yield out .....', data)

步骤1启动了generator,步骤2向generator内部发送数据,并经过yield向generator外部抛出结果10, 最后的执行结果是tornado

1
2
3
step 1.......
step 2....... 20
yield out ..... 10

而后让我再说说Future,Future是对异步调用结果的封装。一个callback型的异步调用的执行结果不只包括调用的返回,还包括调用得到返回以后须要执行的回调,因此才须要将
异步调用的结果封装一下,做为一个异步调用执行结果的占位符。Future类基本能够这么写oop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Future(object):
def __init__(self):
self._callback = []
self._result = None
self._done = False

def set_callback(self, cb):
self._callback.append(cb)

def _run_callback(self):
for cb in self._callback:
cb()

def set_result(self, result)
self._done = True
self._result = result
self._run_callback()

def is_ready(self):
return self._done is True

固然这只是个简约版的,详细能够参看concurrent.Future。fetch

最后再来讲说另外一个重要的函数Task, 这个函数的主要做用是将一个callback型的异步调用适配成一个返回Future的异步调用,而这个做为异步调用结果的Future会在原来的那个callback被时解析出来

1
2
3
4
5
6
7
8
9
def Task(func, *args, **kwargs):
future = Future
def set_result(result):
if future.done():
return
future.set_result(result)

func(*args, callback=_argument_adapter(set_result), **kwargs)
return future

这里忽略了一些与本文无关的部分。能够看到Task里面构造了一个callback,_argument_adapter是将callback的参数进行适配,将不定参数适配成一个参数也就是result, 最后经过
future.set_result(result)将result赋值给future,这样future就被解析出来。 那么问题来了,AsyncHTTPClient并无通过Task的适配,而是直接返回一个Future。这个Future是在
何时解析的呢?进httpclient.py来看下AsyncHTTPClient是如何解析Future的,这是httpclient.py中的fetch函数,也就是咱们实际发起http请求的那个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def fetch(self, request, callback=None, raise_error=True, **kwargs):
.....
future = TracebackFuture()
if callback is not None:
callback = stack_context.wrap(callback)

def handle_future(future):
exc = future.exception()
if isinstance(exc, HTTPError) and exc.response is not None:
response = exc.response
elif exc is not None:
response = HTTPResponse(
request, 599, error=exc,
request_time=time.time()-request.start_time
)
else:
response = future.result()
self.io_loop.add_callback(callback, response)
future.add_done_callback(handle_future)

def handle_response(response):
if raise_error and response.error:
future.set_exception(response.error)
else:
future.set_result(response)
self.fetch_impl(request, handle_response)
return future

fetch中定义一个表明fetch异步调用执行结果的future,若是调用时传入了callback,并非直接将callback传给fetch_impl,而是首先给future设置一个名为handle_future解析完成后的回调,这个handle_future
中经过add_callback把实际传进来的callback加入到IOLoop中让IOLoop规划其调用。而传入到fetch_impl中的callback 则换成被了handle_response这个函数,
fetch_impl最后会在当收到response的时候调用handle_response回调(这个有兴趣能够看下,若是之后有写httpserver相关的分析可能会再分析), handle_response会解析出表明执行结果的future。对没有设置callback的调用,future解析结束整个流程也就结束了。而对于设置了callback的调用,future完成以后会调用handle_future 。
画个简图来描述一下调用过程
fetch->fetch_impl->HTTP请求直到有response或出错,若是有response回调handle_response->future.set_result(response)(future有值了)->若是fetch带了callback则handle_future->ioloop中调用callback
至此能够看到AsyncHTTPClient是如何把一个callback型的异步调用转换成一个返回future的异步调用,而这个future会在handle_response调用时被解析获得返回的response。

好了,差很少该深刻gen.coroutine这个装饰器以及其最终实现Runner类。其实看完上面的内容gen.coroutine和Runner的做用也呼之欲出,其主要功能就是拿到yield出的异步调用返回的future,看这个
future是否已经完成,若是完成就把结果再send到generator中,若是没有完成就要为future设置一个完成时回调,这个回调的主要做用就是启动Runner(也就是调用run方法)。至于future啥时候完成,这个
gen.coroutine和Runner可无论,你必须设计一个AsyncHTTPClient中fetch那样的返回Future的异步调用或者用Task封装一下你的带有callback的异步调用。下面是节选gen.coroutine装饰器中主要方法
_make_coroutine_wrapper的代码的主要部分

1
2
3
4
5
6
7
8
9

try:
yielded = next(result)
except (StopIteration, Return) as e:
future.set_result(getattr(e, 'value', None))
except Exception:
future.set_exc_info(sys.exc_info())
else:
Runner(result, future, yielded)

result就是被装饰的函数返回的generator,next启动这个generator, 若是generator抛出StopIteration和Return两个异常,表示generator已经解析出结果,将这个结果设置给最后coroutine返回的
future。若是有其余异常表示generator执行过程当中发生了异常,将异常设置到future中。排除这两种状况,表示generator尚未执行完毕,调用Runner执行generator。Runner的参数result就是还没
有运行完毕generator, future是表明coroutine执行结果的那个future, 而yielded是func返回的future(或者YieldPoint,我们只考虑future的状况)。再深刻到Runner中,主要有两个函数handle_yield
和run,handle_yield主要是肯定generator返回的yielded是不是一个执行完成的yielded(对于yielded是future的状况来讲就是future.is_ready() == True),若是没有执行完成则须要设置future完成时
执行run方法,也就是future.add_done_callback(future, lambda f:self.run())并返回False也就是不执行立刻run, 不然返回True并当即执行run方法,由于这时候已经有异步调用的结果了。
run方法拿到yielded的执行结果,并传入到generator中。这样generator内部就能经过yield拿到异步调用的执行结果了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
 def handle_yield(self, yielded):
#处理YieldPoint忽略掉,可是原理跟Future是同样的
try:
self.future = convert_yielded(yielded)
except BadYieldError:
self.future = TracebackFuture()
self.future.set_exc_info(sys.exc_info())

if not self.future.done() or self.future is moment:
self.io_loop.add_future(
self.future, lambda f: self.run())
return False
return True

def run(self):
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done(): #执行run时generator返回的那个future必须已经有结果,不然就不必传回到generator中了
return
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
exc_info = None

try:
value = future.result()
except Exception:
self.had_exception = True
exc_info = sys.exc_info()

if exc_info is not None:
yielded = self.gen.throw(*exc_info)
exc_info = None
else:
yielded = self.gen.send(value)

if stack_context._state.contexts is not orig_stack_contexts:
self.gen.throw(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
#generator执行完毕并成功的处理
except Exception:
#generator执行过程当中异常的处理
if not self.handle_yield(yielded):
#这里generator尚未执行完毕,yielded是generator迭代过一次以后返回的新yielded。若是yieled尚未被解析出结果就经过handle_yield给yieled设置完成时的重启run的回调,
#不然yielded已经有结果,就再次运行run,因此run中才会有一个循环
return
finally:
self.running = False

分析完毕,没看懂的同窗能够在读两遍代码,主要仍是要抓住coroutine装饰器只不过是将callback型调用转换成generator型伪同步调用的一个适配器这个关键点,阅读起代码来就明白多了。期待下篇吧,准备写stack_context异步调用中的异常捕获问题

相关文章
相关标签/搜索