python由于其全局解释器锁GIL而没法经过线程实现真正的平行计算。这个论断咱们不展开,可是有个概念咱们要说明,IO密集型 vs. 计算密集型。python
IO密集型:读取文件,读取网络套接字频繁。编程
计算密集型:大量消耗CPU的数学与逻辑运算,也就是咱们这里说的平行计算。网络
而concurrent.futures模块,能够利用multiprocessing实现真正的平行计算。并发
核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序能够利用多核CPU来提高执行速度。因为子进程与主解释器相分离,因此他们的全局解释器锁也是相互独立的。每一个子进程都可以完整的使用一个CPU内核。异步
Future总结异步编程
1. python3自带,python2须要安装 2. Executer对象 它是一个抽象类,它提供了异步执行的方法,他不能直接使用,但能够经过它的子类 ThreadPoolExecuter和ProcessPoolExecuter 2.1 Executer.submit(fn,*args,**kwargs) fn:须要异步执行的函数 *args,**kwargs fn接受的参数 该方法的做用就是提交一个可执行的回调task,它返回一个Future对象 2.2 map(fn,*iterables, timeout=None, chunksize=1) map(task,URLS) # 返回一个map()迭代器,这个迭代器中的回调执行返回的结果是有序的 3. Future对象相关 future能够理解为一个在将来完成的操做,这是异步编程的基础 一般状况下咱们在遇到IO操做的时候,将会发生阻塞,cpu不能作其余事情 而future的引入帮助咱们在这段等待时间能够完成其余的操做 3.1 done(): 若是当前线程已取消/已成功,返回True。 3.2 cance(): 若是当前线程正在执行,而且不能取消调用,返回Flase。不然调用取消,返回True 3.3 running(): 若是当前的线程正在执行,则返回True 3.4 result(): 返回调用返回的值,若是调用还没有完成,则此方法等待 若是等待超时,会抛出concurrent.futures.TimeoutError 若是没有指定超时时间,则等待无时间限制 若是在完成以前,取消了Future,则会引起CancelledError 4. as_completed(): 在多个Future实例上的迭代器将会被返回 这些Future实例由fs完成时产生。 由fs返回的任何重复的Future,都会被返回一次。 里面保存的都是已经执行完成的Future对象 5. wait(): 返回一个元祖,元祖包含两个元素 1. 已完成的future集合 2. 未完成的future集合
# coding=utf-8 from concurrent import futures from concurrent.futures import Future import time def return_future(msg): time.sleep(3) return msg pool = futures.ThreadPoolExecutor(max_workers=2) t1 = pool.submit(return_future,'hello') t2 = pool.submit(return_future,'world') time.sleep(3) print(t1.done()) # 若是顺利完成,则返回True time.sleep(3) print(t2.done()) print(t1.result()) # 获取future的返回值 time.sleep(3) print(t2.result()) print("主线程")
map
(func,* iterables,timeout = None,chunksize = 1 )函数
# coding=utf-8 import time from concurrent.futures import Future,as_completed from concurrent.futures import ThreadPoolExecutor as Pool import requests import time URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url,timeout=10): return requests.get(url=url,timeout=timeout) pool = Pool() result = pool.map(task,URLS) start_time = time.time() # 按照URLS的顺序返回 for res in result: print("{} {}".format(res.url,len(res.content))) # 无序的 with Pool(max_workers=3) as executer: future_task = [executer.submit(task,url) for url in URLS] for f in as_completed(future_task): if f.done(): f_ret = f.result() # f.result()获得task的返回值,requests对象 print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content))) print("耗时",time.time() - start_time) print("主线程")
Future能够理解为一个将来完成的操做
当咱们执行io操做的时候,在等待返回结果以前会产生阻塞
cpu不能作其余事情,而Future的引入帮助咱们在等待的这段时间能够完成其余操做url
from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed import requests import time URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url,timeout=10): return requests.get(url=url,timeout=timeout) # start_time = time.time() # for url in URLS: # ret = task(url) # print("{} {}".format(ret.url,len(ret.content))) # print("耗时",time.time() - start_time) with Pool(max_workers=3) as executor: # 建立future任务 future_task = [executor.submit(task,url) for url in URLS] for f in future_task: if f.running(): print("%s is running"%str(f)) for f in as_completed(future_task): try: ret = f.done() if ret: f_ret = f.result() print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content))) except Exception as e: f.cance() print(e) """ url不是按照顺序返回的,说明并发时,当访问某一个url时,若是没有获得返回结果,不会发生阻塞 <Future at 0x1c63990e6d8 state=running> is running <Future at 0x1c639922780 state=running> is running <Future at 0x1c639922d30 state=running> is running <Future at 0x1c63990e6d8 state=finished returned Response>, done, result: http://www.baidu.com/, 2381 <Future at 0x1c639922780 state=finished returned Response>, done, result: https://www.qq.com?fromdefault, 243101 <Future at 0x1c639922d30 state=finished returned Response>, done, result: http://sina.com/, 23103 """
concurrent.futures.
wait
(fs, timeout=None, return_when=ALL_COMPLETED)wait()会返回一个tuple,
tuple会包含两个集合
1. 已完成的集合
2. 未完成的集合
使用wait()会得到更大的自由度,他接受三个参数
FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE
默认为ALL_COMPLETE
from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed,wait import requests URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url,timeout=10): return requests.get(url=url,timeout=timeout) with Pool(max_workers=3) as execute : fulture_task = [execute.submit(task,url) for url in URLS] for f in fulture_task: if f.running(): print("%s"%(str(f))) """ 而且wait还有timeout和return_when两个参数 return_when有三个常量 FIRST_COMPLETED 任何一个future_task执行完成时/取消时,改函数返回 FIRST_EXCEPTION 任何一个future_task发生异常时,该函数返回,若是没有异常发生,等同于ALL_COMPLETED ALL_COMPLETED 当全部的future_task执行完毕返回 """ results = wait(fulture_task,return_when="FIRST_COMPLETED")# done = results[0] for d in done: print(d)
concurrent.futures.
as_completed
(fs, timeout=None)在多个Future实例上的迭代器将会被返回
这些Future实例由fs完成时产生。
由fs返回的任何重复的Future,都会被返回一次。
里面保存的都是已经执行完成的Future对象
from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed import requests import time URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url,timeout=10): return requests.get(url=url,timeout=timeout) with Pool(max_workers=3) as executor: # 建立future任务 future_task = [executor.submit(task,url) for url in URLS] for f in future_task: if f.running(): print("%s is running"%str(f)) for f in as_completed(future_task): try: ret = f.done() if ret: f_ret = f.result() print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content))) except Exception as e: f.cance() print(e)