在tornado的框架中非堵塞通常指得是网络I/O层面的socket数据接收模式(select或者epoll),不论用哪一个模式,最终程序都会收到数据并处理数据(这个数据要么被转发、要么被解析和处理)。html
非堵塞的弊端: 若是处理一个密集计算的请求须要花费10秒钟(就是堵塞了10秒钟),当两个或多个请求同时到达时,只要第一个被接受处理没结束,其余所有请求都要等,而且挨个挨个等到被轮询结束。这就是单线程事件还回机制(非堵塞机制), 对堵塞零容忍, 任何一个地方堵住了还回线程,其余所有请求都被堵住。python
也就是说采用了非堵塞模式以后,最好不要用堵塞(常规解析数据的函数)的代码块来解析数据。mysql
异步的做用是将堵塞代码错开来,不放在当前接受数据的线程中处理,git
要么丢到rabbitmq/zeromq/activemq中交给另一个进程去处理,要么用其余线程或进程来处理。github
让监听数据的这个socket收到数据后直接抛给其余程序来处理,而后立马保持监听状态,这样子程序的循环能力就很是强。web
再就是要提的一点,tornado自己的ioloop就采用epool/select/kqueue来完成非堵塞动做,我们使用tornado只要把异步的代码写好就能够很好的发挥出tornado的优点了。sql
传统的I/O(socket)堵塞编程模式流程:数据库
while True:编程
1. socket accept (等待)windows
2. socket receive (接受数据)
3. handle data (处理数据)
4. socket send (返回结果)
while True:
1. events = epoll poll (主动拉取列表)
2. for file_descriptor, event in events: (查找是否有新的请求)
3. async handle data
3.1 标注状态(running)
3.2 异步丢给其余函数经过线程的方式执行.
3.3 线程执行完毕后修改状态为(finish), 而且经过回掉的方式注册进 ioloop中(ioloop.add_done_callback或者ioloop.add_future)
4. socket send (返回结果)
1. python >= 2.7 < 3.x
2. pip install requests tornado futures
centos 7 | blockingServer.py | 192.168.1.100 | 构建用于测试的堵塞环节 |
windows 8 | blockingClient.py nbAsync.py nbFuture.py nbCoroutine.py nbGenTask.py |
192.168.1.101 | 验证经常使用异步非堵塞写法 |
centos 7 | siege、ab | 192.168.1.102 | 并发环境 |
在192.168.1.100服务器上运行用于测试的堵塞服务器(其实是非堵塞模式,只不过是每一个链接都要等待5秒钟).
# 目的是提供一个堵塞的环境用来证实tornado结合经常使用的异步写法都是非堵塞高效模式.
python blockingServer.py
# -.- coding:utf-8 -.- import tornado.web import tornado.gen import tornado.ioloop import tornado.options import tornado.httpserver class BlockingHandler(tornado.web.RequestHandler): @tornado.gen.coroutine def get(self, *args, **kwargs): # 若是这条命令没看懂的话,请参考这个连接: http://www.tornadoweb.org/en/stable/faq.html yield tornado.gen.sleep(5) self.write('ok') class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockingHandler), ] super(Application, self).__init__(handlers) if __name__ == "__main__": tornado.options.define("port", default=88, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
# 文件名: blockingClient.py
# -.- coding:utf-8 -.- # __author__ = 'zhengtong' import tornado.ioloop import tornado.web import tornado.options import tornado.httpserver import requests class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockHandler), ('/non_blocking', NonBlockHandler), ] super(Application, self).__init__(handlers) class BlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): response = requests.get('http://192.168.1.100:88/blocking') # blocked here. result = dict(response.headers) result.update({'content': response.content}) self.write(result) class NonBlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.write('non_blocking') if __name__ == "__main__": tornado.options.define("port", default=80, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
1. 在192.168.1.102压力测试服务器上运行以下并发测试命令.
# 发起10个并发,持续60秒钟. [root@localhost ~]# siege http://192.168.1.101/blocking -c10 -t60s
2. 在 windows 8 (192.168.1.101)上用浏览器来访问以下连接.
http://192.168.1.101/non_blocking
siege:
** SIEGE 4.0.2 ** Preparing 10 concurrent users for battle. The server is now under siege... HTTP/1.1 200 5.07 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 10.15 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 15.23 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 20.31 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 25.38 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 30.47 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 35.55 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 40.63 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 45.71 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 45.53 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 45.51 secs: 212 bytes ==> GET /blocking Lifting the server siege... Transactions: 11 hits Availability: 100.00 % Elapsed time: 59.65 secs Data transferred: 0.00 MB Response time: 29.05 secs Transaction rate: 0.18 trans/sec Throughput: 0.00 MB/sec Concurrency: 5.36 Successful transactions: 11 Failed transactions: 0 Longest transaction: 45.71 Shortest transaction: 5.07
浏览器:
non_block也是等待状态,必需要等block执行完成后,才会执行non_block.
siege在60秒钟内,只获得了11个结果,证实堵塞很是严重, 而且浏览器也是出于一直等待的状态.
也就是说在tornado中若是写堵塞代码,只有单线程在运行的tornado,会死的很难看,刚接触tornado的同窗甚至都不知道为何会这样,根本没有像据说那样tornado是一个极其高效的web框架。
经过结果能够看出,不采用异步的方式就没法发挥出它的能力。
# 文件名: nbAysnc.py
# -.- coding:utf-8 -.- # __author__ = 'zhengtong' import tornado.ioloop import tornado.web import tornado.options import tornado.httpserver # import requests # 不用requests, 后面再讨论用requests也能异步非堵塞. import tornado.httpclient # 采用tornado自带的异步httpclient客户端 class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockHandler), ('/non_blocking', NonBlockHandler), ] super(Application, self).__init__(handlers) class BlockHandler(tornado.web.RequestHandler): @tornado.web.asynchronous def get(self, *args, **kwargs): client = tornado.httpclient.AsyncHTTPClient() client.fetch('http://192.168.1.100:88/blocking', callback=self.on_response) def on_response(self, content): result = dict(content.headers) result.update({'content': content.body}) self.write(result) self.finish() class NonBlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.write('non_blocking') if __name__ == "__main__": tornado.options.define("port", default=80, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
# 这里提供一个不采用任何装饰器的写法, 比较raw ioloop, 运行结果是一致的,效率也是一致的.
# 文件名: nbAsync_NoAsyncDecorator.py
# -.- coding:utf-8 -.- # __author__ = 'zhengtong' import tornado.ioloop import tornado.web import tornado.options import tornado.httpserver import tornado.concurrent # import requests # 不仅用requests import tornado.httpclient # 采用tornado自带的异步httpclient客户端 class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockHandler), ('/non_blocking', NonBlockHandler), ] super(Application, self).__init__(handlers) class BlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): # def get上方移除了tornado.web.asynchonous装饰器 self._auto_finish = False client = tornado.httpclient.AsyncHTTPClient() future = client.fetch('http://192.168.1.100:88/blocking') # 在这里添加callback也行 tornado.ioloop.IOLoop.current().add_future(future, callback=self.on_response) def on_response(self, content): result = dict(content.headers) result.update({'content': content.body}) self.write(result) self.finish() class NonBlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.write('non_blocking') if __name__ == "__main__": # 经过define 能够为options增长变量. tornado.options.define("port", default=80, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
参考<1. tornado + 非异步代码(堵塞的代码) >章节的测试方法.
siege:
Lifting the server siege... Transactions: 100 hits Availability: 100.00 % Elapsed time: 59.13 secs Data transferred: 0.02 MB Response time: 5.61 secs Transaction rate: 1.69 trans/sec Throughput: 0.00 MB/sec Concurrency: 9.48 Successful transactions: 100 Failed transactions: 0 Longest transaction: 10.62 Shortest transaction: 5.07
浏览器:
访问non_blocking页面正常并且响应很快。
siege一直在持续并发请求的同时用浏览器来访问non_blocking和blocking页面都可以获得响应,也就证实tornado已经开始发挥它的功效了。
采用了异步非堵塞模式后,被命中只有110次,落差很大,心理很是不平衡。其实这并非问题,这里面有多重限制因此才会致使这个结果。
1. AsyncHttpClient自己的限制(默认状况下只容许同时发起10个客户端). 详情请参考tornado源码的 simple_httpclient.py文件
2. ioloop自己的限制(为了保证线程的稳定性,默认只开启了10个线程来支持并发). 详情请参考tornado源码的 netutil.py文件
能够经过设定参数来提升并发能力(将 tornado.httpclient.AsyncHTTPClient() 改成 tornado.httpclient.AsyncHTTPClient(max_clients=100)).
max_clients由默认的10改成100后,测试结果的hits也随之增长了十倍.
Lifting the server siege... Transactions: 1099 hits Availability: 100.00 % Elapsed time: 59.71 secs Data transferred: 0.22 MB Response time: 5.10 secs Transaction rate: 18.41 trans/sec Throughput: 0.00 MB/sec Concurrency: 93.81 Successful transactions: 1099 Failed transactions: 0 Longest transaction: 6.34 Shortest transaction: 5.06
# 文件名: nbFuture.py
# 备注: 在第二章节中的移除tornado.web.asynchonous装饰器的写法一样适合futures. 详情请参考源码文件: nbFuture_NoAsyncDecorator.py
# -.- coding:utf-8 -.- # __author__ = 'zhengtong' import tornado.ioloop import tornado.web import tornado.options import tornado.httpserver import tornado.concurrent # import requests # 不仅用requests import tornado.httpclient # 采用tornado自带的异步httpclient客户端 class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockHandler), ('/non_blocking', NonBlockHandler), ] super(Application, self).__init__(handlers) class BlockHandler(tornado.web.RequestHandler): @tornado.web.asynchronous def get(self, *args, **kwargs): client = tornado.httpclient.AsyncHTTPClient() future = tornado.concurrent.Future() fetch_future = client.fetch('http://192.168.1.100:88/blocking', callback=self.on_response) fetch_future.add_done_callback(lambda x: future.set_result(x.result())) def on_response(self, content): result = dict(content.headers) result.update({'content': content.body}) self.write(result) self.finish() class NonBlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.write('non_blocking') if __name__ == "__main__": # 经过define 能够为options增长变量. tornado.options.define("port", default=80, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
参考<1. tornado + 非异步代码(堵塞的代码) >章节的测试方法.
于<2. tornado.web.asynchronous >的测试结果基本一致.
future是官方特别推荐用来练习的一种编码方式,由于这样会比较深刻的了解tornado的运做原理。
future的add_done_callback方法,是告诉ioloop当future的状态变动为完成的时候,就调用包裹在add_done_callback中的函数(或匿名函数).
future还提供了一组produce方法和consumer方法, 用于管理future的状态.
# 文件名: nbGenTask.py
# -.- coding:utf-8 -.- # __author__ = 'zhengtong' import tornado.ioloop import tornado.web import tornado.options import tornado.httpserver import tornado.concurrent import tornado.gen # 导入tornado.gen模块 # import requests # 不仅用requests import tornado.httpclient # 采用tornado自带的异步httpclient客户端 class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockHandler), ('/non_blocking', NonBlockHandler), ] super(Application, self).__init__(handlers) class BlockHandler(tornado.web.RequestHandler): @tornado.gen.coroutine def get(self, *args, **kwargs): client = tornado.httpclient.AsyncHTTPClient() content = yield tornado.gen.Task(client.fetch, ('http://192.168.1.100:88/blocking')) result = dict(content.headers) result.update({'content': content.body}) self.write(result) self.finish() class NonBlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.write('non_blocking') if __name__ == "__main__": tornado.options.define("port", default=80, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
参考<1. tornado + 非异步代码(堵塞的代码) >章节的测试方法.
于<2. tornado.web.asynchronous >的测试结果基本一致.
tornado.gen.Task须要配合tornado.gen.coroutine装饰器来完成代码的运行,由于Task利用了yield,它的隐藏方法run()利用了gen.send()方法,因此gen模块必需要用coroutine装饰器.
利用coroutine的方式比较明显的一个地方是,代码不用再分开了, 这个是Python语言的一个特性,yield关键字能够赋值给一个变量, 所以就不须要callback了.
这样有什么好处? 本地变量和全局变量不用传递了,默认就是共享的,这个算不算很爽?
# 文件名: nbFuture.py
# -.- coding:utf-8 -.- # __author__ = 'zhengtong' import tornado.ioloop import tornado.web import tornado.options import tornado.httpserver import tornado.concurrent import tornado.gen import requests import tornado.concurrent # 导入 tornado.concurrent 并发模块 class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockHandler), ('/non_blocking', NonBlockHandler), ] super(Application, self).__init__(handlers) # 建议设定为CPU核心数量 * 4或8或16也是能够接受的, 取决于计算量,计算量越大设定的值应该越小. self.executor = tornado.concurrent.futures.ThreadPoolExecutor(16) class BlockHandler(tornado.web.RequestHandler): @property def executor(self): return self.application.executor @tornado.gen.coroutine def get(self, *args, **kwargs): print dir(self) content = yield self.executor.submit(requests.get, ('http://192.168.1.100:88/blocking')) result = dict(content.headers) result.update({'content': content.content}) self.write(result) class NonBlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.write('non_blocking') if __name__ == "__main__": # 经过define 能够为options增长变量. tornado.options.define("port", default=80, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
参考<1. tornado + 非异步代码(堵塞的代码) >章节的测试方法.
于<2. tornado.web.asynchronous >的测试结果基本一致.
自从了coroutine、threadpool、processpool以后,tornado算是一个里程碑式的解放了对异步的要求, 缘由是tornado的异步库只针对httpclient, 没有针对mysql或者其余数据库的异步库(本身写一个异步库难度过高,由于展转十几个源码文件的重度调用以及每一个类中的状态控制)。
coroutine结合threadpool让编写异步代码再也不拆成多个函数,变量可以共享,堵塞的代码(例如 requests、mysql.connect、密集计算)能够不影响ioloop,造成真正的闭合.
https://github.com/tornadoweb/tornado/wiki
http://scotdoyle.com/python-epoll-howto.html
tornado源码