在写python多线程代码的时候,会用到multiprocessing这个包,这篇文章总结了一些这个包在多进程管理方面的一些原理和代码分析。python
1. 问题一:是否须要显式调用pool的close和join方法,不调用的话,子进程是否没法退出?bootstrap
首先初始化Pool的时候,指定processes的个数,就是pool中worker的个数,pool初始化的时候,会把worker以daemon=True的子进程方式启动起来。api
def _repopulate_pool(self): """Bring the number of pool processes up to the specified number, for use after reaping workers which have exited. """ for i in range(self._processes - len(self._pool)): w = self.Process(target=worker, args=(self._inqueue, self._outqueue, self._initializer, self._initargs, self._maxtasksperchild) ) self._pool.append(w) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() debug('added worker')
推荐在使用完pool以后,用thread pool的时候调用close()和join()方法,这样能够把pool中的worker都释放掉(等待子任务结束)。可是若是不显式的调用,在主进程退出的时候,这些子进程也会退出(缘由是设置了daemon这个flag)。多线程
def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, active_children=active_children, current_process=current_process): # NB: we hold on to references to functions in the arglist due to the # situation described below, where this function is called after this # module's globals are destroyed. global _exiting info('process shutting down') debug('running all "atexit" finalizers with priority >= 0') _run_finalizers(0) if current_process() is not None: # NB: we check if the current process is None here because if # it's None, any call to ``active_children()`` will throw an # AttributeError (active_children winds up trying to get # attributes from util._current_process). This happens in a # variety of shutdown circumstances that are not well-understood # because module-scope variables are not apparently supposed to # be destroyed until after this function is called. However, # they are indeed destroyed before this function is called. See # issues 9775 and 15881. Also related: 4106, 9205, and 9207. for p in active_children(): if p._daemonic: info('calling terminate() for daemon %s', p.name) p._popen.terminate() for p in active_children(): info('calling join() for process %s', p.name) p.join() debug('running the remaining "atexit" finalizers') _run_finalizers()
主进程退出的时候,会调用_exit_function, 若是看到active的children是_daemonic的就会调用其terninate方法,让子进程退出。exit是经过这个调用注册的,atexit.register(_exit_function),本质是利用系统的退出hook方法,在退出的时候触发对应的函数。app
2. 问题二:若是启动以后,kill -9主进程,子进程会不会没法退出?函数
以下代码是pool中worker的主代码逻辑,若是kill -9主进程,子进程若是没有在处理做业,由于主进程退出了,get()方法从queue中拿task的时候,就会发生exception,这样worker会退出。若是子进程正在处理任务,任务结束的时候,须要往queue中扔回结果,由于主进程退出了,因此也会exception,worker同样会退出。this
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): inqueue._writer.close() outqueue._reader.close() if initializer is not None: initializer(*initargs) completed = 0 while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() except (EOFError, IOError): debug('worker got EOFError or IOError -- exiting') break if task is None: debug('worker got sentinel -- exiting') break job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) except Exception, e: result = (False, e) try: put((job, i, result)) except Exception as e: wrapped = MaybeEncodingError(e, result[1]) debug("Possible encoding error while sending result: %s" % ( wrapped)) put((job, i, (False, wrapped))) completed += 1 debug('worker exiting after %d tasks' % completed)
worker退出的时候,看以下代码spa
## process.py def _bootstrap(self): from . import util global _current_process try: self._children = set() self._counter = itertools.count(1) try: sys.stdin.close() sys.stdin = open(os.devnull) except (OSError, ValueError): pass _current_process = self util._finalizer_registry.clear() util._run_after_forkers() util.info('child process calling self.run()') try: self.run() exitcode = 0 finally: util._exit_function()
子进程run()会结束,而后调用_exit_function()清理一些子进程,调用_run_finalizers()结束进程。线程
可是若是子进程在pool的worker中跑的是长时间不退出的task,那这个子进程就会没法退出,一直在运行。若是task都是短做业,即便主进程被kill -9,子进程也会在做业跑完以后都退出。debug