1、生产者消费者程序员
主要是为解耦(借助队列来实现生产者消费者模型)安全
import queue # 不能进行多进程之间的数据传输app
(1)from multiprocessing import Queue 借助Queue解决生产者消费者模型,队列是安全的。异步
q = Queue(num)async
num :为队列的最大长度函数
q.get() # 阻塞等待获取数据,若是有数据直接获取,若是没有数据,阻塞等待spa
q.put() # 阻塞,若是能够继续往队列中放数据,就直接放,不能放就阻塞等待code
q.get_nowait() # 不阻塞,若是有数据直接获取,没有数据就报错对象
q.put_nowait() # 不阻塞,若是能往队列中放数据直接放,不能够就报错blog
(2)from multiprocessing import JoinableQueue # 可链接的队列
JoinableQueue 是继承Queue ,因此能够使用Queue中的方法
而且JoinableQueue 又多了两个方法
q.join() # 用于生产者。等待q.task_done的返回结果,经过返回结果,生产者就能得到消费者当前消费了多少个数据。
q.task_done() # 用于消费者,是指每消费队列中的一个数据,就给join返回一个标识。
1 from multiprocessing import Queue,Process,Pool,JoinableQueue 2
3
4 def consumer(q,lis): 5 while 1: 6 for i in lis: 7 print(i + '拿走了' + q.get()) 8 q.task_done() # get() 一次就会给生产者的join返回一次数据
9
10
11 def producer(q,name1): 12 for i in range(1,9): 13 q.put(name1 + '第%s号剑'% i) 14 q.join() # 记录了生产者往队列中添加了8个数据,此时会阻塞,等待消费返回8次数据,后生产者进程才会结束
15
16
17 if __name__ == '__main__': 18 q = JoinableQueue() # 实例化一个队列
19 p = Process(target=consumer,args=(q,['盖聂','卫庄','高渐离','胜七','掩日'])) 20 p1 = Process(target=producer,args=(q,'越王八剑')) 21 p.daemon = True # 注意是把消费者设置为守护进程,会随着主进程的结束而结束。
22 p.start() 23 p1.start() 24 p1.join() # 主进程会等待生产者进程结束后才结束,而生产者进程又会等待消费者进程消费完之后才结束。
2、进程之间的共享内存
from multiprocessing import Manager,Value
m = Manager()
num = m.dict({键 :值})
num = m.list([1,2,3])
from multiprocessing import Process,Manager,Value def func(num): for i in num: print(i - 1) # 结果为:0,1,2
if __name__ == '__main__': m = Manager() # 用来进程之间共享数据的
num = m.list([1,2,3]) p = Process(target=func,args=(num,)) p.start() p.join() # 等待func子进程执行完毕后结束
#################Value################
from multiprocessing import Process,Manager,Value def func1(num): print(num) num.value += 1 # 和Manager用法不同
print(num.value) if __name__ == '__main__': num = Value('i',123) # Manager里面不须要传参数
p = Process(target=func1,args=(num,)) p.start() p.join()
3、进程池
进程池:一个池子,里边有固定数量的进程。这些进程一直处于待命状态,一旦有任务来,立刻就去处理。
进程池还会帮程序员去管理池中的进程。
from multiprocessing import Pool
p = Pool(os.cpu_count() + 1)
进程池有三个方法:
map(func,iterable) 有返回值
iterable:可迭代对象,是把可迭代对象中的每一个元素一次传给任务函数当参数
from multiprocessing import Pool def func(num): num += 1
print(num) return num # 返回给map方法
if __name__ == '__main__': p = Pool() res = p.map(func,[i for i in range(10)]) # 参数为目标对象和可迭代对象
p.close() p.join() # 等待子进程结束
print('主进程',res) # res是一个列表
apply(func,args=()) :apply的实现是进程之间是同步的,池中的进程一个一个的去执行。
func :进程池中的进程执行的任务函数。
args :可迭代对象型的参数,是传给任务函数的参数。
同步处理任务时,不须要close和join
同步处理任务时,进程池中的全部进程是普通进程(主进程须要等待其子进程执行结束)
from multiprocessing import Pool def func(num): num += 1
return num if __name__ == '__main__': p = Pool(5) # 实例化5个进程
for i in range(100): res = p.apply(func,args=(i,)) # 这里传的参数是元祖,这里是同步执行
print(res)
apply_async(func,args=(),callback=None) :进城之间是异步的,
func :进程池中的进程执行的任务函数。
args :可迭代对象型的参数,是传给任务函数的参数
from multiprocessing import Pool def func(num): num += 1 return num if __name__ == '__main__': p = Pool(5) # 实例化5个进程 lis = [] for i in range(100): res = p.apply_async(func,args=(i,)) # 异步执行,5个进程同时去调用func lis.append(res) print(res) # 打印结果为 <multiprocessing.pool.ApplyResult object at 0x0347F3D0> p.close() # Pool中用apply_async异步执行时必须关闭进程 p.join() # 由于是异步执行因此须要等待子进程结束 print(lis) # 100个<multiprocessing.pool.ApplyResult object at 0x0347F3D0> 这种存放在列表中 [print(i.get()) for i in lis] # 输出100个数字[1......100]
callback :回调函数,就是说每当进程池中有进程处理完任务,返回的结果能够交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步是没有的
异步处理任务时,进程池中的全部进程是守护进程(主进程代码执行完毕守护进程就结束)
异步处理任务时,必需要加上close和join
回调函数的使用:
进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的处理操做
回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数。
from multiprocessing import Pool import requests import os def func(ulr): res = requests.get(ulr) print('func进程的pid:%s' % os.getpid(),'父进程的pid:%s' % os.getppid()) if res.status_code == 200: return ulr,res.text def cal_back(sta): # func中返回的值被自动调用,并当成形参传进来
ulr,text = sta print('callback回调函数的pid:%s'% os.getpid(),'父进程的pid:%s' % os.getppid()) # 回调函数的pid和父进程的pid同样
if __name__ == '__main__': p = Pool(5) lis = ['https://www.baidu.com', 'http://www.jd.com', 'http://www.taobao.com', 'http://www.mi.com', 'http://www.cnblogs.com', 'https://www.bilibili.com', ] print('父进程的pid:%s' % os.getpid()) for i in lis: p.apply_async(func,(i,),callback=cal_back) # 异步的执行每个进程,这里的传参和Process不一样,这里必须这样写callback=cal_back
# 异步执行程序func,在每一个任务结束后,在func中return回一个结果,这个结果会自动的被callback函数调用,并当成形参来接收。
p.close() # 进程间异步必须加上close()
p.join() # 等待子进程的结束
4、管道机制
from multiprocessing import Pipe
con1,con2 = Pipe()
管道是不安全的
管道是用于多进程之间通讯的一种方式。
若是在单进程中使用管道,con1发数据,那么就用con2来收数据
con2发数据,那么就用con1来收数据
若是在多进程中使用管道,那么就必须是父进程使用con1收,子进程就必须使用con2发
父进程使用con1发,子进程就必须使用con2收
父进程使用con2收,子进程就必须使用con1发
父进程使用con2发,子进程就必须使用con1收
在管道中有一个著名的错误叫作EOFError。是指,父进程若是关闭了发送端,子进程还继续收数据,那么就会引起EOFError。
# 单进程中管道的应用
from multiprocessing import Pipe con1,con2 = Pipe() # 管道机制
con1.send('123') # con1发送,须要con2来接收 是固定
print(con2.recv()) con2.send('456') # con2发送,须要con1来接收 是固定
print(con1.recv())
# 多进程中管道的应用
from multiprocessing import Process,Pipe def func(con): con1,con2 = con con1.close() # 由于子进程只用con2与父进程通讯,因此关闭了
while 1: try: print(con2.recv()) # 接收父进程con1发来的数据
except EOFError: # 若是父进程的con1发完数据,并关闭管道,子进程的con2继续接收数据,就会报错。
con2.close() # 当接到报错,此时数据已经接收完毕,关闭con2管道。
break # 退出循环
if __name__ == '__main__': con1,con2 = Pipe() p = Process(target=func,args=(con1,con2)) p.start() con2.close() # 由于父进程是用con1来发数据的,con2提早关闭。
for i in range(10): # 生产数据
con1.send('郭%s' % i) # 给子进程的con2发送数据
con1.close() # 生产完数据,关闭父进程的con1管道