3七、进程之间的通讯

一、生产者消费者模型Queue:队列(管道+锁,双向通讯)安全

    为了解决数据供需不平衡的状况(即生产者生产的多或者消费者消费的多形成的失衡),有一种解决的方法:子进程生产数据,子进程处理数据。app

生产者消费者模型:dom

  一、消费者要处理多少数据是不肯定的ide

  二、因此只能用while循环来处理数据,可是while循环没法结束ui

  三、须要生产者发送信号spa

  四、有多少个消费者,就须要发送多少个信号操作系统

  五、可是发送的信号数量须要根据生产者和消费者的数量进行计算,因此很是不方便。   code

import time
import random
from multiprocessing import Process
from multiprocessing import Queue

def producer(q,food):
    for i in range(5):
        q.put('%s-%s'%(food,i))
        print('生产了%s'%(food))
        time.sleep(random.random())
    q.put(None)
    q.put(None)
    q.put(None)

def consumer(q,name):
    while True:
        food=q.get()
        if food == None:break
        print('%s吃了%s'%(name,food))


if __name__ =='__main__':
    q=Queue()
    p1=Process(target=producer,args=(q,'骨头'))
    p1.start()
    p2=Process(target=producer,args=(q,'泔水'))
    p2.start()
    p3 = Process(target=consumer, args=(q, 'alex'))
    p3.start()
    p4 = Process(target=consumer, args=(q, 'egon'))
    p4.start()
    p5 = Process(target=consumer, args=(q, 'jin'))
    p5.start()
    

二、joinableQueue:顺序为:生产者生产的数据所有被消费------->生产者进程结束------->主进程执行代码结束—-->消费者守护进程结束。blog

  put和get的一个计数机制,每次get数据以后发送task_done,put端接收到计数-1,直到计数为0就能感知到。队列

import time
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue
# put   q.join
# get  处理数据 task_done 消费完了
def producer(q,food):
    for i in range(5):
        q.put('%s-%s'%(food,i))
        print('生产了%s'%food)
        # time.sleep(random.randint(1,3))
        time.sleep(random.random())
    q.join()  # 等待 消费者 把全部的数据都处理完

def consumer(q,name):
    while True:
        food = q.get()   # 生产者不生产仍是生产的慢
        print('%s 吃了 %s'%(name,food))
        q.task_done()

if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,'泔水'))
    p1.start()
    p2 = Process(target=producer, args=(q, '骨头'))
    p2.start()
    c1 = Process(target=consumer, args=(q, 'alex'))
    c1.daemon = True
    c1.start()
    c2 = Process(target=consumer, args=(q, 'egon'))
    c2.daemon = True
    c2.start()
    c3 = Process(target=consumer, args=(q, 'jin'))
    c3.daemon = True
    c3.start()

    p1.join()  # 等待p1执行完毕
    p2.join()  # 等待p2执行完毕

三、ipc(inter-process-commnication内部进程交流)——

  pipe(管道):支持双向通讯,数据不安全。

  首先来看管道是怎么用的

  

from multiprocessing import  Pipe
p1,p2=Pipe()
p1.send('hello')        #一个发送,一个接收
print(p2.recv())
print('————————————————————————')
p2.send('hihihi')
p2.close()              #关闭p2发送端
print(p1.recv())
#print(p1.recv())     #放开的话,继续接收,会报错,报错类型:EOFError

管道在多进程中的做用:主进程中发送到管道,而后在子进程中接收。

应该特别注意管道端点的正确管理问题。若是是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为什么在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。若是忘记执行这些步骤,程序可能在消费者中的recv()操做上挂起。管道是由操做系统进行引用计数的,必须在全部进程中关闭管道后才能生成EOFError异常。所以,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。 

  

from multiprocessing import Process
from multiprocessing import Pipe,Lock
def func(p):
    foo,son=p
    foo.close()             #只用son,就把foo给关了
    while True:
        try:
            print(son.recv())
        except EOFError:        #异常处理
            break

if __name__ =='__main__':
    foo,son=Pipe()
    p=Process(target=func,args=((foo,son),))
    p.start()
    son.close()                #只用foo,把son给关了
    foo.send('hello')
    foo.send('hello')
    foo.send('hello')
    foo.send('heo')
    foo.close()

pipe实现消费者生产者模型:

  

  

 1 from multiprocessing import Process,Pipe,Lock
 2 
 3 def consumer(p,name,lock):
 4     produce, consume=p
 5     produce.close()
 6     while True:
 7         # lock.acquire()
 8         baozi=consume.recv()
 9         # lock.release()
10         if baozi:
11             print('%s 收到包子:%s' %(name,baozi))
12         else:
13             consume.close()
14             break
15 
16 def producer(p,n):
17     produce, consume=p
18     consume.close()
19     for i in range(n):
20         produce.send(i)
21     produce.send(None)
22     produce.send(None)
23     produce.close()
24 
25 if __name__ == '__main__':
26     produce,consume=Pipe()
27     lock = Lock()
28     c1=Process(target=consumer,args=((produce,consume),'c1',lock))
29     c2=Process(target=consumer,args=((produce,consume),'c2',lock))
30     p1=Process(target=producer,args=((produce,consume),10))
31     c1.start()
32     c2.start()
33     p1.start()
34 
35     produce.close()
36     consume.close()
37 
38     c1.join()
39     c2.join()
40     p1.join()
41     print('主进程')
pipe实现消费者生产者模型

 四、Manager:是一个类,提供了能够进行数据共享的一个机制,提供了不少数据类型dict 、list、pipe(这些并不提供数据安全的支持)

from multiprocessing import Process,Manager,Lock
def work(dict,lock):
    lock.acquire()
    dict['count']-=1
    lock.release()

if __name__ =='__main__':
    lock=Lock()
    m=Manager()        #要用Manager定义dict
    dict=m.dict({'count':100})
    l=[]
    for i in range(100):
        p=Process(target=work,args=(dict,lock))
        p.start()
        l.append(p)
    [p.join() for p in l]
    print(dict)

 五、进程池:相似与有一个池子(工做间),把进程(员工)放进去,进行工做。接下来是发任务,而后循环对任务进行工做。(进程池这个功能已经被实现了)。

import os
import time
import random
from multiprocessing import Pool
from multiprocessing import Process
def func(i):
    i+=1

if __name__=='__main__':
    # p=Pool(5)                #一、带#的是另外一种方法:用进程池
    # start=time.time()
    # p.map(func,range(1000))   #至关于target=func,args=next(iterable)
    # p.close()                 #是不容许再向进程池中添加任务
    # p.join()
    # print(time.time()-start)
    p = Process()                 #二、起进程的办法,太占内存,费时间:起进程的时候要分配资源,分配内存,耗费时间
    start=time.time()
    l=[]
    for i in range(100):
        p=Process(target=func,args=(i,))
        p.start()
        l.append(p)
    [i.join() for i in l]
    print(time.time()-start)
相关文章
相关标签/搜索