线上有一个相关百科的服务,返回一个query中说起的百科词条。该服务是用python实现的,之前经过thrift接口访问,现要将其改成经过HTTP访问。以前没有搭建HTTPServer的经验,所以想用python的web Framework来作这件事,因而有了下面的工做。第一部分是框架选择,这一部分没有太仔细考虑,只是大概看了一些文章。第二部分是根据所须要的功能,学习及测试在框架上应该如何实现。第三部分是实际的代码。第四部分是下一步的学习。html
python有不少开源的web framework。从知乎上找了几篇综述型的简介,大致包括:Django、Bottle、Flask、web2py、Tornado。看中了介绍中说起Tornado的速度与并发量,因而打算用tornado来实现。因此按目前的了解,或许Tornado并不是实现本工做的最佳方案,只是一个可行方案。python
tornado具备web framework的功能,所以用它开发web服务很是方便:web
- 实现处理请求的Handler,该类继承自
tornado.web.RequestHandler
,实现用于处理请求的对应方法如:get、post等。返回内容用self.write
方法输出。- 实例化一个Application。构造函数的参数是一个Handlers列表,经过正则表达式,将请求与Handler对应起来。经过dict将Handler须要的其余对象以参数的方式传递给Handler的initialize方法。
- 初始化一个
tornado.httpserver.HTTPServer
对象,构造函数的参数是上一步的Application对象。- 为HTTPServer对象绑定一个端口。
- 开始IOLoop。
原服务是一个内存占用大,IO密集,计算量适中的服务。正则表达式
- 内存占用大。须要加载一个比较大的词表,其中每一个词对应一个id列表,这一部分是C++实现的,经过boost.python封装为python可调用的so。原服务单进程占用内存超过5G。
- IO密集。计算过程当中大量访问redis读取term及baikeid的属性信息,用于过滤及rank计算。也访问在线分词服务,获取各term的NLP分析。
- 计算量适中。划词匹配、rank计算有必定计算量,可是整体来看计算量不是特别大。python单进程天天500多万的访问量,单CPU利用率也就40%-50%之间。
关于服务的分析:redis
- 内存占用大。内存占用大,但绝大部分是只读的。不适合独立启动多个进程,适合多线程或用子进程。
- IO密集。适合将IO操做都变为异步请求,或者用多线程模型。
- 计算量适中。因为python解释器使用GIL,多线程只能提升IO的并发能力,不能提升计算的并发能力。所以能够考虑经过子进程的方式,适当增长提供服务的进程数,提升整个系统服务能力的上限。
因为tornado的亮点是异步请求,因此这里首先想到的是将全部请求都改造为异步的。可是这里遇到一个问题,就是异步函数内必定不能有阻塞调用出现,不然整个IOLoop都会被卡住。这就要求完全地去改造服务,将全部IO或是用时较长的请求都改造为异步函数。这个工程量是很是大的,须要去修改已有的代码。所以,咱们考虑用线程池的方式去实现。当一个线程阻塞在某个请求或IO时,其余线程或IOLoop会继续执行。json
另一个瓶颈就是GIL限制了CPU的并发数量,所以考虑用子进程的方式增长进程数,提升服务能力上限。浏览器
综合上面的分析,大体用如下方案:多线程
- 经过子进程的方式复制多个进程,使子进程中的只读页指向同一个物理页。
- 线程池。回避异步改造的工做量,增长IO的并发量。
首先测试线程池,测试用例为:并发
对sleep页面同时发出两个请求:app
- 在线程池中运行的函数(这里是
self.block_task
)可以同时执行。表现为在控制台交替打印出数字。- 两个get请求几乎同时返回,在浏览器上显示返回的内容。
线程池的测试代码以下:
import os import sys import time import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web import tornado.gen from tornado.concurrent import run_on_executor from concurrent.futures import ThreadPoolExecutor from tornado.options import define, options class HasBlockTaskHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(20) #起线程池,由当前RequestHandler持有 @tornado.gen.coroutine def get(self): strTime = time.strftime("%Y-%m-%d %H:%M:%S") print "in get before block_task %s" % strTime result = yield self.block_task(strTime) print "in get after block_task" self.write("%s" % (result)) @run_on_executor def block_task(self, strTime): print "in block_task %s" % strTime for i in range(1, 16): time.sleep(1) print "step %d : %s" % (i, strTime) return "Finish %s" % strTime if __name__ == "__main__": tornado.options.parse_command_line() app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False) http_server = tornado.httpserver.HTTPServer(app) http_server.bind(8888) tornado.ioloop.IOLoop.instance().start()
整个代码里有几个位置值得关注:
executor = ThreadPoolExecutor(20)
。这是给Handler类初始化了一个线程池。其中concurrent.futures
不属于tornado,是python的一个独立模块,在python3中是内置模块,python2.7须要本身安装。- 修饰符
@run_on_executor
。这个修饰符将同步函数改造为在executor(这里是线程池)上运行的异步函数,内部实现是将被修饰的函数submit到executor,返回一个Future对象。- 修饰符
@tornado.gen.coroutine
。被这个修饰符修饰的函数,是一个以同步函数方式编写的异步函数。本来经过callback方式编写的异步代码,有了这个修饰符,能够经过yield一个Future的方式来写。被修饰的函数在yield了一个Future对象后将会被挂起,Future对象的结果返回后继续执行。
运行代码后,在两个不一样浏览器上访问sleep页面,获得了想要的效果。这里有一个小插曲,就是若是在同一浏览器的两个tab上进行测试,是没法看到想要的效果。第二个get请求会被block,直到第一个get请求返回,服务端才开始处理第二个get请求。这让我一度以为多线程没有生效,用了半天时间查了不少资料,才看到是浏览器把相同的第二个请求block了,具体连接参考这里。
因为tornado很方便地支持多进程模型,多进程的使用要简单不少,在以上例子中,只须要对启动部分稍做改动便可。具体代码以下所示:
if __name__ == "__main__": tornado.options.parse_command_line() app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False) http_server = tornado.httpserver.HTTPServer(app) http_server.bind(8888) print tornado.ioloop.IOLoop.initialized() http_server.start(5) tornado.ioloop.IOLoop.instance().start()
须要注意的地方有两点:
app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False)
,在生成Application对象时,要将autoreload和debug两个参数至为False。也就是须要保证在fork子进程以前IOLoop是未被初始化的。这个能够经过tornado.ioloop.IOLoop.initialized()
函数来跟。http_server.start(5)
在启动IOLoop以前经过start函数设置进程数量,若是设置为0表示每一个CPU都启动一个进程。
最后的效果是能够看到n+1个进程在运行,且公用同一个端口。
大部分逻辑代码是封装好的,服务的代码以下:
import os import sys import json import tornado.httpserver import tornado.ioloop import tornado.options import tornado.httpclient import tornado.web import tornado.gen from tornado.concurrent import run_on_executor from concurrent.futures import ThreadPoolExecutor from tornado.options import define, options import rela_baike_server from rela_baike_server import RelaBaikeRequest, RelaBaikeResult, RelaBaikeServer import logging from logging.handlers import TimedRotatingFileHandler logging.basicConfig() import pdb g_log_prefix = '../log/rela_baike_tornado.' def getLogger(strPrefixBase): strPrefix = "%s%d" % (strPrefixBase, os.getpid()) logger = logging.getLogger("RELA_BAIKE") logger.propagate = False handler = TimedRotatingFileHandler(strPrefix, 'H', 1) handler.suffix = "%Y%m%d_%H%M%S.log" formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger def makeResponseBody(retCode, errReason, dicSummary): dicRes = {} dicRes['retCode'] = retCode if retCode != 0: dicRes['error'] = errReason else: dicRes['data'] = dicSummary return json.dumps(dicRes) class RelaBaikeHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(50) def initialize(self, relaServer, logger): self.__serverRelaBaike = relaServer self.__logger = logger @tornado.gen.coroutine def get(self): lstSummary = [] retCode = 0 errReason = "" try: utfQuery = self.get_argument('query').encode('utf8').strip() except: errorReason = 'Query encoding not utf-8.' strRes = makeResponseBody(-1, errorReason, lstSummary) self.write(strRes) return if utfQuery == "": strRes = makeResponseBody(0, '', lstSummary) self.write(strRes) return error, errReason, lstSummary = yield self.getRelaBaike(utfQuery) strRes = makeResponseBody(error, errReason, lstSummary) self.write(strRes) def __logResponse(self, utfQuery, relaResult): succ = relaResult.isSuccess() if succ: self.__logger.info("%s\tSucc\t%s" % (utfQuery, "|".join([str(item[0]) for item in relaResult]))) else: self.__logger.info("%s\tError:%d" % (utfQuery, relaResult.getError())) @run_on_executor def getRelaBaike(self, utfQuery): error = 0 lstSummary = [] relaBaikeRequest = RelaBaikeRequest(content=utfQuery) relaBaikeResult = self.__serverRelaBaike.getRelaBaike(relaBaikeRequest) self.__logResponse(utfQuery, relaBaikeResult) if relaBaikeResult.isSuccess(): for item in relaBaikeResult: baikeid = item[0] try: dicSummary = json.loads(item[1]) except: return -2, 'summary format error' ,lstSummary lstSummary.append(dicSummary) else: return relaBaikeResult.getError(), rela_baike_server.g_dic_error.get(relaBaikeResult.getError(), 'other error') ,lstSumm ary return 0, 'success',lstSummary def start(): port = int(sys.argv[1]) serverRelaBaike = rela_baike_server.getRelaBaikeServer() logger = getLogger(g_log_prefix) app = tornado.web.Application(handlers=[(r"/rela_baike", RelaBaikeHandler, dict(relaServer=serverRelaBaike, logger=logger))]) http_server = tornado.httpserver.HTTPServer(app) http_server.bind(port) http_server.start(2) tornado.ioloop.IOLoop.instance().start() if __name__ == "__main__": start()
代码所涉及的特性基本上不超过前面的测试例子,除了下两几点:
- 在*Handler类里增长了一个
def initialize(self, relaServer, logger)
函数。这是为了把一些初始化好的对象传到Handler类里。app = tornado.web.Application(handlers=[(r"/rela_baike", RelaBaikeHandler, dict(relaServer=serverRelaBaike, logger=logger))])
。前面handler的initialize函数参数,对应于Application初始化时,每一个handler对应的dict。