使用python作web开发面临的一个最大的问题就是性能,在解决C10K问题上显的有点吃力。有些异步框架Tornado、Twisted、Gevent 等就是为了解决性能问题。这些框架在性能上有些提高,可是也出现了各类古怪的问题难以解决。html
在python3.6中,官方的异步协程库asyncio正式成为标准。在保留便捷性的同时对性能有了很大的提高,已经出现许多的异步框架使用asyncio。node
使用较早的异步框架是aiohttp,它提供了server端和client端,对asyncio作了很好的封装。可是开发方式和最流行的微框架flask不一样,flask开发简单,轻量,高效。python
微服务是最近最火开发模式,它解决了复杂性问题,提升开发效率,便于部署等优势。git
正是结合这些优势, 以Sanic为基础,集成多个流行的库来搭建微服务。 Sanic框架是和Flask类似的异步协程框架,简单轻量,而且性能很高。github
本项目就是以Sanic为基础搭建的微服务框架。web
项目地址: sanic-mssql
Example数据库
使用sanic异步框架,有较高的性能,可是使用不当会形成blocking, 对于有IO请求的都要选用异步库。 添加库要慎重。
sanic使用uvloop异步驱动,uvloop基于libuv使用Cython编写,性能比nodejs还要高。
功能说明:express
@app.listener('before_server_start') async def before_srver_start(app, loop): queue = asyncio.Queue() app.queue = queue loop.create_task(consume(queue, app.config.ZIPKIN_SERVER)) reporter = AioReporter(queue=queue) tracer = BasicTracer(recorder=reporter) tracer.register_required_propagators() opentracing.tracer = tracer app.db = await ConnectionPool(loop=loop).init(DB_CONFIG)
@app.middleware('request') async def cros(request): if request.method == 'POST' or request.method == 'PUT': request['data'] = request.json span = before_request(request) request['span'] = span @app.middleware('response') async def cors_res(request, response): span = request['span'] if 'span' in request else None if response is None: return response result = {'code': 0} if not isinstance(response, HTTPResponse): if isinstance(response, tuple) and len(response) == 2: result.update({ 'data': response[0], 'pagination': response[1] }) else: result.update({'data': response}) response = json(result) if span: span.set_tag('http.status_code', "200") if span: span.set_tag('component', request.app.name) span.finish() return response
对抛出的异常进行处理,返回统一格式json
建立task消费queue中对span,用于日志追踪
因为使用的是异步框架,能够将一些IO请求并行处理
Example:
async def async_request(datas): # async handler request results = await asyncio.gather(*[data[2] for data in datas]) for index, obj in enumerate(results): data = datas[index] data[0][data[1]] = results[index] @user_bp.get('/<id:int>') @doc.summary("get user info") @doc.description("get user info by id") @doc.produces(Users) async def get_users_list(request, id): async with request.app.db.acquire(request) as cur: record = await cur.fetch( """ SELECT * FROM users WHERE id = $1 """, id) datas = [ [record, 'city_id', get_city_by_id(request, record['city_id'])] [record, 'role_id', get_role_by_id(request, record['role_id'])] ] await async_request(datas) return record
get_city_by_id, get_role_by_id是并行处理。
Peewee is a simple and small ORM. It has few (but expressive) concepts, making it easy to learn and intuitive to use。ORM使用peewee, 只是用来作模型设计和migration, 数据库操做使用asyncpg。
Example:
# models.py class Users(Model): id = PrimaryKeyField() create_time = DateTimeField(verbose_name='create time', default=datetime.datetime.utcnow) name = CharField(max_length=128, verbose_name="user's name") age = IntegerField(null=False, verbose_name="user's age") sex = CharField(max_length=32, verbose_name="user's sex") city_id = IntegerField(verbose_name='city for user', help_text=CityApi) role_id = IntegerField(verbose_name='role for user', help_text=RoleApi) class Meta: db_table = 'users' # migrations.py from sanic_ms.migrations import MigrationModel, info, db class UserMigration(MigrationModel): _model = Users # @info(version="v1") # def migrate_v1(self): # migrate(self.add_column('sex')) def migrations(): try: um = UserMigration() with db.transaction(): um.auto_migrate() print("Success Migration") except Exception as e: raise e if __name__ == '__main__': migrations()
asyncpg is the fastest driver among common Python, NodeJS and Go implementations使用asyncpg为数据库驱动, 对数据库链接进行封装, 执行数据库操做。
不使用ORM作数据库操做,一个缘由是性能,ORM会有性能的损耗,而且没法使用asyncpg高性能库。另外一个是单个微服务是很简单的,表结构不会很复杂,简单的SQL语句就能够处理来,不必引入ORM。使用peewee只是作模型设计
Example:
sql = "SELECT * FROM users WHERE name=$1" name = "test" async with request.app.db.acquire(request) as cur: data = await cur.fetchrow(sql, name) async with request.app.db.transaction(request) as cur: data = await cur.fetchrow(sql, name)
使用aiohttp中的client,对客户端进行了简单的封装,用于微服务之间访问。Don’t create a session per request. Most likely you need a session per application which performs all requests altogether.
A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.
Example:
@app.listener('before_server_start') async def before_srver_start(app, loop): app.client = Client(loop, url='http://host:port') async def get_role_by_id(request, id): cli = request.app.client.cli(request) async with cli.get('/cities/{}'.format(id)) as res: return await res.json() @app.listener('before_server_stop') async def before_server_stop(app, loop): app.client.close()
对于访问不一样的微服务能够建立多个不一样的client,这样每一个client都会keep-alives
使用官方logging, 配置文件为logging.yml, sanic版本要0.6.0及以上。JsonFormatter将日志转成json格式,用于输入到ESEnter OpenTracing: by offering consistent, expressive, vendor-neutral APIs for popular platforms, OpenTracing makes it easy for developers to add (or switch) tracing implementations with an O(1) configuration change. OpenTracing also offers a lingua franca for OSS instrumentation and platform-specific tracing helper libraries. Please refer to the Semantic Specification.
@logger(type='method', category='test', detail='detail', description="des", tracing=True, level=logging.INFO) async def get_city_by_id(request, id): cli = request.app.client.cli(request)
api文档使用swagger标准。
Example:
from sanic_ms import doc @user_bp.post('/') @doc.summary('create user') @doc.description('create user info') @doc.consumes(Users) @doc.produces({'id': int}) async def create_user(request): data = request['data'] async with request.app.db.transaction(request) as cur: record = await cur.fetchrow( """ INSERT INTO users(name, age, city_id, role_id) VALUES($1, $2, $3, $4, $5) RETURNING id """, data['name'], data['age'], data['city_id'], data['role_id'] ) return {'id': record['id']}
在返回时,不要返回sanic的response,直接返回原始数据,会在Middleware中对返回的数据进行处理,返回统一的格式,具体的格式能够[查看]
单元测试使用unittest。 mock是本身建立了MockClient,由于unittest尚未asyncio的mock,而且sanic的测试接口也是发送request请求,因此比较麻烦. 后期可使用pytest。
Example:
from sanic_ms.tests import APITestCase from server import app class TestCase(APITestCase): _app = app _blueprint = 'visit' def setUp(self): super(TestCase, self).setUp() self._mock.get('/cities/1', payload={'id': 1, 'name': 'shanghai'}) self._mock.get('/roles/1', payload={'id': 1, 'name': 'shanghai'}) def test_create_user(self): data = { 'name': 'test', 'age': 2, 'city_id': 1, 'role_id': 1, } res = self.client.create_user(data=data) body = ujson.loads(res.text) self.assertEqual(res.status, 200)
coverage erase coverage run --source . -m sanic_ms tests coverage xml -o reports/coverage.xml coverage2clover -i reports/coverage.xml -o reports/clover.xml coverage html -d reports
使用 app.error_handler = CustomHander() 对抛出的异常进行处理
Example:
from sanic_ms.exception import ServerError @visit_bp.delete('/users/<id:int>') async def del_user(request, id): raise ServerError(error='内部错误',code=10500, message="msg")