并发指的是同时启动任务,并行指的是同时运行人物。依赖时间切片和多核,并发也能够是并行。下文中统称为并发,都指的是并行的并发。html
现实中须要解决的问题有两类:python
CPU bound 指的是须要密集 CPU 运行的任务,IO bound 指的是有大量等待 IO 的任务。CPU bound 只能经过多核并行来解决,而 IO bound 则是本文的重点,也是 asyncio 大显身手的地方。git
单核 CPU 的性能有其极限,因此咱们须要并发来提升性能。可是并发又会致使资源的竞争,因此须要引用锁来保护敏感区。可是锁又会下降并发度,因此咱们须要探索无锁的并发方案。github
可使用线程模型来提升并发度,可是每个线程都须要独立的栈空间(64-bit JVM 中是 1024 KB),这还只是不包含任何资源的初始栈空间,并且栈空间的大小和线程切换的开销成正比。因此咱们须要寻找比线程更轻量的解决方案。golang
为了减小线程的切换,咱们能够建立一个等于 CPU 核数的线程池,把须要运算的逻辑放进线程池,不须要时就拿出来换成其余的任务,保持线程的持续运算而不是切换。web
为了更好的使用 CPU 的性能,咱们应该在任务不在须要 CPU 资源时让其从线程池里退出来(好比等待 IO 时),这就须要有一种机制,让任务能够在阻塞时退出,在资源就绪时恢复运行。因此咱们将任务抽象为一种用户态的线程(协程,greenthread、coroutine),当其须要调用阻塞资源时,就在 IO 调度器里注册一个事件,并让出线程资源给其余协程,当资源就绪时,IO 调度器会在有空余线程资源时,从新运行这个协程。django
用户态线程(下文称之为协程)的设计方案通常有三种(按照用户态线程和系统线程的比例):编程
协程的优势在于,这是一种用户态的机制,避免的内核态用户态切换的成本,并且初始栈空间能够设置的很小(Golang 中的 goroutine 仅为 2 KB),这样能够建立比线程更大数量的协程。flask
简单聊几句。后端
我最先据说的异步库就是 twisted,不过听说使用极其复杂,因此望而却步了。
后来在 GoogleAppEngine 上用 web.py 开发后端,接着不久就赶上了 Aaron 不幸被逼自杀, 在选择新的后端框架时据说了 tornado, 被其简单的用法所折服,一直用到如今,这个博客也是用 tornado 开发的,我甚至还本身撸了一整套 RESTful 的框架。
不过其实用了 tornado 一两年后才真正的看懂了它 ioloop 的设计,不得不说 tornado 的注释写的真的很好,强烈推荐学习 tornado 的源码。
tornado 中最难解决的问题就是如何去调度嵌套的异步任务,由于 tornado 是经过 yield 和 decorator 相结合的方式来实现异步任务, 因此致使异步函数很难返回值,在 tornado 里你只能经过 raise 的方式来返回值,这又致使 coroutine 很难正确的捕获到异常,为了解决这个问题我本身写了一个 decorator, 而后每次写 tornado 时都是一大堆的:
@tornado.gen.coroutine @debug_wrapper def xxx(): # ... raise tornado.gen.Return(xxx)
挺烦。
不过 Python 界的主流后端框架除了 tornado 外还有 flask 和 django,那么使用这两种框架的人在遇到密集的 IO 时该怎么办呢? 还好有神奇的 gevent!gevent 经过 patch 的方式将各类常见的 IO 调用封装为协程,而且将整个调度过程彻底封装,用户能够用近乎黑盒的方式来使用, 你惟一须要作的就是先手动 patch 一下,而后用 gevent.spawn 去发起任务,若是须要同步的话就再 joinall 一下。 能够说 gevent 选择了和 golang 同样的思路,gevent.spawn 就像是 golang 里的 goroutine,gevent 再继续优化升级下去,终极目标就是实现 golang 的 runtime 吧。
gevent 的一个例子:
#!/usr/bin/env python # -*- coding: utf-8 -*- """ cost 0.7102580070495605s for url http://httpbin.org/user-agent cost 0.7106029987335205s for url http://httpbin.org/get cost 0.7245540618896484s for url http://httpbin.org/headers cost 0.7327840328216553s for url http://httpbin.org/ cost 1.073429822921753s for url http://httpbin.org/ip total cost 1.0802628993988037s """ import time import gevent import gevent.monkey gevent.monkey.patch_socket() try: import urllib2 except ImportError: import urllib.request as urllib2 TARGET_URLS = ( 'http://httpbin.org/', 'http://httpbin.org/ip', 'http://httpbin.org/user-agent', 'http://httpbin.org/headers', 'http://httpbin.org/get', ) def demo_task(url): start_ts = time.time() r = urllib2.urlopen(url) print('cost {}s for url {}'.format(time.time() - start_ts, url)) def demo_handler(): start_ts = time.time() tasks = [gevent.spawn(demo_task, url) for url in TARGET_URLS] gevent.joinall(tasks) print('total cost {}s'.format(time.time() - start_ts)) def main(): demo_handler() if __name__ == '__main__': main()
Python 3 的官方的解决方案 asyncio 选择了更为白盒的调用方式, 该方案极大的吸取了 tornado 的优势,而且为了解决 tornado 的协程返回,增长了新语法 yield from, 因此在 Python 3.4 的时代,你能够用近乎和 tornado 彻底相同的方法写 asyncio:
# python 3.4 # 注意:Python 3.6 已经不这么写了 import asyncio @asyncio.coroutine def coroutine_demo(): r = yield from coroutine_child_demo() print(r) @asyncio.coroutine def coroutine_child_demo(): asyncio.sleep(1) return 2
不过这么写仍是太丑陋,并且总让人以为 coroutine 只是一个第三方包提供的功能,好在反正 asyncio 包被声明为一个不稳定的开发状态的包, 因此咱们能够继续大改,因此 asyncio 的大幅修改一直到了 Python3.6 才算正式结束。
Python 3.6 做为 asyncio 的第一个稳定版,新的语法已经变成了这样:
import asyncio async def coroutine_demo(): r = awiat coroutine_child_demo() print(r) async def coroutine_child_demo(): asyncio.sleep(1) return 2 if __name__ == '__main__': ioloop = asyncio.get_event_loop() ioloop.run_until_complete(coroutine_demo())
下面会稍微详细的讲解 asyncio 包的用法。
后面的例子里,我都会用 asyncio.sleep
来表示一个耗时的阻塞操做, 你能够将其理解为实际操做中的网络请求或文件读写等 IO 操做。
首先,你要会建立协程:
async def coroutine_demo(): await asyncio.sleep(2) print(coroutine_demo) # <function coroutine_demo at 0x7fd35c4c89d8> print(coroutine_demo()) # <coroutine object coroutine_demo at 0x7fd35c523ca8>
协程都是非阻塞的,当你调用一个协程时(形如 coroutine_demo()
), 这个协程程序就被执行了,直到执行到另外一个协程(asyncio.sleep
), 这时会在 ioloop 里挂起一个事件,而后马上返回。
此时你须要作的,就是继续干你的事情,而且确保你给了这个协程足够的时间执行完成, 因此继续写完这个简短的脚本:
if __name__ == '__main__': ioloop = asyncio.get_event_loop() # 建立事件循环 ioloop coroutine = coroutine_demo() # 启动协程 future = asyncio.ensure_future(coroutine) # 将其封装为 Future 对象 # 而后就只须要将 future 提交给 ioloop,让其等待该 future 对象完成就好了 ioloop.run_untile_complete(future) print('all done')
Future 有点像是一个 lazy object,当你调用一个协程时,这个协程会被注册到 ioloop, 同时该协程会马上返回一个 coroutine 对象,而后你能够用 asyncio.ensure_future
将其封装为一个 Future 对象。
当协程任务结束时,这个 future 对象的状态也会变化,能够经过这个 future 对象来获取该任务的结果值(或异常):
future = asyncio.ensure_future(coroutine_demo())
future.done() # 任务是否结束 # True or False future.result(timeout=None) # 获取任务的结果 # 默认会阻塞等待到任务结束
目前提到了 coroutine、Task 和 future,对于这三者的关系,个人理解以下:
下面举一些用例
先简单的说一下 asyncio 的使用,首先你须要启动一个主函数,在主函数里你实例化 ioloop, 而后在这个 ioloop 里注册任意多的 task,task 也能够注册子 task,以后你能够选择让 ioloop 永久的运行下去, 或者运行到最后一个 task 完成为止。
首先看一个最简单的案例,请求多个 URL:
urls = [ 'https://httpbin.org/', 'https://httpbin.org/get', 'https://httpbin.org/ip', 'https://httpbin.org/headers', ] async def crawler(): async with aiohttp.ClientSession() as session: futures = map(asyncio.ensure_future, map(session.get, urls)) for f in asyncio.as_completed(futures): print(await f) if __name__ == '__main__': ioloop = asyncio.get_event_loop() ioloop.run_untile_complete(asyncio.ensure_future(crawler()))
上面的例子里能够看到,咱们启动了不少了 session.get
的子协程,而后用 asyncio.ensure_future
将其封装为 future
, 而后调用 as_completed
方法监听这一堆的子任务,每当有子任务完成时,就会触发 for 循环对结果进行处理。
asyncio 里除了 as_completed
外,经常使用的还有 asyncio.wait(fs, timeout=None, when=ALL_COMPLETED)
。 方法就是能够等待多个 futures
,when
参数能够设定等待的模式,可接受的参数有:
FIRST_COMPLETED
:等到第一个完成;FIRST_EXCEPTION
:等到一个出错;ALL_COMPLETED
:等待所有完成。因此上面的函数,as_completed
那段还能够写成:
await asyncio.wait(futures) for f in futures: print(f.result())
除了上面举的那些事件触发的任务外,asyncio 还能够依靠时间进行触发。
ioloop = asyncio.get_event_loop() # 一段时间后运行 ioloop.call_later(delay_in_seconds, callback, args) # 指定时间运行 ioloop.call_at(when, callback, *args)
这里须要注意的是,ioloop 使用的是本身的时间,你能够经过 ioloop.time()
获取到 ioloop 当前的时间,因此若是你要用 call_at
,你须要计算出相对于 ioloop 的时间。因此其实这个方法没什么意义,通常用 ioloop.call_later
这个方法用的更多。
携程带来的性能提高很是的显著,以致于你须要考虑一个你之前可能从未考虑过的问题:并发控制。 对资源的控制也是异步编程的难点所在。
举个例子,你须要下载 100 万 张图片,过去你开了 20 个 线程来下载,那么在同一时间最大的并发量就是 20, 对于服务器而言,最多须要处理 20 qps 的请求,对于客户端而言,最多须要在内存里放 20 张 图片的数据,仅此而已。
可是进入协程时代,全部的东西都是非阻塞的,你能够在很短的时间内向远程发起 100 万 的请求, 也可能在内存里挂起 100 万 次请求的数据,这不管对于服务端仍是客户端都是难以接受的。
asyncio 里提供了四种锁:
下面先介绍一个最经常使用的案例,而后再逐一介绍这几个锁的区别。
首先讲一下协程任务的并发控制,asyncio 提供了信号量方法 asyncio.Semaphore(value=1)
, 这个方法会返回一个信号量,你能够初始化一个信号量后,而后在每次发起请求时都去请求这个信号量, 来实现对携程任务数量的控制,好比咱们能够经过信号量来控制对服务器的请求并发数:
# initiallize semaphore concurrency_sem = asyncio.Semaphore(50) async with aiohttp.ClientSession() as session: while 1: # 即便这样写也不用担忧并发数会爆炸啦 # require semaphore # will be blocked when accesses to 50 concurrency async with concurrency_sem: async with session.get(url, timeout=10) as resp: assert resp.status == 200
若是不知道信号量是什么,能够参阅《并行编程中的各类锁》。
信号量能够有效的控制同一时间任务的并发数,可是有时候一些协程任务的执行很是迅速, 致使任务执行返回的数据大量堆积,也就是所咱们须要限制任务的处理总量,而不是并发量, 这时候就能够采用 asyncio.Queue(maxsize=0)
来进行控制, 咱们能够经过设定 maxsize
来设定队列的总长度,当队列满时,put
操做就会被挂起, 直到后续逻辑逐渐消化掉了队列里的任务后,才能继续添加,这样就实现了对任务堆积总量的控制。
好比咱们能够用 Queue 来限制我读取大文件时,不要一会儿把整个文件都读进来, 而是读几行,处理几行:
task_q = asyncio.Queue(maxsize=1000) async def worker_to_process_each_line(): while not task_q.empty(): line = await task_q.get() # do something with this line with open('huge_file_with_many_lines.txt', 'r') as f: worker_to_process_each_line() for line in f: await task_q.put(line)
活用 Semaphore
和 Queue
,基本就能够解决绝大部分的并发控制问题了。
最简单的互斥锁,其实会用 Semaphore 的话彻底不须要用 Lock 了,毕竟 mutex 只是 Semaphore 为 1 时的特例。
lock = Lock() async with lock(): # ...
事件锁,这个锁有两个状态:set
和 unset
,能够调用 evt.wait()
挂起等待,直到这个事件被 set()
:
evt = Event() async def demo(): await evt.wait() # wait for set print('done) demo() print(evt.is_set()) # False evt.set() # release evt # done
就像 Semaphore 能够简单理解为带计数器的 Lock,Condition 也能够简单理解为带计数器的 Event。
一个 Condition 能够被多个协程等待,而后能够按照需求唤醒指定数量的协程。
其实 Condition 是 threading 模块里一直存在的锁,简单介绍一下使用方法, 使用 condition 前须要先获取锁(async with cond
),这是一个互斥锁,调用 wait()
时会自动的释放锁, ,针对 condition 的 notify
、notify_all、
wait必须在获取锁后才能操做,不然会抛出
RuntimeError` 错误。
因此当你 notify 后若是须要当即生效的话,须要退出这个 mutex,而且挂起当前协程等待调度, 其余协程才能顺利的获取 mutex,而且获取到 condition 的信号,执行后续的任务,并在完成后释放锁。
from asyncio import Condition, sleep, get_event_loop, wait, ensure_future async def workers(cond, i): async with cond: # require lock print('worker {} is waiting'.format(i)) await cond.wait() # wait for notify and release lock print('worker {} done, released'.format(i)) async def main(): cond = Condition() fs = list([ensure_future(workers(cond, i)) for i in range(5)]) # run workers await sleep(0.1) for i in range(3): print('notify {} workers'.format(i)) async with cond: # require lock cond.notify(i) # notify await sleep(0.1) # let another coroutine run async with cond: await sleep(0.5) print('notify all') cond.notify_all() await wait(fs) # wait all workers done get_event_loop().run_until_complete(main()) # Output: # worker 0 is waiting # worker 1 is waiting # worker 2 is waiting # worker 3 is waiting # worker 4 is waiting # notify 0 workers # notify 1 workers # worker 0 done, released # notify 2 workers # worker 1 done, released # worker 2 done, released # notify all # worker 3 done, released # worker 4 done, released
上面提到了,python asyncio 的实现方案是 N:1,因此协程是不能跨核的。为了利用多核,你须要建立多进程程序,而且为每个进程初始化一个 ioloop。
咱们可使用 concurrent.futures
里提供的 ProcessPoolExecutor
来轻松的实现多进程。
from concurrent.futures import ProcessPoolExecutor, as_completed from asyncio import get_event_loop, sleep, ensure_future async def coroutine_demo(): await sleep(1) def runner(): ioloop = get_event_loop() future = ensure_future(coroutine_demo()) ioloop.run_until_complete(future) def main(): executor = ProcessPoolExecutor(max_workers=7) # CPU 数 - 1 for futu in as_completed([executor.submit(runner) for _ in range(7)]): result = futu.result() # ...
顺便提一下多线程,有时候须要兼容旧代码,你须要调用过去用线程写的程序,或者有些阻塞无法用 asyncio 解决,你只能包一层线程,可是你又但愿用 asyncio 的方式来调用,这时候就须要用到 run_in_executor
。
代码片断示例:
from concurrent.futures import ThreadPoolExecutor import time executor = ThreadPoolExecutor(max_workers=10) ioloop = get_event_loop() def something_blocking(): time.sleep(5) # 关键代码 ioloop.run_in_executor(executor, something_blocking, *args)
你能够经过 ioloop.set_default_executor(executor)
设置好经常使用的 executor,以后再调用 run_in_executor(None, somthing_blocking, *args)
的时候,第一个参数就能够传 None
了。
由于 asyncio 几乎颠覆了过去 python 的写法逻辑,若是你要使用 asyncio,你几乎须要重构全部的阻塞库,不过感谢活跃的社区,目前各类第三方库发展的速度很是快。
好比你能够在下面这个页面找到各式各样的支持 asyncio 的第三方库:
并且由于 asyncio 已经做为官方的事实标准,因此包括 tornado 在内的第三方异步解决方案目前也开始对 asyncio 提供了支持。我稍后会另写一篇介绍如何将过去的 tornado 项目无缝的迁移到 asyncio 来。
知识点差很少就这些,了解了这些,就能够上手开动了。
为了方便,我写过一个基于 asyncio 的脚本框架,能够按时执行各个任务:https://github.com/Laisky/ramjet
再贴一个给同事写的批量下载 s3 图片的脚本,这个脚本须要读取一个有一千万行的图片文件地址文件, 而后按照每一行的地址去请求服务器下载文件,因此我作了一次最多读取 1000 行,最多发起 10 个 链接的并发控制:
import os import asyncio import datetime import aiohttp import aiofiles async def image_downloader(task_q): async with aiohttp.ClientSession() as session: while not task_q.empty(): url = await task_q.get() try: async with session.get(url, timeout=5) as resp: assert resp.status == 200 content = await resp.read() except Exception as err: print('Error for url {}: {}'.format(url, err)) else: fname = split_fname(url) print('{} is ok'.format(fname)) await save_file(fname, content) def split_fname(url): # do something return 'FILENAME_AFTER_PROCESSED' async def save_file(fname, content): async with aiofiles.open(fname, mode='wb') as f: await f.write(content) async def produce_tasks(task_q): with open('images.txt', 'r') as f: for count, image_url in enumerate(f): image_url = image_url.strip() if os.path.isfile(split_fname(image_url)): continue await task_q.put(image_url) async def run(): task_q = asyncio.Queue(maxsize=1000) task_producer = asyncio.ensure_future(produce_tasks(task_q)) workers = [asyncio.ensure_future(image_downloader(task_q)) for _ in range(10)] try: await asyncio.wait(workers+[task_producer]) except Exception as err: print(err.msg) def main(): print('start at', datetime.datetime.utcnow()) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asyncio.ensure_future(run())) print('end at', datetime.datetime.utcnow()) if __name__ == '__main__': main()