一 .进程池(multiprocess.Pool)css
1.进程池概念html
为何要有进程池?进程池的概念。
在程序实际处理问题过程当中,忙时会有成千上万的任务须要被执行,闲时可能只有零星任务。那么在成千上万个任务须要被执行的时候,咱们就须要去建立成千上万个进程么?首先
,建立进程须要消耗时间,销毁进程也须要消耗时间。第二即使开启了成千上万的进程,操做系统也不能让他们同时执行,这样反而会影响程序的效率。所以咱们不能无限制的根据
任务开启或者结束进程。那么咱们要怎么作呢?
在这里,要给你们介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等处处理完毕,进程并不关闭,而是将
进程再放回进程池中继续等待任务。若是有不少任务须要执行,池中的进程数量不够,任务就要等待以前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,
池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增长操做系统的调度难度,还节省了开闭进程的时间,也必定程度上可以实现并发效果。
Pool([numprocess [,initializer [, initargs]]]):建立进程池 1 numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值 2 initializer:是每一个工做进程启动时要执行的可调用对象,默认为None 3 initargs:是要传给initializer的参数组
import os
import time
import random
from multiprocessing import Pool
多进程和进程
from multiprocessing import Pool,Proces def run(n): for i in range(10): print(n+1) if __name__ == '__main__':
进程池 p=Pool(5) # 参数表示5个进程 (相似于5个cppu) p.map(run, range(50)) # 50任务 # 等同于上面只是时间上的差别 进程池更快 下面这个要慢带你 多进程 for i in range(50): Process(target=run,args=(i,)).start()
进程池和多进程效率对比
注意:map : 这个map在这里自带了join方法 不须要人为添加
def run(n): for i in range(10): print(n+1)
if __name__ == '__main__':
进程池 str1=time.time() p=Pool(5) # 参数表示5个进程 (相似于5个cppu) p.map(run, range(50)) # 50任务 这个map在这里自带了join方法 不须要人为添加 t1=time.time()-str1 # 0.26728272438049316 进程池执行时间 多进程 str2 = time.time() ret=[] for i in range(50): p1=Process(target=run,args=(i,)) ret.append(p1) p1.start() for i in ret: i.join() t2=time.time()-str2 # 2.7745769023895264 没有使用进程池执行时间 print(t1,t2)
map进程池传参数
# map : 这个map在这里自带了join方法 不须要人为添加 def run(n): for i in range(10): print(n+1) def run1(n): print(n) # ('aa', 100) print(n[1]) # 100 if __name__ == '__main__': p=Pool(5) # 参数表示5个进程 (相似于5个cppu) p.map(run, range(5)) # 50任务 这个map在这里自带了join方法 不须要人为添加 p.map(run1,[("aa",100)])
2.进程同步(apply)python
# 同步 import os,time from multiprocessing import Pool def work(n): print("开始 run%s"%n,os.getpid()) time.sleep(1) print("结束 run%s" % n, os.getpid()) if __name__ == '__main__': p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程当中可能有阻塞也可能没有阻塞 # 但无论该任务是否存在阻塞,同步调用都会在原地等着 print(res_l, "空") # # 执行结果: # 开始 run0 14036 # 结束 run0 14036 # 开始 run1 18312 # 结束 run1 18312 # 开始 run2 12744 # 结束 run2 12744 # 开始 run3 14036 # 结束 run3 14036 # 开始 run4 18312 # 结束 run4 18312 # 开始 run5 12744 # 结束 run5 12744 # 开始 run6 14036 # 结束 run6 14036 # 开始 run7 18312 # 结束 run7 18312 # 开始 run8 12744 # 结束 run8 12744 # 开始 run9 14036 # 结束 run9 14036 # [] 空 # 进程已结束,退出代码 0
3.进程异步( apply_async)git
# 异步 带问题的 import os import time import random from multiprocessing import Pool def work(n): print("开始 run%s" % n, os.getpid()) time.sleep(1) print("结束 run%s" % n, os.getpid()) if __name__ == '__main__': print("主进程") p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务 for i in range(10): res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行 # 返回结果以后,将结果放入列表,归还进程,以后再执行新的任务 # 注意这里是一个真异步 就是主进程不会等子进程结束 而是主进程执行完了不会管子进程 由于没有发感知进程池结束 # 执行结果 # 主进程 # 进程已结束,退出代码 0
# 异步 import os import time import random from multiprocessing import Pool def work(n): print("开始 run%s" % n, os.getpid()) time.sleep(1) print("结束 run%s" % n, os.getpid()) if __name__ == '__main__': p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务 for i in range(10): res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行 # 返回结果以后,将结果放入列表,归还进程,以后再执行新的任务 p.close() # 结束进程池接收任务 p.join() # 感知进程池中的任务结束 # 执行结果 # 开始 run0 12284 # 开始 run1 4864 # 开始 run2 18452 # 结束 run0 12284 # 开始 run3 12284 # 结束 run1 4864 # 开始 run4 4864 # 结束 run2 18452 # 开始 run5 18452 # 结束 run3 12284 # 开始 run6 12284 # 结束 run4 4864 # 开始 run7 4864 # 结束 run5 18452 # 开始 run8 18452 # 结束 run6 12284 # 开始 run9 12284 # 结束 run7 4864 # 结束 run8 18452 # 结束 run9 12284 # 进程已结束,退出代码 0
import os import time import random from multiprocessing import Pool def work(n): print("开始 run%s" % n, os.getpid()) time.sleep(1) print("结束 run%s" % n, os.getpid()) if __name__ == '__main__': print("主进程") p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务 res_l=[] for i in range(5): res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行 # 返回结果以后,将结果放入列表,归还进程,以后再执行新的任务 # 须要注意的是,进程池中的三个进程不会同时开启或者同时结束 # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。 res_l.append(res) # 异步apply_async用法:若是使用异步提交的任务,主进程须要使用jion,等待进程池内任务都处理完,而后能够用get收集结果 # 不然,主进程结束,进程池可能还没来得及执行,也就跟着一块儿结束了 p.close() p.join() # print(res_l,"》》》》》》》》》》") for res in res_l: print(res.get()) #使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get # 执行结果 主进程 开始 run0 14684 开始 run1 15180 开始 run2 18996 结束 run0 14684 开始 run3 14684 结束 run1 15180 开始 run4 15180 结束 run2 18996 结束 run3 14684 结束 run4 15180 None None None None None 进程已结束,退出代码 0
from multiprocessing import Pool import time,random,os def work(n): time.sleep(0.5) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待进程池中全部进程执行完毕 for aa in res_l: print(aa.get()) # 获取进程池中全部数据 # 0 # 1 # 4 # 9 # 16 # 25 # 36 # 49 # 64 # 81
# 若是在主进程中等待进程池中全部任务都执行完毕后,再统一处理结果,则无需回调函数。 from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待进程池中全部进程执行完毕 nums=[] for res in res_l: nums.append(res.get()) #拿到全部结果 print(nums) #主进程拿到全部的处理结果,能够在主进程中进行统一进行处理 # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4.进程池版 socketgithub
server
#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count()) #开启6个客户端,会发现2个客户端处于等待状态 #在每一个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程 from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): print('进程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(4) while True: conn,*_=server.accept() p.apply_async(talk,args=(conn,)) # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
client
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
5. 进程池回调函数(主要用于爬虫)json
须要回调函数的场景:进程池中任何一个任务一旦处理完了,就当即告知主进程: 我好了额,你能够处理个人结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
# 先执行fun1函数 fun1函数的返回值 做为回调函数的参数 而后去执行回调函数
import os import time import random from multiprocessing import Pool # 先执行fun1函数 fun1函数的返回值 做为回调函数的参数 而后去执行回调函数 def fun1(n): print("这是fun1函数",os.getpid()) return n*n def fun2(aa): # 参数只能回调哪一个参数 print("这是fun2函数",os.getpid()) print(aa) if __name__ == '__main__': print("主进程",os.getpid()) p=Pool(5) p.apply_async(fun1,args=(10,),callback=fun2) p.close() p.join() # 说明fun2回调函数 回到 主进程中调用 执行结果 主进程 19268 这是fun1函数 12740 这是fun2函数 19268 100 进程已结束,退出代码 0
import os import time import random from multiprocessing import Pool # 先执行fun1函数 fun1函数的返回值 做为回调函数的参数 而后去执行回调函数 def fun1(n): print("这是fun1函数",os.getpid()) return n*n def fun2(aa): # 参数只能回调哪一个参数 print("这是fun2函数",os.getpid()) print(aa) if __name__ == '__main__': print("主进程",os.getpid()) p=Pool(5) p.apply_async(fun1,args=(5,),callback=fun2) p.close() p.join() # 说明fun2回调函数 回到 主进程中调用 执行结果 主进程 7164 这是fun1函数 3272 这是fun2函数 7164 25 进程已结束,退出代码 0
爬虫案例数组
import requests from multiprocessing import Pool aa=requests.get("https://www.baidu.com") print(aa) print(aa.status_code) print(aa.text) print("11111",aa.content)
print(aa.__dict__) 查看里面属性
print("**************************************")
执行结果
<Response [200]>
200
<!DOCTYPE html>
<!--STATUS OK--><html> <head><meta http-equiv=content-type content=text/html;charset=utf-8><meta http-equiv=X-UA-Compatible content=IE=Edge><meta content=always name=referrer><link rel=stylesheet type=text/css href=https://ss1.bdstatic.com/5eN1bjq8AAUYm2zgoY3K/r/www/cache/bdorz/baidu.min.css><title>ç¾åº¦ä¸ä¸ï¼ä½ å°±ç¥é</title></head> <body link=#0000cc> <div id=wrapper> <div id=head> <div class="head_wrapper"> <div class="s_form"> <div class="s_form_wrapper"> <div id=lg> <img hidefocus=true src=//www.baidu.com/img/bd_logo1.png width=270 height=129> </div> <form id=form name=f action=//www.baidu.com/s class="fm"> <input type=hidden name=bdorz_come value=1> <input type=hidden name=ie value=utf-8> <input type=hidden name=f value=8> <input type=hidden name=rsv_bp value=1> <input type=hidden name=rsv_idx value=1> <input type=hidden name=tn value=baidu><span class="bg s_ipt_wr"><input id=kw name=wd class="s_ipt" value maxlength=255 autocomplete=off autofocus=autofocus></span><span class="bg s_btn_wr"><input type=submit id=su value=ç¾åº¦ä¸ä¸ class="bg s_btn" autofocus></span> </form> </div> </div> <div id=u1> <a href=http://news.baidu.com name=tj_trnews class="mnav">æ°é»</a> <a href=https://www.hao123.com name=tj_trhao123 class="mnav">hao123</a> <a href=http://map.baidu.com name=tj_trmap class="mnav">å°å¾</a> <a href=http://v.baidu.com name=tj_trvideo class="mnav">è§é¢</a> <a href=http://tieba.baidu.com name=tj_trtieba class="mnav">è´´å§</a> <noscript> <a href=http://www.baidu.com/bdorz/login.gif?login&tpl=mn&u=http%3A%2F%2Fwww.baidu.com%2f%3fbdorz_come%3d1 name=tj_login class="lb">ç»å½</a> </noscript> <script>document.write('<a href="http://www.baidu.com/bdorz/login.gif?login&tpl=mn&u='+ encodeURIComponent(window.location.href+ (window.location.search === "" ? "?" : "&")+ "bdorz_come=1")+ '" name="tj_login" class="lb">ç»å½</a>');
</script> <a href=//www.baidu.com/more/ name=tj_briicon class=bri style="display: block;">æ´å¤äº§å</a> </div> </div> </div> <div id=ftCon> <div id=ftConw> <p id=lh> <a href=http://home.baidu.com>å ³äºç¾åº¦</a> <a href=http://ir.baidu.com>About Baidu</a> </p> <p id=cp>©2017 Baidu <a href=http://www.baidu.com/duty/>使ç¨ç¾åº¦åå¿ è¯»</a> <a href=http://jianyi.baidu.com/ class=cp-feedback>æè§åé¦</a> 京ICPè¯030173å· <img src=//www.baidu.com/img/gs.gif> </p> </div> </div> </div> </body> </html>
11111 b'<!DOCTYPE html>\r\n<!--STATUS OK--><html> <head><meta http-equiv=content-type content=text/html;charset=utf-8><meta' \
b' http-equiv=X-UA-Compatible content=IE=Edge><meta content=always name=referrer><link rel=stylesheet type=text/css ' \
b'href=https://ss1.bdstatic.com/5eN1bjq8AAUYm2zgoY3K/r/www/cache/bdorz/baidu.min.css><title>\xe7\x99\xbe\xe5\xba\xa6\
xe4\xb8\x80\xe4\xb8\x8b\xef\xbc\x8c\xe4\xbd\xa0\xe5\xb0\xb1\xe7\x9f\xa5\xe9\x81\x93</title></head> <body link=#0000cc> ' \
b'<div id=wrapper> <div id=head> <div class="head_wrapper"> <div class="s_form"> <div class="s_form_wrapper"> <div id=lg> <img' \
b'hidefocus=true src=//www.baidu.com/img/bd_logo1.png width=270 height=129> </div> <form id=form name=f action=//www.baid' \
b'u.com/s class="fm"> <input type=hidden name=bdorz_come value=1> <input type=hidden name=ie value=utf-8> <input type=hidde' \
b'n name=f value=8> <input type=hidden name=rsv_bp value=1> <input type=hidden name=rsv_idx value=1> <input type=hidden n' \
b'ame=tn value=baidu><span class="bg s_ipt_wr"><input id=kw name=wd class="s_ipt" value maxlength=255 autocomplete=off auto' \
b'focus=autofocus></span><span class="bg s_btn_wr"><input type=submit id=su value=\xe7\x99\xbe\xe5\xba\xa6\xe4\xb8\x80\xe' \
b'4\xb8\x8b class="bg s_btn" autofocus></span> </form> </div> </div> <div id=u1> <a href=http://news.baidu.com name=tj_trn' \
b'ews class="mnav">\xe6\x96\xb0\xe9\x97\xbb</a> <a href=https://www.hao123.com name=tj_trhao123 class="mnav">hao123</a> <a hre' \
b'f=http://map.baidu.com name=tj_trmap class="mnav">\xe5\x9c\xb0\xe5\x9b\xbe</a> <a href=http://v.baidu.com name=tj_trvideo ' \
\xe9\xa6\x88</a> \xe4\xba\xacICP\xe8\xaf\x81030173\xe5\x8f\xb7 ' \
b'<img src=//www.baidu.com/img/gs.gif> </p> </div> </div> </div> </body> </html>\r\n'
def get_ali(url): # print(url) # res=requests.get(url) # print(res) # print(res.status_code) # print(res.text) res=requests.get(url) if res.status_code==200: return res.content.decode("utf-8"),url,res.text def show(args): cont,url,text=args print(cont,text) print(url,len(cont)) if __name__=="__main__": ret=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(5) for url in ret: p.apply_async(get_ali,args=(url,),callback=show) p.close() p.join()
使用多进程请求多个url来减小网络等待浪费的时间
from multiprocessing import Pool import requests import json import os def get_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的结果,其实彻底不必拿该结果,该结果已经传给回调函数处理了 ''' 打印结果: <进程3388> get https://www.baidu.com <进程3389> get https://www.python.org <进程3390> get https://www.openstack.org <进程3388> get https://help.github.com/ <进程3387> parse https://www.baidu.com <进程3389> get http://www.sina.com.cn/ <进程3387> parse https://www.python.org <进程3387> parse https://help.github.com/ <进程3387> parse http://www.sina.com.cn/ <进程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''
import re from urllib.request import urlopen from multiprocessing import Pool def get_page(url,pattern): response=urlopen(url).read().decode('utf-8') return pattern,response def parse_page(info): pattern,page_content=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0].strip(), 'title':item[1].strip(), 'actor':item[2].strip(), 'time':item[3].strip(), } print(dic) if __name__ == '__main__': regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>' pattern1=re.compile(regex,re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() {'index': '1', 'title': '绝杀慕尼黑', 'actor': '主演:弗拉基米尔·马什科夫,约翰·萨维奇,伊万·科列斯尼科夫', 'time': '上映时间:2019-06-13'} {'index': '2', 'title': '千与千寻', 'actor': '主演:柊瑠美,周冬雨,入野自由', 'time': '上映时间:2019-06-21'} {'index': '3', 'title': '命运之夜——天之杯II :迷失之蝶', 'actor': '主演:杉山纪彰,下屋则子,神谷浩史', 'time': '上映时间:2019-07-12'} {'index': '4', 'title': '玩具总动员4', 'actor': '主演:汤姆·汉克斯,蒂姆·艾伦,安妮·波茨', 'time': '上映时间:2019-06-21'} {'index': '5', 'title': '扫毒2天地对决', 'actor': '主演:刘德华,古天乐,苗侨伟', 'time': '上映时间:2019-07-05'} {'index': '6', 'title': '蜘蛛侠:英雄远征', 'actor': '主演:汤姆·赫兰德,杰克·吉伦哈尔,塞缪尔·杰克逊', 'time': '上映时间:2019-06-28'} {'index': '7', 'title': '爱宠大机密2', 'actor': '主演:帕顿·奥斯瓦尔特,冯绍峰,凯文·哈特', 'time': '上映时间:2019-07-05'} {'index': '8', 'title': '狮子王', 'actor': '主演:唐纳德·格洛弗,塞斯·罗根,詹姆斯·厄尔·琼斯', 'time': '上映时间:2019-07-12'} {'index': '9', 'title': '机动战士高达NT', 'actor': '主演:榎木淳弥,村中知,松浦爱弓', 'time': '上映时间:2019-07-12'} {'index': '10', 'title': '最好的咱们', 'actor': '主演:陈飞宇,何蓝逗,惠英红', 'time': '上映时间:2019-06-06'}
6. 进程池返回值网络
异步获取返回值
def fun (i): # time.sleep(1) return i*i if __name__ == '__main__': p=Pool(5) for i in range(6): res=p.apply_async(fun,args=(i,)) # # print(res) print(res.get()) #等待计算结果 阻塞获取结果 等待下次循环 这里至关于join # print(res.get())获取到值可是和同步同样 可是咱们这里是异步 因此异步get阻塞
解决方案
import os import time import random from multiprocessing import Pool # 异步获取返回值 表示五个五个返回数据 def fun (i): time.sleep(1) return i*i if __name__ == '__main__': p=Pool(5) ret=[] for i in range(22): res=p.apply_async(fun,args=(i,)) # ret.append(res) #等待计算结果 阻塞获取结果 等待下次循环 这里至关于join for s in ret: print( s.get()) # 异步获取返回值 表示五个五个返回数据
异步获取返回值 map并发
# map 自带join方法 close 方法 map 会先把每个计算结果 添加到列表中 在返回 列表 def fun (i): time.sleep(1) return i*i if __name__ == '__main__': p=Pool(5) res=p.map(fun,range(5)) # print(res) # [0, 1, 4, 9, 16]
同步获取返回值
def fun (i): return i*i if __name__ == '__main__': p=Pool(5) for i in range(5): res=p.apply(fun,args=(i,)) # apply结果就是fun的返回值 print(res) # 0 # 1 # 4 # 9 # 16