协程实现了在单线程下的并发,每一个协程共享线程的几乎全部的资源,除了协程本身私有的上下文栈;协程的切换属于程序级别的切换,对于操做系统来讲是无感知的,所以切换速度更快、开销更小、效率更高,在有多IO操做的业务中能极大提升效率。html
python并发编程之asyncio协程(三)github
asyncio在python3.4后被内置在python中,使得python的协程建立变得更加方便。session
import asyncio import os # async 关键字定义一个协程 async def target_func1(): print('the func start') print(os.getpid()) print('the func end') def run(): # 建立一个协程对象 coroutine = target_func1() # 建立一个事件循环 loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) # 将协程对象添加到事件循环,运行直到结束 print(os.getpid()) loop.close() # 关闭事件循环 def run1(): # 建立一个事件循环 loop = asyncio.get_event_loop() # 建立一个协程对象 coroutine = target_func1(loop) loop.create_task(coroutine) # 建立一个任务并添加到事件循环中 loop.run_forever() # 开启无限循环,须要在异步函数中调用stop()使中止 loop.close() if __name__ == '__main__': run() # 结果 the func start 4876 the func end 4876
以上可知,全部的代码段都是在一个进程的单线程中执行。多线程
被async修饰的函数调用后会生成协程函数,能够经过send唤醒执行。并发
async def target_func1(): print('the func start') print(os.getpid()) print('the func end') coroutine = target_func1() try: coroutine.send(None) # 唤醒协程 except StopIteration: print('xx') coroutine.close() # 关闭
async关键字能够定义一个协程对象,被async修饰的函数变成了一个协程对象而不是一个普通的函数。
async def target_func1(): pass coroutine = target_func1() print(coroutine)
await用于控制事件的执行顺序,它只能在异步函数中使用,即被async关键字定义的协程函数,不然报错。当执行到await时,当前协程挂起,转而去执行await后面的协程,完毕后再回到当前协程继续往下。
# async 关键字定义一个协程 async def target_func1(): print('the func start') x = await target_func2() # 当前协程挂起 print(x) print('the func end') return 1 async def target_func2(): """ 目标函数2 :return: """ time.sleep(2) print('the func end2') return 0
asyncio.get_event_loop():建立一个事件循环,全部的异步函数都须要在事件循环中运行; asyncio.ensure_future():建立一个任务 asyncio.gather(*fs):添加并行任务 asyncio.wait(fs):添加并行任务,能够是列表 loop.run_until_complete(func):添加协程函数同时启动阻塞直到结束 loop.run_forever():运行事件无限循环,直到stop被调用 loop.create_task():建立一个任务并添加到循环 loop.close():关闭循环 loop.time():循环开始后到当下的时间 loop.stop():中止循环 loop.is_closed() # 判断循环是否关闭 loop.create_future():建立一个future对象,推荐使用这个函数而不要直接建立future实例 loop.call_soon() # 设置回调函数,不能接受返回的参数,须要用到future对象,当即回调 loop.call_soon_threadsafe() # 线程安全的对象 loop.call_later() # 异步返回后开始算起,延迟回调 loop.call_at() # 循环开始多少s回调 loop.call_exception_handler() # 错误处理
Future:主要用来保存任务的状态; Task:Future的子类,扩展了Future的功能;
# Future from asyncio import Future # future = Future() # future.result() # 获取任务的结果 # future.remove_done_callback(fn) # 删除全部的回调函数并返回个数 # future.set_result('result') # 设置任务的结果,必须在result()以前执行,不然报错 # future.exception() # 获取任务的错误信息 # future.set_exception('bad') # 设置任务的错误信息 # future.add_done_callback('fn') # 添加回调函数 # Task current_task():返回循环当前的任务,类方法 all_tasks():返回事件循环全部的任务 get_stack():获取其余协程的堆栈列表 print_stack:输出其余协程的堆栈列表 cancel:取消任务
async def target_func3(name): """ :return: """ await asyncio.sleep(1) print(name) return 0 def run1(): # 建立一个事件循环 loop = asyncio.get_event_loop() x = loop.run_until_complete(asyncio.gather(target_func3('A'),target_func3('B'),target_func3('C'),)) print(x) # 等待返回结果,一个列表,按照事件添加的顺序,可是计算的顺序是不定的 loop.close() if __name__ == '__main__': run1()
run_forever()不能直接获得异步函数的返回结果,须要使用Future类来做为第三方保存结果,同时设置回调函数;
from asyncio import Future from functools import partial async def target_func0(name, future): """ 目标函数2 :return: """ time.sleep(1) print(name) future.set_result(name) # 设置返回结果 def got_result(loop, future): print(future.result()) # 处理结果 loop.stop() # 循环中止 def run(): loop = asyncio.get_event_loop() future = Future(loop=loop) res = asyncio.ensure_future(target_func0('A', future)) # 生成一个Task任务 print(res) future.add_done_callback(partial(got_result, loop)) # 回调函数默认只能有一个参数future,必须使用偏函数 # print(future.result()) # future上下文必须先调用future.set_result。 loop.run_forever() loop.close() if __name__ == '__main__': run()
协程里调用等待另外的协程完成后才能返回。
import asyncio import time # async 关键字定义一个协程 async def target_func1(): print('the func start') x = await target_func2() # 等待协程完成,控制执行顺序 print(x) print('the func end') return 1 async def target_func2(): """ 目标函数2 :return: """ time.sleep(2) print('the func end2') return 0 def run1(): # 建立一个事件循环 loop = asyncio.get_event_loop() x = loop.run_until_complete(target_func1()) print(x) loop.close() if __name__ == '__main__': run()
import asyncio import time from functools import partial # async 关键字定义一个协程 async def target_func1(): print('the func end') return 1 def get_res(loop): print('xxxx') loop.stop() def run1(): # 建立一个事件循环 loop = asyncio.get_event_loop() loop.create_task(target_func1()) # loop.call_soon(partial(get_res, loop)) # 设置回调函数,不能接受返回的参数,须要用到future对象 # loop.call_soon_threadsafe() # 线程安全的对象 # loop.call_later(delay=5, callback=partial(get_res, loop)) # 异步返回后开始算起,延迟5秒回调 # loop.call_at(when=8000,callback=partial(get_res, loop)) # 循环开始第8秒回调 # loop.call_exception_handler() # 错误处理 loop.run_forever() loop.close() if __name__ == '__main__': run1()
使用协程的目的是在系统发生io阻塞的时候,能够交出CUP的控制权,让其去执行其余的任务。实际使用时通常的场景有本地IO和网络IO。
# 使用asyncio+aiohttp,若是想异步化,网络请求须要抛弃requests包 import asyncio import time from aiohttp import ClientSession async def target2(): print('start2') async with ClientSession() as session: async with session.get(url='http://www.baidu.com') as rsp: data = await rsp.read() print('end2') return data def run1(): # 建立一个事件循环 loop = asyncio.get_event_loop() tasks = [target2() for i in range(100)] ts = asyncio.gather(*tasks) t = time.time() loop.run_until_complete(ts) print(time.time()-t) loop.close() if __name__ == '__main__': run1()
核心思想:将文件读写的while循环换成事件循环。
可参考:https://github.com/lyyyuna/script_collection/blob/master/aysncfile/asyncfile.py
asyncio模块也有本身的queue实现生产消费模式,只要有三种队列:Queue(先进先出),PriorityQueue(优先级队列),LifoQueue(栈),可是Queue不是线程安全的类,也就是说在多进程或多线程的状况下不要使用这个队列。
import asyncio import time from asyncio import Queue # async 关键字定义一个协程 async def target_func1(q:Queue): for i in range(100): await q.put(i) async def target_func2(q:Queue): for i in range(100): x = await q.get() print(x) def run1(): # 建立一个事件循环 loop = asyncio.get_event_loop() q = Queue(100) task = asyncio.gather(target_func1(q), target_func2(q)) loop.run_until_complete(task) loop.close() if __name__ == '__main__': run1()
Queue的get(),join(),put()方法返回的都是协程,须要使用await关键字。