管道可用于具备亲缘关系进程间的通讯,有名管道克服了管道没有名字的限制,所以,除具备管道所具备的功能外,它还容许无亲缘关系进程间的通讯.html
先画一幅图帮助你们理解下管道的基本原理python
现有2个进程A和B,他们都在内存中开辟了空间,那么咱们在内存中再开辟一个空间C,做用是链接这两个进程的。对于进程来讲内存空间是能够共享的(任何一个进程均可以使用内存,内存当中的空间是用地址来标记的,咱们经过查找某一个地址就能找到这个内存)A进程能够不断的向C空间输送东西,B进程能够不断的从C空间读取东西,这就是进程间的通讯 .segmentfault
管道在信息传输上是以流的方式传输, 也就是你从A进程不断的写入,B进程源源不断的读出,A进程先写入的就会被B进程先读出,后写进来的就会被后读出,安全
Pipe仅仅适用于只有两个进程一读一写的半双工状况,也就是说信息是只向一个方向流动。单项通讯叫作半双工,双向叫作全双工.markdown
单工:简单的说就是一方只能发信息,另外一方则只能收信息,通讯是单向的。数据结构
半双工:比单工先进一点,就是双方都能发信息,但同一时间则只能一方发信息。app
全双工:比半双工再先进一点,就是双方不只都能发信息,并且可以同时发送。dom
管道是由内核管理的一个缓冲区,至关于咱们放入内存中的一个纸条。管道的一端链接一个进程的输出。这个进程会向管道中放入信息。管道的另外一端链接一个进程的输入,这个进程取出被放入管道的信息。一个缓冲区不须要很大,它被设计成为环形的数据结构,以便管道能够被循环利用。当管道中没有信息的话,从管道中读取的进程会等待,直到另外一端的进程放入信息。当管道被放满信息的时候,尝试放入信息的进程会等待,直到另外一端的进程取出信息。当两个进程都终结的时候,管道也自动消失。函数
管道是单向的、先进先出的、无结构的字节流,它把一个进程的输出和另外一个进程的输入链接在一块儿。ui
写进程在管道的尾端写入数据,读进程在管道的首端读出数据。数据读出后将从管道中移走,其它读进程都不能再读到这些数据。
管道提供了简单的流控制机制。进程试图读一个空管道时,在数据写入管道前,进程将一直阻塞。一样,管道已经满时,进程再试图写管道,在其它进程从管道中读走数据以前,写进程将一直阻塞。
只能用于具备亲缘关系的进程之间的通讯(也就是父子进程或者兄弟进程之间)。
一种半双工的通讯模式,具备固定的读端和写端。
LINUX把管道看做是一种文件,采用文件管理的方法对管道进行管理,对于它的读写也可使用普通的read()和write()等函数。可是它不是普通的文件,并不属于其余任何文件系统,只存在于内核的内存空间中。
#建立管道的类:
Pipe([duplex]):在进程之间建立一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的链接对象,强调一点:必须在产生Process对象以前产生管道
#参数介绍:
dumplex:默认管道是半双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送。
#主要方法:
conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另一端已经关闭,那么recv方法会抛出EOFError。
conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象
#其余方法:
conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回链接使用的整数文件描述符
conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。
例子
# 主进程写,子进程读 from multiprocessing import Pipe,Process def func(out_pipe, in_pipe): in_pipe.close() # 关闭复制过来的管道的输入端 while True: try : msg = out_pipe.recv() #子进程的管道端口接收主进程的数据 print(msg) except EOFError: out_pipe.close() break if __name__ == '__main__': out_pipe, in_pipe = Pipe() Process(target=func,args = (out_pipe, in_pipe)).start() #启动子进程 out_pipe.close() #关闭主进程的输出管道端口 for i in range(20): in_pipe.send('hello world!') #经过管道的端口向子进程写入 in_pipe.close()
例子2
# 出现EOF错误的状况 # 当pipe的输入端被关闭,且没法接收到输入的值,那么就会抛出EOFError。 from multiprocessing import Pipe, Process def func(out_pipe, in_pipe): in_pipe.close() # 关闭复制过来的管道的输入端 while True: msg = out_pipe.recv() # 子进程的管道端口接收主进程的数据 print(msg) if __name__ == '__main__': out_pipe, in_pipe = Pipe() Process(target=func, args=(out_pipe, in_pipe)).start() # 启动子进程 out_pipe.close() # 关闭主进程的输出管道端口 for i in range(20): in_pipe.send('hello world!') # 经过管道的端口向子进程写入 in_pipe.close()
from multiprocessing import Process,Pipe import time,random def consumer(p,name): in_pipe,out_pipe=p out_pipe.close() while True: try: # time.sleep(random.uniform(0,1)) baozi=in_pipe.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: in_pipe.close() break def producer(p,name): in_pipe,out_pipe=p in_pipe.close() for i in range(10): # print(i) str ='%s生产的包子%s'%(name,i) out_pipe.send(str) # time.sleep(1) else: out_pipe.close() if __name__ == '__main__': in_pipe,out_pipe=Pipe() p = Process(target=producer,args=((in_pipe,out_pipe),'jack')) c1=Process(target=consumer,args=((in_pipe,out_pipe),'c1')) c2=Process(target=consumer,args=((in_pipe,out_pipe),'c2')) c1.start() c2.start() p.start() in_pipe.close() out_pipe.close() c1.join() c2.join() print('主进程') # 基于管道实现进程间通讯(与队列的方式是相似的,队列就是管道加锁实现的) ## 加锁来控制操做管道的行为,来避免进程之间争抢数据形成的数据不安全现象
这里须要加锁来解决数据不安全的状况
from multiprocessing import Process,Pipe,Lock def consumer(produce, consume,name,lock): produce.close() while True: lock.acquire() baozi=consume.recv() lock.release() if baozi: print('%s 收到包子:%s' %(name,baozi)) else: consume.close() break def producer(produce, consume,n): consume.close() for i in range(n): produce.send(i) produce.send(None) produce.send(None) produce.close() if __name__ == '__main__': produce,consume=Pipe() lock = Lock() c1=Process(target=consumer,args=(produce,consume,'c1',lock)) c2=Process(target=consumer,args=(produce,consume,'c2',lock)) p1=Process(target=producer,args=(produce,consume,30)) c1.start() c2.start() p1.start() produce.close() consume.close()
使用Manager能够方便的进行多进程数据共享,事实上Manager的功能远不止于此 。Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
但与管道相似,这里的数据也是不安全的。须要用锁来解决。
from multiprocessing import Manager,Process def main(dic): dic['count'] -= 1 # print(dic) if __name__ == '__main__': m = Manager()#为这个manager类注册存储容器,也就是经过这个manager类实现的共享的变量 dic=m.dict({'count':100}) p_lst = [] for i in range(50): p = Process(target=main, args=(dic,)) p_lst.append(p) p.start() for p in p_lst: p.join() print("主进程",dic['count'])
分析:多运行几回能够看到,每次输出的结果都基本是不一样的,所以这里仍是须要用锁来解决。
from multiprocessing import Manager,Process,Lock def main(dic,lock): # with lock:能够这样写,也能够写成下面的样子 lock.acquire() dic['count'] -= 1 lock.release() if __name__ == '__main__': m = Manager() l = Lock() dic=m.dict({'count':100}) p_lst = [] for i in range(50): p = Process(target=main,args=(dic,l)) p.start() p_lst.append(p) for i in p_lst: i.join() print('主进程',dic)
[1]http://www.javashuo.com/article/p-xgtlqpws-gh.html
[2]http://www.th7.cn/system/lin/201605/165994.shtml
[3]https://blog.csdn.net/weixin_39859512/article/details/80898340