前面文章咱们提到了上下文管理器,可是这个上下文管理器只适用于同步代码,不能用于异步代码(async def形式),不过不用担忧今天咱们就来讨论在异步中如何使用上下文管理器。
特别提醒本教程所使用的Python版本为Python3.7。html
异步上下文管理器。相似于同步上下文管理器,咱们知道使用with能够实现一个上下文管理的器,而对于异步上下文管理器其根本表现形式为async with,下面的一段代码告诉你async with是如何运做的。并发
import asyncio class AContext: def __init__(self): print("in init") async def __aenter__(self): print("in aenter") async def __aexit__(self, exc_type, exc_val, exc_tb): print("in aexit") async def main(): async with AContext() as ac: print("in with", ac) if __name__ == '__main__': print("start") asyncio.run(main())
输出内容app
start in init in aenter in with None in aexit
下面说下async with和with的不一样地方
语法上,with实现了enter和exit两个方法,async with实现了相似方法
aenter和aexit在同步的基础上加个a,实际上就是表明asynchronous。
实现上,使用普通的函数就能够实现with,可是async with须要经过异步函数的形式去实现,就像上面的例子同样。异步
从Python 3.7开始,有两种方法能够编写异步上下文管理器。一种就是前面提到的魔法函数的实现,另一种就是contextlib的另一个模块asynccontextmanager。经过装饰器的方式实现一个异步上下文管理器async
import asyncio from contextlib import asynccontextmanager from concurrent.futures.thread import ThreadPoolExecutor class AsyncFile(object): def __init__(self, file, loop=None, executor=None): if not loop: loop = asyncio.get_running_loop() # 获取当前运行事件循环 if not executor: executor = ThreadPoolExecutor(10) # 线程池数量10 self.file = file self.loop = loop self.executor = executor self.pending = [] self.result = [] def write(self, string): """ 实现异步写操做 :param string: 要写的内容 :return: """ self.pending.append( self.loop.run_in_executor( self.executor, self.file.write, string, ) ) def read(self, i): """ 实现异步读操做 :param i: :return: """ self.pending.append( self.loop.run_in_executor(self.executor, self.file.read, i,) ) def readlines(self): self.pending.append( self.loop.run_in_executor(self.executor, self.file.readlines, ) ) @asynccontextmanager async def async_open(path, mode="w"): with open(path, mode=mode) as f: loop = asyncio.get_running_loop() file = AsyncFile(f, loop=loop) try: yield file finally: file.result = await asyncio.gather(*file.pending, loop=loop)
上面的代码经过使用asyncio中run_in_executor运行一个线程,来使得阻塞操做变为非阻塞操做,达到异步非阻塞的目的。
AsyncFile类提供了一些方法,这些方法将用于将write、read和readlines的调用添加到pending列表中。这些任务经过finally块中的事件循环在ThreadPoolExecutor进行调度。
yield 前半段用来表示_aenter_()
yield 后半段用来表示_aexit_()
使用finally之后能够保证连接资源等使用完以后可以关闭。函数
若是调用前面示例中的异步上下文管理器,则须要使用关键字async with来进行调用。另外带有async with的语句只能在异步函数中使用。oop
from asynciodemo.asyncwith import async_open import asyncio import tempfile import os async def main(): tempdir = tempfile.gettempdir() path = os.path.join(tempdir, "run.txt") print(f"临时文件目录:{path}") async with async_open(path, mode='w') as f: f.write("公众号: ") f.write("Python") f.write("学习") f.write("开发") if __name__ == '__main__': asyncio.run(main())
使用方法和with相似能够经过使用as,而后使用其句柄,惟一须要注意的就是要在异步函数中使用。学习
在以前的一些异步教程里和你们说了关于协程中的几个同步方法,asyncio.wait和asyncio.gather,这里咱们能够配合这些方法经过异步上下文管理器来实现同步任务,请看以下代码线程
import asyncio # 同步挂起协程 class Sync(): def __init__(self): self.pending = [] self.finished = None def schedule_coro(self, coro, shield=True): #若是去掉asyncio.shield,在取消fut函数的时候,就会致使coro协程也出错。 fut = asyncio.shield(coro) if shield else asyncio.ensure_future(coro) self.pending.append(fut) return fut async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): # 退出async with的时候,任务列表中的任务进行并发操做。 self.finished = await asyncio.gather(*self.pending, return_exceptions=True) async def workload1(): await asyncio.sleep(2) print("These coroutines will be executed return 41") return 41 async def workload2(): await asyncio.sleep(2) print("These coroutines will be executed return 42") return 42 async def workload3(): await asyncio.sleep(2) print("These coroutines will be executed return 43") return 43 async def main(): async with Sync() as sync: # 使用异步上下文能够建立同步协程程序 sync.schedule_coro(workload1()) sync.schedule_coro(workload2()) sync.schedule_coro(workload3()) print("All scheduled corotines have retuned or throw:", sync.finished) if __name__ == '__main__': asyncio.run(main())
输出code
These coroutines will be executed return 41 These coroutines will be executed return 42 These coroutines will be executed return 43 All scheduled corotines have retuned or throw: [41, 42, 43]
1.程序是同步的形式并发输出的。 2. schedule_coro的做用是将协程workload1,workload2,workload3添加到任务列表pending,退出async with的时候,任务列表中的任务进行并发操做。