1、管道(不推荐使用,了解便可)数据库
进程间通讯(IPC)方式二:管道(不推荐使用,了解便可),会致使数据不安全的状况出现,后面还会提到为何编程
会带来数据不安全的问题。数组
1 #建立管道的类: 2 Pipe([duplex]):在进程之间建立一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的链接对象,强调一点:必须在产生Process对象以前产生管道 3 #参数介绍: 4 dumplex:默认管道是全双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送。 5 #主要方法: 6 conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另一端已经关闭,那么recv方法会抛出EOFError。 7 conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象 8 #其余方法: 9 conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法 10 conn1.fileno():返回链接使用的整数文件描述符 11 conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。 12 13 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。 14 conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收 15 16 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。
1 from multiprocessing import Process, Pipe 2 3 def f(conn): 4 conn.send("Hello 妹妹") #子进程发送了消息 5 conn.close() 6 7 if __name__ == '__main__': 8 parent_conn, child_conn = Pipe() #创建管道,拿到管道的两端,双工通讯方式,两端均可以收发消息 9 p = Process(target=f, args=(child_conn,)) #将管道的一段给子进程 10 p.start() #开启子进程 11 print(parent_conn.recv()) #主进程接受了消息 12 p.join()
1 应该特别注意管道端点的正确管理问题。若是是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为什么在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。若是忘记执行这些步骤,程序可能在消费者中的recv()操做上挂起(就是阻塞)。管道是由操做系统进行引用计数的,必须在全部进程中关闭管道的相同一端就会能生成EOFError异常。所以,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点
from multiprocessing import Process, Pipe def f(parent_conn,child_conn): #parent_conn.close() #不写close将不会引起EOFError while True: try: print(child_conn.recv()) except EOFError: child_conn.close() break if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(parent_conn,child_conn,)) p.start() child_conn.close() parent_conn.send('hello') parent_conn.close() p.join()
主进程将管道的两端都传送给子进程,子进程和主进程共用管道的两种报错状况,都是在recv接收的时候报错的:安全
1.主进程和子进程中的管道的相同一端都关闭了,出现EOFError;并发
2.若是你管道的一端在主进程和子进程中都关闭了,可是你还用这个关闭的一端去接收消息,那么就会出现OSError;app
2、数据共享(了解便可)异步
基于消息传递的并发编程是大势所趋async
即使是使用线程,推荐作法也是将程序设计为大量独立的线程集合分布式
经过消息队列交换数据,这样极大的减小了对使用锁定和其余同步手段的需求,还能够扩展到分布式系统中ide
注意:进程之间应当是尽可能避免通讯,即便须要通讯,也应该选择进程安全的工具来避免加锁带来的问题。应该尽可能避免使用
共享数据的方法,之后会尝试使用数据库来解决进程之间的数据共享问题
进程之间数据共享模块之一Manager模块:
1 进程间数据是独立的,能够借助于队列或管道实现通讯,两者都是基于消息传递的 2 虽然进程间数据独立,但能够经过Manager实现数据共享,事实上Manager的功能远不止于此 3 4 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. 5 6 A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
多进程共同去处理共享数据的时候,就和咱们多进程同时去操做一个文件中的数据同样,不加锁就会出现错误的结果,进程
不安全的,因此也须要加锁
1 from multiprocessing import Manager,Process,Lock 2 def work(d,lock): 3 with lock: #不加锁而操做共享的数据,确定会出现数据错乱 4 d['count']-=1 5 6 if __name__ == '__main__': 7 lock=Lock() 8 with Manager() as m: 9 dic=m.dict({'count':100}) 10 p_l=[] 11 for i in range(100): 12 p=Process(target=work,args=(dic,lock)) 13 p_l.append(p) 14 p.start() 15 for p in p_l: 16 p.join() 17 print(dic)
总结下,进程之间的通讯:队列、管道、数据共享
3、进程池
multiprocess.pool 模块
建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个
进程去执行使用全部的任务(高级一些的进程池能够根据你的并发量,搞成动态增长或减小进程池中进程数量的操做),
不会开启其余进程,提升操做系统效率,减小空间的占用等
1 numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值 2 initializer:是每一个工做进程启动时要执行的可调用对象,默认为None 3 initargs:是要传给initializer的参数组 4 5 p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。 6 '''须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()''' 7 8 p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。 9 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。''' 10 11 p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成 12 13 P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用 14 15 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法 16 obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起。 17 obj.ready():若是调用完成,返回True 18 obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常 19 obj.wait([timeout]):等待结果变为可用。 20 obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数
1 import time 2 from multiprocessing import Pool,Process 3 4 #针对range(100)这种参数的 5 # def func(n): 6 # for i in range(3): 7 # print(n + 1) 8 9 def func(n): 10 print(n) 11 # 结果: 12 # (1, 2) 13 # alex 14 def func2(n): 15 for i in range(3): 16 print(n - 1) 17 if __name__ == '__main__': 18 #1.进程池的模式 19 s1 = time.time() #咱们计算一下开多进程和进程池的执行效率 20 poll = Pool(5) #建立含有5个进程的进程池 21 # poll.map(func,range(100)) #异步调用进程,开启100个任务,map自带join的功能 22 poll.map(func,[(1,2),'alex']) #异步调用进程,开启100个任务,map自带join的功能 23 # poll.map(func2,range(100)) #若是想让进程池完成不一样的任务,能够直接这样搞 24 #map只限于接收一个可迭代的数据类型参数,列表啊,元祖啊等等,若是想作其余的参数之类的操做,须要用后面咱们要学的方法。 25 # t1 = time.time() - s1 26 # 27 # #2.多进程的模式 28 # s2 = time.time() 29 # p_list = [] 30 # for i in range(100): 31 # p = Process(target=func,args=(i,)) 32 # p_list.append(p) 33 # p.start() 34 # [pp.join() for pp in p_list] 35 # t2 = time.time() - s2 36 # 37 # print('t1>>',t1) #结果:0.5146853923797607s 进程池的效率高 38 # print('t2>>',t2) #结果:12.092015027999878s