python的multiprocessing模块是用来建立多进程的,下面对multiprocessing总结一下使用记录。html
multiprocessing建立多进程在windows和linux系统下的对比python
import ospid = os.fork() # 建立一个子进程if pid == 0: print('这是子进程') print(os.getpid(),os.getppid())else: print('这是父进程') print(os.getpid())os.wait() # 等待子进程结束释放资源
fork函数被调用后会返回两次,pid为0的表明子进程,其余返回子进程的id号表示父进程。linux
getpid和getppid函数能够获取本进程和父进程的id号;windows
fork方式的缺点:安全
兼容性差,只能在类linux系统下使用,windows系统不可以使用;app
扩展性差,当须要多条进程的时候,进程管理变得很复杂;异步
会产生“孤儿”进程和“僵尸”进程,须要手动回收资源。async
优势:函数
是系统自带的接近低层的建立方式,运行效率高。ui
建立方式一:
from multiprocessing import Queue, Processimport osdef test(): time.sleep(2) print('this is process {}'.format(os.getpid()))if __name__ == '__main__': p = Process(target=test) p.start() # 子进程 开始执行 p.join() # 等待子进程结束 print('ths peocess is ended')
建立方式二:
from multiprocessing import Queue, Processimport osclass MyProcess(Process): def run(self): time.sleep(2) print('this is process {}'.format(os.getpid())) def __del__(self): print('del the process {}'.format(os.getpid()))if __name__ == '__main__': p = MyProcess() p.start() print('ths process is ended')# 结果:ths process is ended this is process 7600del the process 7600del the process 12304
说明:
Process对象能够建立进程,但Process对象不是进程,其删除与否与系统资源是否被回收没有直接的关系。
上例看到del方法被调用了两次,Process进程建立时,子进程会将主进程的Process对象彻底复制一份,这样在主进程和子进程各有一个Process对象,可是p1.start()启动的是子进程,主进程中的Process对象做为一个静态对象存在。
主进程执行完毕后会默认等待子进程结束后回收资源,不须要手动回收资源;
join()函数用来控制子进程结束的顺序,主进程会阻塞等待子进程结束,其内部也有一个清除僵尸进程的函数,能够回收资源;
当子进程执行完毕后,会产生一个僵尸进程,其会被join函数回收,或者再有一条进程开启,start函数也会回收僵尸进程,因此不必定须要写join函数。
windows系统在子进程结束后会当即自动清除子进程的Process对象,而linux系统子进程的Process对象若是没有join函数和start函数的话会在主进程结束后统一清除。
class Process(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): pass # Process对象是python用来建立进程的类 group:扩展保留字段; target:目标代码,通常是咱们须要建立进程执行的目标函数。 name:进程的名字,若是不指定会自动分配一个; args:目标函数的普通参数; kwargs:目标函数的键值对参数; # 方法 start():建立一个子进程并执行,该方法一个Process实例只能执行一次,其会建立一个进程执行该类的run方法。 run():子进程须要执行的代码; join():主进程阻塞等待子进程直到子进程结束才继续执行,能够设置等待超时时间timeout. terminate():使活着的进程终止; is_alive():判断子进程是否还活着。
若是须要建立大量的进程,就须要使用Pool了。
from multiprocessing import Queue, Process, Poolimport osdef test(): time.sleep(2) print('this is process {}'.format(os.getpid()))def get_pool(n=5): p = Pool(n) # 设置进程池的大小 for i in range(10): p.apply_async(test) p.close() # 关闭进程池 p.join()if __name__ == '__main__': get_pool() print('ths process is ended')
分析:厦门厦工叉车怎么样?
如上,进程池Pool被建立出来后,即便实际须要建立的进程数远远大于进程池的最大上限,p1.apply_async(test)代码依旧会不停的执行,并不会停下等待;至关于向进程池提交了10个请求,会被放到一个队列中;
当执行完p1 = Pool(5)这条代码后,5条进程已经被建立出来了,只是尚未为他们各自分配任务,也就是说,不管有多少任务,实际的进程数只有5条,计算机每次最多5条进程并行。
当Pool中有进程任务执行完毕后,这条进程资源会被释放,pool会按先进先出的原则取出一个新的请求给空闲的进程继续执行;
当Pool全部的进程任务完成后,会产生5个僵尸进程,若是主线程不结束,系统不会自动回收资源,须要调用join函数去回收。
join函数是主进程等待子进程结束回收系统资源的,若是没有join,主程序退出后无论子进程有没有结束都会被强制杀死;
建立Pool池时,若是不指定进程最大数量,默认建立的进程数为系统的内核数量.
class Pool(object): def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None): pass # 初始化参数 processes:进程池的大小,默认cpu内核的数量 initializer:建立进程执行的目标函数,其会按照进程池的大小建立相应个数的进程; initargs:目标函数的参数 context:代码的上下文 # 方法 apply():使用阻塞方式调用func; apply_async():使用非阻塞方式条用func; close():关闭Pool,使其再也不接受新的任务; terminate():无论任务是否完成,当即终止; join():主进程阻塞,等待子进程的退出,必须在close()后面使用; map(self, func, iterable, chunksize=None):多进程执行一个函数,传入不一样的参数; starmap(self, func, iterable, chunksize=None):和map相似,但iterable参数可解压缩; starmap_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):使用异步的方式的starmap,callback为返回后的处理函数 map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):异步方式的map
实例
from multiprocessing import Poolimport osdef test(n): time.sleep(1) print('this is process {}'.format(os.getpid())) return ndef test1(n, m): print(n, m) print('this is process {}'.format(os.getpid()))def back_func(values): # 多进程执行完毕会返回全部的结果的列表 print(values)def back_func_err(values): # 多进程执行完毕会返回全部错误的列表 print(values)def get_pool(n=5): p = Pool(n) # p.map(test, (i for i in range(10))) # 阻塞式多进程执行 # p.starmap(test1, zip([1,2,3],[3,4,5])) # 阻塞式多进程执行多参数函数 # 异步多进程执行函数 p.map_async(test, (i for i in range(5)), callback=back_func, error_callback=back_func_err) # 异步多进程执行多参数函数 p.starmap_async(test1, zip([1,2,3],[3,4,5]), callback=back_func, error_callback=back_func_err) print('-----') p.close() p.join()if __name__ == '__main__': get_pool() print('ths process is ended')
进程虽然不像线程那样共享内存的数据,而是每一个进程有单独的内存,但多进程也是共享文件系统的,即硬盘系统;当多进程同时写入文件操做时,可能形成数据的破坏,所以进程也存在同步锁。
from multiprocessing import Pool, Lockmuex = Lock()def test(): if muex.acquire(): f = open('./test_pro.txt', 'r+', encoding='utf-8') x = f.read() if not x: f.write('0') else: f.seek(0) f.write(str(int(x)+1)) f.close() muex.release()if __name__ == '__main__': p = Pool(5) for i in range(10): p.apply_async(test) p.close() p.join() with open('./test_pro.txt', 'r+', encoding='utf-8') as f: print(f.read())
进程锁能够保证文件系统的安全,可是它使得并行变成了串行,效率降低了,也可能形成死锁问题,通常避免用锁机制。