from tornado.concurrent import Future def async_call_method(fun, *args, **kwargs): future = Future() // 定义一个闭包 finish def finish(): try: result = fun(*args, **kwargs) if future._callbacks: IOLoop.current().add_callback(future.set_result, result) else: future.set_result(result) except: if future._callbacks: IOLoop.current().add_callback(future.set_exc_info, sys.exc_info()) else: future.set_exc_info(sys.exc_info()) child_gr = greenlet.greenlet(finish) child_gr.switch() return future
Future 是一种用于并发编程的模式,首次引入是在 python 3.2 的 concurrent.futures 模块。python
Future 对象是一个对于异步返回结果的占位符。mysql
一个 Future 对象包含了一次异步操做的结果。在同步编程中,Futures 被用于等待从一个线程池或进程池里返回的结果;在 tornado 中,future 一般被用在 IOLoop.add_future 或者在一个 gen.coroutine 函数中 yielding 它们。ios
tornado.concurrent.Future 和 concurrent.futures.Future 类似,可是其不是线程安全的(所以,在单线程事件循环应用在速度更快)git
通过一番搜索,查询到 async_call_method()
这个函数来自于 github.com/snower/TorMySQL.github
通过对该项目代码的仔细阅读,我发现了它是如何实现了 mysql 的异步操做。sql
... def connect(self): # 入口函数 # 设置 future 占位符 future = Future() # 定义回调函数 def on_connected(connection_future): if connection_future._exc_info is None: future.set_result(self) else: future.set_exc_info(connection_future.exc_info()) self._connection = Connection(defer_connect = True, *self._args, **self._kwargs) self._connection.set_close_callback(self.connection_close_callback) # 用 greenlet 包装 self._connection.connect 并返回 future # 要使 async_call_method 包装后的函数有非阻塞的特性,必须达成如下要求 # 1. 函数能够访问 父greenlet # 2. 函数中全部 IO 操做均支持非阻塞(好比: 非阻塞由 socket 的 non-blocking 特性支持) # 3. 函数中执行 IO 操做后当即将运行权交还给主函数(父greenlet, 如:ioloop 时间循环)(greenlet.switch) # 4. 函数中全部 IO 操做均返回 Future # 5. Future.callback 运行后当即将运行权(greenlet.switch)返回给当前函数(greenlet.current),完成当前函数的剩余部分 connection_future = async_call_method(self._connection.connect) # 当 connection_future 状态为 finished, 调用 on_connected() # finished => 调用 connection_future.set_result() IOLoop.current().add_future(connection_future, on_connected) return future ...
... # IOStream 基于 tornado.iostream.IOStream sock = IOStream(sock) sock.set_close_callback(self.stream_close_callback) # getcurrent() 返回包装了当前函数的 greenlet child_gr = greenlet.getcurrent() # main 是指 父greenlet(主函数, 时间循环?) main = child_gr.parent assert main is not None, "Execut must be running in child greenlet" ... def connected(future): if self._loop_connect_timeout: self._loop.remove_timeout(self._loop_connect_timeout) self._loop_connect_timeout = None if future._exc_info is not None: child_gr.throw(future.exception()) else: self._sock = sock # 将运行权交还给当前 greenlet child_gr.switch() # IOStream.connect 是 no-blocking 的 socket 操做 future = sock.connect(address) # 给 sock.connect 操做添加回调函数 self._loop.add_future(future, connected) # 而后把运行权交还给 父greenlet # 直到链接成功,connected() 中会将运行权交还给 当前greenlet main.switch() ...
要使 async_call_method 包装后的函数有非阻塞的特性,必须达成如下要求编程
函数能够访问 父greenlet安全
函数中全部 IO 操做均支持非阻塞(好比: 非阻塞由 socket 的 non-blocking 特性支持)闭包
函数中执行 IO 操做后当即将运行权交还给主函数(父greenlet, 如:ioloop 时间循环)(greenlet.switch)并发
函数中全部 IO 操做均返回 Future
Future.callback 运行后当即将运行权(greenlet.switch)返回给当前函数(greenlet.current),完成当前函数的剩余部分
async_call_method 包装后的函数要实现非阻塞,最终仍是依赖于 socket 的非阻塞
=> socket.setblocking(False)
。
github.com/snower/TorMySQL
中于 mysql
的交互所有经过 IOStream()
的如下方法实现:
* def _handle_events(self, fd, events): # ioloop 在事件发生时调用 _handle_events * def _handle_connect(self): * def _handle_read(self): # 当事件为读取事件时,读取数据到 buffer, 而后 future.set_result(data) * def _handle_write(self): # 当事件为写事件时,读取数据到 buffer, 而后 future.set_result(data) * def read(self, num_bytes): * def write(self, data):
经过对上述方法进行 设置 future 占位符
,并基于 non-blocking socket 实现上述方法的非阻塞。