wsgi自带,用语构建简单服务器
from wsgiref.simple_server import make_server
def index(env, res):
res('200 ok', [('Content-Type', 'text/html')])
print(env['PATH_INFO'][1:]) # method = env['REQUEST_METHOD']
body = '<h1>hello %s </h1>' % (env['PATH_INFO'][1:] or 'index')
return [body.encode('utf8')]
# 建立服务器
server = make_server('127.0.0.1', 3000, index)
print('Server start at 3000...')
# 监听请求
server.serve_forever()
aiohttp基于asyncio实现的HTTP框架
import asyncio
from aiohttp import web
@asyncio.coroutine
def index(request):
yield from asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % (request.match_info['name'] or 'index')
return web.Response(body=text.encode('utf8'))
@asyncio.coroutine
def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/{name}', index)
# 建立服务器
srv = yield from loop.create_server(app.make_handler(), '127.0.0.1', 3000)
print('Server started at http://127.0.0.1:3000...')
return srv
# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(init(loop))
# 监听请求
loop.run_forever()
from jinja2 import Environment, FileSystemLoader
def init_jinja2():
path = os.path.abspath('.')
env = Environment(loader=FileSystemLoader(path))
return env
__template__ = init_jinja2()
....
body = __template__.get_template('example.html').render(**kw).encode('utf-8')
....
aiomysql 为asyncio提供异步mmysql IO的驱动
import asyncio
import aiomysql
loop = None
@asyncio.coroutine
def connect():
global loop
# 创建链接
conn = yield from aiomysql.connect(
host= '127.0.0.1',
port= 3306,
user= 'root',
password= 'password',
db= 'awesome',
loop= loop)
# 打开游标
cur = yield from conn.cursor()
# 执行SQL语句
yield from cur.execute('SELECT * FROM users')
print(cur.description)
# 获取结果集
r = yield from cur.fetchall()
print(r)
yield from cur.close()
conn.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(connect())