1、 管道数据库
建立管道的类安全
Pipe([duplex]):在进程之间建立一条管道,并返回元祖(conn1,conn2),其中conn1,conn2表示管道两端的链接对象,强调一点:必须在产生process对象以前产生管道并发
参数介绍app
dumplex函数
默认管道是全双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送工具
主要方法:ui
Conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另外一端已经关闭,那么recv方法会抛出EOFError。spa
Conn1.send(obj):经过链接发送对象。Obj是与序列化兼容的任意对象操作系统
其余方法:线程
Conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法
Conn1.fileno():返回链接使用的整数文件描述符
Conn1.poll(【timeout】):若是链接上的数据可用,返回True。Timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达
Conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。Maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起ioError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起eoferror异常
Conn.send_bytes(buffer[,offset[,size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收
Conn1.recv_bytes_info(buffer[,offset])接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。Offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起bufferTooShort异常
from multiprocessing import Pipe,Process def func(conn1,): # conn2.close() msg = conn1.recv() print('>>>>>',msg) if __name__ == '__main__': conn1,conn2 = Pipe() p = Process(target=func,args=(conn1,)) p.start() # conn1.close() # conn2.close() conn2.send('小鬼!') print('主进程结束')
应该特别注意管道端点的正确管理问题。若是生产者或消费者都没有使用管道的某个端点,就应将它关闭。这也说明了为什么生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。若是忘记要执行这些步骤,程序可能在消费者中的recv()操做上挂起(就是阻塞)。管道是由操做系统进行引用计数的,必须在全部进程中关闭管道的相同一端就会能生成EOFError异常。所以,在生产者中关闭通道不会有任何效果,除非消费者也关闭了相同的管道端点。
主进程将管道的两端都传给子进程,子进程和主进程共用管道的两种报错状况,都是在recv接收的时候报错的:
1, 主进程和子进程的管道的相同一端都关闭了,出现EOFError
2, 若是你管道的一端在主进程和子进程中都关闭了,可是你还用这个关闭的一端去接收消息,那么就会出现OSError
因此你关闭管道的时候,就会容易出现问题,须要将全部只有这个管道的进程中的两端所有关闭才行,固然也能够经过异常捕获(try:except EOFerror)来处理。
虽然咱们在主进程和子进程中都打印了conn1一端的对象,发现两个再也不同一个地址,可是子进程中的管道和主进程的管道仍是能够通讯的,由于管道是同一套,系统可以记录
咱们的目的就是关闭全部的通道,那么主进程和子进程进行通讯的时候,能够给子进程传管道的一端就够了,而且用咱们以前学到的,信息发送完后,再发送一个结束信号None,那么你收到的消息为None的时候直接结束接收或者说结束循环,就不用每次都关闭各个进程中管道了。
from multiprocessing import Process,Pipe def consumer(p,name): produce,consume = p produce.close() while 1: try: baozi = consume.recv() print('%s 收到包子:%s'%(name,baozi)) except EOFError: break def producer(seq,p): produce,consume = p consume.close() for i in seq: produce.send(i) if __name__ == '__main__': produce,consume = Pipe() c1 = Process(target=consumer,args=((produce,consume),'c1')) c1.start() seq = (i for i in range(10)) producer(seq,(produce,consume)) produce.close() consume.close() c1.join() print('主进程')
关于管道会形成数据不安全问题的官方解释:
由Pipe方法返回的两个链接对象表示管道的两端。每一个链接对象都有send和recv方法(除其余以外)。注意,若是两个进程(或线程)试图同时从管道的同一端读取或写入数据,那么管道中的数据可能会损坏。固然,在使用管道的不一样端部的过程当中不存在损坏风险
from multiprocessing import Process,Pipe,Lock def consumer(p,name,lock): produce, consume=p produce.close() while True: lock.acquire() baozi=consume.recv() lock.release() if baozi: print('%s 收到包子:%s' %(name,baozi)) else: consume.close() break def producer(p,n): produce, consume=p consume.close() for i in range(n): produce.send(i) produce.send(None) produce.send(None) produce.close() if __name__ == '__main__': produce,consume=Pipe() lock = Lock() c1=Process(target=consumer,args=((produce,consume),'c1',lock)) c2=Process(target=consumer,args=((produce,consume),'c2',lock)) p1=Process(target=producer,args=((produce,consume),10)) c1.start() c2.start() p1.start() produce.close() consume.close() c1.join() c2.join() p1.join() print('主进程')
from multiprocessing import Manager,Process,Lock
def work(d,lock):
with lock:
d['count'] -=1
if __name__ == '__main__':
lock = Lock()
with Manager() as m:
dic = m.dict({'count':100})
p_1 = []
for i in range(100):
p = Process(target=work,args=(dic,lock))
p_1.append(p)
p.start()
for p in p_1:
p.join()
print(dic)
1、 数据共享
进程间应该尽可能避免通讯,即使须要通讯,也应该选择进程安全的工具来避免加锁带来的问题,应该尽可能避免共享数据的方式,之后会使用数据库来解决进程之间的数据共享问题
总结一下,进程之间的通讯:队列,管道,数据共享也算
1、 进程池和mutiprocess.poll
为何要有进程池?进程池的概念
在程序实际处理问题过程当中,忙时会有成千上万的任务须要被执行,闲时可能只有零星任务。那么在成千上万割任务须要被执行的时候,咱们就须要去船舰成千上万个进程吗?首先,建立进程须要消耗时间,销毁进程(空间,变量,文件信息等等内容)也须要消耗时间。第二即使开启了成千上万的进程,操做系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还须要进行切换而且记录每一个进程的执行节点,也就是记录上下文,这样反而会影响程序的效率。所以咱们不能无限制的根据任务开启或者开启程序。
定义一个池子,有需求来了,就拿一个池中的进程来处理任务,等处理完毕,进程并不关闭,而是将进程再放回池中继续等待任务。若是有不少任务须要执行,池中的数量不够,任务就要等待以前的进程执行完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程再运行。这样不会增长操做系统的调度难度,还节省了开闭进程的时间,也必定程度上可以实现并发效果