asyncio的核心概念与基本架构python
本文针对的是python3.4之后的版本的,由于从3.4开始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不过语法上面有稍微的改变。好比在3.4版本中使用@asyncio.coroutine装饰器和yield from语句,可是在3.5之后的版本中使用async、await两个关键字代替,虽然语法上稍微有所差别,可是原理是同样的。express
(1)result = yield from future,返回future的结果。编程
(2)result = yield from coroutine,等候另外一个协程函数返回结果或者是触发异常 安全
(3)result= yield from task,返回一个task的结果架构
(4)return expression,做为一个函数抛出返回值并发
(5)raise exception异步
如何理解事件循环:async
线程一直在各个协程方法之间永不停歇的游走,遇到一个yield from 或者await就悬挂起来,而后又走到另一个方法,依次进行下去,知道事件循环全部的方法执行完毕。实际上loop是BaseEventLoop的一个实例,咱们能够查看定义,它到底有哪些方法可调用异步编程
协程函数,不是像普通函数那样直接调用运行的,必须添加到事件循环中,而后由事件循环去运行,单独运行协程函数是不会有结果的。函数
import time import asyncio async def say_after_time(delay,what): await asyncio.sleep(delay) print(what) async def main(): print(f"开始时间为: {time.time()}") await say_after_time(1,"hello") await say_after_time(2,"world") print(f"结束时间为: {time.time()}") ''' 直接运行 ''' # >>> main() # <coroutine object main at 0x1053bb7c8> ''' 须要经过事件循环来调用''' loop=asyncio.get_event_loop() #建立事件循环对象 #loop=asyncio.new_event_loop() #与上面等价,建立新的事件循环 loop.run_until_complete(main()) #经过事件循环对象运行协程函数 loop.close()
(1)获取事件循环对象的几种方式:
loop=asyncio.get_running_loop()
,返回(获取)在当前线程中正在运行的事件循环,若是没有正在运行的事件循环,则会显示错误
loop=asyncio.get_event_loop()
,得到一个事件循环,若是当前线程尚未事件循环,则建立一个新的事件循环loop
loop=asyncio.set_event_loop(loop)
, 设置一个事件循环为当前线程的事件循环;
loop=asyncio.new_event_loop()
,建立一个新的事件循环
(2)经过事件循环运行协程函数的两种方式:
建立事件循环对象loop,即 asyncio.get_event_loop()
,经过事件循环运行协程函数
直接经过 asyncio.run(function_name)
运行协程函数。
可是须要注意的是,首先run函数是python3.7版本新添加的,前面的版本是没有的;其次,这个run函数老是会建立一个新的事件循环并在run结束以后关闭事件循环,因此,若是在同一个线程中已经有了一个事件循环,则不能再使用这个函数了,由于同一个线程不能有两个事件循环,并且这个run函数不能同时运行两次,由于他已经建立一个了。即同一个线程中是不容许有多个事件循环loop的。 asyncio.run()是python3.7 新添加的内容,也是后面推荐的运行任务的方式,由于它是高层API,后面会讲到它与asyncio.run_until_complete()的差别性,run_until_complete()是相对较低层的API。
有三类对象是可等待的,即 coroutines
, Tasks
, and Futures
.
coroutine
:本质上就是一个函数,一前面的生成器yield和yield from为基础,再也不赘述;
Tasks
: 任务,顾名思义,就是要完成某件事情,其实就是对协程函数进一步的封装;
Future
:它是一个“更底层”的概念,他表明一个异步操做的最终结果,由于异步操做通常用于耗时操做,结果不会当即获得,会在“未来”获得异步运行的结果,故而命名为 Future。
三者的关系,coroutine
能够自动封装成 task
,而Task是 Future
的子类。
Task用来 并发调度的协程, 单纯的协程函数仅仅是一个函数而已,将其包装成任务,任务是能够包含各类状态的,异步编程最重要的就是对异步操做状态的把控了。
(1)建立任务(两种方法):
方法一:task = asyncio.create_task(coro()) # 这是3.7版本新添加的
方法二:task = asyncio.ensure_future(coro())
,也可使用loop.create_future()
,loop.create_task(coro)
也是能够的。
(2)获取某一个任务的方法:
方法一:task=asyncio.current_task(loop=None);返回在某一个指定的loop中,当前正在运行的任务,若是没有任务正在运行,则返回None;若是loop为None,则默认为在当前的事件循环中获取,
方法二:asyncio.all_tasks(loop=None);返回某一个loop中尚未结束的任务;
Future是一个较低层的可等待(awaitable)对象,他表示的是异步操做的最终结果,当一个Future对象被等待的时候,协程会一直等待,直到Future已经运算完毕。 Future是Task的父类,通常状况下,已不用去管它们二者的详细区别,也没有必要去用Future,用Task就能够了,返回 future 对象的低级函数的一个很好的例子是 loop.run_in_executor().
asyncio分为高层API和低层API。咱们前面所讲的Coroutine和Tasks属于高层API,而Event Loop 和Future属于低层API。所谓的高层API主要是指那些asyncio.xxx()的方法。
High-level APIs
●Coroutines and Tasks(本文要写的) ●Streams ●Synchronization Primitives ●Subprocesses ●Queues ●Exceptions
Low-level APIs
●Event Loop(下一篇要写的) ●Futures ●Transports and Protocols ●Policies ●Platform Support
1)运行异步协程 asyncio.run(coro, *, debug=False) #运行一个一步程序,参见上面 2)建立任务 task=asyncio.create_task(coro) #python3.7 ,参见上面 task = asyncio.ensure_future(coro()) 3)睡眠 await asyncio.sleep(delay, result=None, *, loop=None) 这个函数表示的是:当前的那个任务(协程函数)睡眠多长时间,而容许其余任务执行。这是它与time.sleep()的区别,time.sleep()是当前线程休息 4)并发运行多个任务 await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False) 它自己也是awaitable的。当全部的任务都完成以后,返回的结果是一个列表的形式、 5)防止任务取消 await asyncio.shield(*arg, *, loop=None) 6)设置timeout await asyncio.wait_for(aw, timeout, *, loop=None) 当异步操做须要执行的时间超过waitfor设置的timeout,就会触发异常,因此在编写程序的时候,若是要给异步操做设置timeout,必定要选择合适,
若是异步操做自己的耗时较长,而你设置的timeout过短,会涉及到她还没作完,就抛出异常了。 7)多个协程函数时候的等候 await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) 与上面的区别是,第一个参数aws是一个集合,要写成集合set的形式,好比: {func(),func(),func3()} 表示的是一系列的协程函数或者是任务,其中协程会自动包装成任务。事实上,写成列表的形式也是能够的。 该函数的返回值是两个Tasks/Futures的集合: (done, pending) 其中done是一个集合,表示已经完成的任务tasks;pending也是一个集合,表示尚未完成的任务。 常见的使用方法为:done, pending = await asyncio.wait(aws)
(1)他是做为一个python协程对象,和Future对象很像的这么一个对象,但不是线程安全的;他继承了Future全部的API,,除了Future.set_result()和Future.set_Exception();
(2)使用高层API asyncio.create_task()建立任务,或者是使用低层API loop.create_task()或者是loop.ensure_future()建立任务对象;
(3)相比于协程函数,任务时有状态的,可使用Task.cancel()进行取消,这会触发CancelledError异常,使用cancelled()检查是否取消。
cancel函数:
import asyncio async def cancel_me(): print('cancel_me(): before sleep') try: await asyncio.sleep(3600) #模拟一个耗时任务 except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): #经过协程建立一个任务,须要注意的是,在建立任务的时候,就会跳入到异步开始执行 #由于是3.7版本,建立一个任务就至关因而运行了异步函数cancel_me task = asyncio.create_task(cancel_me()) #等待一秒钟 await asyncio.sleep(1) print('main函数休息完了') #发出取消任务的请求 task.cancel() try: await task #由于任务被取消,触发了异常 except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) '''运行结果为: cancel_me(): before sleep main函数休息完了 cancel_me(): cancel sleep cancel_me(): after sleep main(): cancel_me is cancelled now '''
两种方法:第一种是直接经过Task.result()来获取;第二种是绑定一个回调函数来获取,即函数执行完毕后调用一个函数来获取异步函数的返回值。
1,经过result函数
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模拟耗时任务3秒 print("Hello again 01 end") return a+b coroutine=hello1(10,5) loop = asyncio.get_event_loop() #第一步:建立事件循环 task=asyncio.ensure_future(coroutine) #第二步:将多个协程函数包装成任务列表 loop.run_until_complete(task) #第三步:经过事件循环运行 print('-------------------------------------') print(task.result()) loop.close() '''运行结果为 Hello world 01 begin Hello again 01 end ------------------------------------- 15 '''
2, 经过定义回调函数
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模拟耗时任务3秒 print("Hello again 01 end") return a+b def callback(future): #定义的回调函数 print(future.result()) loop = asyncio.get_event_loop() #第一步:建立事件循环 task=asyncio.ensure_future(hello1(10,5)) #第二步:将多个协程函数包装成任务 task.add_done_callback(callback) #并被任务绑定一个回调函数 loop.run_until_complete(task) #第三步:经过事件循环运行 loop.close() #第四步:关闭事件循环 '''运行结果为: Hello world 01 begin Hello again 01 end 15 '''
所谓的回调函数,就是指协程函数coroutine执行结束时候会调用回调函数。并经过参数future获取协程执行的结果。咱们建立的task和回调里的future对象,其实是同一个对象,由于task是future的子类。
import asyncio import time from functools import partial async def get_url(): print('start get url') await asyncio.sleep(2) # await 后面跟的必须是一个 await 对象 print('end get url') return 'stack' def test(url,future): print(url,'hello, stack') if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() # loop.run_until_complete(get_url()) # 只是提交了一个请求,时间2s tasks = [get_url() for i in range(10)] # get_future = asyncio.ensure_future(get_url()) # 得到返回值用法1,源码上依然是先判断loop,而后调用create_task # get_future = loop.create_task(get_url()) # 方法2,还能够继续添加函数,执行逻辑 # get_future.add_done_callback(partial(test, 'Stack')) # 函数自己在得到调用时须要一个任意形数,参数便是 get_future 自己,不然报错 # 若是函数须要传递参数,须要经过 偏函数 partial 模块来解决,以及函数的形参须要放在前面 loop.run_until_complete(asyncio.wait(tasks)) # 提交了10次,时间也是2s # loop.run_until_complete(asyncio.gather(*tasks)) 效果同上 # gather 和 wait 的区别 # gather是更高一级的抽象,且使用更加灵活,可使用分组,以及取消任务 print(time.time() - start) # print(get_future.result()) # 接收返回值
针对3.7的版本
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模拟耗时任务3秒 print("Hello again 01 end") return a+b async def hello2(a,b): print("Hello world 02 begin") await asyncio.sleep(2) #模拟耗时任务2秒 print("Hello again 02 end") return a-b async def hello3(a,b): print("Hello world 03 begin") await asyncio.sleep(4) #模拟耗时任务4秒 print("Hello again 03 end") return a*b async def main(): results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5)) for result in results: print(result) asyncio.run(main()) '''运行结果为: Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end 15 5 50 '''
总结:
第一步:构建一个入口函数main 它也是一个异步协程函数,即经过async定义,而且要在main函数里面await一个或者是多个协程,同前面同样,我能够经过gather或者是wait进行组合,对于有返回值的协程函数,通常就在main里面进行结果的获取。
第二步:启动主函数main 这是python3.7新添加的函数,就一句话,即 asyncio.run(main())