1、什么是future?
一、标准库中这样说,The Future class encapsulates the asynchronous execution of a callable. python
流畅的python这样说,期物封装待完成的操做,能够放入队列,完成的状态可查询,获得结果(或抛出异常)后能够获取结果(或异常)。
安道将future翻译为期物仍是很形象的,安道翻译的书质量仍是有保证的。git
二、从原码看Future的对象有5种状态,封装了一把锁,一个状态值,结果,异常,和回调函数。状态分别经过cancelled,running,done,来查询。
经过set_result,和set_exception来设置结果和异常,并触发回调函数,回调函数只有一个参数就是future自己,在回调函数中能够获取该future绑定操做的结果。
经过result获取最终结果,若是有异常就raise该异常。能够经过exception获取异常,并无raise该异常。
concurrent.futures.ThreadPoolExecutor设置结果、异常的线程和获取结果的线程再也不一个线程,这个时候self._condition这一把锁就起了做用,并且这把锁也是在future绑定的操做未完成以前,经过result()方法获取结果时阻塞的缘由。
三、concurrent.futures中,不过与其说future是封装了一个操做,不如说是每个future绑定了一个操做。github
注意:当在线程池内部发生异常的时候并不会直接raise该异常而是经过futures的set_exception()方法将异常暂时封装到future中。当future封装的操做完成的时候,经过其result()方法获取结果是会raise在线程池内部发生的exception。json
2、ThreadPoolExecutor,在什么地方建立的线程,如何控制的线程个数。
一、ThreadPoolExecutor有一个任务队列,一个保存线程对象的set.
二、在init方法中能够看出,线程池默认最大线程个数为( cpu个数*5 )
if max_workers is None:
max_workers = (os.cpu_count() or 1) * 5
三、在submit方法中将一个future对象,和一个操做绑定到一个_WorkItem任务中,在其run方法中会把fn操做的结果和异常放到对应的future中,每有一个fn对应一个future,submit返回这个future。因此能够在线程池外经过futrue对于fn的状态进行查询,并获取fn的结果或其异常。以后将_WorkItem的对象丢到任务队列中。
四、在submit中判断线程数,若是线程数未达到最大线程数,就新建线程。新建的线程target为_worker,_worker的任务就是取出任务queue中的_WorkItem而后run()。网络
class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future self.fn = fn self.args = args self.kwargs = kwargs def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException as e: self.future.set_exception(e) else: self.future.set_result(result) def _worker(executor_reference, work_queue): try: while True: work_item = work_queue.get(block=True) if work_item is not None: work_item.run() # Delete references to object. See issue16284 del work_item continue executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) class ThreadPoolExecutor(_base.Executor): def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue)) t.daemon = True t.start() self._threads.add(t) _threads_queues[t] = self._work_queue
3、为何Executor.map按可迭代的顺序返回参数,而as_completed会先返回完成的future?async
一、Executor实现了map方法,ThreadPoolExecutor继承了它。
二、Executor的map方法先调用submit方法,将对应操做fn,丢到线程池中,并得到一个future List,而后经过迭代这个列表依次获取对应future中的结果。
三、as_completed是先查询全部fs(future)的状态,而后返回已经完成的future,客户端代码会先获取的已经完成的future,而后不断检查获取已完成的future,而后返回,因此与提交任务的顺序无关会先返回完成的任务。函数
四、文章结尾代码为对应的map和as_completed效果对比。测试
class Executor(object): def map(self, fn, *iterables, timeout=None, chunksize=1): if timeout is not None: end_time = timeout + time.time() fs = [self.submit(fn, *args) for args in zip(*iterables)] # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): try: for future in fs: if timeout is None: yield future.result() else: yield future.result(end_time - time.time()) finally: for future in fs: future.cancel() return result_iterator()
4、futures 未完成时为何会在Executor.map,和as_completed方法阻塞?
一、self._condition.wait(timeout)这把锁是阻塞的缘由,调用result的客户端代码,和调用set_result、set_exception的线程池代码不在一个线程中,只有在future对应的任务完成以后,线程池中的线程经过set_result、set_exception中的self._condition.notify_all(),从新唤醒wait的客户端代码线程,这个时候阻塞解除,获取到对应的已完成的future。ui
class Future(object): def result(self, timeout=None): with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: return self.__get_result() self._condition.wait(timeout) if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: return self.__get_result() else: raise TimeoutError()
附:测试代码url
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed import requests from requests.exceptions import ConnectionError from functools import partial from os import cpu_count def get_url(url): try: r = requests.get(url) except ConnectionError: raise ConnectionError('检查网络连接!') return url, r.status_code URLS = [ 'https://my.oschina.net/u/2255341/blog', 'https://github.com/timeline.json', 'http://www.oschina.net/', ] if __name__ == '__main__': # get_url('https://github.com/timeline.json') executor = ThreadPoolExecutor(max_workers=2) for res in executor.map(get_url, URLS): print(res) print('------------------------------------------------') for future in as_completed(map(partial(executor.submit, get_url), URLS)): res = future.result() print(res)
(py3env) ➜ concurrent_futures git:(master) ✗ python download.py
('https://my.oschina.net/u/2255341/blog', 403)
('https://github.com/timeline.json', 410)
('http://www.oschina.net/', 403)
------------------------------------------------
('https://my.oschina.net/u/2255341/blog', 403)
('http://www.oschina.net/', 403)
('https://github.com/timeline.json', 410)
代码地址:https://github.com/kagxin/recipe/blob/master/concurrent_futures/download.py