守护进程html
1)正常的子进程没有执行完的时候主进程要一直等着python
2)守护进程的进程的做用:
守护进程会随着主进程的代码执行结束(此时主进程不必定结束)而结束。
3)守护进程是否结束的判断标准是 :
主进程的代码是否执行完。不是主进程是否结束。
4)主进程的代码执行完后会要等着全部子进程结束以后结束。
5)守护进程不会等待其它子进程结束,和子进程无关。但若是想让守护进程等待某个子程序结束后再结束,能够在主程序后使用p.join()设置阻塞,即主程序会等待该子进程,而等待这个代码是主进程的代码,即主进程代码没有执行完,因此守护进程不会结束。
6)守护进程 要在start以前设置
7)守护进程中 不能再开启子进程编程
1,开一个守护进程json
1)父进程作本身的事,开一个报时器子进程,每过一秒通报一次。可是父进程的代码执行完后子进程并不会结束。
2)设置守护进程后,父进程的代码执行完后,子进程也会结束。数组
import time
from multiprocessing import Process
def cal_time(): # 定义一个报时器
while True:
time.sleep(1)
print('过去了1秒')
if __name__ == '__main__':
p = Process(target=cal_time)
# 必定在开启进程以前设置 # 将接下来开启的一个进程设置成守护进程
# p.daemon = True
p.start()
for i in range(100): # 10s
time.sleep(0.1)
print('*'*i)
2,使守护进程等待一个子进程结束以后在结束安全
import time from multiprocessing import Process def func(): print('--'*10) time.sleep(15) print('--'*10) def cal_time(): while True: time.sleep(1) print('过去了1秒') if __name__ == '__main__': p = Process(target=cal_time) p.daemon = True # 必定在开启进程以前设置 p.start() p2 = Process(target=func) # 15s p2.start() for i in range(100): # 10s time.sleep(0.1) print('*'*i) p2.join() # 使守护进程等待一个子进程结束以后在结束
# 此处阻塞,子进程p2不结束主程序代码就不会执行完
process 中的部分方法网络
3,process 中的方法 p.is_alive() p.terminate()并发
# p.is_alive() # 是否活着 True表明进程还在 False表明进程不在了
# p.terminate() # 结束一个进程,可是这个进程不会马上被杀;强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁app
import time
from multiprocessing import Process
def func():
print('wahaha')
time.sleep(5)
print('qqxing')
if __name__ == '__main__':
p = Process(target=func)
p.start()
print(p.is_alive()) # True
time.sleep(0.1)
p.terminate() # 给操做系统发出结束进程的请求,# 是个异步操做,同步的会等在这。
print(p.is_alive()) # True #不会当即结束进程,会有一点延迟。
time.sleep(1)
print(p.is_alive()) # False # 进程结束。
# 结果:
# True
# wahaha
# True
# False
process 中的部分属性dom
4,process 中的属性
# pid 查看这个进程 进程id
# name 查看这个进程的名字
def func():
print('wahaha')
time.sleep(5)
print('qqxing')
if __name__ == '__main__':
p = Process(target=func)
p.start()
print(p.name,p.pid)
p.name = '哇哈哈哈' # 改进程的名字
print(p.name)
5,如何能在子进程里查看该子进程的name,pid。
class MyProcess(Process):
def run(self):
print('wahaha',self.name,self.pid)
time.sleep(5)
print('qqxing',self.name,self.pid)
if __name__ == '__main__':
p = MyProcess()
p.start()
print(p.pid)
锁
1)为何须要锁?
由于实现了异步进程,各进程互不干扰,当多个进程同时访问同一个文件时会出现问题。例如,文件中记录有10张票,(多我的)多个进程同时访问,拿到的记录是同样的,每一个进程拿到的都是10,每一个进程都执行的是10-1,而后有都写入文件,文件记录为9张票。事实上,咱们知道,不止卖了一张票。为防止此情况的出现,使用了锁的方式。
2)什么是锁?
一个文件设置一个锁,当一个进程访问文件时,其它进程就不能进入,当这个进程进行完该文件的操做。其它进程才能访问。
至关于,一个上锁的房间,门外挂着一把钥匙,要访问的进程都去强这把钥匙,抢到的进程拿着钥匙进入房间并锁上房间,处理完出来后在把钥匙还挂在门外,其它进程在抢。
3)锁 就是在并发编程中 保证数据安全
7,建立锁
from multiprocessing import Lock
lock = Lock() # 建立锁对象
lock.acquire() # 锁 拿钥匙
lock.acquire() # 锁 阻塞 当第一个拿钥匙的释放了钥匙才能拿钥匙
lock.release() # 释放锁 还钥匙
7.1,多进程 实现 并发
import json
import time
import random
from multiprocessing import Lock
from multiprocessing import Process
def search(i):
with open('ticket') as f:
print(i,json.load(f)['count'])
def get(i):
with open('ticket') as f:
ticket_num = json.load(f)['count']
time.sleep(random.random())
if ticket_num > 0:
with open('ticket','w') as f:
json.dump({'count':ticket_num-1},f)
print('%s买到票了'%i)
else:
print('%s没票了'%i)
def task(i,lock):
search(i) # 查看票
lock.acquire()
get(i) # 抢票
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(20): # 20我的同时抢票
p = Process(target=task,args=(i,lock))
p.start()
信号量
就是一个上锁的文件,外面挂了设定数量的钥匙,即房间最多存在设定数量的人数。
from multiprocessing import Semaphore
sem = Semaphore(4) # 设置为四个
sem.acquire() # 须要钥匙
print(0)
sem.acquire() # 须要钥匙
print(1)
sem.acquire() # 须要钥匙
print(2)
sem.acquire() # 须要钥匙
print(3)
sem.release()
sem.acquire() # 须要钥匙
print(4)
8, 实例
import time
import random
from multiprocessing import Semaphore
from multiprocessing import Process
def sing(i,sem):
sem.acquire()
print('%s : 进入 ktv'%i)
time.sleep(random.randint(1,10))
print('%s : 出 ktv'%i)
sem.release()
# 迷你唱吧 20我的,同一时间只能有4我的进去唱歌
if __name__ == '__main__':
sem = Semaphore(4)
for i in range(20):
Process(target=sing,args=(i,sem)).start()
事件
9,
# recv accept input sleep 同步阻塞
# lock锁 是异步阻塞
# 事件——异步阻塞
# 事件是一个通知,标志,能够同时使全部进程 都陷入阻塞
from multiprocessing import Event #事件
e = Event() # 实例化一个事件 标志 /交通讯号灯
e.wait() # 刚实例化出来的一个事件对象,阻塞信号 /是红灯
# 执行到wait,要先看灯,绿灯行红灯停,若是在停的过程当中灯绿了,就变成非阻塞了
e.set() # 将标志变成非阻塞 /交通灯变绿
e.clear() # 将标志又变成阻塞 /交通灯变红
e.is_set() # 判断是否阻塞 True就是绿灯 False就是红灯
10,实现一个红绿灯处随机经过的车辆,每一个数表明一个车辆(进程)
import time
import random
for i in range(100): # 一百辆车
if i%random.randint(3,6) == 0 : #随机生成一个数,用车号除以生成数。
time.sleep(random.randint(1,3)) #知足条件的车,等待随机几秒在经过
print('发生等待:',end=' ') # 标识
print('%s号车经过'%i)
11,实例:红绿灯
import time
import random
from multiprocessing import Process
from multiprocessing import Event
def traffic_light(e):
while True:
if e.is_set():
time.sleep(3)
print('红灯亮')
e.clear() # 绿变红
else:
time.sleep(3)
print('绿灯亮')
e.set() # 红变绿
def car(i,e):
e.wait()
print('%s车经过'%i)
if __name__ == '__main__':
e = Event() # 立一个红灯
tra = Process(target=traffic_light,args=(e,))
tra.start() # 启动一个进程来控制红绿灯
for i in range(100):
if i%6 == 0 :
time.sleep(random.randint(1,3))
car_pro = Process(target=car, args=(i,e))
car_pro.start()
进程间通讯——队列
12,队列
1.进程之间通讯 可使用multiprocessing 的 Queue模块
2.队列有两种建立方式 第一种不传参数 这个队列就没有长度限制 ;传参数,建立一个有最大长度限制的队列
3.提供两个重要方法;put() get()
4.qsize 可能出错
from multiprocessing import Queue
q = Queue(3) # 能够设置长度
q.put(1)
q.put(2)
q.put(3)
q.put(4)
print(q.get())队列
print(q.get())
print(q.get())
print(q.get()) # 若是队列里已经没有值了 就会阻塞等待有一个值
13,进程间通讯
一个进程往队列里放数据,一个进程能拿到队列里的数据
from multiprocessing import Process
from multiprocessing import Queue
def q_put(q):
q.put('hello')
def q_get(q):
print(q.get())
if __name__ =='__main__':
q = Queue()
p = Process(target=q_put,args=(q,))
p.start()
p1 = Process(target=q_get, args=(q,))
p1.start()
14, 生产者消费者模型
我要生产一个数据 而后 给一个函数 让这个函数依赖这个数据进行运算 拿到结果 —— 同步过程
# 但生产者生成数据很快,而消费者处理数据很慢,可使用多个相同的进程处理生产的数据。
# 作包子 和 吃包子
import time
def producer(q): # 生产者
for i in range(100): # 生产一百个包子
q.put('包子%s'%i)
def consumer(q): # 消费者
for i in range(100):
time.sleep(1)
print(q.get())
if __name__ == '__main__':
q = Queue(10) # 托盘 # 限制队列长度十,当队列满了,只有用一个才能在放一个新的。
# 防止占用内存
p = Process(target=producer,args=(q,))
p.start()
c1 = Process(target=consumer, args=(q,))
c2 = Process(target=consumer, args=(q,))
c1.start() #用两个消费者进程,处理数据。
c2.start()
15,解决 数据的供需不平衡
# 同步 生产数据 使用数据
# 异步 主进程 生产数据 子进程使用数据 —— 问题:500个子进程去处理数据 生产过慢
# 异步 子进程 生产数据 子进程处理数据 —— 问题:不知道生产慢仍是处理慢
15.1,
# 3个子进程 生产包子
# 2个子进程 吃包子
# 生产的快 吃的慢 包子溢出
# 生产的慢 吃得快 包子不够吃
# 若是增长生产者 —— 让生产变快
# 减小生产者 —— 找一个容器放 约束容器的容量
import time
import random
from multiprocessing import Queue
from multiprocessing import Process
def producer(q,food):
for i in range(5):
q.put('%s-%s'%(food,i))
print('生产了%s'%food)
# time.sleep(random.randint(1,3))
time.sleep(random.random())
q.put(None) # 有几个消费者进程,就要放入几个
q.put(None)
q.put(None)
def consumer(q,name):
while True:
food = q.get() # 生产者不生产仍是生产的慢
if food == None:break
print('%s 吃了 %s'%(name,food))
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer,args=(q,'包子'))
p1.start()
p2 = Process(target=producer, args=(q, '骨头'))
p2.start()
c1 = Process(target=consumer, args=(q, 'alex'))
c1.start()
c2 = Process(target=consumer, args=(q, 'egon'))
c2.start()
c3 = Process(target=consumer, args=(q, 'jin'))
c3.start()
# 队列不会乱,由于一次出一个,先进先出。
# 如今 经过queue
# 生产者消费者模型
#1.消费者要处理多少数据是不肯定的
#2.因此只能用while循环来处理数据 ,可是while循环没法结束
#3.须要生产者发送信号
#4.有多少个消费者 就须要发送多少个信号
#5.可是发送的信号数量须要根据 生产者和消费者的数量进行计算,因此很是不方便
16,JoinableQueue 解决上面问题
# 使用上面的方式建立的队列对象,有join和task_done两个方法。
# q.task_done() 代表完成队列中一个数据的处理。放到消费者代码后,当消费者处理完这个数据后执行这个代码。
# q.join() 放到生产者代码后,与上一个方法搭配使用,检查到队列中的全部数据都处理完了(taskdone)不在阻塞。生产者代码结束。
# 在主进程后加上每一个生产者进程的阻塞,消费者没处理完数据,生产者就不会结束,主进程中设置的生产者阻塞就不经过,即主进程代码不会执行完。
# 将每一个消费者进程设置成 守护进程,这样当主进程代码执行完,即生产者已经生产了全部数据并处理完毕。守护进程结束,即消费者进程结束。
import time
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue
# def producer(q,food):
# for i in range(5):
# q.put('%s-%s'%(food,i))
# print('生产了%s'%food)
# # time.sleep(random.randint(1,3))
# time.sleep(random.random())
# q.join() # 等待 消费者 把全部的数据都处理完
#
# def consumer(q,name):
# while True:
# food = q.get()
# print('%s 吃了 %s'%(name,food))
# q.task_done() # 这个数据处理完了
#
# if __name__ == '__main__':
# q = JoinableQueue()
# p1 = Process(target=producer,args=(q,'泔水'))
# p1.start()
# p2 = Process(target=producer, args=(q, '骨头'))
# p2.start()
# c1 = Process(target=consumer, args=(q, 'alex'))
# c1.daemon = True # 将消费者设置成守护进程
# c1.start()
# c2 = Process(target=consumer, args=(q, 'egon'))
# c2.daemon = True
# c2.start()
# c3 = Process(target=consumer, args=(q, 'jin'))
# c3.daemon = True
# c3.start()
#
# p1.join() # 等待p1执行完毕
# p2.join() # 等待p2执行完毕
#
# # 生产者生产的数据所有被消费 —— 生产者进程结束 —— 主进程代码执行结束 —— 消费者守护进程结束
管道 pipe
17,pipe与两个重要方法 send() recv()
from multiprocessing import Process
from multiprocessing import Pipe
p1,p2 = Pipe() #支持双向通讯
p1.send('hello')
print(p2.recv())
p2.send('hi')
# p2.close() # p2端关闭
print(p1.recv())
print(p1.recv()) # 若是p2端没有关闭,会发生阻塞
# p2端关闭,管道内没有数据可接收时,报错 EOFError
18,进程中用
from multiprocessing import Process
from multiprocessing import Pipe
def func(son)
print(son.recv()) # 接收到
print(son,recv()) #接收不到报错
if __name__ == '__main__':
foo,son = Pipe()
p = Process(target=func,args=(son,))
p.start()
foo.send('hello')
foo.close()
19,下面的状况关闭后没有报错而是阻塞
from multiprocessing import Process
from multiprocessing import Pipe
def func(p)
foo,son = p # 此处形成了一个管道两端各有两个口
# foo.close() # 关闭子进程的foo,recv没有接收到会报错。
# 可是子进程foo的关闭主进程没关闭,还会阻塞。
print(son.recv()) # 接收到
print(son,recv()) # 阻塞
if __name__ == '__main__':
foo,son = Pipe()
p = Process(target=func,args=((foo,son),))
p.start()
son.close()
foo.send('hello')
foo.close() # 关闭的是主进程的foo ,子进程的foo没有关闭。
# 因此主进程要及时关闭son ,子进程要关闭不用的foo
# 管道不安全,若是向上面同样,一个管道有多个发送端口有多个接收端口,当管道中有一个数据时,若是,两个接收端同时去拿这各数据,会形成这两个接收端都拿到了这个数据。
20,用管道也能实现生产者消费者模型
管道+锁
def func(p,l): foo, son = p foo.close() while True: try : l.acquire() print(son.recv()) # EOFError
l.release() except EOFError: l.release() son.close() break
def func2(p): foo, son = p son.close() for i in range(10): foo.send(i) foo.close() if __name__ == '__main__': foo,son = Pipe() l = Lock() p = Process(target=func,args=((foo,son),l)) p1 = Process(target=func,args=((foo,son),l)) p2 = Process(target=func,args=((foo,son),l)) p.start() p1.start() p2.start() p3 = Process(target=func2, args=((foo, son),)).start() p4 = Process(target=func2, args=((foo, son),)).start() p5 = Process(target=func2, args=((foo, son),)).start() p6 = Process(target=func2, args=((foo, son),)).start() p7 = Process(target=func2, args=((foo, son),)).start() son.close() foo.close()
21,管道或队列的先择
管道:多个机器多个进程间通讯
队列:用在同一台机器的多个进程之间通讯
Manager模块
22,Manager 模块
# Pipe 管道 : 双向通讯 数据不安全
# Queue 管道+锁: 双向通讯 数据安全
# JoinableQueue :数据安全
# put 和 get的一个计数机制 ,每次get数据以后发送task_done,put端接收到计数-1,直到计数为0就能感知到
# Manager是一个类 就提供了能够进行数据共享的一个机制 提供了不少数据类型 dict list
if __name__ == '__main__':
m = Manager()
d = m.dict() # 建立的字典进程间能共享的字典
print(d)
22.1,例
# Manager : dict list pipe ,并不提供数据安全的支持
# def func(dic):
# print(dic)
# # while True:
# # print(dic)
# # time.sleep(3)
# if __name__ == '__main__':
# m = Manager()
# d = m.dict({'count':0})
# print(d)
# # print(d)
# # d['count'] = 0
# # print(d)
# # d = {}
# p = Process(target=func,args=(d,))
# p.start()
# d['count'] = 0
22.2,例
# from multiprocessing import Manager,Process,Lock
# def work(d,lock):
# lock.acquire()
# d['count'] -= 1
# lock.release()
#
# if __name__ == '__main__':
# lock= Lock()
# m = Manager()
# dic=m.dict({'count':100}) # 共享的数据
# l = []
# for i in range(100):
# p=Process(target=work,args=(dic,lock))
# p.start()
# l.append(p)
# [p.join() for p in l]
# print(dic)
进程池
# 一个电脑四个核,无论开几个进程一次最多同时处理四个进程。
# 有100 个任务,要处理,造一个池子, 放四个进程(或五个不限,但多了没啥用)用来完成任务。
# 开启进程关进程,也须要时间,开的进程 多了花费时间就多
# 开的进程再多,一次最多处理四个进程。而进度池,是开四个或五个进程,反复使用完成任务。
概念介绍:
Pool([numprocess [,initializer [, initargs]]]):建立进程池
参数:
numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值 initializer:是每一个工做进程启动时要执行的可调用对象,默认为None initargs:是要传给initializer的参数组
主要方法:
1) p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。
'''须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()'''
2) p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。
'''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。'''
3) p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
4) P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用
其它方法:
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法
1) obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起。
2) obj.ready():若是调用完成,返回True
3) obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常
4) obj.wait([timeout]):等待结果变为可用。
5) obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数
23,建立进程池
23.1,
import os
import time
from multiprocessing import Pool
def func(i):
time.sleep(1) # 结果是五个五个的出来,即同时作五个任务。
print(i,os.getpid()) # 每五个进程的pid是同样的。
# 注意其实输出没顺序,各进程是异步的。
if __name__ == '__main__':
p = Pool(5) # 建立能容纳5个进程的进程池对象
p.map(func,range(20)) # 向池中提交任务
23.2,
import os
import time
from multiprocessing import Pool
def func(i):
time.sleep(1) # 结果是五个五个的出来,即同时作五个任务。
print(i,os.getpid()) # 每五个进程的pid是同样的。
# 注意其实输出没顺序,各进程是异步的。
if __name__ == '__main__':
p = Pool(5)
p.map(func,range(20))
p.close() # 是不容许再向进程池中添加任务,不是关闭对象。
p.join() # 必须close以后在join
print('====') # 加上上两句,才保证任务处理完以后,在执行这一句。
# 由于是异步的,不加上两句,这句可能会在,任务没处理完就执行了。
23.3,
import os
import time
import random
from multiprocessing import Pool
from multiprocessing import Process
def func(i):
i += 1
if __name__ == '__main__':
# 设置进度池,完成一百个任务(用时短,占内存小)
# 只开可5个进程
p = Pool(5) # 建立了5个进程
start = time.time()
p.map(func,range(1000)) # target = func args=next(iterable) 必须是可迭代的
#要传多个用元组 [(1,2,3),1,2,3,4]
p.close()
p.join()
print(time.time() - start)
start = time.time()
# 开一百个进程,完成一百个任务(用时长,占内存多)
# 开了一百个进程
l = []
for i in range(1000):
p = Process(target=func,args=(i,)) # 建立了一百个进程
p.start()
l.append(p)
[i.join() for i in l] # 等待全部进程结束
print(time.time() - start)
24,进程池的 apply 方法
import time
from multiprocessing import Pool
def func(i):
time.sleep(1)
i += 1
if __name__ == '__main__':
p = Pool(5)
for i in range(20):
# p.apply(func,args=(i,)) # apply是同步提交任务的机制,执行完一个任务才提交下一个任务。多进程等于没开。
p.apply_async(func,args=(i,)) # apply_async是异步提交任务的机制
p.close()
p.join()
回调函数
25,回调函数
25.1,回调函数的使用
import os
from multiprocessing import Pool
def func(i): # 多进程中的io多,
print('子进程%s:%s'%(i,os.getpid())) # 子进程pid
return i*'*'
def call(arg): # 回调函数是在主进程中完成的,不能传参数,只能接受多进程中函数的返回值
print('回调 :',os.getpid()) # 回调函数pid
print(arg)
if __name__ == '__main__':
print('---->',os.getpid()) # 主进程pid
p = Pool(5)
for i in range(10):
p.apply_async(func,args=(i,),callback=call)
# callback设置回调函数,即执行完func后返回的值不会再返回到调用它的地方,而是放回给回调函数作参数。
p.close()
p.join()
25.2,例子1
# 请求网页
# 网络延时 IO操做
# 单进程
# 10个页面 同时访问多个 —> 用多进程
# 分析页面:——>用回调函数
# from urllib.request import urlopen # 这个模块须要安装
# import requests
# ret = requests.get('http://www.baidu.com')
# print(ret.text) # 整个网页的源代码
# print(ret.status_code) # 访问网页的状态码
25.2,例子2
from urllib.request import urlopen
import requests
from multiprocessing import Pool
def get_url(url):
ret = requests.get(url)
return {'url':url,
'status_code':ret.status_code,
'content':ret.text}
def parser(dic):
print(dic['url'],dic['status_code'],len(dic['content']))
# 把分析结果写到文件里
if __name__ == '__main__':
url_l = [
'http://www.baidu.com',
'http://www.sogou.com',
'http://www.hao123.com',
'http://www.yangxiaoer.cc',
'http://www.python.org'
]
p = Pool(4)
for url in url_l:
p.apply_async(get_url,args=(url,),callback=parser)
p.close()
p.join()
25.3,爬虫实例
import re from urllib.request import urlopen from multiprocessing import Pool def get_page(url,pattern): response=urlopen(url).read().decode('utf-8') return pattern,response def parse_page(info): pattern,page_content=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0].strip(), 'title':item[1].strip(), 'actor':item[2].strip(), 'time':item[3].strip(), } print(dic) if __name__ == '__main__': regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>' pattern1=re.compile(regex,re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get()
25.4,若是在主进程中等待进程池中全部任务都执行完毕后,再统一处理结果,则无需回调函数
from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待进程池中全部进程执行完毕 nums=[] for res in res_l: nums.append(res.get()) #拿到全部结果 print(nums) #主进程拿到全部的处理结果,能够在主进程中进行统一进行处理
进程池的其余实现方式:https://docs.python.org/dev/library/concurrent.futures.html