*****
模型: 就是解决某个问题套路
生产者: 指的是产生数据的一方 (一段代码)
消费者: 指的是处理数据的一方 (一段代码)python
生活中处处都是这种模型dom
例如:饭店 厨师就是生产者 吃饭的人就是消费者函数
例如: 先爬取网页数据(生产) 在解析网页数据 (消费)网站
# 消费任务 def eat(food): for i in range(10): # 要消费 time.sleep(random.randint(0, 2)) print(food,"吃完了!") # 生产任务 def make_rose(): for i in range(10): # 再生产 time.sleep(random.randint(0, 2)) print("第%s盘青椒肉丝制做完成!" % i) rose = "第%s盘青椒肉丝" % i eat(rose) # 直接调用消费任务 # 开启任务 make_rose()
生产者和消费,处理速度不平衡,一方快一方慢,致使一方须要等待另外一方 总体效率低下线程
本来,双方是耦合 在一块儿,消费必须等待生产者生成完毕在开始处理, 反过来翻译
若是消费消费速度太慢,生产者必须等待其处理完毕才能开始生成下一个数据code
1.将双方分开来.一方专门负责生成,一方专门负责处理队列
这样一来数据就不能直接传递了 由于消费者可能尚未处理完成,为了使生产者能够不断的生成,则须要一个共同的容器进程
2.生产者完成后放入容器,消费者从容器中取出数据ip
这样就解决了双方能力不平衡的问题,作的快的一方能够继续作,不须要等待另外一方
案例:
def eat(q): for i in range(10): # 要消费 rose = q.get() time.sleep(random.randint(0, 2)) print(rose,"吃完了!") # 生产任务 def make_rose(q): for i in range(10): # 再生产 time.sleep(random.randint(0, 2)) print("第%s盘青椒肉丝制做完成!" % i) rose = "第%s盘青椒肉丝" % i # 将生成完成的数据放入队列中 q.put(rose) if __name__ == '__main__': # 建立一个共享队列 q = Queue() make_p = Process(target=make_rose,args=(q,)) eat_p = Process(target=eat,args=(q,)) make_p.start() eat_p.start()
可翻译:为可join的队列
该队列相比普通的Queue的区别在于该对列额外增长的了join函数
该函数为阻塞函数,会阻塞直到等待队列中全部数据都被处理完毕。
q = JoinableQueue() q.put(1) q.get() q.join() #阻塞 等待队列中全部数据都被处理完毕 print("over")
执行以上函数,将致使进程没法结束,注释掉join调用就正常,发现join的确有阻塞的效果,
可是队列中一共就一个数据,明明已经调用get取出了,为何join依然阻塞呢?
这是由于get仅仅是取出数据,而join是等待数据处理完毕,也就是说:
取出数据还不算完,你处理完之后必须告知队列处理完毕,经过task_done
q = JoinableQueue() q.put(1) q.get() q.task_done() # 数据处理完毕 q.join() #阻塞 等待队列中全部数据都被处理完毕 print("over") #输出: # over
须要注意的时,task_done的调用次数必大于等于队列中的数据个数,join才能正常结束阻塞
q = JoinableQueue() q.put(1) q.put(1) q.get() q.task_done() # 数据处理完毕 q.join() #阻塞 等待队列中全部数据都被处理完毕 print("over") #输出: # over
主进程能够明确知道队列中的数据什么时候被处理完毕
回顾以前的生产者消费者模型中,生产者与消费者都明确要处理的数据数量,可是实际开发中不少状况是没法提早明确的,例如:要爬去一个网站上的全部页面,页面数量数不固定的
from multiprocessing import Process,JoinableQueue,Queue import time,random def producter(name,q): for i in range(5): time.sleep(random.randint(1,2)) print("\033[46m%s生产了 热狗%s\033[0m" % (name,i)) q.put("%s的 热狗%s" % (name,i)) def customer(name,q): while True: time.sleep(random.randint(1, 2)) hot_dog = q.get() print("\033[47m%s 吃掉了 %s \033[0m" % (name,hot_dog)) if __name__ == '__main__': q = Queue() p1 = Process(target=producter,args=("北京🌭店",q)) p2 = Process(target=producter,args=("上海🌭店",q)) p3 = Process(target=producter, args=("深圳🌭店", q)) p1.start() p2.start() p3.start() c1 = Process(target=customer,args=("王思聪",q)) c1.start()
上述代码没法正常运行结束,是由于消费者进程中不清楚处理是否处理完成,因此一直在循环等待数据。
此时咱们就可使用joinablequeue队列来让主进程获取生成者进程是否生成完毕的信号从而结束子进程
from multiprocessing import Process,JoinableQueue,Queue # q = JoinableQueue() # # q.put(1) # q.put(1) # # q.get() # q.task_done() # # # q.join() #阻塞 等待队列中全部数据都被处理完毕 # print("over") import time,random def producter(name,q): for i in range(5): time.sleep(random.randint(1,3)) print("\033[46m%s生产了 热狗%s\033[0m" % (name,i)) q.put("%s的 热狗%s" % (name,i)) def customer(name,q): while True: time.sleep(random.randint(1, 2)) hot_dog = q.get() print("\033[47m%s 吃掉了 %s \033[0m" % (name,hot_dog)) # 一个数据处理完毕 q.task_done() if __name__ == '__main__': # q = Queue() q = JoinableQueue() p1 = Process(target=producter,args=("北京🌭店",q)) p2 = Process(target=producter,args=("上海🌭店",q)) p3 = Process(target=producter, args=("深圳🌭店", q)) p1.start() p2.start() p3.start() c1 = Process(target=customer,args=("王思聪",q)) c1.daemon = True # 使子进程跟随主进程结束 c1.start() # 等待生产者进程所有生成完毕 p1.join() p2.join() p3.join() # 等待全部数据所有处理完毕 q.join() # 终止子进程 也能够开启子进程前将子进程设置为守护进程来结束子进程 # c1.terminate()
进程池:
进程池与线程池使用方法彻底一致,放到线程池一块儿讲