【python3】asyncio:异步的同步写法

基本概念:
 
  • Asynchronous I/O(异步 I/O)。当代码需要执行一个耗时的 I/O 操作的时候, 它只发出 I/O 的指令, 并不等待 I/O 的结果, 然后去执行其它的代码, 以提高效率。
  • event loop(事件循环)。把基本的 I/O 操作转换为需要处理的事件, 通过事件循环做事件的监测和事件触发等处理工作。
  • coroutines(协程)。 线程是由操作系统控制切换的, 使用协程可以收回控制权, 并且将异步编程同步化, 注册到事件循环中的事件处理器就是协程对象, 它由事件循环来调用, 当程序阻塞等待读取或者写入数据的时候, 进行上下文的切换可以让效率最大化。
  • tasks(任务)。asyncio 模块非常容易和方便的执行并发任务, 并且可以实现创建、取消等管理任务。
 
我们用的比较多的就是携程, 异步转同步的内容
asyncio 的事件循环有多种方法启动协程, 最简单的方案是 run_until_complete():
 
 
import asyncio
 
async def coroutine ( ) : # 使用 async 创建一个协程
    print ( 'in coroutine' )
    return 'result'
 
 
if __name__ == '__main__' :
    event_loop = asyncio . get_event_loop ( ) # 创建一个默认的事件循环
    try :
        print ( 'starting coroutine' )
        coro = coroutine ( )
        print ( 'entering event loop' )
        result = event_loop . run_until_complete ( coro ) # 通过调用事件循环的 run_until_complete() 启动协程
        print ( f'it returned: { result } ' )
    finally :
        print ( 'closing event loop' )
        event_loop . close ( ) # 关闭事件循环
 
# 输出:
starting coroutine
entering event loop
in coroutine
it returned : result
closing event loop
 
 
总结一下几个关键点:
  1. event_loop = asyncio.get_event_loop()
  2. result =event_loop.run_until_complete(用async修饰的函数)
  3. event_loop.close() # 关闭事件循环
 
 
在上边的例子上拓展,再加一个实际上的使用场景:
 
import asyncio
 
async def main ( ) :
    print ( 'waiting for chain1' )
    result1 = await chain1 ( )
    print ( 'waiting for chain2' )
    result2 = await chain2 ( result1 )
    return ( result1 , result2 )
 
 
async def chain1 ( ) :
   print ( 'chain1' )
    return 'result1'
 
 
async def chain2 ( arg ) :
    print ( 'chain2' )
    return f'Derived from { arg } '
 
 
if __name__ == '__main__' :
    event_loop = asyncio . get_event_loop ( )
    try :
        return_value = event_loop . run_until_complete ( main ( ) )
        print ( f'return value: { return_value } ' )
    finally : 
        event_loop . close ( )
 
 
# 输出:
waiting for chain1
chain1
waiting for chain2
chain2
return value : ( 'result1' , 'Derived from result1' )
 
然后我们再此基础上进一步拓展:
先补充一下run_in_executor的参数
 
awaitable loop.run_in_executor(executor, func, *args)
参数 :
executor 可以是  ThreadPoolExecutor / ProcessPool  , 如果是None 则使用默认线程池
func:需要执行的方法
*args带的参数
 
 
领导让写一个爬虫 小李是一个老搬砖工,一个爬虫而已,轻车熟路啦,reqeusts请求,BeautifulSoup解析,最后保存数据完工。
因为最近看了asyncio, 他分析了一下,决定使用多线程发起网络请求,多进程来解析dom,最后保存依旧使用多线程的方式来实现。
 
class XiaoLiSpider(object):
    def __init__(self) -> None:
        self._urls = [
            " http://www.fover.cn/da/ "
            " http://www.fover.cn/sha/ "
            " http://www.fover.cn/bi/ "
         ]
        self._loop = asyncio.get_event_loop()
        self._thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
        self._process_pool = concurrent.futures.ProcessPoolExecutor()
 
    #我们最终要执行的方法,爬
    def crawl(self):
        a = []
        for url in self._urls:
            a.append(self. _little_spider (url))
            self._loop.run_until_complete(asyncio.gather(*a))
            self._loop.close()
            self._thread_pool.shutdown()
            self._process_pool.shutdown()
 
    #爬虫
    async def _little_spider(self, url):
        response = await self._loop. run_in_executor (
        self._thread_pool, self._request, url)
        urls = await self._loop. run_in_executor (self._process_pool,
        self._biu_soup,
        response.text)
        self._save_data(urls)
        print(urls)
 
    def _request(self, url):
        return requests.get(url=url,timeout=10)
 
    @ classmethod
    def _biu_soup(cls, response_str):
        soup = BeautifulSoup(response_str, 'lxml')
        a_tags = soup.find_all("a")
        a = []
        for a_tag in a_tags:
            a.append(a_tag['href'])
            return a
 
    def _save_data(self,urls):
        #保存思路如上,这里偷懒了大家根据实际需要写
         pass
 
 
再总结一下几个关键点:
 
1、asyncio.gather(*a) 这里将任务做个集合
2、run_in_executor()是一个好方法,可以方便的让我们使用线程池和进程池
3、注意在使用 进程池 执行任务的时候,需要加上 @classmethod装饰,因为多进程不共享内存,当然网上有更加详细的解释,大家可以上网搜一下具体的资料,我这里简答的描述为由于内存共享问题,所以多进程调用方法必须是无副作用的。
4、用完记得关。close()、shutdown() 牢记心间,要不然定时任务跑多了,你会发现一堆进程在那边吃着cpu耗着memory在看戏
 

加速Asyncio

uvloop,这个使用库可以有效的加速asyncio,本库基于libuv,也就是nodejs用的那个库。
学网络I/O的时候难免会碰到这样或那样的异步IO库,比如libevent、libev、libuv
Libevent、 libev、 libuv三个网络库,都是 c语言实现的异步事件库 Asynchronousevent library)
 
github地址:
使用它也非常方便
 
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
 
没错就是2行代码,就可以提速asyncio,效果大概是这样的:
 
 
 
小李看完捋了捋自己的头发,想着如果有一天自己的脑袋也像老王这样油亮光滑,自己的编程水平也和他差不多了吧。