前面咱们已经将线程并发编程与进程并行编程所有摸了个透,其实我第一次学习他们的时候感受很是困难甚至是吃力。由于概念实在是太多了,各类锁,数据共享同步,各类方法等等让人十分头痛。因此这边要告诉你一个好消息,前面的全部学习的知识点其实都是为本章知识点作铺垫,在学习了本章节的内容后关于如何使用多线程并发与多进程并行就采起本章节中介绍的方式便可。html
这里要介绍一点与以前内容不一样的地方,即若是使用队列进行由进程池建立的进程之间数据共享的话不论是multiprocessing
模块下的Queue
仍是queue
模块下的Queue
都不能为进程池中所建立的进程进行数据共享,咱们须要用到另外一个队列即multiprocessing.Manager()
中的Queue
。固然这个我也会在下面介绍到。那么开始学习吧!python
官方文档编程
最先期的Python2中是没有线程池这一律念的,只有进程池。直到Python3的出现才引入了线程池,其实关于他们的使用都是很是简单,并且接口也是高度统一甚至说如出一辙的。而线程池与进程池的做用便是为了让咱们可以更加便捷的管理线程或进程。多线程
咱们先说一下,若是须要使用线程池或进程池,须要导入模块concurrent.futures
。并发
from concurrent.futures import ThreadPoolExecutor
# 线程池执行器app
from concurrent.futures import ProcessPoolExecutor
# 进程池执行器框架
这里介绍一下,关于线程池或者进程池建立出的线程与进程与咱们使用multiprocessing
模块或者threading
模块中建立的线程或进程有什么区别。咱们以多线程为例:异步
import threading def task(): ident = threading.get_ident() print(ident) # 销毁当前执行任务的线程 if __name__ == '__main__': for i in range(10): t1 = threading.Thread(target=task,) # 领任务 t1.start() # 等待CPU调度,而不是当即执行 # 执行 # ==== 执行结果 ==== Ps:能够看到每一个线程的id号都不同,这也印证了图上说的。 """ 10392 12068 5708 13864 2604 7196 7324 9728 9664 472 """
import threading from concurrent.futures import ThreadPoolExecutor # 线程池执行器 def task(): ident = threading.get_ident() print(ident) # 结束任务,不销毁当前执行任务的线程,直到全部任务都执行完毕。 if __name__ == '__main__': pool = ThreadPoolExecutor(max_workers=2) # 这里表明有2个线程能够领取任务 for i in range(10): pool.submit(task) # 执行器启动任务,将这些任务给2我的分配,也就是说task这个任务会被这2个线程不断的执行,直到执行完毕后这2个线程才会死亡 # ==== 执行结果 ==== Ps:能够看到这里都让这2个线程把任务接了,内存开销相比于上面的要小。 """ 7272 7272 7272 7272 11596 7272 11596 11596 11596 11596 """
执行器方法大全 | |
---|---|
submit(fn, *args, **kwargs) | 调度可调用对象 fn ,以 fn(*args **kwargs) 方式执行并返回 |
map(func, *iterables, timeout=None, chunksize=1) | 相似于 |
shutdown(wait=True) | 等待,相似join() 方法,而且在全部的任务完成后关闭执行器。wait=True 为关闭,为False 则是不关闭执行器的意思。 |
Ps:其实对于线程池或进程池来讲,他们的池都有一个官方的名称叫作执行器,接口都是同样的。那么接下来我就将线程池进程池这样的名字换作执行器了,也是方便理解。 |
其实关于执行器的使用,咱们有两种方式,一种是依赖于with
语句,一种是不依赖于with
语句,那么我在这里推荐使用依赖于wait语句的执行器。ide
不依赖于with
语句的执行器使用:函数
import threading from concurrent.futures import ThreadPoolExecutor # 线程池执行器 def task(): print("执行了") if __name__ == '__main__': pool = ThreadPoolExecutor(max_workers=2) # 这里表明有2个线程能够领取任务 , 对于线程池来说它是默认值是CPU核心数+4,对于进程池来说最大开启的进程数是CPU核心数。 for i in range(10): pool.submit(task) # 执行器启动任务,将这些任务给2我的分配,也就是说task这个任务会被这2个线程不断的执行,直到执行完毕后这2个线程才会死亡 # ==== 执行结果 ==== Ps:能够看到这里都让这2个线程把任务接了,内存开销相比于上面的要小。 """ 执行了 执行了 执行了 执行了 执行了 执行了 执行了 执行了 执行了 执行了 """
依赖于with
语句的执行器使用:
import threading from concurrent.futures import ThreadPoolExecutor # 线程池执行器 def task(): print("执行了") # 销毁 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool: # 这里表明有2个线程能够领取任务 , 对于线程池来说它是默认值是CPU核心数+4,对于进程池来说最大开启的进程数是CPU核心数。 for i in range(10): pool.submit(task) # 执行器启动任务,将这些任务给2我的分配,也就是说task这个任务会被这2个线程不断的执行,直到执行完毕后这2个线程才会死亡 # ==== 执行结果 ==== Ps:能够看到这里都让这2个线程把任务接了,内存开销相比于上面的要小。 """ 执行了 执行了 执行了 执行了 执行了 执行了 执行了 执行了 执行了 执行了 """
期程对象(由执行器执行的任务的返回结果)方法大全 | |
---|---|
方法/属性名称 | 功能描述 |
cancel() | 尝试取消调用。 若是调用正在执行或已结束运行不能被取消则该方法将返回 False ,不然调用会被取消而且该方法将返回 True 。 |
cancelled() | 若是调用成功取消返回 True 。 |
running() | 若是调用正在执行并且不能被取消那么返回 True 。 |
done() | 若是调用已被取消或正常结束那么返回 True 。 |
result(timeout=None) | 即获取任务的返回结果,最大等待timeout秒,如不设置则死等,超时触发CancelledError 异常。 |
add_done_callback(fn) | 增长回调函数fn ,这个fn 应该至少有一个形参来接收当前期程对象。 |
exception(timeout=None) | 返回由调用引起的异常。若是调用还没完成那么这个方法将等待 timeout 秒。若是在 timeout 秒内没有执行完成,concurrent.futures.TimeoutError 将会被触发。timeout 能够是整数或浮点数。若是 timeout 没有指定或为 None ,那么等待时间就没有限制。 |
Ps:还有一些期程对象的方法没有举例出来。详情参见文档 |
咱们能够看到,咱们上面的函数并无返回值,若是有返回值的话怎么办呢?
import threading from concurrent.futures import ThreadPoolExecutor # 线程池执行器 def task(): print("执行了") return "玫瑰花" # 销毁 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool: res = pool.submit(task) print(res) # <Future at 0x2539ea97850 state=finished returned str> 这个就是期程对象,能够看到他里面还有当前任务的执行状态。 finished = 执行完了的意思 print(res.result()) # 经过该方法就能够拿到任务的返回结果 # ==== 执行结果 ==== """ 执行了 <Future at 0x2539ea97850 state=finished returned str> 玫瑰花 """
期程对象,也被称为将来对象,是一个很是重要的概念。这里能够记一笔,在Django
框架中也有些地方采起了期程对象这样的设定,这是后话,后面再聊。
咱们尝试着将它的任务数量增多,发现使用期程对象直接获取任务结果会致使阻塞,怎么解决?
import time import threading from concurrent.futures import ThreadPoolExecutor # 线程池执行器 def task(x): print("执行了,这是第%s个任务"%x) time.sleep(3) return "玫瑰花" # 销毁 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool: for i in range(10): res = pool.submit(task,i) print(res.result()) # 每次获取结果的时候都是阻塞,怎么办?这个速率就变得很是的Low逼了。 # ==== 执行结果 ==== """ 执行了,这是第0个任务 玫瑰花 执行了,这是第1个任务 玫瑰花 执行了,这是第2个任务 玫瑰花 执行了,这是第3个任务 玫瑰花 执行了,这是第4个任务 玫瑰花 执行了,这是第5个任务 玫瑰花 执行了,这是第6个任务 玫瑰花 执行了,这是第7个任务 玫瑰花 执行了,这是第8个任务 玫瑰花 执行了,这是第9个任务 玫瑰花 """
我这里有一个办法,能够值得尝试一下。就是执行器自己有个方法shutdown(wait=True)
,它会致使当前主线程的阻塞。那么咱们就能够这样操做,主程序阻塞住,再将启程对象所有放到一个列表中,当全部任务处理完毕后阻塞通行,这个时候咱们再循环这个列表拿出其中的结果。
import time import threading from concurrent.futures import ThreadPoolExecutor # 线程池执行器 def task(x): print("执行了,这是第%s个任务"%x) time.sleep(3) return "玫瑰花" # 销毁 if __name__ == '__main__': res_list = [] # 用于存放全部期程对象 with ThreadPoolExecutor(max_workers=2) as pool: for i in range(10): res = pool.submit(task,i) res_list.append(res) # 将期程对象放入列表 pool.shutdown(wait=True) # 表明必须将全部子线程的任务跑完再继续向下执行主线程。 for i in res_list: print(i.result()) # ==== 执行结果 ==== """ 执行了,这是第0个任务 执行了,这是第1个任务 执行了,这是第2个任务 执行了,这是第3个任务 执行了,这是第4个任务 执行了,这是第5个任务 执行了,这是第6个任务 执行了,这是第7个任务 执行了,这是第8个任务 执行了,这是第9个任务 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 """
若是你以为这种方法很赞,我只能送你两个字,太low了。咱们注意执行器的submit()
方法,这玩意儿是异步提交。异步提交的结果须要用到回调函数来进行调用,咱们来看一下它有多牛逼。
import time import threading from concurrent.futures import ThreadPoolExecutor # 线程池执行器 def task(x): print("执行了,这是第%s个任务"%x) time.sleep(3) return "玫瑰花" # 销毁 def callback(res): # 必须有一个形参,来接收期程对象 print(res.result()) # 打印结果,即task任务的返回结果 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool: for i in range(10): res = pool.submit(task,i) res.add_done_callback(callback) # <--- 增长回调函数,当期程对象中的任务处理状态完毕后将自动调用回调函数 # ==== 执行结果 ==== # 异步提交牛逼不?只要任务返回了咱们立马就能够获取到结果进行处理。 """ 执行了,这是第0个任务 执行了,这是第1个任务 玫瑰花 玫瑰花 执行了,这是第2个任务 执行了,这是第3个任务 玫瑰花 玫瑰花 执行了,这是第4个任务 执行了,这是第5个任务 玫瑰花 玫瑰花 执行了,这是第6个任务 执行了,这是第7个任务 玫瑰花 玫瑰花 执行了,这是第8个任务 执行了,这是第9个任务 玫瑰花 玫瑰花 """
当咱们使用进程池执行器启动多进程执行任务时,若是想用数据共享,单纯multiprocessing.Queue
进程队列并不支持。
import multiprocessing from concurrent.futures import ProcessPoolExecutor # 进程池执行器 def task_1(q): q.put("玫瑰花") print("放完了...") def task_2(q): print(q.get()) print("取到了") if __name__ == '__main__': q = multiprocessing.Queue() with ProcessPoolExecutor(max_workers=2) as pool: pool.submit(task_1,q) pool.submit(task_2,q) # ==== 执行结果 ==== # 阻塞住 """ """
这个时候咱们须要用到multiprocessing
中的Manager()
中的Queue
。
from multiprocessing import Manager from concurrent.futures import ProcessPoolExecutor # 进程池执行器 def task_1(q): q.put("玫瑰花") print("放完了...") def task_2(q): print(q.get()) print("取到了") if __name__ == '__main__': q = Manager().Queue() with ProcessPoolExecutor(max_workers=2) as pool: pool.submit(task_1,q) pool.submit(task_2,q) # ==== 执行结果 ==== # 成功 """ 放完了... 玫瑰花 取到了 """