tornado 源码自带了丰富的 demo ,这篇文章主要分析 demo 中的聊天室应用: chatdemo css
首先看 chatdemo 的目录结构:html
├── chatdemo.py ├── static │ ├── chat.css │ └── chat.js └── templates ├── index.html ├── message.html └── room.html
很是简单,基本没有分层,三个模版一个 js 一个 css ,还有一个最重要的 chatdemo.pypython
本文的重点是弄清楚 chatdemo.py 的运行流程,因此对于此项目的其余文件,包括模版及 chat.js 的实现都不会分析,只要知道 chat.js 的工做流程相信对于理解 chatdemo.py 没有任何问题git
此 demo 主要基于长轮询。 获取新消息的原理:github
在 chat.js 中有一个定时器会定时执行 update 操做web
当没有新消息时 tornado 会一直 hold 住 chat.js 发来的 update 请求ajax
当有新消息时 tornado 将包含新消息的数据返回给全部 hold 的 update 请求shell
此时 chat.js 收到 update 回复后更新返回数据在聊天室中,同时再进行一次 update 请求, 而后又从 1. 开始执行。json
发送新消息的原理:缓存
输入消息, 点击 post 按钮, chat.js 获取表单后用 ajax 方式发送请求 new
tornado 收到请求 new ,返回消息自己, 同时通知全部 hold 住的 update 请求 ( 这里也包括发送 new 请求的 chat.js 所发送的 update 请求 ) 返回新消息
全部在线的 chat.js 收到 update 请求回复,更新返回信息到聊天室,同时再进行一次 update 请求。
清楚了以上流程,咱们直接来看 chatdemo.py :
def main(): parse_command_line() app = tornado.web.Application( [ (r"/", MainHandler), (r"/a/message/new", MessageNewHandler), (r"/a/message/updates", MessageUpdatesHandler), ], cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", template_path=os.path.join(os.path.dirname(__file__), "templates"), static_path=os.path.join(os.path.dirname(__file__), "static"), xsrf_cookies=True, debug=options.debug, ) app.listen(options.port) tornado.ioloop.IOLoop.current().start() if __name__ == "__main__": main()
main 函数主要用做初始化应用、监听端口以及启动 tornado server 。
咱们看路由:
主页对应 MainHandler
new 请求对应 MessageNewHandler
updates 请求对应 MessageUpdatesHandler
下面来看 MainHandler :
# Making this a non-singleton is left as an exercise for the reader. global_message_buffer = MessageBuffer() class MainHandler(tornado.web.RequestHandler): def get(self): self.render("index.html", messages=global_message_buffer.cache)
只有一行代码,就是渲染并返回 index.html,渲染的附加信息就是 global_message_buffer 的全部缓存消息。 global_message_buffer 是 MessageBuffer 的一个实例。 咱们先不关心 MessageBuffer 内部是什么,如今咱们只要记住它主要是用来储存聊天消息和链接到此聊天室的人的信息的类。 其中 MessageBuffer().cache 就是保存聊天室全部聊天消息的结构。
而后来看 MessageNewHandler :
class MessageNewHandler(tornado.web.RequestHandler): def post(self): message = { "id": str(uuid.uuid4()), "body": self.get_argument("body"), } # to_basestring is necessary for Python 3's json encoder, # which doesn't accept byte strings. message["html"] = tornado.escape.to_basestring( self.render_string("message.html", message=message)) if self.get_argument("next", None): self.redirect(self.get_argument("next")) else: self.write(message) global_message_buffer.new_messages([message])
一样很简单,从 post 信息里获取发来的新消息 ( body ) ,而后给消息分配一个惟一的 uuid,接着把这段消息渲染成一段 html ,而后 self.write(message)
返回这段 html, 同时给 global_message_buffer ( MessageBuffer 的实例 ) 添加这条新信息。 这里其实我更倾向于返回 json 之类的数据,这样更加直观和规范,可能写 demo 的人考虑到读者对 json 之类的协议可能不熟悉故而选择了返回渲染好的 html 直接让 chat.js append 到 index.html 里。
接着来看 MessageUpdatesHandler :
class MessageUpdatesHandler(tornado.web.RequestHandler): @gen.coroutine def post(self): cursor = self.get_argument("cursor", None) # Save the future returned by wait_for_messages so we can cancel # it in wait_for_messages self.future = global_message_buffer.wait_for_messages(cursor=cursor) messages = yield self.future if self.request.connection.stream.closed(): return self.write(dict(messages=messages)) def on_connection_close(self): global_message_buffer.cancel_wait(self.future)
重点就在这里,能够看到其内部的 post 方法被 gen.coroutine 修饰器修饰,也就是说这个 post 方法如今是 协程 ( coroutine ) 方式工做。 对于协程比较陌生的童鞋,你能够直接把它看成是单线程解决 io ( 网络请求 ) 密集运算被阻塞而致使低效率的解决方案。 固然这样理解协程还比较笼统,以后我会详细写一篇关于协程的文章,但在这里这样理解是没有问题的。
如今来看代码内容,首先获取 cursor ,一个用来标识咱们已经获取的消息的指针,这样 tornado 就不会把你已经获取的消息重复的发给你。 而后调用 global_message_buffer.wait_for_messages(cursor=cursor)
获取一个 future 对象。 future 对象是 tornado 实现的一个特殊的类的实例,它的做用就是包含以后 ( 将来 ) 将会返回的数据,咱们如今不用关心 Future() 内部如何实现,只要记住上面它的做用就行。 关于 Future 的解读我会放到阅读 Future 源码时讲。
而后看最关键的这句: messages = yield self.future
注意这里的 yield 就是 hold updates 请求的关键,它到这里至关于暂停了整个 post 函数 ( updates 请求被 hold )同时也至关于 updates 此次网络请求被阻塞,这个时候协程发挥做用,把这个函数暂停的地方的全部信息保存挂起,而后把工做线程释放,这样 tornado 能够继续接受 new、 updates 等请求而后运行相应的方法处理请求。
当有新的消息返回时,tornado 底层的 ioloop 实例将会调用 gen.send(value) 返回新消息( value )给每一个被暂停的方法的 yield 处, 此时协程依次恢复这些被暂停的方法, 同时用得到的返回消息继续执行方法, 这时 messages = yield self.future
继续执行,messages 得到 yield 的返回值 value ( python 中调用 gen.send(value) 将会把 value 值返回到 yield 处并替换原前 yield 后的值 ),而后判断下用户是否已经离开,若是还在线则返回新消息。
明白了以上流程,咱们最后来看 MessageBuffer:
class MessageBuffer(object): def __init__(self): self.waiters = set() self.cache = [] self.cache_size = 200 def wait_for_messages(self, cursor=None): # Construct a Future to return to our caller. This allows # wait_for_messages to be yielded from a coroutine even though # it is not a coroutine itself. We will set the result of the # Future when results are available. result_future = Future() if cursor: new_count = 0 for msg in reversed(self.cache): if msg["id"] == cursor: break new_count += 1 if new_count: result_future.set_result(self.cache[-new_count:]) return result_future self.waiters.add(result_future) return result_future def cancel_wait(self, future): self.waiters.remove(future) # Set an empty result to unblock any coroutines waiting. future.set_result([]) def new_messages(self, messages): logging.info("Sending new message to %r listeners", len(self.waiters)) for future in self.waiters: future.set_result(messages) self.waiters = set() self.cache.extend(messages) if len(self.cache) > self.cache_size: self.cache = self.cache[-self.cache_size:]
初始化方法中 self.waiters 就是一个等待新消息的 listener 集合 ( 直接理解成全部被 hold 住的 updates 请求队列可能更清晰 )
self.cache 就是储存全部聊天消息的列表,self.cache_size = 200
则定义了 cache 的大小 是存 200 条消息。
而后先来看简单的 new_messages:
遍历 waiters 列表,而后给全部的等待者返回新消息,同时清空等待者队列。 而后把消息加到缓存 cache 里,若是缓存大于限制则取最新的 200 条消息。这里只要注意到 future.set_result(messages)
就是用来给 future 对象添加返回数据 ( 以前被 yield 暂停的地方此时由于 set_result() 方法将会得到 "将来" 的数据 ) 这一点便可。
而后来看 wait_for_messages :
def wait_for_messages(self, cursor=None): # Construct a Future to return to our caller. This allows # wait_for_messages to be yielded from a coroutine even though # it is not a coroutine itself. We will set the result of the # Future when results are available. result_future = Future() if cursor: new_count = 0 for msg in reversed(self.cache): if msg["id"] == cursor: break new_count += 1 if new_count: result_future.set_result(self.cache[-new_count:]) return result_future self.waiters.add(result_future) return result_future
首先初始化一个 Future 对象,而后根据 cursor 判断哪些消息已经获取了哪些还没获取,若是缓存中有对于这个 waiter 还没获取过的消息,则直接调用 set_result() 返回这些缓存中已有的但对于这个 waiter 来讲是新的的数据。 若是这个 waiter 已经有缓存中的全部数据,那么就把它加到等待者队列里保持等待,直到有新消息来时调用 new_messages 再返回。
而最后一个 cancel_wait 就很简单了,当有用户退出聊天室时,直接从 self.waiters 中移除他所对应的等待者。
当明白了整个代码的运行流程后,咱们能够基于这个简单的 demo 而写出更加丰富的例子,好比加入 session ,作登录、作好友关系,作单聊作群聊等等。
chatdemo with room是我添加的一个简单功能,输入聊天室房号后再进行聊天,只有同一房间中的人才能收到彼此的消息。
以上就是鄙人对整个 chatdemo.py 的解读。 在阅读此 demo 时,我没有参考其余源码解读,只是经过阅读 tornado 底层的源码而得出的我的的理解,所以确定会有不少理解不成熟甚至错误的地方,还望你们多多指教。
做者:rapospectre