Python线程池及其原理和使用(超级详细)

系统启动一个新线程的成本是比较高的,由于它涉及与操做系统的交互。在这种情形下,使用线程池能够很好地提高性能,尤为是当程序中须要建立大量生存期很短暂的线程时,更应该考虑使用线程池。

线程池在系统启动时即建立大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。

此外,使用线程池能够有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会致使系统性能急剧降低,甚至致使 Python 解释器崩溃,而线程池的最大线程数参数能够控制系统中并发线程的数量不超过此数。html

线程池的使用

线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于建立线程池,而 ProcessPoolExecutor 用于建立进程池。

若是使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

Exectuor 提供了以下经常使用方法:编程

  • submit(fn, *args, **kwargs):将 fn 函数提交给线程池。*args 表明传给 fn 函数的参数,*kwargs 表明以关键字参数的形式为 fn 函数传入参数。
  • map(func, *iterables, timeout=None, chunksize=1):该函数相似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式当即对 iterables 执行 map 处理。
  • shutdown(wait=True):关闭线程池。


程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。因为线程任务会在新线程中以异步方式执行,所以,线程执行的函数至关于一个“未来完成”的任务,因此 Python 使用 Future 来表明。并发

Future 提供了以下方法:异步

  • cancel():取消该 Future 表明的线程任务。若是该任务正在执行,不可取消,则该方法返回 False;不然,程序会取消该任务,并返回 True。
  • cancelled():返回 Future 表明的线程任务是否被成功取消。
  • running():若是该 Future 表明的线程任务正在执行、不可被取消,该方法返回 True。
  • done():若是该 Funture 表明的线程任务被成功取消或执行完成,则该方法返回 True。
  • result(timeout=None):获取该 Future 表明的线程任务最后返回的结果。若是 Future 表明的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
  • exception(timeout=None):获取该 Future 表明的线程任务所引起的异常。若是该任务成功完成,没有异常,则该方法返回 None。
  • add_done_callback(fn):为该 Future 表明的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。


在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池再也不接收新任务,但会将之前全部的已提交任务执行完成。当线程池中的全部任务都执行完成后,该线程池中的全部线程都会死亡。

使用线程池来执行线程任务的步骤以下:函数

  1. 调用 ThreadPoolExecutor 类的构造器建立一个线程池。
  2. 定义一个普通函数做为线程任务。
  3. 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
  4. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。


下面程序示范了如何使用线程池来执行线程任务:性能

 1 def test(value1, value2=None):
 2     print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
 3     time.sleep(2)
 4     return 'finished'
 5 
 6 def test_result(future):
 7     print(future.result())
 8 
 9 if __name__ == "__main__":
10     import numpy as np
11     from concurrent.futures import ThreadPoolExecutor
12     threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
13     for i in range(0,10):
14         future = threadPool.submit(test, i,i+1)
15         
16     threadPool.shutdown(wait=True)
1 结果:
2 
3 test__0 threading is printed 0, 1
4 test__1 threading is printed 1, 2
5 test__2 threading is printed 2, 3
6 test__3 threading is printed 3, 4
7 test__1 threading is printed 4, 5
8 test__0 threading is printed 5, 6
9 test__3 threading is printed 6, 7

 

获取执行结果

前面程序调用了 Future 的 result() 方法来获取线程任务的运回值,但该方法会阻塞当前主线程,只有等到钱程任务完成后,result() 方法的阻塞才会被解除。

若是程序不但愿直接调用 result() 方法阻塞线程,则可经过 Future 的 add_done_callback() 方法来添加回调函数,该回调函数形如 fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象做为参数传给该回调函数。
直接调用result函数结果spa

 1 def test(value1, value2=None):
 2     print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
 3     time.sleep(2)
 4     return 'finished'
 5 
 6 def test_result(future):
 7     print(future.result())
 8 
 9 if __name__ == "__main__":
10     import numpy as np
11     from concurrent.futures import ThreadPoolExecutor
12     threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
13     for i in range(0,10):
14         future = threadPool.submit(test, i,i+1)
15 #         future.add_done_callback(test_result)
16         print(future.result())
17         
18     threadPool.shutdown(wait=True)
19     print('main finished')
1 结果:
2 
3 test__0 threading is printed 0, 1
4 finished
5 test__0 threading is printed 1, 2
6 finished
7 test__1 threading is printed 2, 3
8 finished

去掉上面注释部分,调用future.add_done_callback函数,注释掉第16行操作系统

 1 test__0 threading is printed 0, 1
 2 test__1 threading is printed 1, 2
 3 test__2 threading is printed 2, 3
 4 test__3 threading is printed 3, 4
 5 finished
 6 finished
 7 finished
 8 test__1 threading is printed 4, 5
 9 test__0 threading is printed 5, 6
10 finished

 

另外,因为线程池实现了上下文管理协议(Context Manage Protocol),所以,程序可使用 with 语句来管理线程池,这样便可避免手动关闭线程池,如上面的程序所示。

此外,Exectuor 还提供了一个 map(func, *iterables, timeout=None, chunksize=1) 方法,该方法的功能相似于全局函数 map(),区别在于线程池的 map() 方法会为 iterables 的每一个元素启动一个线程,以并发方式来执行 func 函数。这种方式至关于启动 len(iterables) 个线程,井收集每一个线程的执行结果。

例如,以下程序使用 Executor 的 map() 方法来启动线程,并收集线程任务的返回值:.net

示例换成多参数的:线程

def test(value1, value2=None):
    print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
#     time.sleep(2)


if __name__ == "__main__":
    import numpy as np
    from concurrent.futures import ThreadPoolExecutor
    threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
    for i in range(0,10):
#         test(str(i), str(i+1))
        threadPool.map(test, [i],[i+1]) # 这是运行一次test的参数,众所周知map可让test执行屡次,即一个[]表明一个参数,一个参数赋予不一样的值即增长[]的长度如从[1]到[1,2,3]
    threadPool.shutdown(wait=True)

 

上面程序使用 map() 方法来启动 4个线程(该程序的线程池包含 4 个线程,若是继续使用只包含两个线程的线程池,此时将有一个任务处于等待状态,必须等其中一个任务完成,线程空闲出来才会得到执行的机会),map() 方法的返回值将会收集每一个线程任务的返回结果。
经过上面程序能够看出,使用 map() 方法来启动线程,并收集线程的执行结果,不只具备代码简单的优势,并且虽然程序会以并发方式来执行 test() 函数,但最后收集的 test() 函数的执行结果,依然与传入参数的结果保持一致。

 

编写这个文档主要是由于示例文档[1]没有多参数的。网上不少资料都是基于threadpool方法传参见[2]

Reference:

[1] http://c.biancheng.net/view/2627.html

[2] https://www.cnblogs.com/gongxijun/p/6862333.html

相关文章
相关标签/搜索