concurrent.futures模块简单介绍(线程池,进程池)

1、基类Executor

Executor类是ThreadPoolExecutor 和ProcessPoolExecutor 的基类。它为咱们提供了以下方法:html

submit(fn, *args, **kwargs):提交任务。以 fn(*args **kwargs) 方式执行并返回 Future 对像。python

fn:函数地址。app

*args:位置参数。异步

**kwargs:关键字参数。函数

map(func, *iterables, timeout=None, chunksize=1):性能

func:函数地址。测试

iterables:一个可迭代对象,以迭代的方式将参数传递给函数。spa

timeout:这个参数没弄明白,若是是None等待全部进程结束。线程

chunksize:使用 ProcessPoolExecutor 时,这个方法会将 iterables 分割任务块,并做为独立的任务提交到执行池中。这些块的数量能够由 chunksize 指定设置。 对很长的迭代器来讲,设置chunksize 值比默认值 1 能显著地提升性能。 chunksize 对 ThreadPoolExecutor 没有效果。code

shutdown(wait=True):若是为True会等待线程池或进程池执行完成后释放正在使用的资源。若是 wait 为 False,将当即返回,全部待执行的期程完成执行后会释放已分配的资源。 无论 wait 的值是什么,整个 Python 程序将等到全部待执行的期程完成执行后才退出。

2、线程池对象

ThreadPoolExecutor 是 Executor 的子类,下面介绍ThreadPoolExecutor 的参数。

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()):

max_workers:线程池的数量。

thread_name_prefix:线程名前缀。默认线程名ThreadPoolExecutor-线程数。

initializer:一个函数或方法,在启用线程前会调用这个函数(给线程池添加额外任务)

initargs :以元祖的方式给initializer中的函数传递参数。

这里须要说明的是除了max_workers这个参数外其它三个参数基本不多用。max_workers很好理解就是线程池的数量。

下面来讲initializer和initargs 这两个奇怪的家伙。

示例一:

from concurrent.futures import ThreadPoolExecutor
def work():
    print('工做线程')
def test(num):
    print('test:',num)
executor = ThreadPoolExecutor(max_workers=2,initializer=test(7))  # 开启2个线程  initializer指定参数test(7)
executor.submit(work)  
executor.submit(work)

# 打印内容以下
test: 7
工做线程
工做线程

示例二:

from concurrent.futures import ThreadPoolExecutor
def work():
    print('工做线程')
def test(num):
    print('test:',num)
executor = ThreadPoolExecutor(max_workers=2,initializer=test,initargs=(7,)) # 这里咱们使用initargs=(7,)的方式给test传递参数。
executor.submit(work)
executor.submit(work)

# 打印内容以下
test: 7
工做线程
工做线程
test: 7

经过示例一和示例二咱们能够发现initializer=test(7)时,test函数只被调用了1次,当initializer=test,initargs=(7,)时,test被调用了2次。具体缘由没有去分析。感受没什么用。之后有时间看看源码在补上。

3、进程池对象

ProcessPoolExecutor 也是 Executor 的子类,下面是ProcessPoolExecutor 参数介绍:

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

max_workers:工做进程数。若是 max_workers 为 None 或未给出,它将默认为机器的处理器个数。 若是 max_workers 小于等于 0,则将引起 ValueError。 在 Windows 上,max_workers 必须小于等于 61,不然将引起 ValueError。 若是 max_workers 为 None,则所选择的默认最多为 61,即便存在更多处理器。

mp_context :能够是一个多进程上下文或是 None。 它将被用来启动工做进程。 若是 mp_context 为 None 或未给出,将使用默认的多进程上下文。

initializer:一个函数或方法,在启用线程前会调用这个函数。

initargs :以元祖的方式给initializer中的函数传递参数。

关于说initializer和initargs 与ThreadPoolExecutor 相似这里很少说了。


 

4、建立线程池

from concurrent.futures import ThreadPoolExecutor
import time
def work(num):
    time.sleep(1)
    print('工做线程:',num)
if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=5)  # 建立线程池,数量为5
    for i in range(5):
        executor.submit(work, i)
    print('主线程')

# 打印内容以下
主线程
工做线程:   0
工做线程:   1
工做线程:   2
工做线程:   3
工做线程:   4
# 使用shutdown等待全部线程结束后在打印主线程 from concurrent.futures import ThreadPoolExecutor import time def work(num): time.sleep(1) print('工做线程:',num) if __name__ == '__main__': executor = ThreadPoolExecutor(max_workers=5) # 建立线程池,数量为5 for i in range(5): executor.submit(work, i) executor.shutdown(wait=True) # 等待线程池结束 print('主线程') # 打印内容以下 工做线程: 0 工做线程: 1 工做线程: 2 工做线程: 3 工做线程: 4 主线程

若是想要在线程执行的过程当中添加额外的功能,可使用initializer参数,以下:

from concurrent.futures import ThreadPoolExecutor

def work(num):
    print('工做线程:',num)
def test(num):
    print('额外任务:',num)
if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=5,initializer=test,initargs=(7,)) # 添加额外任务
    for i in range(5):
        executor.submit(work, i)
    executor.shutdown(wait=True)
    print('主线程')

# 打印内容以下
额外任务: 7
工做线程: 0
额外任务: 7
工做线程: 1
额外任务: 7
工做线程: 2 
额外任务: 7
工做线程: 3 
额外任务: 7
工做线程: 4 
主线程

5、进程池

进程池与线程池用法基本一致,只是名字和实现不同而已。

from concurrent.futures import ProcessPoolExecutor
import time
def work(num):
    time.sleep(1)
    print('工做进程:',num)
if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=5)  # 建立进程池,数量为5
    for i in range(5):
        executor.submit(work, i)
    print('主线程')

# 打印内容以下
主线程
工做进程: 0
工做进程: 1
工做进程: 2
工做进程: 3
工做进程: 4

# 使用shutdown等待全部线程结束后在打印主线程
from concurrent.futures import ProcessPoolExecutor
import time
def work(num):
    time.sleep(1)
    print('工做进程:',num)
if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=5)  # 建立进程池,数量为5
    for i in range(5):
        executor.submit(work, i)
    executor.shutdown(wait=True)  # 等待进程池结束
    print('主线程')
# 打印内容以下
工做进程: 0
工做进程: 1
工做进程: 2
工做进程: 3
工做进程: 4
主线程

若是想要在线程执行的过程当中添加额外的功能,可使用initializer参数,以下:

from concurrent.futures import ProcessPoolExecutor

def work(num):
    print('工做进程:',num)
def test(num):
    print('额外任务:',num)
if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=5,initializer=test,initargs=(7,)) # 添加额外任务
    for i in range(5):
        executor.submit(work, i)
    executor.shutdown(wait=True)
    print('主线程')

# 打印内容以下
额外任务: 7
工做进程: 0
工做进程: 1
工做进程: 2
工做进程: 3
工做进程: 4
额外任务: 7
额外任务: 7
额外任务: 7
额外任务: 7
主线程

 


 

6、Future Objects

future类封装了可调用文件的异步执行。future的实例由executor.submit()时被建立的,除了测试以外不该该直接实例化future对象,因此为了获取future对象咱们能够f=executor.submit()便可。

class concurrent.futures.Future类中的方法:

cancel():尝试取消执行线程池中的函数调用。若是调用当前正在执行或已完成运行,而且没法取消,则方法将返回false,不然调用将被取消,方法将返回true。

cancelled():若是线程池中的函数执行成功返回True,调用失败返回false。

running():若是线程池中的调用当前正在执行且没法取消,则返回true。

done():若是呼叫成功取消或完成运行,则返回true。不然返回false

result(timeout=None):返回线程函数的返回值。若是线程函数未执行完成,则此方法将最多等待timeout秒,若是线程函数未在超时秒内完成,则将引起concurrent.futures.TimeoutError。超时能够是int或float。若是未指定超时 timeout=None,则会阻塞,一直等待函数执行完成。若是在线程函数完成以前使用future对象取消了执行,则将引起CancelederRor。若是调用raised,此方法将引起相同的异常。

exception(timeout=None):返回线程函数引起的异常。若是线程函数还没有完成,则此方法将最多等待timeout秒。若是线程函数未在超时秒内完成,则将引起concurrent.futures.TimeoutError。超时能够是int或float。若是未指定超时或无超时timeout=None,则会一直等待。若是在线程函数完成以前使用future对象取消了执行,则将引起CancelederRor若是线程函数完成但未引起,则返回None。

add_done_callback(fn):将可调用fn附加到future对象。当future对象被取消或结束运行时,将调用fn,其中future对象是唯一的参数。添加的可调用对象是按照添加顺序调用的,而且老是在属于添加它们的进程的线程中调用。若是Callable引起异常子类,它将被记录并忽略。若是可调用引起BaseException子类,则行为未定义。


 

7、Module Functions

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):将fs绑定一个future实例,若是future执行完成或取消执行fs函数。

fs:fs是一个函数绑定在future实例(可能由不一样的执行器实例建立)。返回2个命名元组的集合。第一组名为“done”,包含等待完成,完成前(完成或future对象取消)。第二组名为“not_done”,包含未完成的future(未完成或正在运行的future)。

timeout:若是为None一直等待,不然会等待timeout秒。

return_when :必须是以下范围。

Constant

Description

FIRST_COMPLETED

当任何future 完成或取消或者线程函数执行完成时。

FIRST_EXCEPTION

当future经过引起异常而结束时,线程函数将返回。若是没有future引起异常,那么它至关于全部已完成的。

ALL_COMPLETED

当全部future完成或取消时,函数将返回。

 

concurrent.futures.as_completed(fs, timeout=None):返回一个future迭代器。

fs:可迭代对象的future。

timeout:超时时间,若是为None会一直阻塞直到执行完成。不然将等待timeout秒。

from concurrent.futures._base import as_completed
from concurrent.futures import ThreadPoolExecutor

def work(num):
    return num ** 2
if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=5)
    future_list = []  # 存放future对象
    for i in range(5):
        future_list.append(executor.submit(work, i))
    for future in as_completed(future_list):   # 这是一个无聊的用法
        res = future.result()
        print(f'结果:{res}')  # 打印工做线程返回的结果
# 打印结果以下

结果:0
结果:4
结果:16
结果:1
结果:9

 

参考文档:https://docs.python.org/3/library/concurrent.futures.html

相关文章
相关标签/搜索