python之路-day31-守护进程、锁、队列、生产者消费者模型

 

1、守护进程python

    以前咱们讲的子进程是不会随着主进程而结束的,子进程所有执行完以后,程序才结束,那么若是有linux

  一天咱们的需求是个人主进程结束了,由主进程建立的子进程必须跟着结束,怎么办?守护进程就来了!编程

    主进程建立守护进程json

      其一:守护进程会在主进程代码执行结束后就终止windows

      其二:守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children安全

    注意:进程之间是互相独立的,主程序代码运行结束,守护进程随即终止网络

 1 import os  2 import time  3 from multiprocessing import Process  4 
 5 class Myprocess(Process):  6     def __init__(self,person):  7         super().__init__()  8         self.person = person  9     def run(self): 10         print(os.getpid(),self.name) 11         print('%s正在和女主播聊天' %self.person) 12         time.sleep(3) 13 if __name__ == '__main__': 14     p=Myprocess('太白') 15     p.daemon=True #必定要在p.start()前设置,设置p为守护进程,禁止p建立子进程,而且父进程代码执行结束,p即终止运行
16  p.start() 17     # time.sleep(1) # 在sleep时linux下查看进程id对应的进程ps -ef|grep id
18     print('')
守护进程

 

2、进程同步(锁)多线程

    咱们以前实现了进程的异步,让多个任务能够同时在几个进程中并发处理,他们之间的运行时没有顺序的,一旦并发

  开启也不受咱们控制。尽管并发编程让咱们能更加充分的利用IO资源,可是也给咱们带来了新的问题:进程之间数据dom

  不共享。虽然能够共享同一台文件系统,因此访问同一个文件,或者同一个打印红缎,是没有问题的,而共享带来的是

  竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

 1 import os  2 import time  3 import random  4 from multiprocessing import Process  5 
 6 def work(n):  7     print('%s: %s is running' %(n,os.getpid()))  8  time.sleep(random.random())  9     print('%s:%s is done' %(n,os.getpid())) 10 
11 if __name__ == '__main__': 12     for i in range(5): 13         p=Process(target=work,args=(i,)) 14  p.start() 15 
16 # 看结果:经过结果能够看出两个问题:问题一:每一个进程中work函数的第一个打印就不是按照咱们for循环的0-4的顺序来打印的
17 #问题二:咱们发现,每一个work进程中有两个打印,可是咱们看到全部进程中第一个打印的顺序为0-2-1-4-3,可是第二个打印没有按照这个顺序,变成了2-1-0-3-4,说明咱们一个进程中的程序的执行顺序都混乱了。
18 #问题的解决方法,第二个问题加锁来解决,第一个问题是没有办法解决的,由于进程开到了内核,有操做系统来决定进程的调度,咱们本身控制不了
19 # 0: 9560 is running
20 # 2: 13824 is running
21 # 1: 7476 is running
22 # 4: 11296 is running
23 # 3: 14364 is running
24 
25 # 2:13824 is done
26 # 1:7476 is done
27 # 0:9560 is done
28 # 3:14364 is done
29 # 4:11296 is done
多进程抢占输出资源,致使打印混乱
 1 #由并发变成了串行,牺牲了运行效率,但避免了竞争
 2 from multiprocessing import Process,Lock  3 import os,time  4 def work(n,lock):  5     #加锁,保证每次只有一个进程在执行锁里面的程序,这一段程序对于全部写上这个锁的进程,你们都变成了串行
 6  lock.acquire()  7     print('%s: %s is running' %(n,os.getpid()))  8     time.sleep(1)  9     print('%s:%s is done' %(n,os.getpid())) 10     #解锁,解锁以后其余进程才能去执行本身的程序
11  lock.release() 12 if __name__ == '__main__': 13     lock=Lock() 14     for i in range(5): 15         p=Process(target=work,args=(i,lock)) 16  p.start() 17 
18 #打印结果:
19 # 2: 10968 is running
20 # 2:10968 is done
21 # 0: 7932 is running
22 # 0:7932 is done
23 # 4: 4404 is running
24 # 4:4404 is done
25 # 1: 12852 is running
26 # 1:12852 is done
27 # 3: 980 is running
28 # 3:980 is done
29 
30 #结果分析:(本身去屡次运行一下,看看结果,我拿出其中一个结果来看)经过结果咱们能够看出,多进程刚开始去执行的时候,每次运行,首先打印出来哪一个进程的程序是不固定的,可是咱们解决了上面打印混乱示例代码的第二个问题,那就是同一个进程中的两次打印都是先完成的,而后才切换到下一个进程去,打印下一个进程中的两个打印结果,说明咱们控制住了同一进程中的代码执行顺序,若是涉及到多个进程去操做同一个数据或者文件的时候,就不担忧数据算错或者文件中的内容写入混乱了。
加锁:由并发改为了串行,牺牲了运行效率,但避免了竞争

    上面这种状况虽然用加锁的形式是险恶熟悉怒的执行,可是程序又从新变成串行了,这样确实会浪费了时间

  可是却保证了数据安全

    接下来,模拟抢票为例,来看看数据安全的重要性。

 1 #注意:首先在当前文件目录下建立一个名为db的文件
 2 #文件db的内容为:{"count":1},只有这一行数据,而且注意,每次运行完了以后,文件中的1变成了0,你须要手动将0改成1,而后在去运行代码。
 3 #注意必定要用双引号,否则json没法识别
 4 #并发运行,效率高,但竞争写同一文件,数据写入错乱
 5 from multiprocessing import Process,Lock  6 import time,json,random  7 
 8 #查看剩余票数
 9 def search(): 10     dic=json.load(open('db')) #打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典
11     print('\033[43m剩余票数%s\033[0m' %dic['count']) 12 
13 #抢票
14 def get(): 15     dic=json.load(open('db')) 16     time.sleep(0.1)       #模拟读数据的网络延迟,那么进程之间的切换,致使全部人拿到的字典都是{"count": 1},也就是每一个人都拿到了这一票。
17     if dic['count'] >0: 18         dic['count']-=1
19         time.sleep(0.2)   #模拟写数据的网络延迟
20         json.dump(dic,open('db','w')) 21         #最终结果致使,每一个人显示都抢到了票,这就出现了问题~
22         print('\033[43m购票成功\033[0m') 23 
24 def task(): 25  search() 26  get() 27 
28 if __name__ == '__main__': 29     for i in range(3): #模拟并发100个客户端抢票
30         p=Process(target=task) 31  p.start() 32 
33 #看结果分析:因为网络延迟等缘由使得进程切换,致使每一个人都抢到了这最后一张票
34 # 剩余票数1
35 # 剩余票数1
36 # 剩余票数1
37 # 购票成功
38 # 购票成功
39 # 购票成功
并发运行,效率高,可是竞争同一个文件,致使数据混乱
 1 #注意:首先在当前文件目录下建立一个名为db的文件
 2 #文件db的内容为:{"count":1},只有这一行数据,而且注意,每次运行完了以后,文件中的1变成了0,你须要手动将0改成1,而后在去运行代码。
 3 #注意必定要用双引号,否则json没法识别
 4 #加锁保证数据安全,不出现混乱
 5 from multiprocessing import Process,Lock  6 import time,json,random  7 
 8 #查看剩余票数
 9 def search(): 10     dic=json.load(open('db')) #打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典
11     print('\033[43m剩余票数%s\033[0m' %dic['count']) 12 
13 #抢票
14 def get(): 15     dic=json.load(open('db')) 16     time.sleep(0.1)       #模拟读数据的网络延迟,那么进程之间的切换,致使全部人拿到的字典都是{"count": 1},也就是每一个人都拿到了这一票。
17     if dic['count'] >0: 18         dic['count']-=1
19         time.sleep(0.2)   #模拟写数据的网络延迟
20         json.dump(dic,open('db','w')) 21         #最终结果致使,每一个人显示都抢到了票,这就出现了问题~
22         print('\033[43m购票成功\033[0m') 23     else: 24         print('sorry,没票了亲!') 25 def task(lock): 26  search() 27     #由于抢票的时候是发生数据变化的时候,全部咱们将锁加加到这里
28  lock.acquire() 29  get() 30  lock.release() 31 if __name__ == '__main__': 32     lock = Lock() #建立一个锁
33     for i in range(3): #模拟并发100个客户端抢票
34         p=Process(target=task,args=(lock,)) #将锁做为参数传给task函数
35  p.start() 36 
37 #看结果分析:只有一我的抢到了票
38 # 剩余票数1
39 # 剩余票数1
40 # 剩余票数1
41 # 购票成功 #幸运的人儿
42 # sorry,没票了亲!
43 # sorry,没票了亲!
加锁:购票行为由并发变成了串行,牺牲了效率,可是保证了数据安全

 

  进程锁的总结:

#加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然能够用文件共享数据实现进程间通讯,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.须要本身加锁处理

#所以咱们最好找寻一种解决方案可以兼顾:一、效率高(多个进程共享一块内存的数据)二、帮咱们处理好锁问题。这就是mutiprocessing模块为咱们提供的基于消息的IPC通讯机制:队列和管道。
队列和管道都是将数据存放于内存中
队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来,
咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。

IPC通讯机制(了解):IPC是intent-Process Communication的缩写,含义为进程间通讯或者跨进程通讯,是指两个进程之间进行数据交换的过程。IPC不是某个系统所独有的,任何一个操做系统都须要有相应的IPC机制,
好比Windows上能够经过剪贴板、管道和邮槽等来进行进程间通讯,而Linux上能够经过命名共享内容、信号量等来进行进程间通讯。Android它也有本身的进程间通讯方式,Android建构在Linux基础上,继承了一
部分Linux的通讯方式。

  

3、队列(推荐使用)  

    进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:对列和管道,这两种方式都是使用消息传递的。

  队列就像一个特殊的列表,可是能够设置固定长度,而且从前面插入数据,从后面取出数据,先进先出。

Queue([maxsize])建立共享的进程队列
参数:maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。
底层队列使用管道和锁实现。

  queue的方法介绍  

q = Queue([maxsize])
建立共享的进程队列。maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。
底层队列使用管道和锁定实现。另外,还须要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具备如下方法:

q.get([block[,timeout]])
返回q中的一个项目。若是q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True。若是设置为False,将引起Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。若是在指定的时间间隔内没有项目变为可用,将引起Queue.Empty异常

q.put(item [,block[,timeout]])
将item放入队列。若是队列已满,此方法将阻塞至有空间可用为止。block用于控制阻塞行为,默认为True。若是设置为False,将引起Queue.Empty异常(定义在Queue库模块中)
timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引起Queue.Full异常。

q.get_nowait()
同q.get(False)方法。

q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,由于在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引起NotImplementedError异常。

q.empty() 
若是调用此方法时 q为空,返回True。若是其余进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

q.full() 
若是q已满,返回为True. 因为线程的存在,结果也多是不可靠的(参考q.empty()方法)。

  queue的其余方法(了解)

q.close() 
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,若是某个使用者正被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误。

q.cancel_join_thread() 
不会再进程退出时自动链接后台线程。这能够防止join_thread()方法阻塞。

q.join_thread() 
链接队列的后台线程。此方法用于在调用q.close()方法后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread()方法能够禁止这种行为。

  queue的简单用法

 1 from multiprocessing import Queue  2 q=Queue(3) #建立一个队列对象,队列长度为3
 3 
 4 #put ,get ,put_nowait,get_nowait,full,empty
 5 q.put(3)   #往队列中添加数据
 6 q.put(2)  7 q.put(1)  8 # q.put(4) # 若是队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
 9            # 若是队列中的数据一直不被取走,程序就会永远停在这里。
10 try: 11     q.put_nowait(4) # 可使用put_nowait,若是队列满了不会阻塞,可是会由于队列满了而报错。
12 except: # 所以咱们能够用一个try语句来处理这个错误。这样程序不会一直阻塞下去,可是会丢掉这个消息。
13     print('队列已经满了') 14 
15 # 所以,咱们再放入数据以前,能够先看一下队列的状态,若是已经满了,就不继续put了。
16 print(q.full()) #查看是否满了,满了返回True,不满返回False
17 
18 print(q.get())  #取出数据
19 print(q.get()) 20 print(q.get()) 21 # print(q.get()) # 同put方法同样,若是队列已经空了,那么继续取就会出现阻塞。
22 try: 23     q.get_nowait(3) # 可使用get_nowait,若是队列满了不会阻塞,可是会由于没取到值而报错。
24 except: # 所以咱们能够用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
25     print('队列已经空了') 26 
27 print(q.empty()) #空了
queue的简单用法

  子进程与父进程经过队列进行通讯

 1 #看下面的队列的时候,按照编号看注释
 2 import time  3 from multiprocessing import Process, Queue  4 
 5 # 8. q = Queue(2) #建立一个Queue对象,若是写在这里,那么在windows还子进程去执行的时候,咱们知道子进程中还会执行这个代码,可是子进程中不可以再次建立了,也就是这个q就是你主进程中建立的那个q,经过咱们下面在主进程中先添加了一个字符串以后,在去开启子进程,你会发现,小鬼这个字符串还在队列中,也就是说,咱们使用的仍是主进程中建立的这个队列。
 6 def f(q):  7     # q = Queue() #9. 咱们在主进程中开启了一个q,若是咱们在子进程中的函数里面再开一个q,那么你下面q.put('姑娘,多少钱~')添加到了新建立的这q里里面了
 8     q.put('姑娘,多少钱~')  #4.调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。
 9     # print(q.qsize()) #6.查看队列中有多少条数据了
10 
11 def f2(q): 12     print('》》》》》》》》') 13     print(q.get())  #5.取数据
14 if __name__ == '__main__': 15     q = Queue() #1.建立一个Queue对象
16     q.put('小鬼') 17 
18     p = Process(target=f, args=(q,)) #2.建立一个进程
19     p2 = Process(target=f2, args=(q,)) #3.建立一个进程
20  p.start() 21  p2.start() 22     time.sleep(1) #7.若是阻塞一点时间,就会出现主进程运行太快,致使咱们在子进程中查看qsize为1个。
23     # print(q.get()) #结果:小鬼
24     print(q.get()) #结果:姑娘,多少钱~
25     p.join()
子进程与父进程经过队列进行通讯

 

4、生产者和消费者模型

 

在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。

    为何要使用生产者和消费者模式

      在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

    什么是生产者消费者模式

      生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力,而且我能够根据生产速度和消费速度来均衡一下多少个生产者能够为多少个消费者提供足够的服务,就能够开多进程等等,而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据。

    通俗的解释:看图说话。。背景有点乱,等我更新~~

 

 

 1 from multiprocessing import Process,Queue  2 import time,random,os  3 def consumer(q):  4     while True:  5         res=q.get()  6         time.sleep(random.randint(1,3))  7         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))  8 
 9 def producer(q): 10     for i in range(10): 11         time.sleep(random.randint(1,3)) 12         res='包子%s' %i 13  q.put(res) 14         print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) 15 
16 if __name__ == '__main__': 17     q=Queue() 18     #生产者们:即厨师们
19     p1=Process(target=producer,args=(q,)) 20 
21     #消费者们:即吃货们
22     c1=Process(target=consumer,args=(q,)) 23 
24     #开始
25  p1.start() 26  c1.start() 27     print('')
基于队列的生产者消费模型
 1 #生产者消费者模型总结
 2 
 3     #程序中有两类角色
 4  一类负责生产数据(生产者)  5  一类负责处理数据(消费者)  6         
 7     #引入生产者消费者模型为了解决的问题是:
 8  平衡生产者与消费者之间的工做能力,从而提升程序总体处理数据的速度  9         
10     #如何实现:
11         生产者<-->队列<——>消费者 12     #生产者消费者模型实现类程序的解耦和
生产者消费者模型总结

    经过上面基于队列的生产者消费者代码示例,咱们发现一个问题:主进程永远不会结束,缘由是:生产者p在生产完后就结束了,可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。

    解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环

 1 from multiprocessing import Process,Queue  2 import time,random,os  3 def consumer(q):  4     while True:  5         res=q.get()  6         if res is None:break #收到结束信号则结束
 7         time.sleep(random.randint(1,3))  8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))  9 
10 def producer(q): 11     for i in range(5): 12         time.sleep(random.randint(1,3)) 13         res='包子%s' %i 14  q.put(res) 15         print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) 16     q.put(None) #在本身的子进程的最后加入一个结束信号
17 if __name__ == '__main__': 18     q=Queue() 19     #生产者们:即厨师们
20     p1=Process(target=producer,args=(q,)) 21 
22     #消费者们:即吃货们
23     c1=Process(target=consumer,args=(q,)) 24 
25     #开始
26  p1.start() 27  c1.start() 28 
29     print('')
子进程生产者在生产完毕后发送结束信号None

注意:结束信号None,不必定要由生产者发,主进程里一样能够发,但主进程须要等生产者结束后才应该发送该信号

 1 from multiprocessing import Process,Queue  2 import time,random,os  3 def consumer(q):  4     while True:  5         res=q.get()  6         if res is None:break #收到结束信号则结束
 7         time.sleep(random.randint(1,3))  8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))  9 
10 def producer(q): 11     for i in range(2): 12         time.sleep(random.randint(1,3)) 13         res='包子%s' %i 14  q.put(res) 15         print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) 16 
17 if __name__ == '__main__': 18     q=Queue() 19     #生产者们:即厨师们
20     p1=Process(target=producer,args=(q,)) 21 
22     #消费者们:即吃货们
23     c1=Process(target=consumer,args=(q,)) 24 
25     #开始
26  p1.start() 27  c1.start() 28 
29     p1.join() #等待生产者进程结束
30     q.put(None) #发送结束信号
31     print('')
主进程在生产者生产完后发送结束信号

  

   但上述解决方式,在有多个生产者和多个消费者时,因为队列咱们说了是进程安全的,我一个进程拿走告终束信号,另一个进程就拿不到了,还须要多发送一个结束信号,有几个取数据的进程就要发送几个结束信号,咱们则须要用一个很low的方式去解决

 1 from multiprocessing import Process,Queue  2 import time,random,os  3 def consumer(q):  4     while True:  5         res=q.get()  6         if res is None:break #收到结束信号则结束
 7         time.sleep(random.randint(1,3))  8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))  9 
10 def producer(name,q): 11     for i in range(2): 12         time.sleep(random.randint(1,3)) 13         res='%s%s' %(name,i) 14  q.put(res) 15         print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) 16 
17 
18 
19 if __name__ == '__main__': 20     q=Queue() 21     #生产者们:即厨师们
22     p1=Process(target=producer,args=('包子',q)) 23     p2=Process(target=producer,args=('骨头',q)) 24     p3=Process(target=producer,args=('泔水',q)) 25 
26     #消费者们:即吃货们
27     c1=Process(target=consumer,args=(q,)) 28     c2=Process(target=consumer,args=(q,)) 29 
30     #开始
31  p1.start() 32  p2.start() 33  p3.start() 34  c1.start() 35 
36     p1.join() #必须保证生产者所有生产完毕,才应该发送结束信号
37  p2.join() 38  p3.join() 39     q.put(None) #有几个消费者就应该发送几回结束信号None
40     q.put(None) #发送结束信号
41     print('')
有多个消费者和生产者的时候须要发送屡次结束信号

 

 

其实咱们的思路无非是发送结束信号而已,有另一种队列提供了这种机制

 JoinableQueue([maxsize]) 

 

#JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

   #参数介绍:
    maxsize是队列中容许最大项数,省略则无大小限制。    
  #方法介绍:
    JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止,也就是队列中的数据所有被get拿走了。

 

 1 from multiprocessing import Process,JoinableQueue  2 import time,random,os  3 def consumer(q):  4     while True:  5         res=q.get()  6         # time.sleep(random.randint(1,3))
 7  time.sleep(random.random())  8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))  9         q.task_done() #向q.join()发送一次信号,证实一个数据已经被取走并执行完了
10 
11 def producer(name,q): 12     for i in range(10): 13         # time.sleep(random.randint(1,3))
14  time.sleep(random.random()) 15         res='%s%s' %(name,i) 16  q.put(res) 17         print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) 18     print('%s生产结束'%name) 19     q.join() #生产完毕,使用此方法进行阻塞,直到队列中全部项目均被处理。
20     print('%s生产结束~~~~~~'%name) 21 
22 if __name__ == '__main__': 23     q=JoinableQueue() 24     #生产者们:即厨师们
25     p1=Process(target=producer,args=('包子',q)) 26     p2=Process(target=producer,args=('骨头',q)) 27     p3=Process(target=producer,args=('泔水',q)) 28 
29     #消费者们:即吃货们
30     c1=Process(target=consumer,args=(q,)) 31     c2=Process(target=consumer,args=(q,)) 32     c1.daemon=True #若是不加守护,那么主进程结束不了,可是加了守护以后,必须确保生产者的内容生产完而且被处理完了,全部必须还要在主进程给生产者设置join,才能确保生产者生产的任务被执行完了,而且可以确保守护进程在全部任务执行完成以后才随着主进程的结束而结束。
33     c2.daemon=True 34 
35     #开始
36     p_l=[p1,p2,p3,c1,c2] 37     for p in p_l: 38  p.start() 39 
40     p1.join() #我要确保你的生产者进程结束了,生产者进程的结束标志着你生产的全部的人任务都已经被处理完了
41  p2.join() 42  p3.join() 43     print('') 44     
45     # 主进程等--->p1,p2,p3等---->c1,c2
46     # p1,p2,p3结束了,证实c1,c2确定全都收完了p1,p2,p3发到队列的数据
47     # 于是c1,c2也没有存在的价值了,不须要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,因此设置成守护进程就能够了。
JoinableQueue队列实现生产者消费者模型

 

 

 

 

 

 

 

 

 

 

 

 

 

同一个

相关文章
相关标签/搜索