理解Python并发编程-PoolExecutor篇

原文连接python

 

以前咱们使用多线程(threading)和多进程(multiprocessing)完成常规的需求,在启动的时候start、jon等步骤不能省,复杂的须要还要用1-2个队列。随着需求愈来愈复杂,若是没有良好的设计和抽象这部分的功能层次,代码量越多调试的难度就越大。有没有什么好的方法把这些步骤抽象一下呢,让咱们不关注这些细节,轻装上阵呢?设计模式

答案是:有的。多线程

从Python3.2开始一个叫作concurrent.futures被归入了标准库,而在Python2它属于第三方的futures库,须要手动安装:并发

 

pip install futures

```                                             


这个模块中有2个类:ThreadPoolExecutor和ProcessPoolExecutor,也就是对threading和multiprocessing的进行了高级别的抽象, 暴露出统一的接口,帮助开发者很是方便的实现异步调用: ```python import time from concurrent.futures import ProcessPoolExecutor, as_completed NUMBERS = range(25, 38) def fib(n): if n<= 2: return 1 return fib(n-1) + fib(n-2) start = time.time() with ProcessPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result) print 'COST: {}'.format(time.time() - start)



感觉下是否是很轻便呢?看一下花费的时间:异步

 

python fib_executor.py fib(25) = 75025 fib(26) = 121393 fib(27) = 196418 fib(28) = 317811 fib(29) = 514229 fib(30) = 832040 fib(31) = 1346269 fib(32) = 2178309 fib(33) = 3524578 fib(34) = 5702887 fib(35) = 9227465 fib(36) = 14930352 fib(37) = 24157817 COST: 10.8920350075



除了用map,另一个经常使用的方法是submit。若是你要提交的任务的函数是同样的,就能够简化成map。可是假如提交的任务函数是不同的,或者执行的过程之可能出现异常(使用map执行过程当中发现问题会直接抛出错误)就要用到submit:函数

 

from concurrent.futures import ThreadPoolExecutor, as_completed NUMBERS = range(30, 35) def fib(n): if n == 34: raise Exception("Don't do this") if n<= 2: return 1 return fib(n-1) + fib(n-2) with ThreadPoolExecutor(max_workers=3) as executor: future_to_num = {executor.submit(fib, num): num for num in NUMBERS} for future in as_completed(future_to_num): num = future_to_num[future] try: result = future.result() except Exception as e: print 'raise an exception: {}'.format(e) else: print 'fib({}) = {}'.format(num, result) with ThreadPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result)



执一下:tornado

 

python fib_executor_with_raise.py fib(30) = 832040 fib(31) = 1346269 raise an exception: Don't do this fib(32) = 2178309 fib(33) = 3524578 Traceback (most recent call last): File "fib_executor_with_raise.py", line 28, in <module> for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 580, in map yield future.result() File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 400, in result return self.__get_result() File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 359, in __get_result reraise(self._exception, self._traceback) File "/Library/Python/2.7/site-packages/concurrent/futures/_compat.py", line 107, in reraise exec('raise exc_type, exc_value, traceback', {}, locals_) File "/Library/Python/2.7/site-packages/concurrent/futures/thread.py", line 61, in run result = self.fn(*self.args, **self.kwargs) File "fib_executor_with_raise.py", line 9, in fib raise Exception("Don't do this") Exception: Don't do this



能够看到,第一次捕捉到了异常,可是第二次执行的时候错误直接抛出来了。this

上面说到的map,有些同窗立刻会说,这不是进程(线程)池的效果吗?看起来确实是的:spa

 

import time

from multiprocessing.pool import Pool NUMBERS = range(25, 38) def fib(n): if n<= 2: return 1 return fib(n-1) + fib(n-2) start = time.time() pool = Pool(3) results = pool.map(fib, NUMBERS) for num, result in zip(NUMBERS, pool.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result) print 'COST: {}'.format(time.time() - start)



好像代码量更小哟。好吧,看一下花费的时间:线程

 
    
 

python fib_pool.py

fib(25) = 75025

fib(26) = 121393

fib(27) = 196418

fib(28) = 317811

fib(29) = 514229

fib(30) = 832040

fib(31) = 1346269

fib(32) = 2178309

fib(33) = 3524578

fib(34) = 5702887

fib(35) = 9227465

fib(36) = 14930352

fib(37) = 24157817

COST: 17.1342718601
 
    
 

WhatTF居然花费了1.7倍的时间。为何?

BTW,有兴趣的同窗能够对比下ThreadPool和ThreadPoolExecutor,因为GIL的缘故,对比的差距必定会更多。

原理

咱们就拿ProcessPoolExecutor介绍下它的原理,引用官方代码注释中的流程图:

 

|======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | => | | => | Call Q | => | | | | +----------+ | | +-----------+ | | | | | ... | | | | ... | | | | | | 6 | | | | 5, call() | | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+



咱们结合源码和上面的数据流分析一下:

  1. executor.map会建立多个_WorkItem对象,每一个对象都传入了新建立的一个Future对象。
  2. 把每一个_WorkItem对象而后放进一个叫作「Work Items」的dict中,键是不一样的「Work Ids」。
  3. 建立一个管理「Work Ids」队列的线程「Local worker thread」,它能作2件事:
    1. 从「Work Ids」队列中获取Work Id, 经过「Work Items」找到对应的_WorkItem。若是这个Item被取消了,就从「Work Items」里面把它删掉,不然从新打包成一个_CallItem放入「Call Q」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「Result Q」队列中。
    2. 从「Result Q」队列中获取_ResultItems,而后从「Work Items」更新对应的Future对象并删掉入口。

看起来就是一个「生产者/消费者」模型罢了,错了。咱们要注意,整个过程并非多个进程与任务+结果-2个队列直接通讯的,而是经过一个中间的「Local worker thread」,它就是让效率提高的重要缘由之一!!!

设想,当某一段程序提交了一个请求,指望获得一个答复。但服务程序对这个请求可能很慢,在传统的单线程环境下,调用函数是同步的,也就是说它必须等到服务程序返回结果后,才能进行其余处理。而在Future模式下,调用方式改成异步,而原先等待返回的时间段,在主调用函数中,则可用于处理其余事物。

Future

Future是常见的一种并发设计模式,在多个其余语言中均可以见到这种解决方案。

一个Future对象表明了一些还没有就绪(完成)的结果,在「未来」的某个时间就绪了以后就能够获取到这个结果。好比上面的例子,咱们指望并发的执行一些参数不一样的fib函数,获取所有的结果。传统模式就是在等待queue.get返回结果,这个是同步模式,而在Future模式下,调用方式改成异步,而原先等待返回的时间段,因为「Local worker thread」的存在,这个时候能够完成其余工做

在tornado中也有对应的实现。2013年的时候,我曾经写过一篇博客使用tornado让你的请求异步非阻塞,最后也提到了用concurrent.futures实现异步非阻塞的完成耗时任务。

 

原文连接

相关文章
相关标签/搜索