Python开发【模块】:aiohttp(一)

AIOHTTP

用于asyncio和Python的异步HTTP客户端/服务器html

 

主要特色:python

  • 支持客户端和HTTP服务器。
  • 支持服务器WebSockets和 客户端WebSockets开箱即用,没有回调地狱。
  • Web服务器具备中间件, 信号和可插拔路由。

 

 入门

客户端:web

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())  

服务端:redis

from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

app = web.Application()
app.add_routes([web.get('/', handle),
                web.get('/{name}', handle)])

web.run_app(app)

  

 Web服务

 一、post、get接收参数数据数据库

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import asyncio
from aiohttp import web


class Application(object):
    '''
    aiohttp 接口
    '''

    def __init__(self):
        pass

    async def prepare_init(self):
        '''
        预加载
        :return:
        '''
        self.hearders_setting = [
            web.post('/', self.post),
            web.get('/', self.get),
        ]

    async def get(self,request):
        print(request.app['db'])
        arguments = request.query
        print(arguments)
        # < MultiDictProxy('model': 'aiohttp') >
        return web.Response(text='get')

    async def post(self,request):
        arguments = await request.post()
        print(arguments)
        # < MultiDictProxy('model': 'aiohttp') >
        # 获取ip地址
        print(request.transport.get_extra_info('peername'))
        # ('127.0.0.1', 53245)
        print(request.path)
        # /
        print(request.raw_path)
        return web.Response(text='post')

    async def app_factory(self):
        '''
        配置app
        :return:
        '''
        await self.prepare_init()
        app = web.Application()
        app.add_routes(self.hearders_setting)
        app['db'] = 'db'
        return app

    def run_forever(self):
        '''
        开启端口
        :return:
        '''
        web.run_app(self.app_factory())


Application().run_forever()
1 import requests
2 
3 
4 requests.post(url='http://127.0.0.1:8080/',data={'model':'aiohttp'},headers={'Content-Type': 'application/x-www-form-urlencoded'})
5 requests.get(url='http://127.0.0.1:8080/',params={'model':'aiohttp'},headers={'Content-Type': 'application/x-www-form-urlencoded'})
请求脚本

 

二、web处理程序取消json

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import asyncio
from aiohttp import web

async def something():
    await asyncio.sleep(2)
    print('handle out')

async def post(request):
    arguments = await request.post()
    await something()
    return web.Response(text='post')

注:当客户端请求过程当中,中断请求,此时handle out 并不会打印执行,直接取消运行;服务器

官方:await 若是客户端断开链接而不读取整个响应的BODY,则能够取消每一个 Web处理程序执行。这种行为与经典的WSGI框架(如Flask和Django)大相径庭websocket

有时它是一种理想的行为:在处理GET请求时,代码可能从数据库或其余Web资源获取数据,提取可能很慢。网络

  取消此很是好:对等已经断开链接,没有理由经过从DB获取数据而没有任何机会将其发送回对等来浪费时间和资源(内存等)。session

  但有时取消很糟糕:根据POST要求,一般须要将数据保存到数据库,而无论对等关闭。

取消预防能够经过如下几种方式实施:

  • 应用于asyncio.shield()将数据保存到DB的协同程序
  • 产生DB保存的新任务
  • 使用aiojobs或其余第三方库

①  shield

import asyncio
from aiohttp import web

async def something():
    await asyncio.sleep(2)
    print('handle out')

async def post(request):
    arguments = await request.post()
    await asyncio.shield(something())
    return web.Response(text='post')

asyncio.shield()工做得很好。惟一的缺点是你须要将Web处理程序分红两个异步函数:一个用于处理程序自己,另外一个用于受保护的代码。

async def handler(request):
    await asyncio.shield(write_to_redis(request))
    await asyncio.shield(write_to_postgres(request))
    return web.Response(text='OK')

在REDIS中保存数据后可能会发生取消, write_to_postgres不会被调用

② 建立任务

import asyncio
from aiohttp import web

async def something():
    await asyncio.sleep(2)
    print('handle out')

async def post(request):
    arguments = await request.post()
    request.loop.create_task(something())
    return web.Response(text='post')

产生一项新任务的状况要糟糕得多:没有地方能够await 产生任务 ;request.loop.create_task(something()) 这个没有办法awit获取值,若是加上awit,则致使中断时,something中止工做

 

三、数据共享

aiohttp.web不鼓励使用全局变量,每一个变量都应该有本身的非全局上下文。

因此,ApplicationRequest 支持一个collections.abc.MutableMapping接口(即它们是相似dict的对象),容许它们用做数据存储。

① 应用程序的配置

要存储相似全局变量,请随意将它们保存在 Application实例中:

app['my_private_key'] = data

并在Web处理程序中获取它

async def handler(request):
    data = request.app['my_private_key']

嵌套应用程序状况下,所需的查找策略可能以下:

  1. 搜索当前嵌套应用程序中的键。
  2. 若是未找到密钥,请继续在父应用程序中进行搜索。

为此,请使用Request.config_dict只读属性:

async def handler(request):
    data = request.config_dict['my_private_key']

② 请求的存储

Variables that are only needed for the lifetime of a Request, can be stored in a Request:

async def handler(request):
  request['my_private_key'] = "data"
  ...

这对于中间件和 信号处理程序来讲很是有用,能够存储数据以供链中的下一个处理程序进一步处理。

 ③ 响应的存储

StreamResponseResponse对象也支持collections.abc.MutableMapping接口。当您但愿在处理程序中的全部工做完成后与信号和中间件共享数据时,这很是有用:

async def handler(request):
  [ do all the work ]
  response['my_metric'] = 123
  return response

 

四、中间件

aiohttp.web提供了一种经过中间件自定义请求处理程序的强大机制 。

一个中间件是能够修改请求或响应中的协程。例如,这是一个附加 到响应的简单中间件:'wink'

from aiohttp.web import middleware

@middleware
async def middleware(request, handler):
    resp = await handler(request)
    resp.text = resp.text + ' wink'
    return resp

注意:该示例不适用于流式响应或websockets

每一个中间件都应该接受两个参数,一个request实例和一个处理程序,并返回响应或引起异常。若是异常不是HTTPException它的实例,则500 HTTPInternalServerError在处理中间件链以后将 其转换为。

警告:第二个参数应该彻底命名为handler

建立时Application,这些中间件将传递给仅限关键字的middlewares参数:

app = web.Application(middlewares=[middleware_1,
                                   middleware_2])

在内部,经过以相反的顺序将中间件链应用于原始处理程序来构造单个请求处理程序,并由RequestHandler做为常规处理程序调用。

因为中间件自己就是协程,所以await在建立新的处理程序时可能会执行额外的 调用,例如调用数据库等。

中间件一般会调用处理程序,可是他们可能会选择忽略它,例如,若是用户没有访问底层资源的权限,则显示403 Forbidden页面或引起HTTPForbidden异常。它们还可能呈现处理程序引起的错误,执行一些预处理或后处理,如处理CORS等。

如下代码演示了中间件的执行顺序:

from aiohttp import web

async def test(request):
    print('Handler function called')
    return web.Response(text="Hello")

@web.middleware
async def middleware1(request, handler):
    print('Middleware 1 called')
    response = await handler(request)
    print('Middleware 1 finished')
    return response

@web.middleware
async def middleware2(request, handler):
    print('Middleware 2 called')
    response = await handler(request)
    print('Middleware 2 finished')
    return response


app = web.Application(middlewares=[middleware1, middleware2])
app.router.add_get('/', test)
web.run_app(app) 

输出

Middleware 1 called
Middleware 2 called
Handler function called
Middleware 2 finished
Middleware 1 finished

中间件的常见用途是实现自定义错误页面。如下示例将使用JSON响应呈现404错误,由于可能适合JSON REST服务:

from aiohttp import web

@web.middleware
async def error_middleware(request, handler):
    try:
        response = await handler(request)
        if response.status != 404:
            return response
        message = response.message
    except web.HTTPException as ex:
        if ex.status != 404:
            raise
        message = ex.reason
    return web.json_response({'error': message})

app = web.Application(middlewares=[error_middleware])

中间件工厂

一个中间件工厂是建立与传递参数的中间件功能。例如,这是一个简单的中间件工厂:

def middleware_factory(text):
    @middleware
    async def sample_middleware(request, handler):
        resp = await handler(request)
        resp.text = resp.text + text
        return resp
    return sample_middleware

请记住,与常规中间件相反,您须要中间件工厂的结果而不是功能自己。所以,当将中间件工厂传递给应用程序时,您实际须要调用它:

app = web.Application(middlewares=[middleware_factory(' wink')])

  

五、信号

虽然middleware能够在准备响应以前或以后定制请求处理程序,但在准备响应时不能定制响应。For this aiohttp.web provides signals.

例如,中间件只能为未准备好的 响应更改HTTP标头(请参阅参考资料StreamResponse.prepare()),但有时咱们须要一个钩子来更改流式响应和WebSockets的HTTP标头。这能够经过订阅Application.on_response_prepare信号来完成 :

async def on_prepare(request, response):
    response.headers['My-Header'] = 'value'

app.on_response_prepare.append(on_prepare)

此外,能够订阅Application.on_startup和 Application.on_cleanup信号以进行应用程序组件设置并相应地拆除。

如下示例将正确初始化并配置aiopg链接引擎:

from aiopg.sa import create_engine

async def create_aiopg(app):
    app['pg_engine'] = await create_engine(
        user='postgre',
        database='postgre',
        host='localhost',
        port=5432,
        password=''
    )

async def dispose_aiopg(app):
    app['pg_engine'].close()
    await app['pg_engine'].wait_closed()

app.on_startup.append(create_aiopg)
app.on_cleanup.append(dispose_aiopg)

信号处理程序不该返回值,但能够修改传入的可变参数。

信号处理程序将按顺序运行,以便添加它们。aiohttp 3.0开始,全部处理程序必须是异步的

 

六、清理上下文

Application.on_startupApplication.on_cleanup 对仍有陷阱:信号处理程序彼此独立。

E.g. we have [create_pg, create_redis] in startup signal and [dispose_pg,dispose_redis] in cleanup.

If, for example, create_pg(app) call fails create_redis(app) is not called. But on application cleanup both dispose_pg(app) and dispose_redis(app) are still called: 

清理信号不知道启动/清理对及其执行状态。

解决方案是Application.cleanup_ctx用法:

async def pg_engine(app):
    app['pg_engine'] = await create_engine(
        user='postgre',
        database='postgre',
        host='localhost',
        port=5432,
        password=''
    )
    yield
    app['pg_engine'].close()
    await app['pg_engine'].wait_closed()

app.cleanup_ctx.append(pg_engine)

属性是列表生成器,代码以前 yield是(称为上初始化阶段的启动),码 以后yield被上执行清理。生成器必须只有一个yield

aiohttp保证当且仅当启动代码成功完成时才调用 清理代码。

Python 3.6+支持异步生成器,在Python 3.5上请使用async_generator 库。

版本3.1中的新功能。

 

 七、后台任务

有时须要在应用程序启动后执行一些异步操做。

更重要的是,在一些复杂的系统中,可能须要在事件循环中运行一些后台任务以及应用程序的请求处理程序。例如,监听消息队列或其余网络消息/事件源(例如,ZeroMQ,Redis Pub / Sub,AMQP等)以对应用程序内的接收消息做出反应。

例如,后台任务能够在zmq.SUB套接字上侦听ZeroMQ ,处理并将检索到的消息转发到经过WebSocket链接的客户端,这些客户端存储在应用程序中的某个位置(例如,在application['websockets']列表中)。

为了运行这种短时间和长期运行的后台任务,aiohttp提供了注册Application.on_startup将与应用程序的请求处理程序一块儿运行的信号处理程序的能力。

例如,须要运行一个快速任务和两个长时间运行的任务,这些任务将一直存在,直到应用程序处于活动状态。相应的后台任务能够注册为Application.on_startup 信号处理程序,以下例所示:

async def listen_to_redis(app):
    try:
        sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
        ch, *_ = await sub.subscribe('news')
        async for msg in ch.iter(encoding='utf-8'):
            # Forward message to all connected websockets:
            for ws in app['websockets']:
                ws.send_str('{}: {}'.format(ch.name, msg))
    except asyncio.CancelledError:
        pass
    finally:
        await sub.unsubscribe(ch.name)
        await sub.quit()


async def start_background_tasks(app):
    app['redis_listener'] = app.loop.create_task(listen_to_redis(app))


async def cleanup_background_tasks(app):
    app['redis_listener'].cancel()
    await app['redis_listener']


app = web.Application()
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
web.run_app(app)

任务listen_to_redis()将永远运行。要正确关闭它,Application.on_cleanup信号处理程序可用于向其发送取消。

 

八、处理异常错误

https://aiohttp-demos.readthedocs.io/en/latest/tutorial.html#aiohttp-demos-polls-middlewares

 

九、request中获取add_routes中url

    async def post(self, request):
        url = [resource._path for resource in request.app.router._resources]
        print(url)
        # ['/', '/6773', '/', '/1234/']
        Response = web.Response(text='post')
        return Response
相关文章
相关标签/搜索