Python 中的协程 (4) asyncio模块

asyncio的核心概念与基本架构python

  本文针对的是python3.4之后的版本的,由于从3.4开始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不过语法上面有稍微的改变。好比在3.4版本中使用@asyncio.coroutine装饰器和yield from语句,可是在3.5之后的版本中使用async、await两个关键字代替,虽然语法上稍微有所差别,可是原理是同样的。express

 

1 asyncio 组成的基本概念

1 协程函数的做用

(1)result = yield from future,返回future的结果。编程

(2)result = yield from coroutine,等候另外一个协程函数返回结果或者是触发异常 安全

(3)result= yield from task,返回一个task的结果架构

(4)return expression,做为一个函数抛出返回值并发

(5)raise exception异步

2 事件循环 event_loop

如何理解事件循环:async

线程一直在各个协程方法之间永不停歇的游走,遇到一个yield from 或者await就悬挂起来,而后又走到另一个方法,依次进行下去,知道事件循环全部的方法执行完毕。实际上loop是BaseEventLoop的一个实例,咱们能够查看定义,它到底有哪些方法可调用异步编程

协程函数,不是像普通函数那样直接调用运行的,必须添加到事件循环中,而后由事件循环去运行,单独运行协程函数是不会有结果的。函数

import time
import asyncio
async def say_after_time(delay,what):
        await asyncio.sleep(delay)
        print(what)

async def main():
        print(f"开始时间为: {time.time()}")
        await say_after_time(1,"hello")
        await say_after_time(2,"world")
        print(f"结束时间为: {time.time()}")
        
''' 直接运行 '''        
# >>> main()
# <coroutine object main at 0x1053bb7c8>       

'''  须要经过事件循环来调用'''
loop=asyncio.get_event_loop()    #建立事件循环对象
#loop=asyncio.new_event_loop()   #与上面等价,建立新的事件循环
loop.run_until_complete(main())  #经过事件循环对象运行协程函数
loop.close()

(1)获取事件循环对象的几种方式:

  1. loop=asyncio.get_running_loop(),返回(获取)在当前线程中正在运行的事件循环,若是没有正在运行的事件循环,则会显示错误

  2. loop=asyncio.get_event_loop() ,得到一个事件循环,若是当前线程尚未事件循环,则建立一个新的事件循环loop

  3. loop=asyncio.set_event_loop(loop), 设置一个事件循环为当前线程的事件循环;

  4. loop=asyncio.new_event_loop() ,建立一个新的事件循环

(2)经过事件循环运行协程函数的两种方式:

  1. 建立事件循环对象loop,即 asyncio.get_event_loop(),经过事件循环运行协程函数

  2. 直接经过 asyncio.run(function_name) 运行协程函数。

  可是须要注意的是,首先run函数是python3.7版本新添加的,前面的版本是没有的;其次,这个run函数老是会建立一个新的事件循环并在run结束以后关闭事件循环,因此,若是在同一个线程中已经有了一个事件循环,则不能再使用这个函数了,由于同一个线程不能有两个事件循环,并且这个run函数不能同时运行两次,由于他已经建立一个了。即同一个线程中是不容许有多个事件循环loop的。 asyncio.run()是python3.7 新添加的内容,也是后面推荐的运行任务的方式,由于它是高层API,后面会讲到它与asyncio.run_until_complete()的差别性,run_until_complete()是相对较低层的API。

3 什么是awaitable对象

有三类对象是可等待的,即 coroutines , Tasks , and Futures .

coroutine :本质上就是一个函数,一前面的生成器yield和yield from为基础,再也不赘述;

Tasks : 任务,顾名思义,就是要完成某件事情,其实就是对协程函数进一步的封装;

Future :它是一个“更底层”的概念,他表明一个异步操做的最终结果,由于异步操做通常用于耗时操做,结果不会当即获得,会在“未来”获得异步运行的结果,故而命名为 Future。

三者的关系,coroutine 能够自动封装成 task ,而Task是 Future 的子类。

4 什么是task任务

Task用来 并发调度的协程, 单纯的协程函数仅仅是一个函数而已,将其包装成任务,任务是能够包含各类状态的,异步编程最重要的就是对异步操做状态的把控了。

(1)建立任务(两种方法):

方法一:task = asyncio.create_task(coro()) # 这是3.7版本新添加的

方法二:task = asyncio.ensure_future(coro()) ,也可使用loop.create_future()loop.create_task(coro) 也是能够的。

(2)获取某一个任务的方法:

方法一:task=asyncio.current_task(loop=None);返回在某一个指定的loop中,当前正在运行的任务,若是没有任务正在运行,则返回None;若是loop为None,则默认为在当前的事件循环中获取,

方法二:asyncio.all_tasks(loop=None);返回某一个loop中尚未结束的任务;

5 什么是future?

  Future是一个较低层的可等待(awaitable)对象,他表示的是异步操做的最终结果,当一个Future对象被等待的时候,协程会一直等待,直到Future已经运算完毕。 Future是Task的父类,通常状况下,已不用去管它们二者的详细区别,也没有必要去用Future,用Task就能够了,返回 future 对象的低级函数的一个很好的例子是 loop.run_in_executor().

  

2 asyncio的基本架构

asyncio分为高层API和低层API。咱们前面所讲的Coroutine和Tasks属于高层API,而Event Loop 和Future属于低层API。所谓的高层API主要是指那些asyncio.xxx()的方法。

High-level APIs

●Coroutines and Tasks(本文要写的) ​ ●Streams ​ ●Synchronization Primitives ​ ●Subprocesses ​ ●Queues ​ ●Exceptions

Low-level APIs

●Event Loop(下一篇要写的) ​ ●Futures ​ ●Transports and Protocols ​ ●Policies ​ ●Platform Support

1 常见的一些高层API方法
1)运行异步协程
asyncio.run(coro, *, debug=False)  #运行一个一步程序,参见上面

2)建立任务
task=asyncio.create_task(coro)  #python3.7  ,参见上面
task = asyncio.ensure_future(coro()) 

3)睡眠
await asyncio.sleep(delay, result=None, *, loop=None)
这个函数表示的是:当前的那个任务(协程函数)睡眠多长时间,而容许其余任务执行。这是它与time.sleep()的区别,time.sleep()是当前线程休息

4)并发运行多个任务
await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
它自己也是awaitable的。当全部的任务都完成以后,返回的结果是一个列表的形式、

5)防止任务取消
await asyncio.shield(*arg, *, loop=None)

6)设置timeout
await asyncio.wait_for(aw, timeout, *, loop=None)
当异步操做须要执行的时间超过waitfor设置的timeout,就会触发异常,因此在编写程序的时候,若是要给异步操做设置timeout,必定要选择合适,
若是异步操做自己的耗时较长,而你设置的timeout过短,会涉及到她还没作完,就抛出异常了。
7)多个协程函数时候的等候 await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) 与上面的区别是,第一个参数aws是一个集合,要写成集合set的形式,好比: {func(),func(),func3()} 表示的是一系列的协程函数或者是任务,其中协程会自动包装成任务。事实上,写成列表的形式也是能够的。 该函数的返回值是两个Tasks/Futures的集合: (done, pending) 其中done是一个集合,表示已经完成的任务tasks;pending也是一个集合,表示尚未完成的任务。 常见的使用方法为:done, pending = await asyncio.wait(aws)
2 Task 类详解

(1)他是做为一个python协程对象,和Future对象很像的这么一个对象,但不是线程安全的;他继承了Future全部的API,,除了Future.set_result()和Future.set_Exception();

(2)使用高层API asyncio.create_task()建立任务,或者是使用低层API loop.create_task()或者是loop.ensure_future()建立任务对象;

(3)相比于协程函数,任务时有状态的,可使用Task.cancel()进行取消,这会触发CancelledError异常,使用cancelled()检查是否取消。

cancel函数:

import asyncio

async def cancel_me():
    print('cancel_me(): before sleep')
    try:
        await asyncio.sleep(3600) #模拟一个耗时任务
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    #经过协程建立一个任务,须要注意的是,在建立任务的时候,就会跳入到异步开始执行
    #由于是3.7版本,建立一个任务就至关因而运行了异步函数cancel_me
    task = asyncio.create_task(cancel_me()) 
    #等待一秒钟
    await asyncio.sleep(1)
    print('main函数休息完了')
    #发出取消任务的请求
    task.cancel()  
    try:
        await task  #由于任务被取消,触发了异常
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

'''运行结果为:
cancel_me(): before sleep
main函数休息完了
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now
'''
3 异步函数的结果获取

两种方法:第一种是直接经过Task.result()来获取;第二种是绑定一个回调函数来获取,即函数执行完毕后调用一个函数来获取异步函数的返回值。

1,经过result函数

import asyncio
import time


async def hello1(a,b):
    print("Hello world 01 begin")
    await asyncio.sleep(3)  #模拟耗时任务3秒
    print("Hello again 01 end")
    return a+b

coroutine=hello1(10,5)
loop = asyncio.get_event_loop()                #第一步:建立事件循环
task=asyncio.ensure_future(coroutine)         #第二步:将多个协程函数包装成任务列表
loop.run_until_complete(task)                  #第三步:经过事件循环运行
print('-------------------------------------')
print(task.result())
loop.close() 

'''运行结果为
Hello world 01 begin
Hello again 01 end
-------------------------------------
15
'''

2, 经过定义回调函数

import asyncio
import time


async def hello1(a,b):
    print("Hello world 01 begin")
    await asyncio.sleep(3)  #模拟耗时任务3秒
    print("Hello again 01 end")
    return a+b

def callback(future):   #定义的回调函数
    print(future.result())

loop = asyncio.get_event_loop()                #第一步:建立事件循环
task=asyncio.ensure_future(hello1(10,5))       #第二步:将多个协程函数包装成任务
task.add_done_callback(callback)                      #并被任务绑定一个回调函数

loop.run_until_complete(task)                  #第三步:经过事件循环运行
loop.close()                                   #第四步:关闭事件循环


'''运行结果为:
Hello world 01 begin
Hello again 01 end
15
'''

  所谓的回调函数,就是指协程函数coroutine执行结束时候会调用回调函数。并经过参数future获取协程执行的结果。咱们建立的task和回调里的future对象,其实是同一个对象,由于task是future的子类。

3 asyncio 的基本模版

针对3.7以前的版本

import asyncio
import time
from functools import partial

async def get_url():
    print('start get url')
    await asyncio.sleep(2)          # await 后面跟的必须是一个 await 对象
    print('end get url')
    return 'stack'

def test(url,future):
    print(url,'hello, stack')

if __name__ == '__main__':
    start = time.time()
    
    loop = asyncio.get_event_loop()

    # loop.run_until_complete(get_url())      # 只是提交了一个请求,时间2s

    tasks = [get_url() for i in range(10)]

    # get_future = asyncio.ensure_future(get_url())
    # 得到返回值用法1,源码上依然是先判断loop,而后调用create_task

    # get_future = loop.create_task(get_url())
    # 方法2,还能够继续添加函数,执行逻辑
    # get_future.add_done_callback(partial(test, 'Stack'))
    # 函数自己在得到调用时须要一个任意形数,参数便是 get_future 自己,不然报错
    # 若是函数须要传递参数,须要经过 偏函数 partial 模块来解决,以及函数的形参须要放在前面


    loop.run_until_complete(asyncio.wait(tasks))  # 提交了10次,时间也是2s
     # loop.run_until_complete(asyncio.gather(*tasks)) 效果同上
     # gather 和 wait 的区别
     # gather是更高一级的抽象,且使用更加灵活,可使用分组,以及取消任务
    print(time.time() - start)
    # print(get_future.result())          # 接收返回值

针对3.7的版本

import asyncio
import time


async def hello1(a,b):
    print("Hello world 01 begin")
    await asyncio.sleep(3)  #模拟耗时任务3秒
    print("Hello again 01 end")
    return a+b

async def hello2(a,b):
    print("Hello world 02 begin")
    await asyncio.sleep(2)   #模拟耗时任务2秒
    print("Hello again 02 end")
    return a-b

async def hello3(a,b):
    print("Hello world 03 begin")
    await asyncio.sleep(4)   #模拟耗时任务4秒
    print("Hello again 03 end")
    return a*b

async def main():
    results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5))
    for result in results:
        print(result)

asyncio.run(main())

'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''

总结:

第一步:构建一个入口函数main 它也是一个异步协程函数,即经过async定义,而且要在main函数里面await一个或者是多个协程,同前面同样,我能够经过gather或者是wait进行组合,对于有返回值的协程函数,通常就在main里面进行结果的获取。

第二步:启动主函数main 这是python3.7新添加的函数,就一句话,即 asyncio.run(main())

相关文章
相关标签/搜索