《流畅的Python》笔记。python
本篇主要讨论concurrent.futures模块,并用它实现一个简单的并发操做。安全
咱们都知道,若是有大量数据要处理,或者要处理大量连接,异步操做会比顺序操做快不少。Python中,concurrent
和asyncio
则是标准库中进行了高度封装的两个异步操做包。它们在底层使用了Python提供的更基础的两个模块,分别是multiprocessing
和threading
。微信
future
(全小写)并不具体指某个类的实例,并且笔者查了老多资料也没看到哪一个类叫作future
,它泛指用于异步操做的对象。concurrent.futures
和asyncio
这两个模块中有两个名为Future
的类:concurrent.futures.Future
和asyncio.Future
。这两个类的做用相同,都表示可能已经完成或还没有完成的延迟计算。这两个Future
的实例并不该该由咱们手动建立,而应交由并发框架(也就是前面那两个模块)来实例化。多线程
本篇主要介绍concurrent.futures
模块的简单使用,并会将其和顺序计算进行对比,其中还会涉及GIL和阻塞型I/O的概念。asyncio
将在下一篇进行介绍。并发
首先实现一个下载各国国旗的程序,随后再将它与并发版本进行对比。如下是顺序执行的版本,它下载人口前20的国家的国旗,并保存到本地:app
# 代码2.1,flags.py
import os, time, sys # 这么引用只是为了节省篇幅,并不提倡
import requests # 第三方库
POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
# 若是想测试本身的并发程序,为了不被误认为是DOS攻击,请自建http服务
BASE_URL = "http://flupy.org/data/flags"
DEST_DIR = "downloads/"
def save_flag(img, filename): # 保存图片到本地
path = os.path.join(DEST_DIR, filename)
with open(path, "wb") as fp:
fp.write(img)
def get_flag(cc): # 请求图片
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
resp = requests.get(url)
return resp.content
def show(text): # 每获取一张图片就给出一个提示
print(text, end=" ")
sys.stdout.flush()
def download_one(cc): # 下载一张图片
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + ".gif")
return cc # 这个return主要是给后面的并发程序用的,此处不要这行代码也能够
def download_many(cc_list): # 下载多张图片
for cc in sorted(cc_list):
download_one(cc)
return len(cc_list)
def main(download_many): # 主程序,接收一个函数为参数
t0 = time.time() # 开始时间
count = download_many(POP20_CC)
elapsed = time.time() - t0 # 结束时间
msg = "\n{} flags downloaded in {:.2f}s"
print(msg.format(count, elapsed))
if __name__ == "__main__":
main(download_many)
# 结果
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 flags downloaded in 14.83s # 耗时,只作了一次
复制代码
如今咱们用concurrent.futures
模块将上述代码改写为线程版本,使其异步执行,其中有大部分函数延用上述代码。框架
首先实现一个更具备细节的版本,咱们手动提交线程,而后再运行。这个版本只是为了讲述细节,因此并无所有下载,最大线程数也没有设置得很高:异步
# 代码3.1,flags_threadpool.py
from concurrent import futures
from flags import save_flag, get_flag, download_one, show, main
def download_many_ac(cc_list):
cc_list = cc_list[:5] # 只下载前五个用于测试
with futures.ThreadPoolExecutor(len(cc_list) / 2) as executor:
to_do = {} # 有意写出字典,其实也能够是列表或集合,但这是个惯用方法
for cc in sorted(cc_list):
future = executor.submit(download_one, cc)
to_do[future] = cc
msg = "Scheduled for {}: {}"
print(msg.format(cc, future))
results = []
for future in futures.as_completed(to_do):
res = future.result()
msg = "{} result: {!r}"
print(msg.format(future, res))
results.append(res)
return len(results)
if __name__ == "__main__":
main(download_many_ac)
# 结果:
Scheduled for BR: <Future at 0x1cbca5ab0f0 state=running>
Scheduled for CN: <Future at 0x1cbcb339b00 state=running>
Scheduled for ID: <Future at 0x1cbcb3490f0 state=running>
Scheduled for IN: <Future at 0x1cbcb349748 state=pending>
Scheduled for US: <Future at 0x1cbcb3497f0 state=pending>
CN <Future at 0x1cbcb339b00 state=finished returned str> result: 'CN'
BR <Future at 0x1cbca5ab0f0 state=finished returned str> result: 'BR'
IN <Future at 0x1cbcb349748 state=finished returned str> result: 'IN'
US <Future at 0x1cbcb3497f0 state=finished returned str> result: 'US'
ID <Future at 0x1cbcb3490f0 state=finished returned str> result: 'ID'
5 flags downloaded in 2.39s # 20个一块儿下载只须要1.6s左右
复制代码
解释:async
在concurrent.futures
中有一个名为Executor
的抽象基类,由它定义执行异步操做的接口。在这个模块中有它的两个具体类:的ThreadPoolExecutor
和ProcessPoolExecutor
,前者是线程,后者是进程。Executor
的第一个参数指定最大运行线程数。函数
Executor.submit(func, *args, **kwargs)
方法会在线程中执行func(*args, **kwargs)
,它将这个方法封装成Future
对象并返回(假设这个实例叫作future
)。submit
方法会对future
进行排期,若是运行的线程数没达到最大线程数,则future
会被当即运行,并将其状态置为running
;不然就等待,并将其状态置为pending
。这同时也代表,线程在submit
方法中启动。
futures.as_completed
函数的第一个参数是一个future
序列,在内部会被转换成set
。它返回一个迭代器,在future
运行结束后产出future
。在使用这个函数时还有一个惯用方法:将future
放到一个字典中。由于as_completed
返回的future
的顺序不必定是传入时的顺序,使用字典能够很轻松的作一些后续处理。
上述代码中,从第31-35行的最开始两个字母是由show
函数输出的。光看上述结果,会让人以为线程是在as_completed
中启动的,而之因此结果输出得这么整齐,是由于for
循环里只是“提交”,实际运行是在线程中。若是在每次循环最后都执行sleep(2)
,你将会看到这样的结果:
# 代码3.2
Scheduled for BR: <Future at 0x13e6b30b2b0 state=running>
BR Scheduled for CN: <Future at 0x13e6b5820b8 state=running>
CN Scheduled for ID: <Future at 0x13e6c099278 state=running>
-- snip --
复制代码
concurrent.futures.Future
有一个**result
方法,它返回future
中可调用对象运行完成后的结果,或者从新抛出可调用对象运行时的异常**。若是future
还未运行完成,调用future.result()
将阻塞调用方所在的线程,直到有结果可返回;它还能够接受一个timeout
参数用于指定运行时间,若是在timeout
时间内future
没有运行完毕,将抛出TimeoutError
异常。
在代码3.1
中,咱们自行提交线程,其实,上述可改成更简洁的版本:使用Executor.map
批量提交,只须要新建一个download_many
函数,其他不变:
# 代码3.3
def download_many(cc_list):
with futures.ThreadPoolExecutor(len(cc_list)) as executor:
res = executor.map(download_one, sorted(cc_list))
return len(list(res))
# 结果:
JP RUBR EG CN VN BD TR FR ID NG DE IN PK ET PH IR US CD MX
20 flags downloaded in 1.69s
复制代码
Executor.map()
方法和内置的map
函数相似,它将第一个参数(可调用对象)映射到第二个参数(可迭代对象)的每个元素上以建立Future
列表。Executor.map()
方法内部也是经过调用Future.submit
来建立Future
对象。
从上面代码能够看出,虽然使用Executor.map()
的代码量比较少,但Executor.submit()
和futures.as_completed()
的组合更灵活。
Executor.map()
更适合于须要批量处理的状况,好比同一函数(或者可调用对象)不一样参数。而Executor.submit()
则更适合于零散的状况,好比不一样函数同一参数,不一样函数不一样参数,甚至两个线程毫无关联。
本文主体部分已经结束,下面是一些补充。
CPython自己并非线程安全的,所以有全局解释器锁(Global Interpreter Lock, GIL),一次只容许使用一个线程执行Python字节码。
以这个为基础,按理说上述全部代码将都不能并行下载,由于一次只能运行一个线程,而且线程版本的运行时间应该比顺序版本的还要多才对(线程切换耗时)。但结果也代表,两个线程版本的耗时都大大下降了。
这是由于,Python标准库中全部执行阻塞型I/O操做的函数,在等待操做系统返回结果时都会释放GIL。这就意味着,GIL几乎对I/O密集型处理并无什么影响,依然可使用多线程。
concurrent.futures
中还有一个ProcessPoolExecutor
类,它实现的是真正的并行计算。它和ThreadPoolExecutro
同样,继承自Executor
,二者实现了共同的接口,所以使用concurrent.futures
编写的代码能够轻松地在线程版本与进程版本之间转换,好比要讲上述代码改成进程版本,只需更改download_many()
中的一行代码:
# 代码3.4
with futures.ThreadPoolExecutor(len(cc_list)) as executor:
# 改成:
with futures.ProcessPoolExecutor() as executor:
复制代码
也能够指定进程数,但默认是os.cpu_count()
的返回值,即电脑的CPU核心数。
这个类很是适合于CPU密集型做业上。使用这个类实现的上述代码虽然比线程版本慢一些,但依然比顺序版本快不少。
若是你用最新版pip
下载过第三方库,你会发如今下载时会有一个文字进度条。在Python中想要实现这种效果可使用第三方库tqdm
,如下是它的一个简单用法:
# 代码3.5
import tqdm
from time import sleep
for i in tqdm.tqdm(range(1000)):
sleep(0.01)
# 结果:
40%|████ | 400/1000 [00:10<00:00, 98.11it/s]
复制代码
迎你们关注个人微信公众号"代码港" & 我的网站 www.vpointer.net ~