Python学习之路36-使用future处理并发

《流畅的Python》笔记。python

本篇主要讨论concurrent.futures模块,并用它实现一个简单的并发操做。安全

1. 前言

咱们都知道,若是有大量数据要处理,或者要处理大量连接,异步操做会比顺序操做快不少。Python中,concurrentasyncio则是标准库中进行了高度封装的两个异步操做包。它们在底层使用了Python提供的更基础的两个模块,分别是multiprocessingthreading微信

future(全小写)并不具体指某个类的实例,并且笔者查了老多资料也没看到哪一个类叫作future,它泛指用于异步操做的对象。concurrent.futuresasyncio这两个模块中有两个名为Future的类:concurrent.futures.Futureasyncio.Future。这两个类的做用相同,都表示可能已经完成或还没有完成的延迟计算。这两个Future的实例并不该该由咱们手动建立,而应交由并发框架(也就是前面那两个模块)来实例化。多线程

本篇主要介绍concurrent.futures模块的简单使用,并会将其和顺序计算进行对比,其中还会涉及GIL和阻塞型I/O的概念。asyncio将在下一篇进行介绍。并发

2. 顺序执行

首先实现一个下载各国国旗的程序,随后再将它与并发版本进行对比。如下是顺序执行的版本,它下载人口前20的国家的国旗,并保存到本地:app

# 代码2.1,flags.py
import os, time, sys  # 这么引用只是为了节省篇幅,并不提倡
import requests  # 第三方库

POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
# 若是想测试本身的并发程序,为了不被误认为是DOS攻击,请自建http服务
BASE_URL = "http://flupy.org/data/flags"  
DEST_DIR = "downloads/"

def save_flag(img, filename):  # 保存图片到本地
    path = os.path.join(DEST_DIR, filename)
    with open(path, "wb") as fp:
        fp.write(img)

def get_flag(cc):   # 请求图片
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content

def show(text):  # 每获取一张图片就给出一个提示
    print(text, end=" ")
    sys.stdout.flush()

def download_one(cc):   # 下载一张图片
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + ".gif")
    return cc   # 这个return主要是给后面的并发程序用的,此处不要这行代码也能够

def download_many(cc_list):   # 下载多张图片
    for cc in sorted(cc_list):
        download_one(cc)
    return len(cc_list)

def main(download_many):   # 主程序,接收一个函数为参数
    t0 = time.time()   # 开始时间
    count = download_many(POP20_CC)
    elapsed = time.time() - t0   # 结束时间
    msg = "\n{} flags downloaded in {:.2f}s"
    print(msg.format(count, elapsed))

if __name__ == "__main__":
    main(download_many)

# 结果
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 14.83s   # 耗时,只作了一次
复制代码

3. concurrent.futures

如今咱们用concurrent.futures模块将上述代码改写为线程版本,使其异步执行,其中有大部分函数延用上述代码。框架

3.1 futures.as_completed

首先实现一个更具备细节的版本,咱们手动提交线程,而后再运行。这个版本只是为了讲述细节,因此并无所有下载,最大线程数也没有设置得很高:异步

# 代码3.1,flags_threadpool.py
from concurrent import futures
from flags import save_flag, get_flag, download_one, show, main

def download_many_ac(cc_list):
    cc_list = cc_list[:5] # 只下载前五个用于测试
    with futures.ThreadPoolExecutor(len(cc_list) / 2) as executor:
        to_do = {}   # 有意写出字典,其实也能够是列表或集合,但这是个惯用方法
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do[future] = cc
            msg = "Scheduled for {}: {}"
            print(msg.format(cc, future))
        results = []
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = "{} result: {!r}"
            print(msg.format(future, res))
            results.append(res)
        return len(results)

if __name__ == "__main__":
    main(download_many_ac)

# 结果:
Scheduled for BR: <Future at 0x1cbca5ab0f0 state=running>
Scheduled for CN: <Future at 0x1cbcb339b00 state=running>
Scheduled for ID: <Future at 0x1cbcb3490f0 state=running>
Scheduled for IN: <Future at 0x1cbcb349748 state=pending>
Scheduled for US: <Future at 0x1cbcb3497f0 state=pending>
CN <Future at 0x1cbcb339b00 state=finished returned str> result: 'CN'
BR <Future at 0x1cbca5ab0f0 state=finished returned str> result: 'BR'
IN <Future at 0x1cbcb349748 state=finished returned str> result: 'IN'
US <Future at 0x1cbcb3497f0 state=finished returned str> result: 'US'
ID <Future at 0x1cbcb3490f0 state=finished returned str> result: 'ID'

5 flags downloaded in 2.39s  # 20个一块儿下载只须要1.6s左右
复制代码

解释async

  • concurrent.futures中有一个名为Executor的抽象基类,由它定义执行异步操做的接口。在这个模块中有它的两个具体类:的ThreadPoolExecutorProcessPoolExecutor,前者是线程,后者是进程。Executor的第一个参数指定最大运行线程数。函数

  • Executor.submit(func, *args, **kwargs)方法会在线程中执行func(*args, **kwargs),它将这个方法封装成Future对象并返回(假设这个实例叫作future)。submit方法会对future进行排期,若是运行的线程数没达到最大线程数,则future会被当即运行,并将其状态置为running;不然就等待,并将其状态置为pending这同时也代表,线程在submit方法中启动

  • futures.as_completed函数的第一个参数是一个future序列,在内部会被转换成set。它返回一个迭代器,在future运行结束后产出future。在使用这个函数时还有一个惯用方法:将future放到一个字典中。由于as_completed返回的future的顺序不必定是传入时的顺序,使用字典能够很轻松的作一些后续处理。

  • 上述代码中,从第31-35行的最开始两个字母是由show函数输出的。光看上述结果,会让人以为线程是在as_completed中启动的,而之因此结果输出得这么整齐,是由于for循环里只是“提交”,实际运行是在线程中。若是在每次循环最后都执行sleep(2),你将会看到这样的结果:

    # 代码3.2
    Scheduled for BR: <Future at 0x13e6b30b2b0 state=running>
    BR Scheduled for CN: <Future at 0x13e6b5820b8 state=running>
    CN Scheduled for ID: <Future at 0x13e6c099278 state=running>
    -- snip --
    复制代码
  • concurrent.futures.Future有一个**result方法,它返回future中可调用对象运行完成后的结果,或者从新抛出可调用对象运行时的异常**。若是future还未运行完成,调用future.result()阻塞调用方所在的线程,直到有结果可返回;它还能够接受一个timeout参数用于指定运行时间,若是在timeout时间内future没有运行完毕,将抛出TimeoutError异常。

3.2 Executor.map

代码3.1中,咱们自行提交线程,其实,上述可改成更简洁的版本:使用Executor.map批量提交,只须要新建一个download_many函数,其他不变:

# 代码3.3
def download_many(cc_list):
    with futures.ThreadPoolExecutor(len(cc_list)) as executor:
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))

# 结果:
JP RUBR  EG CN VN BD TR FR ID NG DE IN PK ET PH IR US CD MX 
20 flags downloaded in 1.69s
复制代码

Executor.map()方法和内置的map函数相似,它将第一个参数(可调用对象)映射到第二个参数(可迭代对象)的每个元素上以建立Future列表。Executor.map()方法内部也是经过调用Future.submit来建立Future对象。

3.3 比较

从上面代码能够看出,虽然使用Executor.map()的代码量比较少,但Executor.submit()futures.as_completed()的组合更灵活。

Executor.map()更适合于须要批量处理的状况,好比同一函数(或者可调用对象)不一样参数。而Executor.submit()则更适合于零散的状况,好比不一样函数同一参数,不一样函数不一样参数,甚至两个线程毫无关联。

4. 补充

本文主体部分已经结束,下面是一些补充。

4.1 I/O密集型和GIL

CPython自己并非线程安全的,所以有全局解释器锁(Global Interpreter Lock, GIL),一次只容许使用一个线程执行Python字节码。

以这个为基础,按理说上述全部代码将都不能并行下载,由于一次只能运行一个线程,而且线程版本的运行时间应该比顺序版本的还要多才对(线程切换耗时)。但结果也代表,两个线程版本的耗时都大大下降了。

这是由于,Python标准库中全部执行阻塞型I/O操做的函数,在等待操做系统返回结果时都会释放GIL。这就意味着,GIL几乎对I/O密集型处理并无什么影响,依然可使用多线程。

4.2 CPU密集型

concurrent.futures中还有一个ProcessPoolExecutor类,它实现的是真正的并行计算。它和ThreadPoolExecutro同样,继承自Executor,二者实现了共同的接口,所以使用concurrent.futures编写的代码能够轻松地在线程版本与进程版本之间转换,好比要讲上述代码改成进程版本,只需更改download_many()中的一行代码:

# 代码3.4
with futures.ThreadPoolExecutor(len(cc_list)) as executor:
# 改成:
with futures.ProcessPoolExecutor() as executor:
复制代码

也能够指定进程数,但默认是os.cpu_count()的返回值,即电脑的CPU核心数。

这个类很是适合于CPU密集型做业上。使用这个类实现的上述代码虽然比线程版本慢一些,但依然比顺序版本快不少。

4.3 进度条

若是你用最新版pip下载过第三方库,你会发如今下载时会有一个文字进度条。在Python中想要实现这种效果可使用第三方库tqdm,如下是它的一个简单用法:

# 代码3.5
import tqdm
from time import sleep

for i in tqdm.tqdm(range(1000)):
    sleep(0.01)

# 结果:
 40%|████      |  400/1000 [00:10<00:00, 98.11it/s]
复制代码

迎你们关注个人微信公众号"代码港" & 我的网站 www.vpointer.net ~

相关文章
相关标签/搜索