做为Python程序员,平时不多使用并发编程,偶尔使用也只须要派生出一批独立的线程,而后放到队列中,批量执行。因此,不夸张的说,虽然我知道线程、进程、并行、并发的概念,但每次使用的时候可能还须要再打开文档回顾一下。python
如今这一篇仍是 《流畅的python》读书笔记,译者在这里把future 翻译为“期物”,我以为不太合适,既然future不能找到一个合适的词汇,暂时仍是直接使用 future 吧。程序员
future 是一种对象,表示异步执行的操做。这个概念是 concurrent.futures模块和asyncio包的基础。编程
concurrent.futures 模块是Python3.2 引入的,对于Python2x 版本,Python2.5 以上的版本能够安装 futures 包来使用这个模块。安全
从Python3.4起,标准库中有两个为Future的类:concurrent.futures.Future 和 asyncio.Future。这两个类做用相同:两个Future类的实例都表示可能已经完成或未完成的延迟计算。bash
Future 封装待完成的操做,可放入队列,完成的状态能够查询,获得结果(或抛出异常)后能够获取结果(或异常)。网络
咱们知道,若是程序中包含I/O操做,程序会有很高的延迟,CPU会处于等待状态,这时若是咱们不使用并发会浪费不少时间。并发
咱们先举个例子:app
下边是有两段代码,主要功能都是从网上下载人口前20的国际的国旗:
第一段代码(flagss.py)是依序下载:下载完一个图片后保存到硬盘,而后请求下一张图片;
第二段代码(flagss_threadpool.py)使用 concurrent.futures 模块,批量下载10张图片。异步
运行分别运行两段代码3次,结果以下:async
images.py 的结果以下
$ python flags.py BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 20 flags downloaded in 6.18s $ python flags.py BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 20 flags downloaded in 5.67s $ python flags.py BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 20 flags downloaded in 6.55s
能够看到,依次下载10张图片,平均须要6秒
flags_threadpool.py 的结果以下:
$ python flags_threadpool.py NG EG VN BR JP FR DE CN TR BD PK MX PH US RU IN ET CD ID IR 20 flags downloaded in 2.12s $ python flags_threadpool.py BR IN DE FR TR RU EG NG JP CN ID ET PK MX PH US IR CD VN BD 20 flags downloaded in 2.23s $ python flags_threadpool.py CN BR DE ID NG RU TR IN MX US IR BD VN CD PH EG FR JP ET PK 20 flags downloaded in 1.18s
使用 concurrent.futures 后,下载10张图片平均须要2秒
经过上边的结果咱们发现使用 concurrent.futures 后,下载效率大幅提高。
下边咱们来看下这两段代码。
同步执行的代码flags.py:
#! -*- coding: utf-8 -*- import os import time import sys import requests # <1> POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() # <2> BASE_URL = 'http://flupy.org/data/flags' # <3> DEST_DIR = 'images/' # <4> # 保存图片 def save_flag(img, filename): # <5> path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) # 下载图片 def get_flag(cc): # <6> url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower()) # 这里咱们使用 requests 包,须要先经过pypi安装 resp = requests.get(url) return resp.content # 显示一个字符串,而后刷新sys.stdout,目的是在一行消息中看到进度 def show(text): # <7> print(text, end=' ') sys.stdout.flush() def download_many(cc_list): # <8> for cc in sorted(cc_list): # <9> image = get_flag(cc) show(cc) save_flag(image, cc.lower() + '.gif') return len(cc_list) def main(download_many): # <10> t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n{} flags downloaded in {:.2f}s' print(msg.format(count, elapsed)) if __name__ == '__main__': main(download_many) # <11>
使用 concurrent.future 并发的代码 flags_threadpool.py
#! -*- coding: utf-8 -*- from concurrent import futures from flags import save_flag, get_flag, show, main # 设定ThreadPoolExecutor 类最多使用几个线程 MAX_WORKERS = 20 # 下载一个图片 def download_one(cc): image = get_flag(cc) show(cc) save_flag(image, cc.lower() + '.gif') return cc def download_many(cc_list): # 设定工做的线程数量,使用约需的最大值与要处理的数量直接较小的那个值,以避免建立多余的线程 workers = min(MAX_WORKERS, len(cc_list)) # <4> # 使用工做的线程数实例化ThreadPoolExecutor类; # executor.__exit__方法会调用executor.shutdown(wait=True)方法, # 它会在全部线程都执行完毕前阻塞线程 with futures.ThreadPoolExecutor(workers) as executor: # <5> # map 与内置map方法相似,不过download_one 函数会在多个线程中并发调用; # map 方法返回一个生成器,所以能够迭代, # 迭代器的__next__方法调用各个Future 的 result 方法 res = executor.map(download_one, sorted(cc_list)) # 返回获取的结果数量;若是有现成抛出异常,会在这里抛出 # 这与隐式调用next() 函数从迭代器中获取相应的返回值同样。 return len(list(res)) # <7> return len(results) if __name__ == '__main__': main(download_many)
上边的代码,咱们对 concurrent.futures 的使用有了大体的了解。但 future 在哪里呢,咱们并无看到。
Future 是 concurrent.futures 模块和 asyncio 包的重要组件。从Python3.4起,标准库中有两个为Future的类:concurrent.futures.Future 和 asyncio.Future。这两个Future做用相同。
Future 封装待完成的操做,可放入队列,完成的状态能够查询,获得结果(或抛出异常)后能够获取结果(或异常)。
Future 表示终将发生的事情,而肯定某件事情会发生的惟一方式是执行的时间已经排定。所以只有把某件事交给 concurrent.futures.Executor 子类处理时,才会建立 concurrent.futures.Future 实例。
例如,调用Executor.submit() 方法的参数是一个可调用的对象,调用这个方法后会为传入的可调用对象排期,并返回一个Future。
Future 有三个重要的方法:
.done() 返回布尔值,表示Future 是否已经执行
.add_done_callback() 这个方法只有一个参数,类型是可调用对象,Future运行结束后会回调这个对象。
.result() 若是 Future 运行结束后调用result(), 会返回可调用对象的结果或者抛出执行可调用对象时抛出的异常,若是是 Future 没有运行结束时调用 f.result()方法,这时会阻塞调用方所在的线程,直到有结果返回。此时result 方法还能够接收 timeout 参数,若是在指定的时间内 Future 没有运行完毕,会抛出 TimeoutError 异常。
asyncio.Future.result 方法不支持设定超时时间,若是想获取 Future 的结果,可使用 yield from 结构
为了加深对 Future 的理解,如今咱们修改下 flags_threadpool.py download_many 函数。
def download_many(cc_list): cc_list = cc_list[:5] with futures.ThreadPoolExecutor(max_workers=3) as executor: to_do = [] # 用于建立并排定 future for cc in sorted(cc_list): # submit 方法排定可调用对象的执行时间而后返回一个future,表示这个待执行的操做 future = executor.submit(download_one, cc) to_do.append(future) msg = 'Scheduled for {}: {}' print(msg.format(cc, future)) results = [] # 用于获取future 结果 # as_completed 接收一个future 列表,返回值是一个迭代器,在运行结束后产出future for future in futures.as_completed(to_do): res = future.result() msg = '{} result: {!r}' print(msg.format(future, res)) results.append(res) return len(results)
如今执行代码,运行结果以下:
Scheduled for BR: <Future at 0x10d43cb70 state=running> Scheduled for CN: <Future at 0x10d4434a8 state=running> Scheduled for ID: <Future at 0x10d443ef0 state=running> Scheduled for IN: <Future at 0x10d443978 state=pending> Scheduled for US: <Future at 0x10d44f748 state=pending> BR <Future at 0x10d43cb70 state=finished returned str> result: 'BR' IN <Future at 0x10d443978 state=finished returned str> result: 'IN' CN <Future at 0x10d4434a8 state=finished returned str> result: 'CN' ID <Future at 0x10d443ef0 state=finished returned str> result: 'ID' US <Future at 0x10d44f748 state=finished returned str> result: 'US' 5 flags downloaded in 1.47s
从结果能够看到,future 的 repr() 方法会显示状态,前三个 是running 是由于咱们设定了三个进程,因此后两个是pendding 状态。若是将max_workers参数设置为5,结果就会全都是 running。
虽然,使用 future 的脚步比第一个脚本的执行速度快了不少,但因为受GIL的限制,下载并非并行的。
CPython 解释器自己不是线程安全的,所以解释器被一个全局解释器锁保护着,它确保任什么时候候都只有一个Python线程执行。
然而,Python标准库中全部执行阻塞型I/O操做的函数,在等待系统返回结果时都会释放GIL。这意味着I/O密集型Python程序能从中受益:一个Python线程等待网络响应时,阻塞型I/O函数会释放GIL,再运行一个线程。
Python 标准库中全部阻塞型I/O函数都会释放GIL,容许其余线程运行。time.sleep()函数也会释放GIL。
那么如何在CPU密集型做业中使用 concurrent.futures 模块绕开GIL呢?
答案是 使用 ProcessPoolExecutor 类。
使用这个模块能够在作CPU密集型工做是绕开GIL,利用全部可用核心。
ThreadPoolExecutor 和 ProcessPoolExecutor 都实现了通用的 Executor 接口,因此,咱们能够轻松的将基于线程的方案改成使用进程的方案。
好比下边这样:
def download_many(cc_list): workers = min(MAX_WORKERS, len(cc_list)) with futures.ThreadPoolExecutor(workers) as executor: pass # 改为 def download_many(cc_list): with futures.ProcessPoolExecutor() as executor: pass
须要注意的是,ThreadPoolExecutor 须要指定 max_workers 参数,
而 ProcessPoolExecutor 的这个参数是可选的默认值是 os.cup_count()(计算机cpu核心数)。
ProcessPoolExecutor 的价值主要体如今CPU密集型做业上。
使用Python处理CPU密集型工做,应该试试PyPy,会有更高的执行速度。
如今咱们回到开始的代码,看下 Executor.map 函数。
文档中对map函数的介绍以下。
map(func, *iterables, timeout=None, chunksize=1)
等同于 map(func, *iterables),不一样的是 func 是异步执行的,而且能够同时进行对 func 的多个调用。若是调用 __next__(),则返回的迭代器提出 concurrent.futures.TimeoutError,而且在从 Executor.map() 的原始调用起的 timeout 秒以后结果不可用。 timeout 能够是int或float。若是未指定 timeout 或 None,则等待时间没有限制。若是调用引起异常,那么当从迭代器检索其值时,将引起异常。当使用 ProcessPoolExecutor 时,此方法将 iterables 分红多个块,它做为单独的任务提交到进程池。这些块的(近似)大小能够经过将 chunksize 设置为正整数来指定。对于很是长的迭代,与默认大小1相比,使用大值 chunksize 能够显着提升性能。使用 ThreadPoolExecutor,chunksize 没有效果。
在 3.5 版更改: 添加了 chunksize 参数。
Executor.map 还有个特性比较有用,那就是这个函数返回结果的顺序于调用开始的顺序是一致的。若是第一个调用称其结果用时10秒,其余调用只用1秒,代码会阻塞10秒,获取map方法返回的生成器产出的第一个结果。
若是不是获取到全部结果再处理,一般会使用 Executor.submit + Executor.as_completed 组合使用的方案。
Executor.submit + Executor.as_completed 这个组合更灵活,由于submit方法能处理不一样的可调用对象和参数,而executor.map 只能处理参数不一样的同一个可调用对象。此外,传给futures.as_completed 函数的期物集合能够来自不一样的 Executor 实例。
futures 有三个异常类:
exception concurrent.futures.CancelledError 在future取消时引起。
exception concurrent.futures.TimeoutError 在future操做超过给定超时时触发。
exception concurrent.futures.process.BrokenProcessPool
从 RuntimeError 派生,当 ProcessPoolExecutor 的一个工人以非干净方式终止(例如,若是它从外部被杀死)时,引起此异常类。
咱们先看一下,future.result() 出现异常的处理状况。代码改动以下:
# 将第一个 CN 改成CN1 也能够是其它任意错误代码 POP20_CC = ('CN1 IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() def get_flag(cc): # <6> url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower()) resp = requests.get(url) if resp.status_code != 200: # <1> resp.raise_for_status() # 若是不是200 抛出异常 return resp.content def download_one(cc): try: image = get_flag(cc) # 捕获 requests.exceptions.HTTPError except requests.exceptions.HTTPError as exc: # # 若是有异常 直接抛出 raise else: save_flag(image, cc.lower() + '.gif') return cc
如今执行代码,会发现 download_one 中的异常传递到了download_many 中,而且致使抛出了异常,未执行完的其它future 也都中断。
为了能保证其它没有错误的future 能够正常执行,这里咱们须要对future.result() 作异常处理。
改动结果以下:
def download_many(cc_list): cc_list = cc_list[:5] with futures.ThreadPoolExecutor(max_workers=20) as executor: to_do_map = {} for cc in sorted(cc_list): future = executor.submit(download_one, cc) to_do_map[future] = cc msg = 'Scheduled for {}: {}' print(msg.format(cc, future)) results = [] for future in futures.as_completed(to_do_map): try: res = future.result() except requests.exceptions.HTTPError as exc: # 处理可能出现的异常 error_msg = '{} result {}'.format(cc, exc) else: error_msg = '' if error_msg: cc = to_do_map[future] # <16> print('*** Error for {}: {}'.format(cc, error_msg)) else: msg = '{} result: {!r}' print(msg.format(future, res)) results.append(res) return len(results)
这里咱们用到了一个对 futures.as_completed 函数特别有用的惯用法:构建一个字典,把各个future映射到其余数据(future运行结束后可能用的)上。这样,虽然 future生成的顺序虽然已经乱了,依然便于使用结果作后续处理。
一篇写完了没有总结总感受少点什么,因此。
Python 自 0.9.8 版就支持线程了,concurrent.futures 只不过是使用线程的最新方式。
futures.ThreadPoolExecutor 类封装了 threading 模块的组件,使使用线程变得更加方便。
顺便再推荐一下 《流畅的python》,绝对值得一下。
下一篇笔记应该是使用 asyncio 处理并发。
最后,感谢女友支持。
>欢迎关注 | >请我喝芬达 |
---|---|
![]() |
![]() |