管道安全
主进程将管道的两端都传送给子进程,子进程和主进程共用管道的两种报错状况,都是在recv接收的时候报错的:并发
1.主进程和子进程中的管道的相同一端都关闭了,出现EOFError;app
2.若是你管道的一端在主进程和子进程中都关闭了,可是你还用这个关闭的一端去接收消息,那么就会出现OSError;异步
因此你关闭管道的时候,就容易出现问题,须要将全部只用这个管道的进程中的两端所有关闭才行。固然也能够经过异常捕获(try:except EOFerror)来处理。async
虽然咱们在主进程和子进程中都打印了一下conn1一端的对象,发现两个再也不同一个地址,可是子进程中的管道和主进程中的管道仍是能够通讯的,由于管道是同一套,系统可以记录。 函数
咱们的目的就是关闭全部的管道,那么主进程和子进程进行通讯的时候,能够给子进程传管道的一端就够了,而且用咱们以前学到的,信息发送完以后,再发送一个结束信号None,那么你收到的消息为None的时候直接结束接收或者说结束循环,就不用每次都关闭各个进程中的管道了。spa
数据共享:操作系统
Manager模块介绍线程
多进程共同去处理共享数据的时候,就和咱们多进程同时去操做一个文件中的数据是同样的,不加锁就会出现错误的结果,进程不安全的,因此也须要加锁code
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加锁而操做共享的数据,确定会出现数据错乱 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) #字典形式 p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
进程池和multiprocess Pool 模块
建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个进程去执行全部任务(高级一些的进程池能够根据你的并发量,搞成动态增长或减小进程池中的进程数量的操做),不会开启其余进程,提升操做系统效率,减小空间的占用等。
定义池子将建立定量的进程:去完成任务,3个出去再回到池子中但进程不关闭,在出去执行任务,直到将任务执行完成结束
进程池的主要方法:
from multiprocessing import Process,Pool import time def func(n): for i in range(5): n=n=i print(n) if __name__ == '__main__': pool_start_time=time.time() pool= Pool(4) pool.map(func,range(100)) pool_end_time=time.time() pool_dif_time=pool_end_time-pool_start_time p_s_time=time.time() p_list=[] for i in range(100): p2=Process(target=func,args=(i,)) p2.start() p_list.append(p2) [p.join() for p in p_list] p_e_time=time.time() p_dif_time=p_e_time-p_s_time print("进程池时间",pool_dif_time) print("多进程时间",p_dif_time)
进程池的同步异步:
同步:
from multiprocessing import Process,Pool import time def func(i): time.sleep(1) return i**2 if __name__ == '__main__': p=Pool(4) for i in range(10): res=p.apply(func,args=(i,)) #同步的方法多个进程完成任务 能够获得返回值也能够不提取返回值 print(res)
异步:
from multiprocessing import Process,Pool import time import os def func(i): time.sleep(2) # print(os.getpid()) return i**2 if __name__ == '__main__': p=Pool(4) res_list=[] for i in range(10): res=p.apply_async(func,args=(i,)) #异步的方法 获得返回值 res_list.append(res) for i in res_list: print(i.get())
回调函数:
进程池中任何一个任务一旦处理完了,就当即告知主进程:我好了额,你能够处理个人结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,这是进程池特有的,普通进程没有这个机制,可是咱们也能够经过进程通讯来拿到返回值,进程池的这个回调也是进程通讯的机制完成的。
import os from multiprocessing import Pool def func1(n): print('func1>>',os.getpid()) print('func1') return n*n def func2(nn): print('func2>>',os.getpid()) print('func2') print(nn) # import time # time.sleep(0.5) if __name__ == '__main__': print('主进程:',os.getpid()) p = Pool(5) #args里面的10给了func1,func1的返回值做为回调函数的参数给了callback对应的函数,不能直接给回调函数直接传参数,他只能是你任务函数func1的函数的返回值 # for i in range(10,20): #若是是多个进程来执行任务,那么当全部子进程将结果给了回调函数以后,回调函数又是在主进程上执行的,那么就会出现打印结果是同步的效果。咱们上面func2里面注销的时间模块打开看看 # p.apply_async(func1,args=(i,),callback=func2) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join() #结果 # 主进程: 11852 #发现回调函数是在主进程中完成的,其实若是是在子进程中完成的,那咱们直接将代码写在子进程的任务函数func1里面就好了,对不对,这也是为何称为回调函数的缘由。 # func1>> 17332 # func1 # func2>> 11852 # func2 # 100