要让Python程序实现多进程(multiprocessing),咱们先了解操做系统的相关知识。python
Unix/Linux操做系统提供了一个fork()
系统调用,它很是特殊。普通的函数调用,调用一次,返回一次,可是fork()
调用一次,返回两次,由于操做系统自动把当前进程(称为父进程)复制了一份(称为子进程),而后,分别在父进程和子进程内返回。windows
子进程永远返回0
,而父进程返回子进程的ID。这样作的理由是,一个父进程能够fork出不少子进程,因此,父进程要记下每一个子进程的ID,而子进程只须要调用getppid()
就能够拿到父进程的ID。多线程
Python的os
模块封装了常见的系统调用,其中就包括fork
,能够在Python程序中轻松建立子进程:app
import os print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
以上代码在windows上会运行失败,由于没有fork调用dom
python提供了multiprocessing包供多线程的开发,其提供了一个Process类来表明一个进程对象,使用方式和多线程Threading同样async
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.') # 输出: Parent process 8768. Child process will start. Run child process test (8572)... Child process end.
建立子进程时,只须要传入一个执行函数和函数的参数,建立一个Process
实例,用start()
方法启动函数
join()
方法能够等待子进程结束后再继续往下运行,一般用于进程间的同步。ui
若是要启动大量的子进程,能够用进程池的方式批量建立子进程:spa
from multiprocessing import Pool, current_process import os, time, random def long_time_task(name): print('Run task %s %s (%s)...' % (name, current_process().name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s %s runs %0.2f seconds.' % (name, current_process().name, (end - start))) return name def done(name): print("Task %s %s is done" % (name, current_process().name)) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,), callback=done) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') # 输出: Parent process 10040. Run task 0 PoolWorker-4 (5528)... Waiting for all subprocesses done... Run task 1 PoolWorker-1 (4844)... Run task 2 PoolWorker-2 (4892)... Run task 3 PoolWorker-3 (7492)... Task 0 PoolWorker-4 runs 1.70 seconds. Run task 4 PoolWorker-4 (5528)... Task 0 MainProcess is done Task 2 PoolWorker-2 runs 1.94 seconds. Task 2 MainProcess is done Task 1 PoolWorker-1 runs 2.26 seconds. Task 1 MainProcess is done Task 3 PoolWorker-3 runs 2.27 seconds. Task 3 MainProcess is done Task 4 PoolWorker-4 runs 1.83 seconds. Task 4 MainProcess is done All subprocesses done. # 如是 p.apply(long_time_task, args=(i,)) 阻塞版本的话(此处无callback参数),输出: Parent process 7624. Run task 0 PoolWorker-3 (2128)... Task 0 PoolWorker-3 runs 2.98 seconds. Run task 1 PoolWorker-4 (5460)... Task 1 PoolWorker-4 runs 1.51 seconds. Run task 2 PoolWorker-2 (8780)... Task 2 PoolWorker-2 runs 0.66 seconds. Run task 3 PoolWorker-1 (7044)... Task 3 PoolWorker-1 runs 1.13 seconds. Run task 4 PoolWorker-3 (2128)... Task 4 PoolWorker-3 runs 2.94 seconds. Waiting for all subprocesses done... All subprocesses done.
对Pool
对象调用join()
方法会等待全部子进程执行完毕,调用join()
以前必须先调用close()
,调用close()
以后就不能继续添加新的Process
了。操作系统
这里Pool(4)即表明同时跑4个进程,不填写默认是当前CPU个数。
上述代码中的p.apply_async()
是apply()
函数的变体,apply_async()
是apply()
的并行版本,apply()
是apply_async()
的阻塞版本,使用apply()
主进程会被阻塞直到子进程执行结束。apply()
既是Pool
的方法,也是Python内置的函数,二者等价。
apply_async()
是能够有callback回调函数的,回调函数的参数即为子进程函数的返回值(这里咱们能够设计多种业务,回传任务id,结束更改任务状态等)
apply()
没有回调函数,由于自己就是阻塞的,会等待返回子进程函数的返回值
对于进程间的通讯,multiprocessing提供了Queue,Pipes,Value等多种方式来交换数据
以Queue为例,在父进程中建立两个子进程,一个往Queue
里写数据,一个从Queue
里读数据:
# -*- coding: utf-8 -*- from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__=='__main__': # 父进程建立Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入: pw.start() # 启动子进程pr,读取: pr.start() # 等待pw结束: pw.join() # pr进程里是死循环,没法等待其结束,只能强行终止: pr.terminate() # 输出: Process to read: 8868 Process to write: 6832 Put A to queue... Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.
在Unix/Linux下,multiprocessing
模块封装了fork()
调用,使咱们不须要关注fork()
的细节。因为Windows没有fork
调用,所以,multiprocessing
须要“模拟”出fork
的效果,父进程全部Python对象都必须经过pickle序列化再传到子进程去,全部,若是multiprocessing
在Windows下调用失败了,要先考虑是否是pickle失败了。
当多个子进程操做同一个东西的时候,就可能会出现混乱的状况,好比咱们启动2个进程,对其中一个变量+1 和 +3
# -*- coding: utf-8 -*- import time from multiprocessing import Process, Value # 更改value值 def write(v, n): for i in range(10): time.sleep(0.1) v.value += n print v.value if __name__ == '__main__': """ typecode_to_type = { 'c': ctypes.c_char, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double } """ # int 类型的值 初始化0,其余类型见上 v = Value("i", 0) p1 = Process(target=write, args=(v, 1)) p2 = Process(target=write, args=(v, 3)) # 启动子进程p1,把v + 1: p1.start() # 启动子进程p2,把v + 3: p2.start() # 等待结束: p1.join() p2.join() # 输出: 1 4 5 8 9 12 13 16 17 20 21 24 25 28 29 32 33 36 37 40
可见输出比较混乱,没有按预想的进程1,输出1,2,3,4,5... 进程2输出3,6,9...,这时就须要进程间的锁
# -*- coding: utf-8 -*- import time from multiprocessing import Process, Value, Lock def write(v, n, lock): with lock: for i in range(10): time.sleep(0.1) v.value += n print v.value if __name__ == '__main__': """ typecode_to_type = { 'c': ctypes.c_char, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double } """ lock = Lock() # int 类型的值 初始化0,其余类型见上 v = Value("i", 0) p1 = Process(target=write, args=(v, 1, lock)) p2 = Process(target=write, args=(v, 3, lock)) # 启动子进程p1,把v + 1: p1.start() # 启动子进程p2,把v + 3: p2.start() # 等待结束: p1.join() p2.join() # 输出: 1 2 3 4 5 6 7 8 9 10 13 16 19 22 25 28 31 34 37 40
此时结果就对了,可是这里由于有锁的存在,会串行,致使效率降低些
在Unix/Linux下,可使用fork()
调用实现多进程。
要实现跨平台的多进程,可使用multiprocessing
模块。
进程间通讯是经过Queue
、Pipes
等实现的。