python协程系列

 

声明:本文针对的是python3.4之后的版本的,由于从3.4开始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不过语法上面有稍微的改变。好比在3.4版本中使用@asyncio.coroutine装饰器和yield from语句,可是在3.5之后的版本中使用async、await两个关键字代替,虽然语法上稍微有所差别,可是原理是同样的。本文用最通俗的语言解释了pythonasyncio背后的一些核心概念,简要解析了asyncio的设计架构,并给出了使用python进行asyncio异步编程的通常模板。python

1、一些最重要的概念
一、协程(coroutine)——本质就是一个函数express

所谓的“协程”就是一个函数,这个函数须要有两个基本的组成要素,第一,须要使用@asyncio.coroutine进行装饰;第二,函数体内必定要有yield from 返回的的generator,或者是说使用yield from 返回另外一个协程对象。编程

固然,这两个条件并非硬性规定的,若是没有这两个条件,依然是函数,只不过是普通函数而已。安全

怎么判断一个函数是否是协程?经过asyncio.iscoroutine(obj)和asyncio.iscoroutinefunction(func)加以判断,返回true,则是。网络

那么协程函数有什么做用呢?多线程

(1)result = yield from future架构

做用一:返回future的结果。什么是future?后面会讲到。当协程函数执行到这一句,协程会被悬挂起来,知道future的结果被返回。若是是future被中途取消,则会触发CancelledError异常。因为task是future的子类,后面也会介绍,关于future的全部应用,都一样适用于task并发

(2)result = yield from coroutineapp

等候另外一个协程函数返回结果或者是触发异常 异步

(3)result= yield from task

返回一个task的结果

(4)return expression

做为一个函数,他自己也是能够返回某一个结果的

(5)raise exception 

二、事件循环——event_loop

协程函数,不是像普通函数那样直接调用运行的,必须添加到事件循环中,而后由事件循环去运行,单独运行协程函数是不会有结果的,看一个简单的例子:

import time
import asyncio
async def say_after_time(delay,what):
await asyncio.sleep(delay)
print(what)

async def main():
print(f"开始时间为: {time.time()}")
await say_after_time(1,"hello")
await say_after_time(2,"world")
print(f"结束时间为: {time.time()}")

loop=asyncio.get_event_loop() #建立事件循环对象
#loop=asyncio.new_event_loop() #与上面等价,建立新的事件循环
loop.run_until_complete(main()) #经过事件循环对象运行协程函数
loop.close()
在python3.6版本中,若是咱们单独像执行普通函数那样执行一个协程函数,只会返回一个coroutine对象(python3.7)以下所示:

>>> main()
<coroutine object main at 0x1053bb7c8>
(1)获取事件循环对象的几种方式:

下面几种方式能够用来获取、设置、建立事件循环对象loop

loop=asyncio.get_running_loop() 返回(获取)在当前线程中正在运行的事件循环,若是没有正在运行的事件循环,则会显示错误;它是python3.7中新添加的

loop=asyncio.get_event_loop() 得到一个事件循环,若是当前线程尚未事件循环,则建立一个新的事件循环loop;

loop=asyncio.set_event_loop(loop) 设置一个事件循环为当前线程的事件循环;

loop=asyncio.new_event_loop() 建立一个新的事件循环

(2)经过事件循环运行协程函数的两种方式:

(1)方式一:建立事件循环对象loop,即asyncio.get_event_loop(),经过事件循环运行协程函数

(2)方式二:直接经过asyncio.run(function_name)运行协程函数。可是须要注意的是,首先run函数是python3.7版本新添加的,前面的版本是没有的;其次,这个run函数老是会建立一个新的事件循环并在run结束以后关闭事件循环,因此,若是在同一个线程中已经有了一个事件循环,则不能再使用这个函数了,由于同一个线程不能有两个事件循环,并且这个run函数不能同时运行两次,由于他已经建立一个了。即同一个线程中是不容许有多个事件循环loop的。

asyncio.run()是python3.7 新添加的内容,也是后面推荐的运行任务的方式,由于它是高层API,后面会讲到它与asyncio.run_until_complete()的差别性,run_until_complete()是相对较低层的API。

注意:到底什么是事件循环?如何理解?

能够这样理解:线程一直在各个协程方法之间永不停歇的游走,遇到一个yield from 或者await就悬挂起来,而后又走到另一个方法,依次进行下去,知道事件循环全部的方法执行完毕。实际上loop是BaseEventLoop的一个实例,咱们能够查看定义,它到底有哪些方法可调用。

三、什么是awaitable对象——便可暂停等待的对象

有三类对象是可等待的,即 coroutines, Tasks, and Futures.

coroutine:本质上就是一个函数,一前面的生成器yield和yield from为基础,再也不赘述;

Tasks: 任务,顾名思义,就是要完成某件事情,其实就是对协程函数进一步的封装;

Future:它是一个“更底层”的概念,他表明一个一步操做的最终结果,由于一步操做通常用于耗时操做,结果不会当即获得,会在“未来”获得异步运行的结果,故而命名为Future。

三者的关系,coroutine能够自动封装成task,而Task是Future的子类。

四、什么是task任务

如前所述,Task用来 并发调度的协程,即对协程函数的进一步包装?那为何还须要包装呢?由于单纯的协程函数仅仅是一个函数而已,将其包装成任务,任务是能够包含各类状态的,异步编程最重要的就是对异步操做状态的把控了。

(1)建立任务(两种方法):

方法一:task = asyncio.create_task(coro())   # 这是3.7版本新添加的

方法二:task = asyncio.ensure_future(coro())

也可使用

loop.create_future()

loop.create_task(coro)

也是能够的。

备注:关于任务的详解,会在后面的系列文章继续讲解,本文只是归纳性的说明。

(2)获取某一个任务的方法:

方法一:task=asyncio.current_task(loop=None)

返回在某一个指定的loop中,当前正在运行的任务,若是没有任务正在运行,则返回None;

若是loop为None,则默认为在当前的事件循环中获取,

方法二:asyncio.all_tasks(loop=None)

返回某一个loop中尚未结束的任务

五、什么是future?

Future是一个较低层的可等待(awaitable)对象,他表示的是异步操做的最终结果,当一个Future对象被等待的时候,协程会一直等待,直到Future已经运算完毕。

Future是Task的父类,通常状况下,已不用去管它们二者的详细区别,也没有必要去用Future,用Task就能够了,

返回 future 对象的低级函数的一个很好的例子是 loop.run_in_executor().

2、asyncio的基本架构
前面介绍了asyncio里面最为核心的几个概念,若是可以很好地理解这些概念,对于学习协程是很是有帮助的,可是按照我我的的风格,我会先说asyncio的架构,理解asyncio的设计架构有助于更好地应用和理解。

asyncio分为高层API和低层API,咱们均可以使用,就像我前面在讲matplotlib的架构的时候所讲的同样,咱们前面所讲的Coroutine和Tasks属于高层API,而Event Loop 和Future属于低层API。固然asyncio所涉及到的功能远不止于此,咱们只看这么多。下面是是高层API和低层API的概览:

High-level APIs

Coroutines and Tasks(本文要写的)
Streams
Synchronization Primitives
Subprocesses
Queues
Exceptions
Low-level APIs

Event Loop(下一篇要写的)
Futures
Transports and Protocols
Policies
Platform Support
所谓的高层API主要是指那些asyncio.xxx()的方法,

一、常见的一些高层API方法

(1)运行异步协程

asyncio.run(coro, *, debug=False)  #运行一个一步程序,参见上面

(2)建立任务

task=asyncio.create_task(coro)  #python3.7  ,参见上面

task = asyncio.ensure_future(coro()) 

(3)睡眠

await asyncio.sleep(delay, result=None, *, loop=None)

这个函数表示的是:当前的那个任务(协程函数)睡眠多长时间,而容许其余任务执行。这是它与time.sleep()的区别,time.sleep()是当前线程休息,注意他们的区别哦。

另外若是提供了参数result,当当前任务(协程)结束的时候,它会返回;

loop参数将会在3.10中移除,这里就再也不说了。

(4)并发运行多个任务
await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

它自己也是awaitable的。

*coros_or_futures是一个序列拆分操做,若是是以个协程函数,则会自动转换成Task。

当全部的任务都完成以后,返回的结果是一个列表的形式,列表中值的顺序和*coros_or_futures完成的顺序是同样的。

return_exceptions:False,这是他的默认值,第一个出发异常的任务会当即返回,而后其余的任务继续执行;

                             True,对于已经发生了异常的任务,也会像成功执行了任务那样,等到全部的任务执行结束一块儿将错误的结果返回到最终的结果列表里面。

若是gather()自己被取消了,那么绑定在它里面的任务也就取消了。

(5)防止任务取消

await asyncio.shield(*arg, *, loop=None)

它自己也是awaitable的。顾名思义,shield为屏蔽、保护的意思,即保护一个awaitable 对象防止取消,通常状况下不推荐使用,并且在使用的过程当中,最好使用try语句块更好。

try:
res = await shield(something())
except CancelledError:
res = None
(6)设置timeout——必定要好好理解

await asyncio.wait_for(aw, timeout, *, loop=None)

若是aw是一个协程函数,会自动包装成一个任务task。参见下面的例子:

import asyncio

async def eternity():
print('我立刻开始执行')
await asyncio.sleep(3600) #当前任务休眠1小时,即3600秒
print('终于轮到我了')

async def main():
# Wait for at most 1 second
try:
print('等你3秒钟哦')
await asyncio.wait_for(eternity(), timeout=3) #休息3秒钟了执行任务
except asyncio.TimeoutError:
print('超时了!')

asyncio.run(main())

'''运行结果为:
等你3秒钟哦
我立刻开始执行
超时了!
'''
为何?首先调用main()函数,做为入口函数,当输出‘等你3秒钟哦’以后,main挂起,执行eternity,而后打印‘我立刻开始执行’,而后eternity挂起,并且要挂起3600秒,大于3,这时候出发TimeoutError。修改一下:‘’

import asyncio

async def eternity():
print('我立刻开始执行')
await asyncio.sleep(2) #当前任务休眠2秒钟,2<3
print('终于轮到我了')

async def main():
# Wait for at most 1 second
try:
print('等你3秒钟哦')
await asyncio.wait_for(eternity(), timeout=3) #给你3秒钟执行你的任务
except asyncio.TimeoutError:
print('超时了!')

asyncio.run(main())

'''运行结果为:
等你3秒钟哦
我立刻开始执行
终于轮到我了
'''
总结:当异步操做须要执行的时间超过waitfor设置的timeout,就会触发异常,因此在编写程序的时候,若是要给异步操做设置timeout,必定要选择合适,若是异步操做自己的耗时较长,而你设置的timeout过短,会涉及到她还没作完,就抛出异常了。

(7)多个协程函数时候的等候

await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

与上面的区别是,第一个参数aws是一个集合,要写成集合set的形式,好比:

{func(),func(),func3()}

表示的是一系列的协程函数或者是任务,其中协程会自动包装成任务。事实上,写成列表的形式也是能够的。

注意:该函数的返回值是两个Tasks/Futures的集合:

(done, pending)

其中done是一个集合,表示已经完成的任务tasks;pending也是一个集合,表示尚未完成的任务。

常见的使用方法为:done, pending = await asyncio.wait(aws)

参数解释:

timeout (a float or int), 同上面的含义同样,须要注意的是,这个不会触发asyncio.TimeoutError异常,若是到了timeout还有任务没有执行完,那些没有执行完的tasks和futures会被返回到第二个集合pending里面。

return_when参数,顾名思义,他表示的是,何时wait函数该返回值。只可以去下面的几个值。 

Constant Description
FIRST_COMPLETED first_completes.当任何一个task或者是future完成或者是取消,wait函数就返回
FIRST_EXCEPTION 当任何一个task或者是future触发了某一个异常,就返回,.若是是全部的task和future都没有触发异常,则等价与下面的 ALL_COMPLETED.
ALL_COMPLETED 当全部的task或者是future都完成或者是都取消的时候,再返回。
以下面例子所示:

import asyncio
import time

a=time.time()

async def hello1(): #大约2秒
print("Hello world 01 begin")
yield from asyncio.sleep(2)
print("Hello again 01 end")

async def hello2(): #大约3秒
print("Hello world 02 begin")
yield from asyncio.sleep(3)
print("Hello again 02 end")

async def hello3(): #大约4秒
print("Hello world 03 begin")
yield from asyncio.sleep(4)
print("Hello again 03 end")

async def main(): #入口函数
done,pending=await asyncio.wait({hello1(),hello2(),hello3()},return_when=asyncio.FIRST_COMPLETED)
for i in done:
print(i)
for j in pending:
print(j)

asyncio.run(main()) #运行入口函数

b=time.time()
print('---------------------------------------')
print(b-a)

'''运行结果为:
Hello world 02 begin
Hello world 01 begin
Hello world 03 begin
Hello again 01 end
<Task finished coro=<hello1() done, defined at e:\Python学习\基础入门\asyncio3.4详解\test11.py:46> result=None>
<Task pending coro=<hello3() running at e:\Python学习\基础入门\asyncio3.4详解\test11.py:61> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
at 0x000001FA8D394438>()]>>
<Task pending coro=<hello2() running at e:\Python学习\基础入门\asyncio3.4详解\test11.py:55> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
at 0x000001FA8D394378>()]>>
---------------------------------------
2.033155679702759
'''
从上面能够看出,hello1()试运行结束了的,hello2()和hello3()还没结束。
(8)asyncio.as_completed()函数

这个函数我没有找到合适的中文名称去描述,因此哪位大神若是知道,望告知,不胜感激!它的函数原型以下:

asyncio.as_completed(aws, *, loop=None, timeout=None)

第一个参数aws:同上面同样,是一个集合{}集合里面的元素是coroutine、task或者future

第三个参数timeout:意义和上面讲的的同样

那到底什么做用呢?其实很简单,我的感受有点鸡肋,从一个例子看起:

import asyncio
import time
import threading

a=time.time()

@asyncio.coroutine
def hello1():
print("Hello world 01 begin")
yield from asyncio.sleep(5) #大约5秒
print("Hello again 01 end")
return '哈哈1'

@asyncio.coroutine
def hello2():
print("Hello world 02 begin")
yield from asyncio.sleep(3) #大约3秒
print("Hello again 02 end")
return '哈哈2'

@asyncio.coroutine
def hello3():
print("Hello world 03 begin")
yield from asyncio.sleep(4) #大约4秒
print("Hello again 03 end")
return '哈哈3'

async def main():
s=asyncio.as_completed({hello1(),hello2(),hello3()})
for f in s:
result=await f
print(result)

asyncio.run(main())

b=time.time()
print('---------------------------------------')
print(b-a)

'''运行结果为:
Hello world 03 begin
Hello world 01 begin
Hello world 02 begin
Hello again 01 end
哈哈1
Hello again 02 end
哈哈2
Hello again 03 end
哈哈3
---------------------------------------
4.0225794315338135
'''

结论:asyncio.as_completed()函数返回的是一个可迭代(iterator)的对象,对象的每一个元素就是一个future对象,不少小伙伴说,这不是至关于没变吗?其实返回的future集合是对参数的future集合从新组合,组合的顺序就是,最早执行完的协程函数(coroutine、task、future)最早返回,从上面的代码可知,参数为

aws={hello1(),hello2(),hello3()},由于hello1大约花费5秒、hello2大约花费3秒、hello3大约花费4秒。返回的结果为

s={hello2()、hello3()、hello(1)},由于hello2时间最短,故而放在前面,hello1时间最长,故而放在最后面。而后对返回的集合s开始迭代。

二、Task 类详解

先来看一下Task类的简单介绍(英文原文文档)。

class asyncio.Task(coro, *, loop=None)

A Future-like object that runs a Python coroutine. Not thread-safe.

Tasks are used to run coroutines in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the completion of the Future. When the Future is done, the execution of the wrapped coroutine resumes.

Event loops use cooperative scheduling: an event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other Tasks, callbacks, or performs IO operations.

Use the high-level asyncio.create_task() function to create Tasks, or the low-level loop.create_task() or ensure_future() functions. Manual instantiation of Tasks is discouraged.

To cancel a running Task use the cancel() method. Calling it will cause the Task to throw a CancelledError exception into the wrapped coroutine. If a coroutine is awaiting on a Future object during cancellation, the Future object will be cancelled.

cancelled() can be used to check if the Task was cancelled. The method returns True if the wrapped coroutine did not suppress the CancelledError exception and was actually cancelled.

asyncio.Task inherits from Future all of its APIs except Future.set_result() and Future.set_exception().

Tasks support the contextvars module. When a Task is created it copies the current context and later runs its coroutine in the copied context.

上面的文字描述中推出了几个很是重要的信息,特在此总结以下:

(1)他是做为一个python协程对象,和Future对象很像的这么一个对象,但不是线程安全的;他继承了Future全部的API,,除了Future.set_result()和Future.set_Exception();

(2)使用高层API  asyncio.ccreate_task()建立任务,或者是使用低层API loop.create_task()或者是loop.ensure_future()建立任务对象;

(3)相比于协程函数,任务时有状态的,可使用Task.cancel()进行取消,这会触发CancelledError异常,使用cancelled()检查是否取消。

下面介绍Task类常见的一些使用函数

(1)cancel()

Request the Task to be cancelled.

其实前面已经有所介绍,最好是使用他会出发CancelledError异常,因此须要取消的协程函数里面的代码最好在try-except语句块中进行,这样方便触发异常,打印相关信息,可是Task.cancel()没有办法保证任务必定会取消,而Future.cancel()是能够保证任务必定取消的。能够参见下面的一个例子:

import asyncio

async def cancel_me():
print('cancel_me(): before sleep')
try:
await asyncio.sleep(3600) #模拟一个耗时任务
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')

async def main():
#经过协程建立一个任务,须要注意的是,在建立任务的时候,就会跳入到异步开始执行
#由于是3.7版本,建立一个任务就至关因而运行了异步函数cancel_me
task = asyncio.create_task(cancel_me())
#等待一秒钟
await asyncio.sleep(1)
print('main函数休息完了')
#发出取消任务的请求
task.cancel()
try:
await task #由于任务被取消,触发了异常
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")

asyncio.run(main())

'''运行结果为:
cancel_me(): before sleep
main函数休息完了
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now
'''
运行过程分析:

首先run函数启动主函数入口main,在main中,由于第一句话就是调用异步函数cancel_me()函数,因此先打印出了第一句话;

而后进入cancel_me中的try语句,遇到await,暂停,这时候返回main中执行,可是有在main中遇到了await,也会暂停,可是因为main中只须要暂停1秒,而cancel_me中要暂停3600秒,因此等到main的暂停结束后,接着运行main,因此打印出第二句话;

接下来遇到取消任务的请求task.cancel(),而后继续执行main里面的try,又遇到了await,接着main进入暂停,接下来进入到cancel_me函数中,可是因为main中请求了取消任务,因此那个耗时3600秒的任务就再也不执行了,直接触发了Cancelled_Error异常,打印出第三句话,接下来又raise一个异常信息;

接下来执行cancel_me的finally,打印出第四句话,此时cancel_me执行完毕,因为他抛出了一个异常,返回到主程序main中,触发异常,打印出第五句话。

(2)done()

当一个被包装得协程既没有触发异常、也没有被取消的时候,意味着它是done的,返回true。

(3)result()

返回任务的执行结果,

当任务被正常执行完毕,则返回结果;

当任务被取消了,调用这个方法,会触发CancelledError异常;

当任务返回的结果是无用的时候,则调用这个方法会触发InvalidStateError;

当任务出发了一个异常而中断,调用这个方法还会再次触发这个使程序中断的异常。

(4)exception()

返回任务的异常信息,触发了什么异常,就返回什么异常,若是任务是正常执行的无异常,则返回None;

当任务被取消了,调用这个方法会触发CancelledError异常;

当任务没有作完,调用这个方法会触发InvalidStateError异常。

下面还有一些不经常使用的方法,以下:

(5)add_done_callback(callback, *, context=None)

(6)remove_done_callback(callback)

(7)get_stack(*, limit=None)

(8)print_stack(*, limit=None, file=None)

(9)all_tasks(loop=None),这是一个类方法

(10)current_task(loop=None),这是一个类方法

三、异步函数的结果获取

对于异步编程、异步函数而言,最重要的就是异步函数调用结束以后,获取异步函数的返回值,咱们能够有如下几种方式来获取函数的返回值,第一是直接经过Task.result()来获取;第二种是绑定一个回调函数来获取,即函数执行完毕后调用一个函数来获取异步函数的返回值。

(1)直接经过result来获取

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")
return a+b

coroutine=hello1(10,5)
loop = asyncio.get_event_loop() #第一步:建立事件循环
task=asyncio.ensure_future(coroutine) #第二步:将多个协程函数包装成任务列表
loop.run_until_complete(task) #第三步:经过事件循环运行
print('-------------------------------------')
print(task.result())
loop.close()

'''运行结果为
Hello world 01 begin
Hello again 01 end
-------------------------------------
15
'''
(2)经过定义回调函数来获取

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")
return a+b

def callback(future): #定义的回调函数
print(future.result())

loop = asyncio.get_event_loop() #第一步:建立事件循环
task=asyncio.ensure_future(hello1(10,5)) #第二步:将多个协程函数包装成任务
task.add_done_callback(callback) #并被任务绑定一个回调函数

loop.run_until_complete(task) #第三步:经过事件循环运行
loop.close() #第四步:关闭事件循环


'''运行结果为:
Hello world 01 begin
Hello again 01 end
15
'''
注意:所谓的回调函数,就是指协程函数coroutine执行结束时候会调用回调函数。并经过参数future获取协程执行的结果。咱们建立的task和回调里的future对象,其实是同一个对象,由于task是future的子类。

3、asyncio异步编程的基本模板
事实上,在使用asyncio进行异步编程的时候,语法形式每每是多样性的,虽然理解异步编程的核心思想很重要,可是实现的时候终究仍是要编写语句的,本次给出的模板,是两个不一样的例子,例子一是三个异步方法,它们都没有参数,没有返回值,都模拟一个耗时任务;例子二是三个异步方法,都有参数,都有返回值。

一、python3.7以前的版本

(1)例子一:无参数、无返回值

import asyncio
import time

a=time.time()

async def hello1():
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")

async def hello2():
print("Hello world 02 begin")
await asyncio.sleep(2) #模拟耗时任务2秒
print("Hello again 02 end")

async def hello3():
print("Hello world 03 begin")
await asyncio.sleep(4) #模拟耗时任务4秒
print("Hello again 03 end")

loop = asyncio.get_event_loop() #第一步:建立事件循环
tasks = [hello1(), hello2(),hello3()] #第二步:将多个协程函数包装成任务列表
loop.run_until_complete(asyncio.wait(tasks)) #第三步:经过事件循环运行
loop.close() #第四步:取消事件循环

'''运行结果为:
Hello world 02 begin
Hello world 03 begin
Hello world 01 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
'''
(2)例子二:有参数、有返回值

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")
return a+b

async def hello2(a,b):
print("Hello world 02 begin")
await asyncio.sleep(2) #模拟耗时任务2秒
print("Hello again 02 end")
return a-b

async def hello3(a,b):
print("Hello world 03 begin")
await asyncio.sleep(4) #模拟耗时任务4秒
print("Hello again 03 end")
return a*b

loop = asyncio.get_event_loop() #第一步:建立事件循环
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
tasks = [task1,task2,task3] #第二步:将多个协程函数包装成任务列表
loop.run_until_complete(asyncio.wait(tasks)) #第三步:经过事件循环运行
print(task1.result()) #而且在全部的任务完成以后,获取异步函数的返回值
print(task2.result())
print(task3.result())
loop.close() #第四步:关闭事件循环

'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''
(3)总结:四步走(针对python3.7以前的版本)

第一步·:构造事件循环

loop=asyncio.get_running_loop() #返回(获取)在当前线程中正在运行的事件循环,若是没有正在运行的事件循环,则会显示错误;它是python3.7中新添加的

loop=asyncio.get_event_loop() #得到一个事件循环,若是当前线程尚未事件循环,则建立一个新的事件循环loop;

loop=asyncio.set_event_loop(loop) #设置一个事件循环为当前线程的事件循环;

loop=asyncio.new_event_loop() #建立一个新的事件循环
第二步:将一个或者是多个协程函数包装成任务Task

#高层API
task = asyncio.create_task(coro(参数列表))   # 这是3.7版本新添加的
task = asyncio.ensure_future(coro(参数列表))

#低层API
loop.create_future(coro)
loop.create_task(coro)

'''须要注意的是,在使用Task.result()获取协程函数结果的时候,使用asyncio.create_task()却会显示错
可是使用asyncio.ensure_future却正确,本人暂时不知道缘由,哪位大神知道,望告知,不胜感激!'''
第三步:经过事件循环运行

loop.run_until_complete(asyncio.wait(tasks)) #经过asyncio.wait()整合多个task

loop.run_until_complete(asyncio.gather(tasks)) #经过asyncio.gather()整合多个task

loop.run_until_complete(task_1) #单个任务则不须要整合

loop.run_forever() #可是这个方法在新版本已经取消,再也不推荐使用,由于使用起来不简洁

'''
使用gather或者wait能够同时注册多个任务,实现并发,但他们的设计是彻底不同的,在前面的2.1.(4)中已经讨论过了,主要区别以下:
(1)参数形式不同
gather的参数为 *coroutines_or_futures,即如这种形式
tasks = asyncio.gather(*[task1,task2,task3])或者
tasks = asyncio.gather(task1,task2,task3)
loop.run_until_complete(tasks)
wait的参数为列表或者集合的形式,以下
tasks = asyncio.wait([task1,task2,task3])
loop.run_until_complete(tasks)
(2)返回的值不同
gather的定义以下,gather返回的是每个任务运行的结果,
results = await asyncio.gather(*tasks)
wait的定义以下,返回dones是已经完成的任务,pending是未完成的任务,都是集合类型
done, pending = yield from asyncio.wait(fs)
(3)后面还会讲到他们的进一步使用
'''
简单来讲:async.wait会返回两个值:done和pending,done为已完成的协程Task,pending为超时未完成的协程Task,需经过future.result调用Task的result。而async.gather返回的是已完成Task的result。

第四步:关闭事件循环

loop.close()

'''
以上示例都没有调用 loop.close,好像也没有什么问题。因此到底要不要调 loop.close 呢?
简单来讲,loop 只要不关闭,就还能够再运行:
loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()
可是若是关闭了,就不能再运行了:
loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3)) # 此处异常
建议调用 loop.close,以完全清理 loop 对象防止误用
'''
二、python3.7版本

在最新的python3.7版本中,asyncio又引进了一些新的特性和API,

(1)例子一:无参数、无返回值

import asyncio
import time


async def hello1():
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")

async def hello2():
print("Hello world 02 begin")
await asyncio.sleep(2) #模拟耗时任务2秒
print("Hello again 02 end")

async def hello3():
print("Hello world 03 begin")
await asyncio.sleep(4) #模拟耗时任务4秒
print("Hello again 03 end")

async def main():
results=await asyncio.gather(hello1(),hello2(),hello3())
for result in results:
print(result) #由于没返回值,故而返回None

asyncio.run(main())

'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
None
None
None
'''
(2)例子二:有参数、有返回值

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")
return a+b

async def hello2(a,b):
print("Hello world 02 begin")
await asyncio.sleep(2) #模拟耗时任务2秒
print("Hello again 02 end")
return a-b

async def hello3(a,b):
print("Hello world 03 begin")
await asyncio.sleep(4) #模拟耗时任务4秒
print("Hello again 03 end")
return a*b

async def main():
results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5))
for result in results:
print(result)

asyncio.run(main())

'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''
(3)总结:两步走(针对python3.7)

第一步:构建一个入口函数main

他也是一个异步协程函数,即经过async定义,而且要在main函数里面await一个或者是多个协程,同前面同样,我能够经过gather或者是wait进行组合,对于有返回值的协程函数,通常就在main里面进行结果的获取。

第二步:启动主函数main

这是python3.7新添加的函数,就一句话,即

asyncio.run(main())

注意:

再也不须要显式的建立事件循环,由于在启动run函数的时候,就会自动建立一个新的事件循环。并且在main中也不须要经过事件循环去掉用被包装的协程函数,只须要向普通函数那样调用便可 ,只不过使用了await关键字而已。

4、协程编程的优势:
一、无cpu分时切换线程保存上下文问题(协程上下文怎么保存)

二、遇到io阻塞切换(怎么实现的)

三、无需共享数据的保护锁(为何)

四、系列文章下篇预告——介绍低层的API,事件循环究竟是怎么实现的以及future类的实现。
---------------------

原文:https://blog.csdn.net/qq_27825451/article/details/86218230

 

 

=============================================================================================================================

 

1、事件循环EventLoop
事件循环是asyncio的核心,异步任务的运行、任务完成以后的回调、网络IO操做、子进程的运行,都是经过事件循环完成的。在前一篇文章中,已经提到过,在python3.7中,咱们甚至彻底不用管事件循环,只须要使用高层API,即asyncio中的方法,咱们不多直接与事件循环打交道,可是为了更加熟悉asyncio的运行原理,最好仍是了解EventLoop的设计原理。

一、事件循环的建立、获取、设置(上文已经介绍过了)

(1)asyncio.get_running_loop()。python3.7新添加的

(2)asyncio.get_event_loop()

(3)asyncio.set_event_loop(loop)

(4)asyncio.new_event_loop()

二、运行和中止事件循环

(1)loop.run_until_complete(future)。运行事件循环,直到future运行结束

(2)loop.run_forever()。在python3.7中已经取消了,表示事件循环会一直运行,直到遇到stop。

(3)loop.stop()。中止事件循环

(4)loop.is_running()。若是事件循环依然在运行,则返回True

(5)loop.is_closed()。若是事件循环已经close,则返回True

(6)loop.close()。关闭事件循环

三、建立Future和Task

(1)loop.create_future(coroutine) ,返回future对象

(2)loop.create_task(corootine) ,返回task对象

(3)loop.set_task_factory(factory)

(4)loop.get_task_factory()

四、事件循环的时钟

loop.time()。能够这么理解,事件循环内部也维护着一个时钟,能够查看事件循环如今运行的时间点是多少,就像普通的time.time()相似,它返回的是一个浮点数值,好比下面的代码。

import asyncio

async def hello1(a,b):
print('准备作加法运算')
await asyncio.sleep(3)
return a+b

loop=asyncio.get_event_loop()
t1=loop.time() #开始时间
print(t1)
loop.run_until_complete(hello1(3,4))
t2=loop.time() #结束时间
print(t2)
print(t2-t1) #时间间隔
'''运行结果为:
28525.671
准备作加法运算
28528.703
3.0320000000028813
'''
五、计划执行回调函数(CallBacks)

(1)loop.call_later(delay, callback, *args, context=None)

首先简单的说一下它的含义,就是事件循环在delay多长时间以后才执行callback函数,它的返回值是asyncio.TimerHandle类的一个实例对象。

(2)loop.call_at(when, callback, *args, context=None)

即在某一个时刻进行调用计划的回调函数,第一个参数再也不是delay而是when,表示一个绝对的时间点,结合前面的loop.time使用,它的使用方法和call_later()很相似。它的返回值是asyncio.TimerHandle类的一个实例对象。

(3)loop.call_soon(callback, *args, context=None)

在下一个迭代的时间循环中马上调用回调函数,用法同上面。它的返回值是asyncio.Handle类的一个实例对象。

(4)loop.call_soon_threadsafe(callback, *args, context=None)

这是call_soon()函数的线程安全版本,计划回调函数必须在另外一个线程中使用。

须要注意的是:上面的几个回调函数都只使用了“位置参数”哦,asyncio中,大部分的计划回调函数都不支持“关键字参数”,若是是想要使用关键字参数,则推荐使用functools.aprtial()对方法进一步包装,详细能够参考前面的python标准库系列文章。

如:

# will schedule "print("Hello", flush=True)"
loop.call_soon(
functools.partial(print, "Hello", flush=True))
下面来看一下具体的使用例子。

import asyncio

def callback(n):
print('我是回调函数,参数为: {0} '.format(n))


async def main(loop):
print('在异步函数中注册回调函数')
loop.call_later(2, callback, 1)
loop.call_later(1, callback, 2)
loop.call_soon(callback, 3)

await asyncio.sleep(4)


loop = asyncio.get_event_loop()
print('进入事件循环')
loop.run_until_complete(main(loop))
print('关闭事件循环')
loop.close()

'''运行结果为:
进入事件循环
在异步函数中注册回调函数
我是回调函数,参数为: 3
我是回调函数,参数为: 2
我是回调函数,参数为: 1
关闭事件循环
'''
再看一个简单的例子:

import asyncio

def callback(a, loop):
print("个人参数为 {0},执行的时间为{1}".format(a,loop.time()))


#call_later, call_at
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
now = loop.time()
loop.call_later(5, callback, 5, loop) #第一个参数设置的时间5.5秒后执行,
loop.call_at(now+2, callback, 2, loop) #在指定的时间,运行,当前时间+2秒
loop.call_at(now+1, callback, 1, loop)
loop.call_at(now+3, callback, 3, loop)
loop.call_soon(callback, 4, loop)
loop.run_forever() #要用这个run_forever运行,由于没有传入协程,这个函数在3.7中已经被取消
except KeyboardInterrupt:
print("Goodbye!")

'''运行结果为:
个人参数为 4,执行的时间为266419.843
个人参数为 1,执行的时间为266420.843
个人参数为 2,执行的时间为266421.859
个人参数为 3,执行的时间为266422.859
个人参数为 5,执行的时间为266424.843
'''
总结注意事项:

(1)CallBack函数只可以定义为同步方法,不可以定义为async方法,及不能使用async和@asyncio.coroutine修饰;

(2)每个CallBack方法只会调用一次,若是在同一个时刻有另个CallBack方法须要调用,则他们的执行顺序是不肯定的;

(3)注意使用functools.partial()去修饰带有关键字参数的CallBack方法;

(4)如何理解?对于通常的异步函数,咱们须要将它放在时间循环里面,而后经过事件循环去循环调用它,而由于CallBack并非异步函数,它是定义为普通的同步方法,因此不可以放在时间循环里面,可是若是我依然想要让事件循环去执行它怎么办呢?那就不放进事件循环,直接让事件循环“当即、稍后、在何时”去执行它不就好了嘛,call的含义就是“执行”。

2、底层API之Future
一、Future的定义概览

Future的本质是一个类。他表示的是异步操做的最终将要返回的结果,故而命名为Future,它不是线程安全的。Future对象是awaitable的,参见系类文章的前面,

class asyncio.Future(*, loop=None)

二、asyncio中关于Future的几个方法

(1)asyncio.isfuture(obj) 。判断一个对象是否是Future,注意python中一切皆对象哦,包括函数,当obj是下面几种状况时返回true:

asyncio.Future的实例对象
asyncio.Task的实例对象
一个具备 _asyncio_future_blocking 属性的对象
(2)asyncio.ensure_future(obj, *, loop=None)。将一个obj包装成Future

(3)asyncio.wrap_future(future, *, loop=None)

将concurrent.futures.Future对象包装成一个 asyncio.Future 对象。

三、Future对象的经常使用方法

(1)result()。返回Future执行的结果返回值

若是Future被执行完成,若是使用set_result()方法设置了一个结果,那个设置的value就会被返回;

若是Future被执行完成,若是使用set_exception()方法设置了一个异常,那么使用这个方法也会触发异常;

若是Future被取消了,那么使用这个方法会触发CancelledError异常;

若是Future的结果不可用或者是不可达,那么使用这个方法也会触发InvalidStateError异常;

(2)set_result(result)

标记Future已经执行完毕,而且设置它的返回值。

(3)set_exception(exception)

标记Future已经执行完毕,而且触发一个异常。

(4)done()

若是Future1执行完毕,则返回 True 。

(5)cancelled()

判断任务是否取消。

(6)add_done_callback(callback, *, context=None)

在Future完成以后,给它添加一个回调方法,这个方法就至关因而loop.call_soon()方法,参见前面,以下例子:

若是要回调带有关键字参数的函数,也须要使用partial方法哦。

(7)remove_done_callback(callback)

(8)cancel()

(9)exception()

(10)get_loop()。返回Future所绑定的事件循环

3、集中回答如下几个问题
经过前面的讲解,已经讲清楚了asyncio架构里面的一些基本东西,如今能够来集中回答如下一些常见的问题了,弄清楚这希尔问题,能够方便咱们更加深刻的理解协程。

一、不少个协程一块儿运行有建立新的线程吗?

协程运行时,都是在一个线程中运行的,没有建立新的线程。以下

import asyncio
import time
import threading

a=time.time()

async def hello1():
    print(f"Hello world 01 begin,my thread is:{threading.currentThread()}")
    await asyncio.sleep(3)
    print("Hello again 01 end")

async def hello2():
    print(f"Hello world 02 begin,my thread is:{threading.currentThread()}")
    await asyncio.sleep(2)
    print("Hello again 02 end")

async def hello3():
    print(f"Hello world 03 begin,my thread is:{threading.currentThread()}")
    await asyncio.sleep(1)
    print("Hello again 03 end")

loop = asyncio.get_event_loop()
tasks = [hello1(), hello2(),hello3()]
loop.run_until_complete(asyncio.wait(tasks))

loop.close()


b=time.time()
print('---------------------------------------')
print(b-a)
'''运行结果为:
Hello world 03 begin,my thread is:<_MainThread(MainThread, started 4168)>
Hello world 02 begin,my thread is:<_MainThread(MainThread, started 4168)>
Hello world 01 begin,my thread is:<_MainThread(MainThread, started 4168)>
Hello again 03 end
Hello again 02 end
Hello again 01 end
---------------------------------------
2.994506597518921
'''
从上面那个能够看出,三个不一样的协程函数都是在一个线程完成的。可是并非意味着,多个协程函数只能在一个线程中执行,一样能够建立新的线程,其实咱们彻底能够在新的线程中从新建立一个事件循环,具体的实例参见后面。

二、线程必定效率更高吗?

也不是绝对的,固然在通常状况下,异步方式的执行效率是更高的,就好比上面的三个函数,若是按照同步的方式执行,则一共须要6秒的时间,可是采用协程则只须要最长的那个时间3秒,这天然是提升了工做效率,那是否是必定会提升呢?也不必定,这与协程的调用方式是由密切关系的。以下所示:

import asyncio
import time
import threading

a=time.time()

async def hello1():
print(f"Hello world 01 begin,my thread is:{threading.currentThread()}")
await asyncio.sleep(3)
print("Hello again 01 end")

async def hello2():
print(f"Hello world 02 begin,my thread is:{threading.currentThread()}")
await asyncio.sleep(2)
print("Hello again 02 end")

async def hello3():
print(f"Hello world 03 begin,my thread is:{threading.currentThread()}")
await hello2()
await hello1()
print("Hello again 03 end")

loop = asyncio.get_event_loop()
tasks = [hello3()]
loop.run_until_complete(asyncio.wait(tasks))

loop.close()

b=time.time()
print('---------------------------------------')
print(b-a)

'''运行结果为:
Hello world 03 begin,my thread is:<_MainThread(MainThread, started 13308)>
Hello world 02 begin,my thread is:<_MainThread(MainThread, started 13308)>
Hello again 02 end
Hello world 01 begin,my thread is:<_MainThread(MainThread, started 13308)>
Hello again 01 end
Hello again 03 end
---------------------------------------
5.008373498916626
'''
咱们发现一个问题,上面执行的顺序彻底不是异步执行,执行的时间也没有获得改善,究其缘由,是由于上面是经过hello3去调用hello1和hello2的,这和同步调用的方式彻底是同样的,即便我定义的都是异步方法,它既没有提升执行效率,还会有阻塞。

结论:在有不少个异步方式的时候,必定要尽可能避免这种异步函数的直接调用,这和同步是没什么区别的,必定要经过事件循环loop,“让事件循环在各个异步函数之间不停游走”,这样才不会形成阻塞。

三、协程会不会有阻塞呢?

异步方式依然会有阻塞的,当咱们定义的不少个异步方法彼此之间有一来的时候,好比,我必需要等到函数1执行完毕,函数2须要用到函数1的返回值,如上面的例子2所示,就会形成阻塞,这也是异步编程的难点之一,如何合理配置这些资源,尽可能减小函数之间的明确依赖,这是很重要的。

四、协程的4种状态

协程函数相比于通常的函数来讲,咱们能够将协程包装成任务Task,任务Task就在于能够跟踪它的状态,我就知道它具体执行到哪一步了,通常来讲,协程函数具备4种状态,能够经过相关的模块进行查看,请参见前面的文章,他的四种状态为:

Pending
Running
Done
Cacelled
 建立future的时候,task为pending,事件循环调用执行的时候固然就是running,调用完毕天然就是done,若是须要中止事件循环,中途须要取消,就须要先把task取消,即为cancelled。

4、多任务实现并发
python异步协程函数的最终目的是实现并发,这样才能提升工做效率。

咱们常常看见下面这样的代码,即:

tasks = asyncio.gather(*[task1,task2,task3])
loop.run_until_complete(tasks)

#或者是
tasks = asyncio.wait([task1,task2,task3])
loop.run_until_complete(tasks)

#甚至能够写在一块儿,即
loop.run_until_complete(asyncio.gather(*[task1,task2,task3])
#或者是
asyncio.gather(asyncio.wait([task1,task2,task3]))
上面这些都是一些简单的应用,能够同时进行多任务,进行并发,可是若是咱们每个任务都有返回值,并且须要获取这些返回值,这样作显然还不够,还须要作进一步的处理。

asyncio实现并发的思想是同样的,只是实现的手段稍有区别,主要有如下几种实现方式:

(1)使用gather同时注册多个任务,实现并发

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

注意事项:

gather的返回值是它所绑定的全部任务的执行结果,并且顺序是不变的,即返回的result的顺序和绑定的顺序是保持一致的。

除此以外,它是awaitable的,因此,若是须要获取多个任务的返回值,既然是awaitable的,就须要将它放在一个函数里面,因此咱们引入一个包装多个任务的入口main,这也是python3.7的思想。以下:

# import asyncio
# import time
# import threading

# a=time.time()

# async def hello1():
# print(f"Hello world 01 begin,my thread is:{threading.currentThread()}")
# await asyncio.sleep(3)
# print("Hello again 01 end")

# async def hello2():
# print(f"Hello world 02 begin,my thread is:{threading.currentThread()}")
# await asyncio.sleep(2)
# print("Hello again 02 end")

# async def hello3():
# print(f"Hello world 03 begin,my thread is:{threading.currentThread()}")
# await hello2()
# await hello1()
# print("Hello again 03 end")

# loop = asyncio.get_event_loop()
# tasks = [hello3()]
# loop.run_until_complete(asyncio.wait(tasks))

# loop.close()


# b=time.time()
# print('---------------------------------------')
# print(b-a)

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")
return a+b

async def hello2(a,b):
print("Hello world 02 begin")
await asyncio.sleep(2) #模拟耗时任务2秒
print("Hello again 02 end")
return a-b

async def hello3(a,b):
print("Hello world 03 begin")
await asyncio.sleep(4) #模拟耗时任务4秒
print("Hello again 03 end")
return a*b

async def main(): #封装多任务的入口函数
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
results=await asyncio.gather(task1,task2,task3)
for result in results: #经过迭代获取函数的结果,每个元素就是相对应的任务的返回值,顺序都没变
print(result)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''
(2)使用wait能够同时注册多个任务,实现并发

await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

它与gather不一样的地方是他的参数是集合类型,并且他的返回类型是这样一个形式,即

 (done, pending).   #返回dones是已经完成的任务,pending是未完成的任务,都是集合类型,不一样的是每个元素再也不是返回值,而是某一个task哦,

相同的是它依然也是awaitable的,故而也须要定义在一个异步函数main()中,以下。

#前面的代码和上面同样
async def main(): #封装多任务的入口函数
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
done,pending=await asyncio.wait([task1,task2,task3])
for done_task in done:
print(done_task.result()) #这里返回的是一个任务,不是直接的返回值,故而须要使用result函数进行获取


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

#运行结果也同样
(3)使用as_completed能够同时注册多个任务,实现并发

这个方法使用的比较少,与前面的两个gather和wait不一样的是,它不是awaitable。使用实例参见前面的一篇文章,参见以下:

(4)主调方获取任务的运行结果

上面的运行结果,都是在main()函数里面获取的运行结果,那可不能够再也不main()里面获取结果呢,,固然是能够的,咱们能够这样作,

async def main(): #封装多任务的入口函数
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))

return await asyncio.gather(task1,task2,task3) #不在这里获取结果,只是返回


loop = asyncio.get_event_loop()
results=loop.run_until_complete(main()) #在这里再获取返回函数值,而后迭代获取
for result in results:
print(result)
loop.close()

#y运行结果同上
或者是以下:

async def main(): #封装多任务的入口函数
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))

return await asyncio.wait([task1,task2,task3]) #不在这里获取结果,只是返回


loop = asyncio.get_event_loop()
done,pending=loop.run_until_complete(main()) #在这里再获取返回函数值,而后迭代获取
for done_task in done:
print(done_task.result())
loop.close()
5、Future补充下一篇预告
一、Future补充

asyncio中的Future类是模仿concurrent.futures.Future类而设计的,关于concurrent.futures.Future,能够查阅相关的文档。它们之间的主要区别是:

(1)asyncio.Future对象是awaitable的,可是concurrent.futures.Future对象是不可以awaitable的;

(2)asyncio.Future.result()和asyncio.Future.exception()是不接受关键字参数timeout的;

(3)当Future没有完成的时候,asyncio.Future.result()和asyncio.Future.exception()将会触发一个InvalidStateError异常;

(4)使用asyncio.Future.add_done_callback()注册的回调函数不会当即执行,它可使用loop.call_soon代替;

(5)asyncio里面的Future和concurrent.futures.wait()以及concurrent.futures.as_completed()是不兼容的。

有兴趣的小伙伴能够本身学一下concurrent.futures哦!

 

=============================================================================================================================

 

本文为系列文章的第七篇,将介绍如何使用多线程结合异步编程asyncio,开发出真正“不假死”的应用程序;以及如何模拟一个timer,实现定时操做。

1、异步方法依然会假死(freezing)
什么是程序的假死,这里再也不多描述,特别是在编写桌面程序的时候,若是是使用单个线程,同步函数的方式,假死是不可避免的,可是有时候咱们即便是使用了异步函数的方式依然是不可避免的,依然会假死,这是为何呢,下面会经过几个例子来详细说明。

一、通常程序的调用方“假死”


import asyncio
import time
import threading

#定义一个异步操做
async def hello1(a,b):
print(f"异步函数开始执行")
await asyncio.sleep(3)
print("异步函数执行结束")
return a+b

#在一个异步操做里面调用另外一个异步操做
async def main():
c=await hello1(10,20)
print(c)
print("主函数执行")

loop = asyncio.get_event_loop()
tasks = [main()]
loop.run_until_complete(asyncio.wait(tasks))

loop.close()

'''运行结果为:
异步函数开始执行(在此处要等待3秒)
异步函数执行结束
30
主函数执行
'''
注意一个问题:咱们前面所讲的例子中,没有出现等待,是由于各个异步方法之间是“彻底并列”关系,彼此之间没有依赖,因此,我能够将全部的异步操做“gather”起来,而后经过事件循环,让事件循环在多个异步方法之间来回调用,永不中止,故而没有出现等待。

可是,现实中不可能全部的异步方法都是彻底独立的,没有任何关系的,在上面的这个例子中,就是很好的说明,hello1是一个耗时任务,耗时大约3秒,main也是一个异步方法,可是main中须要用到hello1中的返回结果,因此他必需要等待hello1运行结束以后再才能继续执行,这就是为何会获得上面结果的缘由。这也再一次说明,异步依然是会有阻塞的。

咱们也能够这样理解,由于我给事件循环只注册了一个异步方法,那就是main,当在main里面遇到了await,事件循环挂起,转而寻找其余的异步方法,可是因为只注册了一个异步方法给事件循环,他没有其余的方法可执行了,因此只能等待,让hello1执行完了,再继续执行。

二、窗体程序的假死

(1)同步假死

import tkinter as tk # 导入 Tkinter 库
import time

class Form:
def __init__(self):
self.root=tk.Tk()
self.root.geometry('500x300')
self.root.title('窗体程序') #设置窗口标题

self.button=tk.Button(self.root,text="开始计算",command=self.calculate)
self.label=tk.Label(master=self.root,text="等待计算结果")

self.button.pack()
self.label.pack()
self.root.mainloop()

def calculate(self):
time.sleep(3) #模拟耗时计算
self.label["text"]=300

if __name__=='__main__':
form=Form()
运行的结果就是,我单机一下“开始计算”按钮,而后窗体会假死,这时候没法移动窗体、也没法最大化最小化、3秒钟以后,“等待计算结果”的label会显示出3,而后前面移动的窗体等操做接着发生,最终效果以下:

 

上面的窗体会假死,这无可厚非,由于,全部的操做都是同步方法,只有一个线程,负责维护窗体状态的线程和执行好使计算的线程是同一个,当遇到time.sleep()的时候天然会遇到阻塞。那若是咱们将耗时任务换成异步方法呢?代码以下:

(2)异步假死

import tkinter as tk # 导入 Tkinter 库
import asyncio

class Form:
def __init__(self):
self.root=tk.Tk()
self.root.geometry('500x300')
self.root.title('窗体程序') #设置窗口标题

self.button=tk.Button(self.root,text="开始计算",command=self.get_loop)
self.label=tk.Label(master=self.root,text="等待计算结果")

self.button.pack()
self.label.pack()

self.root.mainloop()

#定义一个异步方法,模拟耗时计算任务
async def calculate(self):
await asyncio.sleep(3)
self.label["text"]=300

#asyncio任务只能经过事件循环运行,不能直接运行异步函数
def get_loop(self):
self.loop=asyncio.get_event_loop()
self.loop.run_until_complete(self.calculate())
self.loop.close()


if __name__=='__main__':
form=Form()
咱们发现,窗体依然会形成阻塞,状况和前面的同步方法是同样的,为何会这样呢?由于这个地方虽然启动了事件循环,可是拥有事件循环的那个线程同时还须要维护窗体的状态,始终只有一个线程在运行,当单击“开始计算”按钮,开始执行get_loop函数,在get_loop里面启动异步方法calculate,而后遇到await,这个时候事件循环暂停,可是因为事件循环只注册了calculate一个异步方法,也没其余事情干,因此只能等待,形成假死阻塞。

解决办法就是我专门再建立一个线程去执行一些计算任务,维护窗体状态的线程就之专门负责维护状态,后面再详说。

2、多线程结合asyncio解决调用时的假死
一、asyncio专门实现Concurrency and Multithreading(多线程和并发)的函数介绍

为了让一个协程函数在不一样的线程中执行,咱们可使用如下两个函数

(1)loop.call_soon_threadsafe(callback, *args),这是一个很底层的API接口,通常不多使用,本文也暂时不作讨论。

(2)asyncio.run_coroutine_threadsafe(coroutine,loop)

第一个参数为须要异步执行的协程函数,第二个loop参数为在新线程中建立的事件循环loop,注意必定要是在新线程中建立哦,该函数的返回值是一个concurrent.futures.Future类的对象,用来获取协程的返回结果。

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)   # 在新线程中运行协程

result = future.result()   #等待获取Future的结果

二、不阻塞的多线程并发实例

asyncio.run_coroutine_threadsafe(coroutine,loop)的意思很简单,就是我在新线程中建立一个事件循环loop,而后在新线程的loop中不断不停的运行一个或者是多个coroutine。参考下面代码:

import asyncio

import asyncio,time,threading

#须要执行的耗时异步任务
async def func(num):
print(f'准备调用func,大约耗时{num}')
await asyncio.sleep(num)
print(f'耗时{num}以后,func函数运行结束')

#定义一个专门建立事件循环loop的函数,在另外一个线程中启动它
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

#定义一个main函数
def main():
coroutine1 = func(3)
coroutine2 = func(2)
coroutine3 = func(1)

new_loop = asyncio.new_event_loop() #在当前线程下建立时间循环,(未启用),在start_loop里面启动它
t = threading.Thread(target=start_loop,args=(new_loop,)) #经过当前线程开启新的线程去启动事件循环
t.start()

asyncio.run_coroutine_threadsafe(coroutine1,new_loop) #这几个是关键,表明在新线程中事件循环不断“游走”执行
asyncio.run_coroutine_threadsafe(coroutine2,new_loop)
asyncio.run_coroutine_threadsafe(coroutine3,new_loop)

for i in "iloveu":
print(str(i)+" ")

if __name__ == "__main__":
main()

'''运行结果为:
i 准备调用func,大约耗时3
l 准备调用func,大约耗时2
o 准备调用func,大约耗时1
v
e
u
耗时1以后,func函数运行结束
耗时2以后,func函数运行结束
耗时3以后,func函数运行结束
'''
咱们发现,main是在主线程中的,而三个协程函数是在新线程中的,它们是在一块儿执行的,没有形成主线程main的阻塞。下面再看一下窗体函数中的实现。

三、tkinter+threading+asyncio

import tkinter as tk # 导入 Tkinter 库
import time
import asyncio
import threading

class Form:
def __init__(self):
self.root=tk.Tk()
self.root.geometry('500x300')
self.root.title('窗体程序') #设置窗口标题

self.button=tk.Button(self.root,text="开始计算",command=self.change_form_state)
self.label=tk.Label(master=self.root,text="等待计算结果")

self.button.pack()
self.label.pack()

self.root.mainloop()

async def calculate(self):
await asyncio.sleep(3)
self.label["text"]=300

def get_loop(self,loop):
self.loop=loop
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def change_form_state(self):
coroutine1 = self.calculate()
new_loop = asyncio.new_event_loop() #在当前线程下建立时间循环,(未启用),在start_loop里面启动它
t = threading.Thread(target=self.get_loop,args=(new_loop,)) #经过当前线程开启新的线程去启动事件循环
t.start()

asyncio.run_coroutine_threadsafe(coroutine1,new_loop) #这几个是关键,表明在新线程中事件循环不断“游走”执行


if __name__=='__main__':
form=Form()
运行上面的代码,咱们发现,此时点击“开始计算”按钮执行耗时任务,没有形成窗体的任何阻塞,我能够最大最小化、移动等等,而后3秒以后标签会自动显示运算结果。为何会这样?

上面的代码中,get_loop()、change_form_state()、__init__()都是定义在主线程中的,窗体的状态维护也是主线程,二耗时计算calculate()是一个异步协程函数。

如今单击“开始计算按钮”,这个事件发生以后,会触发主线程的chang_form_state函数,而后在该函数中,会建立新的线程,经过新的线程建立一个事件循环,而后将协程函数注册到新线程中的事件循环中去,达到的效果就是,主线程作主线程的,新线程作新线程的,不会形成任何阻塞。

四、multithreading+asyncio总结

第一步:定义须要异步执行的一系列操做,及一系列协程函数;

第二步:在主线程中定义一个新的线程,而后在新线程中产生一个新的事件循环;

第三步:在主线程中,经过asyncio.run_coroutine_threadsafe(coroutine,loop)这个方法,将一系列异步方法注册到新线程的loop里面去,这样就是新线程负责事件循环的执行。

3、使用asyncio实现一个timer
所谓的timer指的是,指定一个时间间隔,让某一个操做隔一个时间间隔执行一次,如此周而复始。不少编程语言都提供了专门的timer实现机制、包括C++、C#等。可是 Python 并无原生支持 timer,不过能够用 asyncio.sleep 模拟。

大体的思想以下,将timer定义为一个异步协程,而后经过事件循环去调用这个异步协程,让事件循环不断在这个协程中反反复调用,只不过隔几秒调用一次便可。

简单的实现以下(本例基于python3.7):

async def delay(time):
await asyncio.sleep(time)

async def timer(time,function):
while True:
future=asyncio.ensure_future(delay(time))
await future
future.add_done_callback(function)

def func(future):
print('done')

if __name__=='__main__':
asyncio.run(timer(2,func))

'''运行结果为:
done
done
done
done
done
done
done
done
done
done
done
.
.
.
.每隔2秒打印一个done
'''
几个注意点:asyncio.sleep()自己就是一个协程函数,故而能够将它封装成一个Task或者是Future,等待时间结束也就是任务完成,绑定回调函数。固然,自己python语法灵活,上面只是其中一种实现而已。
---------------------
做者:LoveMIss-Y
来源:CSDN
原文:https://blog.csdn.net/qq_27825451/article/details/86483493
版权声明:本文为博主原创文章,转载请附上博文连接!

 

 

 

 

 

 

 

 

 

--------------------- 原文:https://blog.csdn.net/qq_27825451/article/details/86292513

相关文章
相关标签/搜索