官方文档html
安装:前端
pip install sanic
启动python
from sanic import Sanic from sanic.response import text, json app = Sanic() # 必须使用async def语法定义函数,来保证其能够进行异步处理。 @app.route('/') # 默认 GET 请求 async def index(request): return json({"code": 0, "data": "请求成功"}) if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, workers=4) # 指定使用4个cpu
run 参数说明web
host(默认“127.0.0.1”): 服务器主机的地址。 port(默认8000): 服务器的端口。 debug(默认False): 启用调试(减慢服务器速度)。 ssl(默认None): 用于工做者SSL加密的SSLContext。 sock(默认None):服务器接受链接的Socket。 worker(默认值1):生成的工做进程数。 loop(默认None): asyncio兼容的事件循环。若是没有指定,Sanic会建立本身的事件循环。 protocol(默认HttpProtocol):asyncio.protocol的子类。
@app.route("/index", strict_slashes=True, # 严格模式 url 没有最后的 " / " name="my_index", # 路由别名 app.url_for("my_index") ) async def index(request, tag):
from sanic import Sanic from sanic.response import json, text app = Sanic() # 接受任意类型 @app.route('/index/<tag>', methods=['GET']) async def index(request, tag): print(tag) # 参数: ensure_ascii=False 使用中文 indent=4 使用四格缩进 res = json({"code": 1, "data": {1: int}}, status=200, ensure_ascii=False, indent=4) return res # 只接受整数 @app.route('/tag/<int:int>', methods=['GET']) async def index(request, int): print(int) # 参数: ensure_ascii=False 使用中文 indent=4 使用四格缩进 res = json({"code": 1, "data": {1: int}}, status=200, ensure_ascii=False, indent=4) return res # 接受 浮点数 @app.route('/number/<number:number>') async def number(request, number): print(request, number) return text("number--{}".format(number)) # 只接受一个字符 @app.route('/name/<name:[A-z]>') async def name(request, name): print(name) # 返回文本类型 return text("{}".format(name)) # 能够使用正则 只接受 POST 请求 @app.route('/re/<re:[\w]{0,4}>', methods=['POST']) async def folder_handler(request, re): # 返回文本类型 return text('re - {}'.format(re))
@app.get('/get') async def get(request): return text(f"{request.args}") @app.post('/post') async def get(request): return text(f"{request.json}")
async def demo1(request): return one('OK') async def demo2(request, name): return tow('Folder - {}'.format(name)) async def demo3(request, name): return three('Person - {}'.format(name)) app.add_route(demo1, '/one') app.add_route(demo2, '/tow/<name>') app.add_route(demo3, '/three/<name:[A-z]>', methods=['GET'])
from sanic import Sanic from sanic.views import CompositionView from sanic.response import text app = Sanic(__name__) async def post_handler(request): print(request.json) return text('it is ok!') view = CompositionView() view.add(["POST"], post_handler) app.add_route(view, "/post_info")
# --------------------------- 可根请求的域名 来区分返回内容 ------------------------- from sanic import Sanic from sanic.response import text app = Sanic() @app.route('/get', methods=['GET'], host='zhangfei.com:5000') async def get_handler(request): return text('GET request - {}'.format(request.args)) # 若是主机头与zhangfei.com不匹配,将使用此路由 @app.route('/get', methods=['GET']) async def get_handler(request): return text('GET request in default - {}'.format(request.args))
url_for
生成URLfrom sanic import Sanic from sanic.response import text, json, redirect app = Sanic() @app.route('/') async def index(request): url = app.url_for('demo', tag=10, args=[1, 2, 3], arg_1="aaa") return redirect(url) # 302 状态码 @app.route('/demo/<tag>') async def demo(request, tag): # 数字会转为字符串 都已列表方式传递 print(request.args) # {'args': ['1', '2', '3'], 'arg_1': ['aaa']} return text('i am demo {}'.format(tag))
from sanic import Sanic, Blueprint app = Sanic() app.static("/home", "./static/demo.html") app.static("/static", "./static") if __name__ == '__main__': app.run()
from sanic import Sanic from sanic.response import text, json, redirect app = Sanic() @app.websocket('/ws') async def ws(request, ws): while 1: data = "链接成功!" await ws.send('msg = {}'.format(data)) res = await ws.recv() print(res) async def wss(request, ws): pass # 方法添加路由 app.add_websocket_route(wss, '/wss') if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <script> var ws = new WebSocket('ws://127.0.0.1:5000/ws'); ws.onmessage = function (res) { console.log(res.data) } </script> </body> </html>
@app.route('/query', methods=['POST']) async def ret_query(request): print(request.json) # 接受的 json 数据 print(request.args) # {'page': ['1']} print(request.url) # http://127.0.0.1:5000/query?page=1 print(request.query_string) # page=1 print(request.form) # form 表单提交的数据 {'name': ['123456789']} print(request.body) # 这个属性容许检索请求的原始数据 b' {"desk_id": 44}' print(request.ip) # 请求者的IP地址 print(request.scheme) # 请求协议 http print(request.host) # 127.0.0.1:5000 print(request.path) # 请求者的 路由 /query return json({"parsed": True, "args": request.args, "url": request.url, "query_string": request.query_string})
@app.route('/files', methods=['POST']) async def ret_file(request): file = request.files.get('filename') print(request.body) # 这个属性容许检索请求的原始数据 print(request.form) # form 表单提交的数据 {'name': ['123456789']} print(file.type) # image/jpeg print(file.name) # aaa.jpg with open(file.name, 'wb') as f: f.write(file.body) return text('ok')
# app.config['DEBUG'] = True app.config['DEBUG'] = False @app.route('/json', methods=['POST']) async def ret_json(request): if request.app.config['DEBUG']: return json({'status': 'debug'}) else: return json({'status': 'production'})
from sanic.request import RequestParameters args = RequestParameters() args['titles'] = ['Post 1', 'Post 2'] args.get('titles') # => 'Post 1' print(args.getlist('titles')) # => ['Post 1', 'Post 2']
from sanic import Sanic from sanic.response import text, json, html, redirect, file, stream, raw app = Sanic() @app.route('/') async def index(request): return text('OK') @app.route('/file') async def handle_request(request): return await file('aaa.jpg') @app.route('/html') def handle_request(request): return html('<p>Hello world!</p>') # @app.route('/raw') def handle_request(request): return raw(b'raw data') @app.route('/json') def handle_request(request): return json( {'message': 'Hello world!'}, headers={'X-Served-By': 'sanic'}, # 添加响应头 status=200 ) if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)