asyncio 是python3.4 引入的一个新的并发模块,主要经过使用coroutines 和 futures 来让咱们更容易的去实现异步的功能,而且几乎和写同步代码同样的写代码,尚未烦人的回调。python
在2018年6月 3.7的更新中针对asyncio的api进行了一些升级,主要是关于task的管理以及 event loops 方面。后面会把3.7的增长的新特性专门整理一篇文章。mysql
现状:
其实目前来讲asyncio相关的异步库并不完善,官网也并无专门维护,在github上有一个俄罗斯的小组在开发维护一些经常使用的库如:aiomysql, aiopika, aioredis等。 这里有一点须要在这里提早说明:若是目前想要用asyncio异步的功能,那么你整个代码中其余的库也要是异步的而不能是阻塞的,若是咱们须要用aiomysql 而不能用pymysql, 咱们须要用aiohttp 而不能使用requests等等等。若是恰巧你用的一些库,如今并无相对应的异步库,那么可能就比较麻烦了。git
1. event loop:主要负责管理和分发不一样task的执行,咱们能够将不一样的任务注册在event loop上。
2. coroutines: 咱们一般也称之为协程,是与python生成器相似的特殊的函数,在这个函数中一般会有一个关键字await ,当coroutine执行到await 的时候,就会将控制权释放给event loop. 若是一个coroutine被包装成一个Future类型的Task中,那么这个coroutine就须要被event loop 去调度执行
3. futures:表明未来执行或没有执行的任务的结果,固然这个结果多是一个异常github
asyncio 容许咱们将子任务定义为coroutine,并容许你来调度它们,而在多线程中,这个调度一般是交给操做系统控制咱们并不能控制。咱们先经过下面的一个例子理解:redis
import asyncio async def foo(): print("running in foo") await asyncio.sleep(0) print("back foo") async def bar(): print("running in bar") await asyncio.sleep(0) print("back bar") async def main(): tasks = [foo(), bar()] await asyncio.gather(*tasks) asyncio.run(main())
上述代码的运行结果以下:sql
running in foo running in bar back foo back bar
针对上述代码的一个说明:api
当咱们的代码是同步执行的时候,执行的顺序是线性的,若是咱们是异步的,顺序就变得不肯定了,咱们经过一个简单的爬虫的例子来理解:网络
import time import random import asyncio import aiohttp URL = 'https://baidu.com' MAX_CLIENTS = 3 async def aiohttp_get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return response async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('fetch coroutine {} started, sleeping for {} seconds'.format( pid, sleepy_time)) response = await aiohttp_get(URL) datetime = response.headers.get('Date') # 这里增长的asyncio.sleep是为了模拟每一个请求有必定延迟返回 await asyncio.sleep(sleepy_time) response.close() return 'coroutine {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def main(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] for i, future in enumerate(asyncio.as_completed(futures)): result = await future print('{} {}'.format(">>" * (i + 1), result)) print("all took: {:.2f} seconds".format(time.time() - start)) asyncio.run(main())
上述代码中,咱们在每一个请求里都添加了asyncio.sleep的操做,这里实际上是为了模拟实际状况中当咱们请求多个网站的时候,由于网络和目标网站的不一样,请求返回的时间通常不一样。
运行结果以下:session
fetch coroutine 2 started, sleeping for 5 seconds fetch coroutine 1 started, sleeping for 3 seconds fetch coroutine 3 started, sleeping for 4 seconds >> coroutine 1: Wed, 27 Feb 2019 11:27:58 GMT, took: 3.09 seconds >>>> coroutine 3: Wed, 27 Feb 2019 11:27:58 GMT, took: 4.08 seconds >>>>>> coroutine 2: Wed, 27 Feb 2019 11:27:58 GMT, took: 5.12 seconds all took: 5.12 seconds
这个参数是当咱们执行多个任务的时候,我只关注最快返回结果的那个任务,用法例子以下(注意我这里为了让复现一个错误,先用了python3.7以前建立loop的方法):多线程
import time import random import asyncio import aiohttp from concurrent.futures import FIRST_COMPLETED URL = 'https://baidu.com' MAX_CLIENTS = 3 async def aiohttp_get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return response async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('fetch coroutine {} started, sleeping for {} seconds'.format( pid, sleepy_time)) response = await aiohttp_get(URL) datetime = response.headers.get('Date') # 这里增长的asyncio.sleep是为了模拟每一个请求有必定延迟返回 await asyncio.sleep(sleepy_time) response.close() return 'coroutine {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def main(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED ) print(done.pop().result()) loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
运行结果会出现以下状况:
fetch coroutine 2 started, sleeping for 2 seconds fetch coroutine 1 started, sleeping for 5 seconds fetch coroutine 3 started, sleeping for 2 seconds coroutine 2: Wed, 27 Feb 2019 11:41:19 GMT, took: 2.11 seconds Task was destroyed but it is pending! task: <Task pending coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000000038E5798>()]>>
其实这里出现这种问题的缘由,咱们很容易理解,咱们开启了三个任务,当咱们收到最快完成的那个以后就关闭了循环,后面的两个任务还处于pending状态,asyncio 认为这是一个错误,因此打印出了咱们看到的那个警告:Task was destroyed but it is pending!
咱们如何解决这个问题呢?
future有四种状态:
咱们能够经过调用done, cancelled 或者 running 来看当前future是否处于该状态,这里再次提醒,done 状态能够表示返回结果,也能够表示跑出了异常。咱们也能够经过调用cancel来专门取消future,不过在python3.7以后,asyncio.run替咱们作了这些事情,咱们把上面的那个出现Task was destroyed but it is pending!的代码进行更改:
import time import random import asyncio import aiohttp from concurrent.futures import FIRST_COMPLETED URL = 'https://baidu.com' MAX_CLIENTS = 3 async def aiohttp_get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return response async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('fetch coroutine {} started, sleeping for {} seconds'.format( pid, sleepy_time)) response = await aiohttp_get(URL) datetime = response.headers.get('Date') # 这里增长的asyncio.sleep是为了模拟每一个请求有必定延迟返回 await asyncio.sleep(sleepy_time) response.close() return 'coroutine {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def main(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED ) print(done.pop().result()) asyncio.run(main())
运行结果以下,彻底正常了:
fetch coroutine 2 started, sleeping for 5 seconds fetch coroutine 3 started, sleeping for 2 seconds fetch coroutine 1 started, sleeping for 2 seconds coroutine 3: Wed, 27 Feb 2019 11:54:13 GMT, took: 2.07 seconds
future还有一个实用的功能:容许咱们在future变成完成状态时添加callback回调.
关于future的完成时结果的获取,经过下面代码来演示:
import time import random import asyncio import aiohttp from concurrent.futures import FIRST_COMPLETED URL = 'https://httpbin.org/get' MAX_CLIENTS = 3 async def aiohttp_get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return response async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('fetch coroutine {} started, sleeping for {} seconds'.format( pid, sleepy_time)) response = await aiohttp_get(URL) datetime = response.headers.get('Date') # 这里增长的asyncio.sleep是为了模拟每一个请求有必定延迟返回 await asyncio.sleep(sleepy_time) response.close() return 'coroutine {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def main(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] done, pending = await asyncio.wait( futures ) print(done) for future in done: print(future.result()) asyncio.run(main())
运行结果以下:
fetch coroutine 2 started, sleeping for 5 seconds fetch coroutine 1 started, sleeping for 2 seconds fetch coroutine 3 started, sleeping for 4 seconds {<Task finished coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> result='coroutine 3:... 5.31 seconds'>, <Task finished coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> result='coroutine 1:... 3.34 seconds'>, <Task finished coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> result='coroutine 2:... 6.38 seconds'>} coroutine 3: Wed, 27 Feb 2019 12:10:15 GMT, took: 5.31 seconds coroutine 1: Wed, 27 Feb 2019 12:10:15 GMT, took: 3.34 seconds coroutine 2: Wed, 27 Feb 2019 12:10:15 GMT, took: 6.38 seconds
咱们能够看到,当全部任务完成后,咱们能够经过done获取每一个人的结果信息。
咱们也能够给咱们的任务添加超时时间
import time import random import asyncio import aiohttp from concurrent.futures import FIRST_COMPLETED URL = 'https://httpbin.org/get' MAX_CLIENTS = 3 async def aiohttp_get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return response async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('fetch coroutine {} started, sleeping for {} seconds'.format( pid, sleepy_time)) response = await aiohttp_get(URL) datetime = response.headers.get('Date') # 这里增长的asyncio.sleep是为了模拟每一个请求有必定延迟返回 await asyncio.sleep(sleepy_time) response.close() return 'coroutine {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def main(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED,timeout=0.01 ) print(done) for future in done: print(future.result()) asyncio.run(main())
我这里把超时时间设置的很是小了是0.01,致使最后我打印done结果的时候其实三个任务没有一任务是被完成的:
fetch coroutine 2 started, sleeping for 4 seconds fetch coroutine 3 started, sleeping for 3 seconds fetch coroutine 1 started, sleeping for 4 seconds set()
这里对python asyncio先进行总体功能的整理,会面会针对细节作详细整理。相对来讲如今各个公司实际线上用asyncio的应该很少,也但愿更多的小伙伴来相互交流,分享这个python以及python异步相关心得。欢迎加入交流群:948510543