python由于其全局解释器锁GIL而没法经过线程实现真正的平行计算。这个论断咱们不展开,可是有个概念咱们要说明,IO密集型 vs. 计算密集型。html
IO密集型:读取文件,读取网络套接字频繁。python
计算密集型:大量消耗CPU的数学与逻辑运算,也就是咱们这里说的平行计算。网络
而concurrent.futures模块,能够利用multiprocessing实现真正的平行计算。多线程
核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序能够利用多核CPU来提高执行速度。因为子进程与主解释器相分离,因此他们的全局解释器锁也是相互独立的。每一个子进程都可以完整的使用一个CPU内核。并发
第一章 concurrent.futures性能阐述app
这个函数是一个计算密集型的函数。异步
# -*- coding:utf-8 -*- # 求最大公约数 def gcd(pair): a, b = pair low = min(a, b) for i in range(low, 0, -1): if a % i == 0 and b % i == 0: return i numbers = [ (1963309, 2265973), (1879675, 2493670), (2030677, 3814172), (1551645, 2229620), (1988912, 4736670), (2198964, 7876293) ]
import time start = time.time() results = list(map(gcd, numbers)) end = time.time() print 'Took %.3f seconds.' % (end - start) Took 2.507 seconds.
消耗时间是:2.507。socket
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor start = time.time() pool = ThreadPoolExecutor(max_workers=2) results = list(pool.map(gcd, numbers)) end = time.time() print 'Took %.3f seconds.' % (end - start) Took 2.840 seconds.
消耗时间是:2.840。async
上面说过gcd是一个计算密集型函数,由于GIL的缘由,多线程是没法提高效率的。同时,线程启动的时候,有必定的开销,与线程池进行通讯,也会有开销,因此这个程序使用了多线程反而更慢了。函数
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor start = time.time() pool = ProcessPoolExecutor(max_workers=2) results = list(pool.map(gcd, numbers)) end = time.time() print 'Took %.3f seconds.' % (end - start) Took 1.861 seconds.
消耗时间:1.861。
在两个CPU核心的机器上运行多进程程序,比其余两个版本都快。这是由于,ProcessPoolExecutor类会利用multiprocessing模块所提供的底层机制,完成下列操做:
1)把numbers列表中的每一项输入数据都传给map。
2)用pickle模块对数据进行序列化,将其变成二进制形式。
3)经过本地套接字,将序列化以后的数据从煮解释器所在的进程,发送到子解释器所在的进程。
4)在子进程中,用pickle对二进制数据进行反序列化,将其还原成python对象。
5)引入包含gcd函数的python模块。
6)各个子进程并行的对各自的输入数据进行计算。
7)对运行的结果进行序列化操做,将其转变成字节。
8)将这些字节经过socket复制到主进程之中。
9)主进程对这些字节执行反序列化操做,将其还原成python对象。
10)最后,把每一个子进程所求出的计算结果合并到一份列表之中,并返回给调用者。
multiprocessing开销比较大,缘由就在于:主进程和子进程之间通讯,必须进行序列化和反序列化的操做。
第二章 concurrent.futures源码分析
能够任务Executor是一个抽象类,提供了以下抽象方法submit,map(上面已经使用过),shutdown。值得一提的是Executor实现了__enter__和__exit__使得其对象能够使用with操做符。关于上下文管理和with操做符详细请参看这篇博客http://www.cnblogs.com/kangoroo/p/7627167.html
ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来建立线程池和进程池的代码。
class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" def submit(self, fn, *args, **kwargs): """Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable. Returns: A Future representing the given call. """ raise NotImplementedError() def map(self, fn, *iterables, **kwargs): """Returns a iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. """ timeout = kwargs.get('timeout') if timeout is not None: end_time = timeout + time.time() fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)] # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): try: for future in fs: if timeout is None: yield future.result() else: yield future.result(end_time - time.time()) finally: for future in fs: future.cancel() return result_iterator() def shutdown(self, wait=True): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other methods can be called after this one. Args: wait: If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed. """ pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown(wait=True) return False
下面咱们以线程ProcessPoolExecutor的方式说明其中的各个方法。
map(self, fn, *iterables, **kwargs)
map方法的实例咱们上面已经实现过,值得注意的是,返回的results列表是有序的,顺序和*iterables迭代器的顺序一致。
这里咱们使用with操做符,使得当任务执行完成以后,自动执行shutdown函数,而无需编写相关释放代码。
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor start = time.time() with ProcessPoolExecutor(max_workers=2) as pool: results = list(pool.map(gcd, numbers)) print 'results: %s' % results end = time.time() print 'Took %.3f seconds.' % (end - start)
产出结果是:
results: [1, 5, 1, 5, 2, 3]
Took 1.617 seconds.
submit(self, fn, *args, **kwargs)
submit方法用于提交一个可并行的方法,submit方法同时返回一个future实例。
future对象标识这个线程/进程异步进行,并在将来的某个时间执行完成。future实例表示线程/进程状态的回调。
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor start = time.time() futures = list() with ProcessPoolExecutor(max_workers=2) as pool: for pair in numbers: future = pool.submit(gcd, pair) futures.append(future) print 'results: %s' % [future.result() for future in futures] end = time.time() print 'Took %.3f seconds.' % (end - start)
产出结果是:
results: [1, 5, 1, 5, 2, 3]
Took 2.289 seconds.
submit函数返回future对象,future提供了跟踪任务执行状态的方法。好比判断任务是否执行中future.running(),判断任务是否执行完成future.done()等等。
as_completed方法传入futures迭代器和timeout两个参数
默认timeout=None,阻塞等待任务执行完成,并返回执行完成的future对象迭代器,迭代器是经过yield实现的。
timeout>0,等待timeout时间,若是timeout时间到仍有任务未能完成,再也不执行并抛出异常TimeoutError
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed start = time.time() with ProcessPoolExecutor(max_workers=2) as pool: futures = [ pool.submit(gcd, pair) for pair in numbers] for future in futures: print '执行中:%s, 已完成:%s' % (future.running(), future.done()) print '#### 分界线 ####' for future in as_completed(futures, timeout=2): print '执行中:%s, 已完成:%s' % (future.running(), future.done()) end = time.time() print 'Took %.3f seconds.' % (end - start)
wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另一个是uncompleted(未完成的)。
使用wait方法的一个优点就是得到更大的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE,默认设置为ALL_COMPLETED。
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed, wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION start = time.time() with ProcessPoolExecutor(max_workers=2) as pool: futures = [ pool.submit(gcd, pair) for pair in numbers] for future in futures: print '执行中:%s, 已完成:%s' % (future.running(), future.done()) print '#### 分界线 ####' done, unfinished = wait(futures, timeout=2, return_when=ALL_COMPLETED) for d in done: print '执行中:%s, 已完成:%s' % (d.running(), d.done()) print d.result() end = time.time() print 'Took %.3f seconds.' % (end - start)
因为设置了ALL_COMPLETED,因此wait等待全部的task执行完成,能够看到6个任务都执行完成了。
执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:False, 已完成:False 执行中:False, 已完成:False #### 分界线 #### 执行中:False, 已完成:True 执行中:False, 已完成:True 执行中:False, 已完成:True 执行中:False, 已完成:True 执行中:False, 已完成:True 执行中:False, 已完成:True Took 1.518 seconds.
若是咱们将配置改成FIRST_COMPLETED,wait会等待直到第一个任务执行完成,返回当时全部执行成功的任务。这里并无作并发控制。
重跑,结构以下,能够看到执行了2个任务。
执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:True, 已完成:False 执行中:False, 已完成:False 执行中:False, 已完成:False #### 分界线 #### 执行中:False, 已完成:True 执行中:False, 已完成:True Took 1.517 seconds.