Python 协程模块 asyncio 使用指南

Python 协程模块 asyncio 使用指南python

前面咱们经过5 分钟入门 Python 协程了解了什么是协程,协程的优势和缺点和如何在 Python 中实现一个协程。没有看过的同窗建议去看看。这篇文章,将再也不对理论性的东西作过多的解说。而是倾向于 asyncio 的使用上,另外为了保证文章时效性这里咱们使用 Python3.8 来进行对后面内容的操做。git

协程的演变

其实早在 Python3.4 的时候就有协程,当时的协程是经过 @asyncio.coroutine 和 yeild from 实现的。在一些很老教程中你可能看到的是下面这种形式:编程

import asyncio

@asyncio.coroutine
def print_hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 建立并获取EventLoop:
loop = asyncio.get_event_loop()
# 执行协程
loop.run_until_complete(print_hello())
loop.close()

由于如今几乎没有人这样写了因此仅做为了解便可。
而后到了 Python3.5 引入了 async/await 语法糖,一直到如今Python3.8 都是用这种形式来表示协程,示例以下。api

import asyncio

async def print_hello():
     print("Hello world!")
     await asyncio.sleep(1)
     print("Hello again!")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        print("开始运行协程")
        coro = print_hello()
        print("进入事件循环")
        loop.run_until_complete(coro)
    finally:
        print("关闭事件循环")
        loop.close()

这种是目前应用范围最广的,能够看到比以前的代码舒服了很多,不用再使用装饰器的形式了。而后就到了 Python3.7 和 Python3.8 ,协程发生了不少细小的变化,可是最大的一个变化就是,启动协程的方法变简单了,一句就能够搞定,不用再像上面那样建立循环而后再仍到事件循环去执行了。安全

import asyncio

async def print_hello():
     print("Hello world!")
     await asyncio.sleep(1)
     print("Hello again!")

if __name__ == '__main__':
        print("开始运行协程")
        asyncio.run(print_hello())
        print("进入事件循环")

怎么样是否是代码更少了,启动协程更简单了。因此这也正是咱们使用 3.8 做为本教程的 Python 版,与时俱进嘛。网络

Asyncio 的组成部分

根据目前的官方文档,总的来讲分为了两部分:高层级 API 和低层级API。
首先看高层级 API 也是接下来重点要讲的。数据结构

高层级API

  • 协程对象和 Tasks 对象
  • 数据流
  • 同步源语
  • 子进程
  • 队列
  • 异常

低层级API

  • 事件循环
  • Futures 对象
  • 传输和协议
  • 策略
  • 平台支持

上面列出了这么多的项目咱们怎么去选择本身所须要的呢,总的来讲对于刚入门的新手或者只是写一个本身用的程序通常都只会用到高级 API 的部分,这部分就属于开箱即用的那种,对于高级用户好比框架开发者,每每能够须要去适应各类须要,须要从新改写一些内部的结构,这个时候就须要用到低层级的 API,可是这两个层级呢只能是一个大概方向吧,主要是方便 API 的查看,下面呢我将围绕者高层级API和低层级API在平常实际工做中常常用到的内容作一些讲解。多线程

了解几个概念

在学习 asyncio 以前须要知道这样的几个概念。架构

事件循环

事件循环是一种处理多并发量的有效方式,在维基百科中它被描述为「一种等待程序分配事件或消息的编程架构」,咱们能够定义事件循环来简化使用轮询方法来监控事件,通俗的说法就是「当A发生时,执行B」。所谓的事件,其实就是函数。事件循环,就是有一个队列,里面存放着一堆函数,从第一个函数开始执行,在函数执行的过程当中,可能会有新的函数继续加入到这个队列中。一直到队列中全部的函数被执行完毕,而且不再会有新的函数被添加到这个队列中,程序就结束了。并发

Future

Future 是一个数据结构,表示还未完成的工做结果。事件循环能够监视Future 对象是否完成。从而容许应用的一部分等待另外一部分完成一些工做。
简答说,Future 就是一个类,用生成器实现了回调。

Task

Task 是 Future 的一个子类,它知道如何包装和管理一个协程的执行。任务所需的资源可用时,事件循环会调度任务容许,并生成一个结果,从而能够由其余协程消费。通常操做最多的仍是 Task。用Task来封装协程,给本来没有状态的协程增长一些状态。

awaitable objects(可等待对象)

若是一个对象能够用在 wait 表达式中,那么它就是一个可等待的对象。在 asyncio 模块中会一直提到这个概念,其中协程函数,Task,Future 都是 awaitable 对象。
用于 await 表达式中的对象。能够是的 coroutine 也能够是实现了 __await__() 方法的对象,参见 PEP 492。类比于 Iterable 对象是 Generator 或实现了__iter__() 方法的对象。

**object.__await__(self)**

必须返回生成器,asyncio.Future 类也实现了该方法,用于兼容 await 表达式。

而 Task 继承自 Future,所以 awaitable 对象有三种:coroutines、Tasks 和 Futures。

await 的目的:

  • 获取协程的结果
  • 挂起当前协程,将控制交由事件循环,切换到其余协程,而后等待结果,最后恢复协程继续执行

启动一个协程

如今咱们使用 async/await 语法来声明一个协程。 代码以下

import asyncio

async def main():
     print('hello')
     await asyncio.sleep(1)
     print('world')

if __name__ == '__main__':
    asyncio.run(main())

asyncio.run 只能用来启动程序入口协程,反过来你在程序中若是使用asyncio.run 就会出错,直接咱们提到对于其余的协程经过await链来实现,这里也是同样的。下面说下代码的含义,首先启动 main 这个协程,main 方法就是先打印 hello,而后在打印过程当中经过使用 asyncio.sleep 来等待1秒,以后再打印 world。前面咱们提到用协程就意味着咱们要一直使用非阻塞的代码,才能达到速度提高,因此这里咱们用了非阻塞版的 time.sleep 即 asyncio.sleep 。

协程中调用协程

以前咱们提到了在协程中,可使用 await 来调用一个协程。
就像下面的代码:

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    #使用f-string拼接字符串
    print(f"开始运行 {time.strftime('%X')}")

    child1=await say_after(1, 'hello') #经过await调用协程,而后接收一下返回值
    child2=await say_after(2, 'world')
    print("child1",child1)
    print("child2",child2)

    print(f"结束运行 {time.strftime('%X')}")
if __name__ == '__main__':
      asyncio.run(main())

运行结果:

开始运行 11:17:26
hello
world
child1 None
child2 None
结束运行 11:17:29
[Finished in 3.1s]

代码是没什么问题,正常运行。可是通常状况下咱们用到更多的是下面的方式。将协程封装为 Task 让本来没有状态标示的协程添加上状态 。
咱们能够经过 asyncio.create_task 方法来实现。

asyncio.create_task

create_task(在3.6版本中须要使用低层级的API asyncio.ensure_future。)是 3.7之后加入的语法,做用是将协程包装为一个任务(Task),相比3.6版本的ensure_future可读性提升。
将上面的代码作以下修改。

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"开始运行 {time.strftime('%X')}")

    child1=asyncio.create_task(say_after(1, 'hello')) #经过await调用协程,而后接收一下返回值
    child2=asyncio.create_task(say_after(2, 'world'))
    print("调用任务child1前",child1)
    print("调用任务child2前",child2)
    await child1
    await child2
    print("调用任务child1后",child1)
    print("调用任务child2前",child2)

    print(f"结束运行 {time.strftime('%X')}")
if __name__ == '__main__':
      asyncio.run(main())

运行结果以下:

开始运行 11:37:54
调用任务child1前 <Task pending name='Task-2' coro=<say_after() running at /Users/chennan/Desktop/2019/aiochatuse/hello.py:4>>
调用任务child2前 <Task pending name='Task-3' coro=<say_after() running at /Users/chennan/Desktop/2019/aiochatuse/hello.py:4>>
hello
world
调用任务child1后 <Task finished name='Task-2' coro=<say_after() done, defined at /Users/chennan/Desktop/2019/aiochatuse/hello.py:4> result=None>
调用任务child2前 <Task finished name='Task-3' coro=<say_after() done, defined at /Users/chennan/Desktop/2019/aiochatuse/hello.py:4> result=None>
结束运行 11:37:56

能够发现,咱们的结果中多了"<Task pending ..."和"<Task finised ..."几行语句。这就是 Task 的一个状态变化,知道状态的好处就是咱们能够根据任务的状态作进一步操做,不像协程函数那样没有状态标示,固然 Task 的状态不仅有这些。
前面说到 Task 是 Future 的子类,因此 Tas k拥有 Future 的一些状态。

Future的状态

大概有以下几种:

  • Pending
  • Running
  • Done
  • Cancelled
    建立 future 的时候,task 为 pending,事件循环调用执行的时候固然就是 running,调用完毕天然就是 done,若是须要中止事件循环,就须要先把 task 取消,状态为 cancel。这里先作了解知道 Task 是有状态的就够了。

    并发运行任务

    一系列的协程能够经过 await 链式的调用,可是有的时候咱们须要在一个协程里等待多个协程,好比咱们在一个协程里等待 1000 个异步网络请求,对于访问次序有没有要求的时候,就可使用另外的关键字asyncio.wait 或 asyncio.gather 来解决了。

    asyncio.gather

    使用方法
asyncio.gather(*aws, loop=None, return_exceptions=False)¶

也就是说使用 gather 语句并发协程,就得用 await 去执行它。这个方法能够接收三个参数,第一个 aws,
aws 通常是一个列表,若是里面的元素是 awaitable 类型,在运行的时候它将自动被包装成 Task,gather 会根据 aws 中元素添加的顺序。顺序执行并返回结果列表。
第二个 loop 能够传入一个事件循环对象,通常不用管,最后一个return_exceptions 默认是 False,若是 return_exceptions 为 True,异常将被视为成功结果,而后添加到结果列表中。
下面是一个10个数字并输出例子。

import asyncio

async def foo(num):
    return num
async def main():
    coro = [asyncio.create_task(foo(i)) for i in range(10) ]
    done= await asyncio.gather(*coro)
    for i in done:
        print(i)
    

if __name__ == '__main__':
    asyncio.run(main())

运行以后结果以下

0
1
2
3
4
5
6
7
8
9

gather 返回的结果是一个列表,迭代这个列表能够看到任务依次输出。
gather 一般被用来阶段性的一个操做,作完第一步才能作第二步,好比下面这样

import asyncio

import time


async def step1(n, start):
    await asyncio.sleep(n)
    print("第一阶段完成")
    print("此时用时", time.time() - start)
    return n


async def step2(n, start):
    await asyncio.sleep(n)
    print("第二阶段完成")
    print("此时用时", time.time() - start)
    return n


async def main():
    now = time.time()
    result = await asyncio.gather(step1(5, now), step2(2, now))
    for i in result:
        print(i)
    print("总用时", time.time() - now)


if __name__ == '__main__':
   asyncio.run(main())

输出内容

第二阶段完成
此时用时 2.0041821002960205
第一阶段完成
此时用时 5.0009942054748535
5
2
总用时 5.001508951187134

能够经过上面结果获得以下结论:
1.step1 和 step2 是并行运行的。
2.gather 会等待最耗时的那个完成以后才返回结果,耗时总时间取决于其中任务最长时间的那个。

asyncio.task

咱们先看一下 wait 的语法结构:

asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)¶

wait 一共有 4 个参数,第一个参数 aws,通常是一个任务列表。
第二个*以后的都是强制关键字参数,即loop,timeout,return_when。
loop通gather的参数是一个事件循环,该参数计划在Python 3.10中删除。
timeout能够指定这组任务的超时时间,请注意,此函数不会引起asyncio.TimeoutErro, 超时的时候会返回已完成的任务。

return_when能够指定什么条件下返回结果,默认是因此任务完成就返回结果列表。return_when的具体参数看下面的表格:
|参数名|含义|
|----|----|
|FIRST_COMPLETED|任何一个future完成或取消时返回|
| FIRST_EXCEPTION |任何一个future出现错误将返回,若是出现异常等价于ALL_COMPLETED|
|ALL_COMPLETED|当全部任务完成或者被取消时返回结果,默认值。|

wait返回的结果是一个元组,第一部分是完成的任务,第二部分是准备中的任务。

done, pending = await asyncio.wait(aws)

其中done表示是完成的任务,能够经过迭代获取每一个任务。
pending表示的是还没执行的任务。
下面看一个例子来进一步了解

import asyncio

async def foo(num):
    await asyncio.sleep(0.99991)
    return num
async def main():
    #coro = foo()
    coro = [asyncio.create_task(foo(i)) for i in range(10) ]
    done, pending = await asyncio.wait(coro,timeout=1,return_when="ALL_COMPLETED")
    
    for coro in done:
        print(coro.result())
    print("pending",pending)
    for item in pending:
         print(item)    

if __name__ == '__main__':
    asyncio.run(main())

运行结果以下:

2
5
3
0
6
4
1
7
pending {<Task pending name='Task-10' coro=<foo() running at /Users/chennan/Desktop/2019/aiochatuse/waitdemo.py:4> wait_for=<Future finished result=None>>, <Task pending name='Task-11' coro=<foo() running at /Users/chennan/Desktop/2019/aiochatuse/waitdemo.py:4> wait_for=<Future finished result=None>>}
<Task pending name='Task-10' coro=<foo() running at /Users/chennan/Desktop/2019/aiochatuse/waitdemo.py:4> wait_for=<Future finished result=None>>
<Task pending name='Task-11' coro=<foo() running at /Users/chennan/Desktop/2019/aiochatuse/waitdemo.py:4> wait_for=<Future finished result=None>>

首先说代码,使用wait实现并发的程序是无序的因此咱们看到数字不是一次出现的。这个是和gather的不一样之处,另外在返回的参数上也有差异,wait返回两个参数done和pending。

上面的代码指定了一个timeout,由于任务没在指定时间完成因此就致使,只有完成的任务输出告终果,没有完成的部分能够看到它们的状态是pending。

总结

最后咱们,总结一下wait和gather的相同之处和不一样之处:
相同之处:均可以完成多个任务的并发操做。
不一样以外:gather适合按照顺序去作的任务,或者按照阶段去作的任务,返回的是结果列表,而wait不讲究任务的顺序,这个在作爬虫中常用到,而后wait能够返回2个结果,done和pending。

任务完成时处理

asyncio.as_completed

as_complete是一个生成器,会管理指定的一个任务列表,并生成他们的结果。每一个协程结束运行时一次生成一个结果。与wait同样,as_complete不能保证顺序,不过执行其余动做以前没有必要等待因此后台操做完成。

咱们看下这个函数都有哪些参数

asyncio.as_completed(aws, *, loop=None, timeout=None)

和前面的wait相似,第一个参数awas,而后loop,最后timeout,须要注意的是timeout若是指定了,那么在指定时间没完成的话会抛出asyncio.exceptions.TimeoutError异常。
下面看一个例子:

import asyncio
import time


async def foo(n):
    print(f'等待{n}秒')
    await asyncio.sleep(n)
    return n


async def main():
    coroutine1 = foo(1)
    coroutine2 = foo(2)
    coroutine3 = foo(4)

    tasks = [asyncio.create_task(coroutine1),asyncio.create_task(coroutine2),asyncio.create_task(coroutine3)]
    for task in asyncio.as_completed(tasks):
        result = await task
        print(f'获取返回结果: {result}')


if __name__ == '__main__':
    now = lambda : time.time()
    start = now()
    asyncio.run(main())
    print(now() - start)

输出结果

等待1秒
等待2秒
等待4秒
获取返回结果: 1
获取返回结果: 2
获取返回结果: 4
4.002715826034546

能够看出整个执行过程总用时取决于 等待时间最长的那个即4秒。
接下来,对上面的代码稍做修改,

for task in asyncio.as_completed(tasks):

改成

for task in asyncio.as_completed(tasks,timeout=2):

其余地方不变,改完运行以后会看到上面提到的错误。

等待1秒
等待2秒
等待4秒
获取返回结果: 1
Traceback (most recent call last):
  File "/Users/chennan/Desktop/2019/aiochatuse/ascomplete.py", line 25, in <module>
    asyncio.run(main())
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 589, in run_until_complete
    return future.result()
  File "/Users/chennan/Desktop/2019/aiochatuse/ascomplete.py", line 18, in main
    result = await task
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py", line 570, in _wait_for_one
    raise exceptions.TimeoutError
asyncio.exceptions.TimeoutError

取消任务的时候保证其余协程运行完毕

在取消任务的时候存在一个问题,首先先看一段代码:

import asyncio

async def coro():
    print('开始休眠')
    await asyncio.sleep(2)
    print('结束休眠')

async def cancel_it(some_task):
    await asyncio.sleep(0.5)
    some_task.cancel()
    print('取消some_task任务')

async def main():
    real_task = asyncio.create_task(coro())
    await cancel_it(real_task)
    await real_task

if __name__ == '__main__':
    asyncio.run(main())

运行以后你会看到以下结果

开始休眠
取消some_task任务
Traceback (most recent call last):
  File "/Users/chennan/Desktop/2019/aiochatuse/shielddemo.py", line 24, in <module>
    asyncio.run(main())      
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 589, in run_until_complete
    return future.result()
asyncio.exceptions.CancelledError

下面说一下代码中的逻辑,在main协程中将coro协程封装为任务real_task,而后cancel_it方法作了一个取消任务的逻辑some_task.cancel()。并打印一句话。而后经过await去运行real_task方法,执行代码以后看到上面的结果。出现了asyncio.exceptions.CancelledError错误,同时看到coro只打印了一个开始休眠,后面的结束休眠没有打印。也就是说咱们在取消一个任务的时候,里面对于的协程也被取消了。若是咱们想在取消任务以后协程还能顺利执行完,就须要用到另一个函数shield.

asyncio.shield

该方法的做用是,在执行cancel取消一个task以后,task里面的协程仍然能够执行结束,不会像上面的coro那样出现错误。

asyncio.shield(aw, *, loop=None)

aw表示须要传入一个 Task。

接下来咱们就使用这个方法对上面的例子作一个修改。
从代码中体会它的做用

import asyncio

async def coro():
    print('开始休眠')
    await asyncio.sleep(2)
    print('结束休眠')

async def cancel_it(some_task):
    await asyncio.sleep(0.5)
    some_task.cancel()
    print('取消some_task任务')

async def main():
    real_task = asyncio.create_task(coro())
    shield = asyncio.shield(real_task)
    await cancel_it(shield)
    await real_task
if __name__ == '__main__':
    asyncio.run(main())

运行以后的结果

开始休眠
取消some_task任务
结束休眠

能够看到尽管some_task任务被取消,可是coro仍然成功的打印了最好的“结束休眠”。经过上面的例子我想你们应该知道shield的做用了。

超时等待

有时候须要等待一个任务完成以后再进行下一个,可是有的时候并不须要运行完就返回。
这个时候可使用wait_for

asyncio.wait_for

该方法的语法以下:

asyncio.wait_for(aw, timeout, *, loop=None)

aw是一个任务,timeout能够指定超时时间。若是发生超时,它将取消该任务并引起asyncio.TimeoutError,此时为了保证任务中协程完成可使用上面说的 shield。

import asyncio

async def foo():
     await asyncio.sleep(1)
     print("in foo")
     
async def eternity():
    # Sleep for one hour
    await foo()
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(asyncio.shield(eternity()), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')
if __name__ == '__main__':
    asyncio.run(main())

输出

in foo
timeout!

按照上面的经验可知道,若是咱们把asyncio.shield去掉以后,“in foo”就没法输出了。

协程配合线程

asyncio.run_coroutine_threadsafe

该方法的语法以下:

asyncio.run_coroutine_threadsafe(coro, loop)

其实在协程中也可使用多线程,有时候咱们须要在主线程中启动一个子线程去作别的任务,这个时候咱们就要用到下面的方法了,先上一个流畅的Python中的代码。

import time
import asyncio
from  threading import Thread

now = lambda: time.time()


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def do_some_work(x):
    print(f'Waiting {x}')
    await asyncio.sleep(x)
    print(f'Done after {x}s')


def more_work(x):
    print(f'More work {x}')
    time.sleep(x)
    print('Finished more work {x}')


start = now()
# 主线程中建立一个 new_loop
new_loop = asyncio.get_event_loop()
# 建立子线程 在其中开启无限事件循环
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print(f'TIME: {time.time() - start}')

# 在主线程中新注册协程对象
# 这样便可在子线程中进行事件循环的并发操做 同时主线程又不会被 block 
# 一共执行的时间大概在 6 s 左右 
asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主线程中建立一个new_loop,而后在另外的子线程中开启一个无限事件循环。主线程经过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操做,同时主线程又不会被block。一共执行的时间大概在6s左右。

同步原语

尽管asyncio应用一般做为单线程运行,不过仍被构建为并发应用。因为I/O以及其余外部事件的延迟和中断,每一个协程或任务可能按一种不可预知的顺序执行。为了支持安全的并发执行,asyncio包含了threading和multiprocessing模块中的一些底层原语的实现。
这里介绍两个常常用到的例子

队列(Queue)

asyncio.Queue为协程提供了一个先进先出的数据结构,这与线程的queue.Queue或进程的multiprocessing.Queue很相似,下面先看一个简单的例子,它是一个非阻塞的队列。

import asyncio
from asyncio import Queue

queue=Queue()
async def start():
        [queue.put_nowait(i) for i in range(1, 10)]
        await asyncio.create_task(work()) #put_nowait表示放入元素

async def work():
    try:
        while not queue.empty():#判断队列的元素是否为空
            num = queue.get_nowait()#获取元素
            print(f"获取数字:{num}")
            queue.task_done()#告诉队列该任务处理完。
    except asyncio.CancelledError:
        pass

if __name__ == '__main__':
    asyncio.run(start())

输出结果:

获取数字:1
获取数字:2
获取数字:3
获取数字:4
获取数字:5
获取数字:6
获取数字:7
获取数字:8
获取数字:9

在作爬虫的时候对于url的处理,常常会用到队列的操做。另一个要说的同步原语就是信号量。

信号量(Semaphore)

简单说下什么是信号量,咱们用停车场和车进行比喻。一个停车场一共就5个车位,因此咱们知道能够同时容纳最多5辆车,这五个车位就是信号量。
而后说信号量的行为,当有车离开停车场的时候外面的车就会进来补,好比有2辆车离开,那么就能够再进来2辆车,依次类推,上面这个过程就是描述了信号量这个东西。下面咱们看如何在程序中使用。
asyncio.Semaphore模块就是一个维持并发量的模块,咱们用它起到一个限流的效果。首先来一段代码。

import asyncio

sem=asyncio.Semaphore(3) #信号量指定为3

async def branch(num):
    async with sem:  #经过异步上下文关键子控制并发量
        print(f"获取当前数字:{num}")
        await asyncio.sleep(0.5)


async def main():
     
     tasks=[asyncio.create_task(branch(i)) for i in range(10)] #将协程封装成任务共10个
     await asyncio.wait(tasks) #执行这些任务
      
if __name__ == '__main__':
    asyncio.run(main())

执行以后你会发现

获取当前数字:0
获取当前数字:1
获取当前数字:2
Task exception was never retrieved
future: <Task finished name='Task-11' coro=<branch() done, defined at /Users/chennan/Desktop/2019/aiochatuse/semaphoredemo.py:26> exception=RuntimeError("Task <Task pending name='Task-11' coro=<branch() running at /Users/chennan/Desktop/2019/aiochatuse/semaphoredemo.py:27> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py:478]> got Future <Future pending> attached to a different loop")>

关键就是 attached to a different loop,这个地方说是当前的事件循环发生了改变,这个问题在Python3.6的时候是不会出现的。
为何3.8出错了,这是由于
个人信号量没有在循环内建立。也就是在asyncio.run()建立的循环以外建立了它们,所以它们使用events.get_event_loop()这就致使了新的事件循环产生。 asyncio.run()建立一个新循环,而后在一个循环中建立的future不能在另外一个循环中使用。因此问题就明确了咱们须要在循环以内建立。也就是咱们须要定义一个全局变量,而后在主循环内部给其赋值,看到这,可能你们想到了global,Python 3.7 增长了上下文变量 Context Variables,至于为何不用全局变量,由于可能会被其余协程修改,不安全,在这里也可使用。
因此咱们的代码变成了下面这个样子

import asyncio
from contextvars import ContextVar

concurrent=ContextVar("concurrent")#定义全局上下文管理器

async def branch(num):
    sem=concurrent.get()#获取上下文关键字
    async with sem:
        print(f"获取当前数字:{num}")
        await asyncio.sleep(0.5) #为了看到明显的效果

async def main():
     concurrent.set(asyncio.Semaphore(3)) #上下文管理器赋值
     tasks=[asyncio.create_task(branch(i)) for i in range(10)]
     await asyncio.wait(tasks)
      
if __name__ == '__main__':
    asyncio.run(main())

而后咱们再次输出

获取当前数字:0
获取当前数字:1
获取当前数字:2
获取当前数字:3
获取当前数字:4
获取当前数字:5
获取当前数字:6
获取当前数字:7
获取当前数字:8
获取当前数字:9

能够看到程序每隔3组输出一次,这就达到了咱们想要的效果了。

后记

到目前为止,asyncio 经常使用的操做就是上面这些了,关于更多的asyncio用法你们能够参考官网的api文档学习,同时学习这些内容,也是为了个人后续文章aiohttp异步爬虫模块打下基础,但愿你们可以多动手实践,加深对代码的影响。后续我将和你们一块儿走入异步爬虫aiohttp的实践部分,敬请期待!

相关文章
相关标签/搜索