因为和线程类似,这里的前几个示例都是从线程示例中修改的。python
内容目录
- multiprocessing基础
- 可导入的目标函数
- 肯定当前进程
- 守护进程 Daemon
- 等待进程 join()
- 终止进程 terminate()
- 进程退出状态
- 调试 log_to_stderr
- 子类化进程
生成进程的最简单的方法是用目标函数实例化一个进程对象,并调用start()来让它开始工做。bootstrap
import multiprocessing def worker(): """worker function""" print('Worker') if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker) jobs.append(p) p.start()
运行结果:并发
Worker Worker Worker Worker Worker
传递参数:app
import multiprocessing def worker(num): """thread worker function""" print('Worker:', num) if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start()
运行结果:函数
Worker: 1 Worker: 0 Worker: 3 Worker: 2 Worker: 4
# main.py import multiprocessing import multiprocessing_import_worker if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process( target=multiprocessing_import_worker.worker, ) jobs.append(p) p.start()
其中worker()函数在multiprocessing_import_worker.py中定义:操作系统
# multiprocessing_import_worker.py def worker(): """worker function""" print('Worker') return
运行结果:线程
Worker Worker Worker Worker Worker
import multiprocessing import time def worker(): name = multiprocessing.current_process().name print(name, 'Starting') time.sleep(2) print(name, 'Exiting') def my_service(): name = multiprocessing.current_process().name print(name, 'Starting') time.sleep(3) print(name, 'Exiting') if __name__ == '__main__': service = multiprocessing.Process( name='my_service', target=my_service, ) worker_1 = multiprocessing.Process( name='worker 1', target=worker, ) worker_2 = multiprocessing.Process( # default name target=worker, ) worker_1.start() worker_2.start() service.start()
运行结果:能够看到默认的进程名字对应于Process-3, 和线程很相似。调试
worker 1 Starting Process-3 Starting my_service Starting worker 1 Exiting Process-3 Exiting my_service Exiting
默认状况下主进程会在子进程所有执行完毕后才退出,若是子进程设置为守护进程,便再也不阻塞主进程退出。
为了将进程标记为守护进程只需将daemon属性设置为True。默认状况下,进程不是守护进程。日志
import multiprocessing import time import sys def daemon(): p = multiprocessing.current_process() print('Starting:', p.name, p.pid) sys.stdout.flush() time.sleep(2) print('Exiting :', p.name, p.pid) sys.stdout.flush() def non_daemon(): p = multiprocessing.current_process() print('Starting:', p.name, p.pid) sys.stdout.flush() print('Exiting :', p.name, p.pid) sys.stdout.flush() if __name__ == '__main__': d = multiprocessing.Process( name='daemon', target=daemon, ) d.daemon = True n = multiprocessing.Process( name='non-daemon', target=non_daemon, ) n.daemon = False d.start() time.sleep(1) n.start()
运行结果:能够看到d进程还没执行完主进程就退出了。code
Starting: daemon 24852 Starting: non-daemon 23248 Exiting : non-daemon 23248
在主程序退出以前,守护进程会自动终止,这将避免留下孤儿进程。
要等到进程完成工做再退出,请使用join()方法。
import multiprocessing import time import sys def daemon(): name = multiprocessing.current_process().name print('Starting:', name) time.sleep(2) print('Exiting :', name) def non_daemon(): name = multiprocessing.current_process().name print('Starting:', name) print('Exiting :', name) if __name__ == '__main__': d = multiprocessing.Process( name='daemon', target=daemon, ) d.daemon = True n = multiprocessing.Process( name='non-daemon', target=non_daemon, ) n.daemon = False d.start() time.sleep(1) n.start() d.join() n.join()
运行结果:
Starting: non-daemon Exiting : non-daemon Starting: daemon Exiting : daemon
默认状况下,join()无限期阻塞。也能够使用超时参数。
在一个进程调用terminate()会杀死子进程
import multiprocessing import time def slow_worker(): print('Starting worker') time.sleep(0.1) print('Finished worker') if __name__ == '__main__': p = multiprocessing.Process(target=slow_worker) print('BEFORE:', p, p.is_alive()) p.start() print('DURING:', p, p.is_alive()) p.terminate() print('TERMINATED:', p, p.is_alive()) p.join() print('JOINED:', p, p.is_alive())
运行结果:在终止它以后,使用join()是很重要的,以便让进程管理代码有时间来更新对象的状态以反映终止。
BEFORE: <Process(Process-1, initial)> False DURING: <Process(Process-1, started)> True TERMINATED: <Process(Process-1, started)> True JOINED: <Process(Process-1, stopped[SIGTERM])> False
当进程退出时产生的状态码能够经过exitcode属性访问。容许的范围列在下面的表格中。
Exit Code | Meaning |
---|---|
== 0 | row 1 col 2 |
> 0 | the process had an error, and exited with that code |
< 0 | the process had an error, and exited with that code |
import multiprocessing import sys import time def exit_error(): sys.exit(1) def exit_ok(): return def return_value(): return 1 def raises(): raise RuntimeError('There was an error!') def terminated(): time.sleep(3) if __name__ == '__main__': jobs = [] funcs = [ exit_error, exit_ok, return_value, raises, terminated, ] for f in funcs: print('Starting process for', f.__name__) j = multiprocessing.Process(target=f, name=f.__name__) jobs.append(j) j.start() jobs[-1].terminate() for j in jobs: j.join() print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))
运行结果:
Starting process for exit_error Starting process for exit_ok Starting process for return_value Starting process for raises Starting process for terminated Process raises: exit_error.exitcode = 1 exit_ok.exitcode = 0 Traceback (most recent call last): File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 258, in _bootstrap self.run() File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs) File "E:\MyPython\image.py", line 19, in raises raise RuntimeError('There was an error!') RuntimeError: There was an error! return_value.exitcode = 0 raises.exitcode = 1 terminated.exitcode = -15
exit(0):无错误退出
exit(1):有错误退出
退出代码是告诉解释器的(或操做系统)
在调试并发性问题时,能够使用multiprocessing所提供的对象的内部构件。有一个方便的模块级函数为logtostderr()。它使用logging设置一个记录器对象,并添加一个处理程序,以便将日志消息发送到标准错误通道。
import multiprocessing import logging import sys def worker(): print('Doing some work') sys.stdout.flush() if __name__ == '__main__': multiprocessing.log_to_stderr(logging.DEBUG) p = multiprocessing.Process(target=worker) p.start() p.join()
运行结果:认状况下,logging级别被设置为NOTSET不会产生任何消息。
Doing some work [INFO/Process-1] child process calling self.run() [INFO/Process-1] process shutting down [DEBUG/Process-1] running all "atexit" finalizers with priority >= 0 [DEBUG/Process-1] running the remaining "atexit" finalizers [INFO/Process-1] process exiting with exitcode 0 [INFO/MainProcess] process shutting down [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0 [DEBUG/MainProcess] running the remaining "atexit" finalizers
若想直接操做日志请使用get_logger()
获取
import multiprocessing import logging import sys def worker(): print('Doing some work') sys.stdout.flush() if __name__ == '__main__': multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) p = multiprocessing.Process(target=worker) p.start() p.join()
运行结果:
[INFO/Process-1] child process calling self.run() Doing some work [INFO/Process-1] process shutting down [INFO/Process-1] process exiting with exitcode 0 [INFO/MainProcess] process shutting down
经过multiprocessing.Process
能够建立进程,也能够经过自定义子类建立进程。
import multiprocessing class Worker(multiprocessing.Process): def run(self): print('In {}'.format(self.name)) return if __name__ == '__main__': jobs = [] for i in range(5): p = Worker() jobs.append(p) p.start() for j in jobs: j.join()
运行结果:
In Worker-2 In Worker-4 In Worker-3 In Worker-1 In Worker-5