做者:MING
我的公众号:Python编程时光
我的微信:mrbensonwonpython注:本系列已在微信公众号更新完成。查看最新文章,请关注公众号获取。编程
你们好,并发编程
进入第十章。微信
为了更好地衔接这一节,咱们先来回顾一下上一节的内容。并发
上一节,咱们首先介绍了,如何建立一个协程对象.
主要有两种方法app
async
关键字,@asyncio.coroutine
装饰函数。而后有了协程对象,就须要一个事件循环容器来运行咱们的协程。其主要的步骤有以下几点:dom
为了让你们,对生成器和协程有一个更加清晰的认识,我还介绍了yield
和async/await
的区别。async
最后,咱们还讲了,如何给一个协程添加回调函数。函数
好了,用个形象的比喻,上一节,其实就只是讲了协程中的单任务
。哈哈,是否是还挺难的?但愿你们必定要多看几遍,多敲代码,不要光看哦。oop
那么这一节,咱们就来看下,协程中的多任务
。spa
协程的并发,和线程同样。举个例子来讲,就好像 一我的同时吃三个馒头,咬了第一个馒头一口,就得等这口咽下去,才能去啃第其余两个馒头。就这样交替换着吃。
asyncio
实现并发,就须要多个协程来完成任务,每当有任务阻塞的时候就await,而后其余协程继续工做。
第一步,固然是建立多个协程的列表。
# 协程函数
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# 协程对象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
# 将协程转成task,并组成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
复制代码
第二步,如何将这些协程注册到事件循环中呢。
有两种方法,至于这两种方法什么区别,稍后会介绍。
asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
复制代码
asyncio.gather()
# 千万注意,这里的 「*」 不能省略
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
复制代码
最后,return的结果,能够用task.result()
查看。
for task in tasks:
print('Task ret: ', task.result())
复制代码
完整代码以下
import asyncio
# 协程函数
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# 协程对象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
# 将协程转成task,并组成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task ret: ', task.result())
复制代码
输出结果
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
复制代码
使用async能够定义协程,协程用于耗时的io操做,咱们也能够封装更多的io操做过程,这样就实现了嵌套的协程,即一个协程中await了另一个协程,如此链接起来。
来看个例子。
import asyncio
# 用于内部的协程函数
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# 外部的协程函数
async def main():
# 建立三个协程对象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
# 将协程转为task,并组成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
# 【重点】:await 一个task列表(协程)
# dones:表示已经完成的任务
# pendings:表示未完成的任务
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
复制代码
若是这边,使用的是asyncio.gather()
,是这么用的
# 注意这边返回结果,与await不同
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
复制代码
输出仍是同样的。
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
复制代码
仔细查看,能够发现这个例子彻底是由 上面「协程中的并发
」例子改编而来。结果彻底同样。只是把建立协程对象,转换task任务,封装成在一个协程函数里而已。外部的协程,嵌套了一个内部的协程。
其实你若是去看下asyncio.await()
的源码的话,你会发现下面这种写法
loop.run_until_complete(asyncio.wait(tasks))
复制代码
看似没有嵌套,实际上内部也是嵌套的。
这里也把源码,贴出来,有兴趣能够看下,没兴趣,能够直接跳过。
# 内部协程函数
async def _wait(fs, timeout, return_when, loop):
assert fs, 'Set of Futures is empty.'
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)
def _on_completion(f):
nonlocal counter
counter -= 1
if (counter <= 0 or
return_when == FIRST_COMPLETED or
return_when == FIRST_EXCEPTION and (not f.cancelled() and
f.exception() is not None)):
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)
for f in fs:
f.add_done_callback(_on_completion)
try:
await waiter
finally:
if timeout_handle is not None:
timeout_handle.cancel()
done, pending = set(), set()
for f in fs:
f.remove_done_callback(_on_completion)
if f.done():
done.add(f)
else:
pending.add(f)
return done, pending
# 外部协程函数
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
if loop is None:
loop = events.get_event_loop()
fs = {ensure_future(f, loop=loop) for f in set(fs)}
# 【重点】:await一个内部协程
return await _wait(fs, timeout, return_when, loop)
复制代码
还记得咱们在讲生成器的时候,有说起过生成器的状态。一样,在协程这里,咱们也了解一下协程(准确的说,应该是Future对象,或者Task任务)有哪些状态。
Pending
:建立future,还未执行Running
:事件循环正在调用执行任务Done
:任务执行完毕Cancelled
:Task被取消后的状态
可手工 python3 xx.py
执行这段代码,
import asyncio
import threading
import time
async def hello():
print("Running in the loop...")
flag = 0
while flag < 1000:
with open("F:\\test.txt", "a") as f:
f.write("------")
flag += 1
print("Stop the loop")
if __name__ == '__main__':
coroutine = hello()
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
# Pending:未执行状态
print(task)
try:
t1 = threading.Thread(target=loop.run_until_complete, args=(task,))
# t1.daemon = True
t1.start()
# Running:运行中状态
time.sleep(1)
print(task)
t1.join()
except KeyboardInterrupt as e:
# 取消任务
task.cancel()
# Cacelled:取消任务
print(task)
finally:
print(task)
复制代码
顺利执行的话,将会打印 Pending
-> Pending:Runing
-> Finished
的状态变化
假如,执行后 立马按下 Ctrl+C,则会触发task取消,就会打印 Pending
-> Cancelling
-> Cancelling
的状态变化。
还记得上面我说,把多个协程注册进一个事件循环中有两种方法吗?
asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
复制代码
asyncio.gather()
# 千万注意,这里的 「*」 不能省略
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
复制代码
asyncio.gather
和 asyncio.wait
在asyncio中用得的比较普遍,这里有必要好好研究下这两货。
仍是照例用例子来讲明,先定义一个协程函数
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
复制代码
接收的tasks,必须是一个list对象,这个list对象里,存放多个的task。
它能够这样,用asyncio.ensure_future
转为task对象
tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
复制代码
也能够这样,不转为task对象。
loop = asyncio.get_event_loop()
tasks=[
factorial("A", 2),
factorial("B", 3),
factorial("C", 4)
]
loop.run_until_complete(asyncio.wait(tasks))
复制代码
接收的就比较普遍了,他能够接收list对象,可是 *
不能省略
tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
复制代码
还能够这样,和上面的 *
做用一致,这是由于asyncio.gather()
的第一个参数是 *coros_or_futures
,它叫 非命名键值可变长参数列表
,能够集合全部没有命名的变量。
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
))
复制代码
甚至还能够这样
loop = asyncio.get_event_loop()
group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)])
group2 = asyncio.gather(*[factorial("B", i) for i in range(1, 5)])
group3 = asyncio.gather(*[factorial("B", i) for i in range(1, 7)])
loop.run_until_complete(asyncio.gather(group1, group2, group3))
复制代码
asyncio.wait
返回dones
和pendings
dones
:表示已经完成的任务pendings
:表示未完成的任务若是咱们须要获取,运行结果,须要手工去收集获取。
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
复制代码
asyncio.gather
它会把值直接返回给咱们,不须要手工去收集。
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
复制代码
import asyncio
import random
async def coro(tag):
await asyncio.sleep(random.uniform(0.5, 5))
loop = asyncio.get_event_loop()
tasks = [coro(i) for i in range(1, 11)]
# 【控制运行任务数】:运行第一个任务就返回
# FIRST_COMPLETED :第一个任务彻底返回
# FIRST_EXCEPTION:产生第一个异常返回
# ALL_COMPLETED:全部任务完成返回 (默认选项)
dones, pendings = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
print("第一次完成的任务数:", len(dones))
# 【控制时间】:运行一秒后,就返回
dones2, pendings2 = loop.run_until_complete(
asyncio.wait(pendings, timeout=1))
print("第二次完成的任务数:", len(dones2))
# 【默认】:全部任务完成后返回
dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2))
print("第三次完成的任务数:", len(dones3))
loop.close()
复制代码
输出结果
第一次完成的任务数: 1
第二次完成的任务数: 4
第三次完成的任务数: 5
复制代码