进程池也是经过事先划分一块系统资源区域,这组资源区域在服务器启动时就已经建立和初始化,用户若是想建立新的进程,能够直接取得资源,从而避免了动态分配资源(这是很耗时的)。html
线程池内子进程的数目通常在3~10个之间,当有新的任务来到时,主进程将经过某种方式选择进程池中的某一个子进程来为之服务。相比于动态建立子进程,选择一个已经存在的子进程的代价显得小得多(进程开启过多,效率反而会降低,开启进程是须要占用系统资源的,并且开启多余核数目的进程也没法作到并行)。python
Pool能够提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,若是池尚未满,那么就会建立一个新的进程用来执行该请求;但若是池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。正则表达式
Pool([numprocess [,initializer [, initargs]]]):建立进程池 numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值,若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个进程去执行全部任务,不会开启其余进程 initializer:是每一个工做进程启动时要执行的可调用对象,默认为None initargs:是要传给initializer的参数组
p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。 p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时, 将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。''' p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成 P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用 map():Pool类中的map方法,与内置的map函数用法行为基本一致,即针对每一个参数都进行func()处理,它会使进程阻塞直到结果一块儿总体返回。 注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。 terminal():结束工做进程,不在处理未处理的任务。 join():主进程阻塞等待子进程的退出,join方法必须在close或terminate以后使用。
例子1数组
from multiprocessing import Pool import time import os def func1(n): print("子进程%s开始%s"%(n,os.getpid())) time.sleep(2) print("子进程%s结束%s"%(n,os.getpid())) return n def func2(n): for i in range(10): print(n+2) if __name__ == "__main__": p = Pool(5) #5个进程,默认是cpu的核心数 res = p.map(func1,range(10)) #10个任务,参数必须是可迭代的 print(res) #返回值是带全部子进程的结果的列表 # 默认异步的执行任务,且自带close和joi # # p.map(funcname,iterable) 默认异步的执行任务,且自带close和join
例子2服务器
from multiprocessing import Pool import time import os def func(n): print("进程池中的进程开始%s"%n,os.getpid()) time.sleep(2) print("进程池中的进程结束%s"%n,os.getpid()) if __name__ == "__main__": pool = Pool(5) for i in range(20): # pool.apply(func,args=(i,)) #同步的执行 pool.apply_async(func,args=(i,)) #异步的执行 pool.close() #结束进程池提交任务 pool.join() # 感知进程池中的任务执行结束
方法apply_async()和map_async()的返回值是AsyncResul的实例obj对象。网络
实例具备如下方法app
obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起异常。若是远程操做中引起了异常,它将在调用此方法时再次被引起。异步
obj.ready():若是调用完成,返回Trueasync
obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常函数
obj.wait([timeout]):等待结果变为可用。
obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数
例子1
from multiprocessing import Pool import time def func(n): print("进程%s"%n) time.sleep(2) return n*n if __name__ == "__main__": p = Pool(5) #5个进程 res_l = [] for i in range(10): # # res = p.apply(func,args=(i,)) #同步 # print(res) # res_l.append(res) res = p.apply_async(func,args=(i,)) #异步 # print(res.get()) #注意异步时提交返回值须要用get()方法获取,get()方法自带join()效果, #若是在这里打印则会出现和同步时同样的效果 res_l.append(res) # 同步 # print(res_l) # 异步 for i in res_l: print(i.get())
执行结果:同步时是一个接一个的慢慢返回结果,异步时很快的返回整个结果,可能会出现顺序乱的状况
例子2
import time from multiprocessing import Pool def func(i): time.sleep(0.5) return i*i if __name__ == '__main__': p = Pool(5) ret = p.map(func,range(10)) print(ret)
执行结果:总体一块儿输出,map()自带join()效果,所以会阻塞,而后总体一块儿输出结果
能够为进程池或线程池内的每一个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值看成参数,该函数称为回调函数。
简单的说就是进程池中任何一个任务一旦处理完了,就当即告知主进程:我好了,你能够处理个人结果了。主进程则调用一个函数去处理该结果,该函数即回调函数。
咱们能够把耗时间(阻塞)的任务放到进程池中,而后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
简单例子
from multiprocessing import Pool import time import os def func1(n): print("子进程",os.getpid()) time.sleep(1) return n*n def func2(m): print("回调函数",os.getpid()) print("回调函数",m) if __name__ == "__main__": p = Pool(5) for i in range(10): p.apply_async(func1,args=(i,),callback=func2) p.close() p.join() print("主进程",os.getpid()) # 从执行结果能够看出执行回调函数的进程是主进程
回调函数最多的在爬虫程序过程当中使用的较多,爬取网页用子进程,处理数据用回调函数。爬虫:即网络爬虫,能够简单的理解为爬取网页源码的程序,在python中能够使用requests模块实现。
例子1
import requests from multiprocessing import Pool def get(url): responds = requests.get(url) if responds.status_code == 200: return url,responds.content.decode("utf-8") def call_back(args): url,content = args print(url,len(content)) if __name__ == "__main__": url_li = ["https://www.bitmain.com/", "https://www.feixiaohao.com/", "https://www.jinse.com/", "http://www.avalonminer.shop/", "http://www.innosilicon.com/html/product/index.html" ] p = Pool(4) for url in url_li: p.apply_async(get,args=(url,),callback=call_back) p.close() p.join() # 在爬虫爬取网页过程当中,主要耗时间的是爬取的过程, # 假设这里的call_back()是放在子进程中执行,则耗费了更多的时间, # 与此同时主进程一直是空闲的,这里的call_back()放在主进程执行,节省了程序执行的时间
例子2
import re from urllib.request import urlopen import requests from multiprocessing import Pool def get_page(url,pattern): response=urlopen(url).read().decode('utf-8') #这里拿到的是有格式的网页源码 return pattern,response # 正则表达式编译结果 网页内容 def get(url,pattern): headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/51.0.2704.63 Safari/537.36'} responds = requests.get(url,headers=headers,timeout=30) res = responds.content.decode("utf-8") #这里拿到的是简化版的无格式的网页源码 return pattern,res def parse_page(info): pattern,page_content=info print("网页内容长度",len(page_content)) # print(page_content) res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0].strip(), 'title':item[1].strip(), 'actor':item[2].strip(), 'time':item[3].strip(), } print(dic) if __name__ == '__main__': regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>' pattern1=re.compile(regex,re.S) url_dic={'http://maoyan.com/board/7':pattern1} p=Pool() res_l=[] for url,pattern in url_dic.items(): # res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res=p.apply_async(get,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get()