进程池和线程池详解

进程池和线程池详解

##池
##进程池
##线程池
##为何要有池?
##10000
#池
# 预先的开启固定个数的进程数,当任务来临的时候,直接提交给已经开好的进程
# 让这个进程去执行就能够了
池的优势:
# 节省了进程,线程的开启 关闭 切换都须要时间
# 而且减轻了操做系统调度的负担
#html


一、开启进程池。任务少于进程数。开启进程池,这里也要放到if name == 'main':下

import os,time,random
from concurrent.futures import ProcessPoolExecutor
def func():
print('start',os.getpid())
time.sleep(random.randint(1,3))
print('end',os.getpid())
if name == 'main':
p=ProcessPoolExecutor(5)
p.submit(func)
-----------结果:
start 4516
end 4516多线程

二、开启的进程最多和进程池的大小同样。任务多于进程数

import os,time,random
from concurrent.futures import ProcessPoolExecutor
def func():
print('start',os.getpid())
time.sleep(random.randint(1,3))
print('end',os.getpid())
if name == 'main':
p=ProcessPoolExecutor(2)
for i in range(4):
p.submit(func)
----------------结果:
start 2548
start 5816
end 2548
start 2548
end 5816
start 5816
end 2548
end 5816
#分析:2548执行完任务以后又来了一个任务再次执行了一遍,总共开启的进程最可能是定义的进程池的大小。这里是以2个进程最大效率的执行这提交的4个任务并发

三、池是实现异步的

import os,time,random
from concurrent.futures import ProcessPoolExecutor
def func():
print('start',os.getpid())
time.sleep(random.randint(1,3))
print('end',os.getpid())
if name == 'main':
p=ProcessPoolExecutor(2)
for i in range(4):
p.submit(func)
print('main',os.getpid())
-------------结果:
main 5512
start 5352
start 5376
end 5376
start 5376
end 5352
start 5352
end 5376
end 5352
#先打印的的main。咱们经常须要的是等提交到进程池中的任务都执行完毕后再往下执行主进程中的代码。这时须要阻塞,池的阻塞是池.shutdown()app

四、submit和shutdown配合,进程池中任务都执行完毕再(作某件事)往下执行主进程代码(shutdown后面的)。池.shutdown()

import os,time,random
from concurrent.futures import ProcessPoolExecutor
def func():
print('start',os.getpid())
time.sleep(random.randint(1,3))
print('end',os.getpid())
if name == 'main':
p=ProcessPoolExecutor(2)
for i in range(4):
p.submit(func)
p.shutdown() #关闭池以后就不能继续提交任务,而且会阻塞,直到已经提交的任务完成
print('main',os.getpid())
------------------结果:
start 5780
start 796
end 5780
start 5780
end 5780
start 5780
end 796
end 5780
main 5640
#关闭池以后继续提交任务报错:RuntimeError: cannot schedule new futures after shutdowndom

五、池执行的函数中传参数,submit后面直接传参,可是池函数必须是第一个参数。

from concurrent.futures import ProcessPoolExecutor
def func(i):
print(i)
if name == 'main':
p=ProcessPoolExecutor(2)
for i in range(4):
p.submit(func,i)
p.shutdown()
----------结果:
0
1
2
3异步

#源码:def submit(self, fn, *args, **kwargs):函数

六、

import os,time,random
from concurrent.futures import ProcessPoolExecutor
def func(i):
time.sleep(random.randint(1,3))
return ii
if name == 'main':
p=ProcessPoolExecutor(2)
for i in range(4):
ret=p.submit(func,i)
print(ret,ret.result()) #这里有阻塞,取返回值结果
p.shutdown()
print('main',os.getpid())
-------------结果:
<Future at 0x2913090 state=finished returned int> 0
<Future at 0x293b770 state=finished returned int> 1
<Future at 0x293b810 state=finished returned int> 4
<Future at 0x293b7f0 state=finished returned int> 9
main 5384
#注释:ret后面的返回值是个Future对象,这个对象.result()返回池中任务函数的返回值。这个返回值在for循环中打印,那么执行完一个任务打印出结果才会提交下一个任务,这样就不是异步了,而是同步了,不是咱们用池实现异步的效果。
七、既能实现并发又能得到返回值。得到返回值的时候不在提交任务的循环中,得到一个对象就放到列表中,从列表中取返回值,不影响它任务的提交。
import os,time,random
from concurrent.futures import ProcessPoolExecutor
def func(i):
time.sleep(random.randint(1,3))
return i
i
if name == 'main':
p=ProcessPoolExecutor(2)
li=[]
for i in range(4):
ret=p.submit(func,i)
li.append(ret)
for ret in li:print('ret-->',ret.result()) #ret这里是同步阻塞
p.shutdown()
print('main',os.getpid())
--------------结果:
ret--> 0
ret--> 1
ret--> 4
ret--> 9
main 5928
八、进程池的特色:
#开销大
#一个池中的任务个数限制了咱们程序的并发个数url

九、线程池提交一个任务加传参

#从并发.将来导入线程程池执行者,实例化线程池执行者并指定线程个数,对象.提交(任务函数,参数(多个参数均可以))
import os,time,random
from concurrent.futures import ThreadPoolExecutor
def func(i):
print('start', os.getpid(),' arg:',i)
time.sleep(random.randint(1,3))
tp=ThreadPoolExecutor(20) #线程个数能开不少,通常开启cpu个数乘以4或5
tp.submit(func,1)
----------------结果:
start 2844 arg: 1spa

十、提交多个线程池任务

#for 循环提交多个任务
import os,time,random
from concurrent.futures import ThreadPoolExecutor
def func(i):
print('start', os.getpid(),' arg:',i)
time.sleep(random.randint(1,3))
tp=ThreadPoolExecutor(2)
for i in range(5):
tp.submit(func,i)
------------结果:
start 2100 arg: 0
start 2100 arg: 1
start 2100 arg: 2
start 2100 arg: 3
start 2100 arg: 4
#这个建立的顺序和执行的顺序好像差很少,可是是实现并发的。操作系统

十一、线程池获取线程任务函数的返回值

#接收提交,追加到列表,循环列表中对象,对象.结果()
import os,time,random
from concurrent.futures import ThreadPoolExecutor
def func(i):
print('start', os.getpid(),' arg:',i)
time.sleep(random.randint(1,3))
return i**2
tp=ThreadPoolExecutor(2)
ret_li=[]
for i in range(5):
ret=tp.submit(func,i)
ret_li.append(ret)
for ret in ret_li:print('ret-->',ret.result())
----------------结果:
start 2856 arg: 0
start 2856 arg: 1
start 2856 arg: 2
ret--> 0
start 2856 arg: 3
ret--> 1
start 2856 arg: 4
ret--> 4
ret--> 9
ret--> 16

十二、线程池中任务都结束才能再执行的代码

#tp.shutdown()线程池对象.关闭(),池中任务都结束才能执行关闭以后的代码
import os,time,random
from concurrent.futures import ThreadPoolExecutor
def func(i):
print('start', os.getpid(),' arg:',i)
time.sleep(random.randint(1,3))
tp=ThreadPoolExecutor(2)
for i in range(4):
ret=tp.submit(func,i)
tp.shutdown()
print('main')
------------------结果:
start 1468 arg: 0
start 1468 arg: 1
start 1468 arg: 2
start 1468 arg: 3
main

1三、线程池其它方法map(省代码量批量提交任务)

#批量建立线程池任务和建立好后获取每一个任务函数返回值更简便
#任务返回值列表变量=线程池对象.map(任务函数,传参可迭代对象)。建立任务数是可迭代对象个数
import os,time,random
from concurrent.futures import ThreadPoolExecutor
def func(i):
print('start', os.getpid(),' arg:',i)
time.sleep(random.randint(1,3))
return i**2
tp=ThreadPoolExecutor(2)
ret=tp.map(func,range(5))
for i in ret:print(i)
---------------结果:
start 2976 arg: 0
start 2976 arg: 1
start 2976 arg: 2
start 2976 arg: 3
0
1
start 2976 arg: 4
4

print(ret,type(ret))
--------结果:
<generator object Executor.map. .result_iterator at 0x02938300> <class 'generator'>

9
16

1四、多线程爬取网页,获取到结果后使用回调函数分析网页

import requests
from concurrent.futures import ThreadPoolExecutor
def get_page(url):
res = requests.get(url)
return {'url': url, 'content': res.text} #ret的结果会做为参数返回给绑定的函数
def parserpage(ret):
dic = ret.result()
print(dic['url'])
tp = ThreadPoolExecutor(5)
url_lst = [
'http://www.baidu.com', # 3
'http://www.cnblogs.com', # 1
'http://www.douban.com', # 1
'http://www.tencent.com',
'http://www.cnblogs.com/Eva-J/articles/8306047.html',
'http://www.cnblogs.com/Eva-J/articles/7206498.html',
]
#方法一:回调函数,使用回调函数简便并且效率高
for url in url_lst:
ret = tp.submit(get_page, url)
ret.add_done_callback(parserpage) #绑定的函数中的内容#谁先执行完谁先使用回调函数
-------------------结果:
http://www.baidu.com
http://www.cnblogs.com/Eva-J/articles/7206498.html
http://www.tencent.com
http://www.cnblogs.com/Eva-J/articles/8306047.html
http://www.douban.com
http://www.cnblogs.com

#方法二:非回调函数。
ret_l = []
for url in url_lst:
ret = tp.submit(get_page, url)
ret_l.append(ret)
for ret in ret_l:
parserpage(ret)
print("ret_l:", ret_l)
--------------------结果:
http://www.baidu.com
http://www.cnblogs.com
http://www.douban.com
http://www.tencent.com
http://www.cnblogs.com/Eva-J/articles/8306047.html
http://www.cnblogs.com/Eva-J/articles/7206498.html
ret_l: [<Future at 0x2db7150 state=finished returned dict>, <Future at 0x2e152d0 state=finished re

1五、回调函数的使用

import time
from concurrent.futures import ThreadPoolExecutor
def son():
print(123)
time.sleep(3)
return 123
def call_back(num):
print(num.result())
t = ThreadPoolExecutor(20)
obj = t.submit(son)
print('main : ',obj)
obj.add_done_callback(call_back)
----------结果;
123
main : <Future at 0x39f5b0 state=running>
123

#代码分析:
导入类:从并发.将来导入线程池执行者
建立池对象:线程池(大小)
获取返回值:用变量接收池对象.提交(任务函数),即将来对象
执行回调函数:将来对象.添加完成回调(回调函数)
回调函数接收一个参数,这个参数是obj将来对象,回调函数中取任务函数返回值就是这个参数.result()

1六、线程池小结

建立一个池子

tp = ThreadPoolExcutor(池中线程/进程的个数)

异步提交任务

ret = tp.submit(函数,参数1,参数2....)

获取返回值

ret.result()

在异步的执行完全部任务以后,主线程/主进程才开始执行的代码

tp.shutdown() 阻塞 直到全部的任务都执行完毕

map方法

ret = tp.map(func,iterable) 迭代获取iterable中的内容,做为func的参数,让子线程来执行对应的任务

for i in ret: 每个都是任务的返回值

回调函数

ret.add_done_callback(函数名)

要在ret对应的任务执行完毕以后,直接继续执行add_done_callback绑定的函数中的内容,而且ret的结果会做为参数返回给绑定的函数

5个进程

20个线程

5*20 = 100个并发

相关文章
相关标签/搜索