Python_学习之多协程html
1、yield和yield from区别
2、gevent构建多协程
3、asyncio构建多协程
一、名词简介
二、经常使用方法(api)
Loop
await
Task
Future
三、asyncio 经过yield from构建多任务协程
四、asyncio经过async和await【官方推荐】
五、asyncio.run() 构建循环事件【官方推荐】
六、实例操做
6.一、批量异步处理类型相同的数据?
6.二、但愿不等待全部完成了才对结果处理,而是消费了就对结果进行处理?
6.三、动态的异步消费,好比动态接收mq或redis插入的数据?
6.四、如何将线程池或进程池与协程一块儿用,且让不支持协程的模块支持?
6.5. 在flask中应用
6.六、代替requests实现的aiohttp支持异步python
前面已经分布介绍了Python中常常用的多线程和多线程,有兴趣能够参考一下,但愿能给你一点帮助,本文主要记录下Python中的多线程用法。web
协程又称微线程,是一种用户态的上下文切换技术,由程序(用户)自身控制代码与代码之间的切换,效率高,安全,无锁机制。redis
Python中实现的方式主要如下几种:编程
yield,yield from 生成器json
greenlet为第三方模块【没有解决遇IO自动切换,只是阻塞】,生产中主要用gevent模块来实现flask
asyncio,Python3.4引入的模块【主流】api
async&awiat ,Python3.5中引入的两个关键字,结合asynio模块使用【主流】缓存
1、yield和yield from区别安全
def study_yield1(items): """一次所有返回个列表""" yield items def study_yield2(items): """一个一个返回,yield from 等价于 for i in items: yield i""" yield from items item = ["I", "Like", "Python"] for i in study_yield1(item): print(i) # ['I', 'Like', 'Python'] for j in study_yield2(item): print(j) # 'I', 'Like', 'Python' # 函数中有yield关键字,便是生成器函数,遇到yield关键字就返回后面的值,同时记录状态,下次调用从该状态处执行,而不是从新开始 # yield 直接返回一个列表,而yield from 是一个一个返回,本质将后面可迭代对象转成生成器了,所以后面可接生成器函数 # yield from 内部解决了不少异常
2、gevent构建多协程
官方文档:http://www.gevent.org/index.html 安装 pip install gevent # 多任务 def asynchronous_func(func_name, job_list): """多少个任务,就开启多少个协程处理""" g_list = [gevent.spawn(func_name, job) for job in job_list] gevent.joinall(g_list) return [g.value for g in g_list]
3、asyncio构建多协程
官方文档:https://docs.python.org/zh-cn/3.8/library/asyncio.html
借用廖雪峰老师的话,asyncio的编程模型就是一个消息循环,咱们从asyncio模块中直接获取一个EventLoop的引用,而后把须要执行的协程扔到EventLoop中执行,就实现了异步IO。
Python3.4引入asyncio,经过装饰器@asyncio.coroutine标识函数是一个协程,使用yield from来驱动即遇IO切换到另外一个任务。
一、名词简介
异步IO:发起一个IO操做,因其耗时,不用等其结束,能够作其余的事情,结束时会发来通知告知。
事件循环loop:管理全部的事件【任务】,在整个程序运行过程当中不断循环执行并追踪事件发生的顺序将它们放到队列中,空闲时,调用相应的事件处理者来处理这些事件。
任务对象Task:是Future的子类,做用是将一个协程打包成一个task对象,为这个协程自动排一个日程准备当即执行,并追踪其状态。
结果对象Future:表示一个异步运算的最终结果的处理,asyncio 中须要 Future 对象以便容许经过 async/await 使用基于回调的代码。
可等待对象:若是一个对象能够在await语句中使用,那么他就是可等待对象。主要有三种类型:协程、任务Task、Future
二、经常使用方法(api)
Loop
关于循环的文章可参考:https://mp.weixin.qq.com/s/fCWQAT-O27mbi8UvKIrjWw
loop = asyncio.get_event_loop()
获取一个标准事件循环loop对象,全部协程都是经过它来循环做业的,能够把它理解为一个队列
loop.run_until_complete(future_obj)
阻塞调用,入参为Future对象,做用是运行全部的协程,直到全部的协程都处理完了返回结果或异常才结束。
loop.close()
关闭事件循环,清除全部队列并当即关闭执行器,不会等没有完成的任务完成,幂等【相同的参数执行相同的函数结果必须同样】不可逆。
await
asyncio.wait(可等待对象awaitable,timeout=None)
内部将咱们传入的任务封装成task对象,返回值为元祖,一个为完成的done列表,一个为还未完成的pending列表,若是设置timeout,在规定时间内,没返回的都放到未完成列表中,协程返回的结果顺序是无序的,完成的结果调用d.result()方法获取任务的返回值
asyncio.gather(可等待对象)
功能通asyncio.wait(),但返回的结果顺序,是按放入任务的顺序,有序的
asyncio.sleep(秒数,result="默认返回的结果")
模拟IO操做,这种休眠不会阻塞事件循环,前面加上await后将控制权交给主事件循环,不能用time.sleep(),因其会释放GIL,从而阻塞整个主线程,继而阻塞整个事件循环。
Task
Task:是Future的子类,做用是将一个协程打包成一个task对象,为这个协程自动排一个日程准备当即执行【白话就是将多个协程任务,排一个时间表自动并发的执行(遇到io就自动切另外一个任务)】。
底层接口:loop.create_task()
asyncio.create_task(协程)
将一个协程打包为一个task,排入日程准备执行,返回task对象,Python3.7加入的,3.7以前经过asyncio.ensure_future(协程)实现
Future
Future:表示一个异步运算的最终结果,是一个awaitable对象,协程能够等待 Future 对象直到它们有结果或异常集合或被取消。在 asyncio 中须要 Future 对象以便容许经过 async/await 使用基于回调的代码。
asyncio.create_future(协程)
三、asyncio 经过yield from构建多任务协程
Python3.4引入asyncio,经过装饰器@asyncio.coroutine标识函数是一个协程,使用yield from来驱动即遇IO切换到另外一个任务。
# 最多见用法 import time import asyncio @asyncio.coroutine def task(n): print(f"in {task.__name__}:{n} start") yield from asyncio.sleep(n) # 模拟IO时间 print(f"in {task.__name__}:{n} end") return f"01_{n}" @asyncio.coroutine def main(): # 构建任务集合 tasks = [ asyncio.ensure_future(task(1)), asyncio.ensure_future(task(2)), ] #或者以下也行,加入asyncio.wait() 会自动将协程转成task对象 #tasks = [task(1), task(2)] print(tasks) done, pending = yield from asyncio.wait(tasks,timeout=None) # 返回两个列表,done是完成的任务返回的结果列表,pending是未完成的列表,调用result()获得返回值 # timeout默认为None,表示一直等任务都完成,若是设置了时间,则规定时间没有返回则放入未完成列表中 # 完成列表每一个元素调用result()方法就能够获得对应任务的结果 task_result = [d.result() for d in done] return task_result for d in done: print(f"协程任务结果为:{d.result()}") if __name__ == '__main__': start = time.time() # 建立主线程的事件循环对象 loop = asyncio.get_event_loop() # 装载任务 result = loop.run_until_complete(main()) # 关闭循环 loop.close() print(f"任务结果为:{result}") print(f"总耗时:{time.time()-start}") """结果为: [<Task pending name='Task-2' coro=<task() running at ...>>, <Task pending name='Task-3' coro=<task() running at...>>] def main(): in task:1 start in task:2 start in task:1 end in task:2 end 协程任务结果为:01_2 协程任务结果为:01_1 任务结果为:['01_2', '01_1'] 总耗时:2.002303123474121 """
四、asyncio经过async和await【官方推荐】
python3.5后官方推荐为了区分生成器和协程,其实就是将@asyncio.coroutine 换成了 async,yield from 换成了 await
import asyncio async def task01(n): print(1) await asyncio.sleep(n) print(2) return n async def task02(n): print(3) await asyncio.sleep(n) print(4) return n if __name__ == '__main__': tasks = [task01(3), task02(2)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.close()
五、asyncio.run() 构建循环事件【官方推荐】
python3.7才有run方法
""" asyncio.run() 函数用来运行最高层级的入口点 "main()" 函数 此函数运行传入的协程,负责管理 asyncio 事件循环并 完结异步生成器。 当有其余 asyncio 事件循环在同一线程中运行时,此函数不能被调用。 若是 debug 为 True,事件循环将以调试模式运行。 此函数---->老是会建立一个新的事件循环并在结束时关闭之<-----。 它应当被用做 asyncio 程序的主入口点,理想状况下应当只被调用一次。 """ import asyncio async def task(m): print(f"start{m}") await asyncio.sleep(m) # IO耗时,经过await 挂起当前协程,事件循环去执行其余的协程,等IO耗时结束了,再继续。 print(f"end{m}") return m async def main1(): # 入口 # 建立多个任务 task1 = asyncio.create_task(task(3)) task2 = asyncio.create_task(task(2)) ret1 = await task1 ret2 = await task2 ret = asyncio.run(main1()) # 写法2: async def main2(): tasks = [task(3), task(2)] done, p = await asyncio.wait(tasks, timeout=None) return [d.result() for d in done] ret = asyncio.run(main2()) # 写法3: tasks = [task(3), task(2)] done,p = asyncio.run(asyncio.wait(tasks)) ret = [d.result() for d in done] # 不容许这样写tasks = [asyncio.create_task(task(3)), asyncio.create_task(task(2))] """ 由于这时候尚未loop,看create_task源码 def create_task(coro, *, name=None): """Schedule the execution of a coroutine object in a spawn task. Return a Task object. """ loop = events.get_running_loop() task = loop.create_task(coro) _set_task_name(task, name) return task """ 注:经过asyncio模块执行异步,全部的流程第一步是先有事件循环,而后才能将可等待对象封装成task对象,放入事件循环,才能实现异步,决不容许先建task对象,而后建loop,这样会报错
六、实例操做
6.一、批量异步处理类型相同的数据?
import asyncio async def task(data): """消费消息,解析消息,处理消息,返回处理结果成功失败""" # 假设data的格式为{"order_id":订单号,"num":数量} await asyncio.sleep(2) if data["num"]: return {"order_id": data["order_id"], "check": 1} else: return {"order_id": data["order_id"], "check": 0} if __name__ == '__main__': import time import random start_time = time.time() messages = [{"order_id": str(o), "num": random.choice([0, 1])} for o in range(835001, 835501)] jobs = [task(d) for d in messages] done, pending = asyncio.run(asyncio.wait(jobs)) print(f"消费结果为:{[r.result() for r in done]}") print(f"总耗时:{time.time()-start_time}")
6.二、但愿不等待全部完成了才对结果处理,而是消费了就对结果进行处理?
import asyncio success_count = 0 fail_count = 0 async def task(data): """消费消息,解析消息,处理消息,返回处理结果成功失败""" # 假设data的格式为{"order_id":订单号,"num":数量} print(f"开始执行:{data}") await asyncio.sleep(2) if data["num"]: res = {"order_id": data["order_id"], "check": 1} else: res = {"order_id": data["order_id"], "check": 0} print(f"执行完成:{data}") return res def my_call_back(future): """对消费的消息结果进行实时处理,好比统计成功率,用于实时可视化展现""" time.sleep(1) result = future.result() global success_count, fail_count if result["check"] == 1: # 能够是操做缓存 success_count += 1 else: fail_count += 1 print(f"{result['order_id']}如今成功数:{success_count}") print(f"{result['order_id']}如今失败数:{fail_count}") if __name__ == '__main__': import time import random start_time = time.time() messages = [{"order_id": str(o), "num": random.choice([0, 1])} for o in range(835001, 835006)] jobs = [] loop = asyncio.get_event_loop() for m in messages: job = loop.create_task(task(m)) job.add_done_callback(my_call_back) # 加入回调函数 jobs.append(job) loop.run_until_complete(asyncio.wait(jobs)) loop.close() print(f"总耗时:{time.time() - start_time}") print(f"最终成功数:{success_count}") print(f"最终失败数:{fail_count}") # 注意:回调函数不能是协程,不能是协程,不能是协程!!!
6.三、动态的异步消费,好比动态接收mq或redis插入的数据?
# 生产者代码忽略 # 消费者 """ 解决方案是:建立一个线程,用于让事件循环永远执行 """ import asyncio import threading def always_run_loop(event_loop): asyncio.set_event_loop(event_loop) event_loop.run_forever() def get_redis(): """获取链接""" import redis conn_pool = redis.ConnectionPool(host="127.0.0.1", port=6379, max_connections=10) return redis.Redis(connection_pool=conn_pool) async def call_back_task(data): """消费数据""" print(data) await asyncio.sleep(2) return data if __name__ == '__main__': redis_pool = get_redis() loop = asyncio.new_event_loop() loop_th = threading.Thread(target=always_run_loop, args=(loop,)) loop_th.setDaemon(True) loop_th.start() while True: message = redis_pool.rpop("check") if message: # 异步动态添加到协程中 asyncio.run_coroutine_threadsafe(call_back_task(message), loop)
6.四、如何将线程池或进程池与协程一块儿用,且让不支持协程的模块支持?
import asyncio import requests async def download_image(url): # 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其余任务) print("开始下载:", url) loop = asyncio.get_event_loop() # requests模块默认不支持异步操做,因此就使用线程池来配合实现了。 future = loop.run_in_executor(None, requests.get, url) response = await future print('下载完成') # 图片保存到本地文件 file_name = url.rsplit('_')[-1] with open(file_name, mode='wb') as file_object: file_object.write(response.content) if __name__ == '__main__': url_list = [ 'https://img.yituyu.com/gallery/1110/01_PnhbzecG.jpg', 'https://img.yituyu.com/gallery/1110/02_rWAsk0kY.JPG', 'https://img.yituyu.com/gallery/1114/00_8Q85y28B.jpg' ] tasks = [download_image(url) for url in url_list] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
6.5. 在flask中应用
import asyncio from flask import Flask app = Flask(__name__) async def first(): await asyncio.sleep(20) return 'first' async def second(): await asyncio.sleep(10) return 'second' async def third(): await asyncio.sleep(10) return 'third' def ordinary_generator(): import sys if not sys.platform.startswith("win"): import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) for future in asyncio.as_completed([first(), second(), third()]): print('reached') yield loop.run_until_complete(future) @app.route('/') def healthcheck(): """ Retrieves the health of the service. """ import time time_s = time.time() for element in ordinary_generator(): print(element) print(f"{time.time() - time_s}") return "Health check passed" if __name__ == '__main__': app.run(debug=True)
6.六、代替requests实现的aiohttp支持异步
pip install aiohttp
官方文档:https://docs.aiohttp.org/en/stable/
import asyncio import aiohttp # 客户端使用 async def get_page(session, url): async with session.get(url) as response: if response.status == 200: text = await response.content.read() with open(f"图片-{url.split('/')[-1]}", "wb") as f: f.write(text) f.flush() async def my_main(): async with aiohttp.ClientSession() as session: urls = [ "http://pic1.win4000.com/wallpaper/4/53ec50e410310.jpg", "http://pic1.win4000.com/m00/a5/d1/8ab24d2d749ad08fe2b99830d5b30065.jpg", "http://pic1.win4000.com/m00/f8/40/a0f4ea98e5b518c410b189a36704f459.jpg" ] tasks = [asyncio.create_task(get_page(session, url)) for url in urls] await asyncio.wait(tasks) asyncio.run(my_main()) # 服务端 from aiohttp import web async def health_check(request): print(f"version:请求HTTP版本->{request.version}") print(f"method:请求HTTP方法->{request.method}") print(f"scheme:是http仍是https->{request.scheme}") print(f"secure:是不是https,返回bool->{request.secure}") print(f"host:服务器地址->{request.host}") print(f"remote:请求来源的地址->{request.remote}") print(f"url:请求url全路径->{request.url}") print(f"rel_url:请求url相对路径,无host->{request.rel_url}") print(f"path_qs:包含路径及参数->{request.path_qs}") print(f"path:url解码过的->{request.path}") print(f"raw_path:url未解码前的信息->{request.raw_path}") print(f"query:获取get请求url中的参数->{request.query}") print(f"query_string:原始请求数据->{request.query_string}") print(f"headers:获取header头部信息->{request.headers}") print(f"content_type:获取请求消息的格式->{request.content_type}") print(f"keep_alive:是否保持长连接->{request.keep_alive}") print(f"cookies:{request.cookies}") print(f"content:{request.content}") print(f"match_info:{request.match_info}->路由解析结果") return web.Response(text="aiohttp server is ok") async def save_db(request): data = await request.post() print(f"获取post请求体:{data}") return web.json_response({"key": 1111}) # 建立应用实例 app = web.Application() # 注册路由 app.add_routes( [ web.get('/', health_check), web.get('/{version}', health_check), web.post('/send', save_db) ] ) if __name__ == '__main__': web.run_app(app, host="127.0.0.88", port=9527)