先看个例子:python
import time from concurrent.futures import ThreadPoolExecutor def foo(): print('enter at {} ...'.format(time.strftime('%X'))) time.sleep(5) print('exit at {} ...'.format(time.strftime('%X'))) executor = ThreadPoolExecutor() executor.submit(foo) executor.shutdown()
执行结果:函数
enter at 16:20:31 ... exit at 16:20:36 ...
shutdown(wait=True) 方法默认阻塞当前线程,等待子线程执行完毕。即便 shutdown(wait=Fasle)也只是非阻塞的关闭线程池,线程池中正在执行任务的子线程并不会被立刻中止,而是会继续执行直到执行完毕。尝试在源码中给新开启的子线程调用t.join(0)来立马强制中止子线程t,也不行,究竟是什么缘由保证了线程池中的线程在关闭线程池时,线程池中正在执行任务的子线程们不会被关闭呢?this
看一下ThreadPoolExecutor源码:线程
class ThreadPoolExecutor(_base.Executor): def __init__(self, max_workers=None, thread_name_prefix=''): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. """ if max_workers is None: # Use this number because ThreadPoolExecutor is often # used to overlap I/O instead of CPU work. max_workers = (os.cpu_count() or 1) * 5 if max_workers <= 0: raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() self._thread_name_prefix = thread_name_prefix def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() # 把目标函数f包装成worker对象,执行worker.run()会调用f() w = _WorkItem(f, fn, args, kwargs) # 把worker对象放入到队列中 self._work_queue.put(w) # 开启一个新的线程不断的从queue中获取worker对象,获取到则调用worker.run() self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): # 当执行del executor时,这个回调方法会被调用,也就是说当executor对象被垃圾回收时调用 def weakref_cb(_, q=self._work_queue): q.put(None) num_threads = len(self._threads) if num_threads < self._max_workers: thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads) # 把_worker函数做为新线程的执行函数 t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue)) t.daemon = True t.start() self._threads.add(t) # 这一步很重要,是确保该线程t不被t.join(0)强制中断的关键。具体查看_python_exit函数 _threads_queues[t] = self._work_queue def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True self._work_queue.put(None) if wait: for t in self._threads: t.join() shutdown.__doc__ = _base.Executor.shutdown.__doc__
submit(func) 干了两件事:code
_adjust_thread_count()干了两件事:orm
开启一个新线程执行_worker函数,这个函数的做用就是不断去queue中取出worker, 执行woker.run(),即执行func()对象
把新线程跟队列queue绑定,防止线程被join(0)强制中断。队列
来看一下_worker函数源码:get
def _worker(executor_reference, work_queue): try: while True: # 不断从queue中取出worker对象 work_item = work_queue.get(block=True) if work_item is not None: # 执行func() work_item.run() # Delete references to object. See issue16284 del work_item continue # 从弱引用对象中返回executor executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. # 当executor执行shutdown()方法时executor._shutdown为True,同时会放入None到队列, # 当work_item.run()执行完毕时,又会进入到下一轮循环从queue中获取worker对象,可是 # 因为shutdown()放入了None到queue,所以取出的对象是None,从而判断这里的if条件分支, # 发现executor._shutdown是True,又放入一个None到queue中,是来通知其余线程跳出while循环的 # shutdown()中的添加None到队列是用来结束线程池中的某一个线程的,这个if分支中的添加None # 队列是用来通知其余线程中的某一个线程结束的,这样连锁反应使得全部线程执行完func中的逻辑后都会结束 if _shutdown or executor is None or executor._shutdown: # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True)
能够看出,这个 _worker方法的做用就是在新新线程中不断得到queue中的worker对象,执行worker.run()方法,执行完毕后经过放入None到queue队列的方式来通知其余线程结束。源码
再来看看_adjust_thread_count()方法中的_threads_queues[t] = self._work_queue这个操做是如何实现防止join(0)的操做强制中止正在执行的线程的。
import atexit _threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True items = list(_threads_queues.items()) for t, q in items: q.put(None) # 取出_threads_queues中的线程t,执行t.join()强制等待子线程完成 for t, q in items: t.join() atexit.register(_python_exit)
这个atexit模块的做用是用来注册一个函数,当MainThread中的逻辑执行完毕时,会执行注册的这个_python_exit函数。而后执行_python_exit中的逻辑,也就是说t.join()会被执行,强制阻塞。这里好奇,既然是在MainThread结束后执行,那这个t.join()是在什么线程中被执行的呢。实际上是一个叫_DummyThread线程的虚拟线程中执行的。
import atexit import threading import weakref import time threads_queues = weakref.WeakKeyDictionary() def foo(): print('enter at {} ...'.format(time.strftime('%X'))) time.sleep(5) print('exit at {} ...'.format(time.strftime('%X'))) def _python_exit(): items = list(threads_queues.items()) print('current thread in _python_exit --> ', threading.current_thread()) for t, _ in items: t.join() atexit.register(_python_exit) if __name__ == '__main__': t = threading.Thread(target=foo) t.setDaemon(True) t.start() threads_queues[t] = foo print(time.strftime('%X')) t.join(timeout=2) print(time.strftime('%X')) t.join(timeout=2) print(time.strftime('%X')) print('current thread in main -->', threading.current_thread()) print(threading.current_thread(), 'end')
执行结果:
enter at 17:13:44 ... 17:13:44 17:13:46 17:13:48 current thread in main --> <_MainThread(MainThread, started 12688)> <_MainThread(MainThread, started 12688)> end current thread in _python_exit --> <_DummyThread(Dummy-2, started daemon 12688)> exit at 17:13:49 ...
从这个例子能够看到,当线程t开启时foo函数阻塞5秒,在MainThread中2次调用t.join(timeout=2),分别的等待了2秒,总等待时间是4秒,可是当执行第二个t.join(timeout=2)后,线程t依然没有被强制中止,而后主线执行完毕,而后_python_exit方法被调用,在_DummyThread线程中由调用t.join(),继续等待子线程t的执行完毕,直到线程t打印exit at 17:13:49 ...
才执行完毕。
总结:
join()是能够被一个线程屡次调用的,至关是屡次等待的叠加。把_python_exit函数注册到atexit模块后,其余线程即便企图调用t.jion(n)来终止线程t也不起做用,由于_python_exit老是在最后执行时调用t.jion()来保证让线程t执行完毕,而不是被中途强制中止。