在使用Python处理任务时,限于单线程处理能力有限,须要将任务并行化,分散到多个线程或者是多个进程去执行。python
concurrent.futures就是这样一种库,它可让用户能够很是方便的将任务并行化。这个名字有点长,后面我直接使用词汇concurrent来代替concurrent.futures。数组
concurrent提供了两种并发模型,一个是多线程ThreadPoolExecutor,一个是多进程ProcessPoolExecutor。对于IO密集型任务宜使用多线程模型。对于计算密集型任务应该使用多进程模型。bash
为何要这样选择呢?是由于Python GIL的存在让Python虚拟机在进行运算时没法有效利用多核心。对于纯计算任务,它永远最多只能榨干单个CPU核心。若是要突破这个瓶颈,就必须fork出多个子进程来分担计算任务。而对于IO密集型任务,CPU使用率每每是极低的,使用多线程虽然会加倍CPU使用率,可是还远远到不了饱和(100%)的地步,在单核心能够应付总体计算的前提下,天然是应该选择资源占用少的模式,也就是多线程模式。网络
接下来咱们分别尝试一下两种模式来进行并行计算。多线程
# coding: utf8
# t.py
import time
import fire
import threading
from concurrent.futures import ThreadPoolExecutor, wait
# 分割子任务
def each_task(index):
time.sleep(1) # 睡1s,模拟IO
print "thread %s square %d" % (threading.current_thread().ident, index)
return index * index # 返回结果
def run(thread_num, task_num):
# 实例化线程池,thread_num个线程
executor = ThreadPoolExecutor(thread_num)
start = time.time()
fs = [] # future列表
for i in range(task_num):
fs.append(executor.submit(each_task, i)) # 提交任务
wait(fs) # 等待计算结束
end = time.time()
duration = end - start
s = sum([f.result() for f in fs]) # 求和
print "total result=%s cost: %.2fs" % (s, duration)
executor.shutdown() # 销毁线程池
if __name__ == '__main__':
fire.Fire(run)
复制代码
运行python t.py 2 10
,也就是2个线程跑10个任务,观察输出并发
thread 123145422131200 square 0thread 123145426337792 square 1
thread 123145426337792 square 2
thread 123145422131200 square 3
thread 123145426337792 square 4
thread 123145422131200 square 5
thread 123145426337792 square 6
thread 123145422131200 square 7
thread 123145426337792 square 8
thread 123145422131200 square 9
total result=285 cost: 5.02s
复制代码
咱们看到计算总共花费了大概5s,总共sleep了10s由两个线程分担,因此是5s。读者也许会问,为何输出乱了,这是由于print操做不是原子的,它是两个连续的write操做合成的,第一个write输出内容,第二个write输出换行符,write操做自己是原子的,可是在多线程环境下,这两个write操做会交错执行,因此输出就不整齐了。若是将代码稍做修改,将print改为单个write操做,输出就整齐了(关于write是否绝对原子性还须要进一步深刻讨论)app
# 分割子任务
def each_task(index):
time.sleep(1) # 睡1s,模拟IO
import sys
sys.stdout.write("thread %s square %d\n" % (threading.current_thread().ident, index))
return index * index # 返回结果
复制代码
咱们再跑一下python t.py 2 10
,观察输出框架
thread 123145438244864 square 0
thread 123145442451456 square 1
thread 123145442451456 square 2
thread 123145438244864 square 3
thread 123145438244864 square 4
thread 123145442451456 square 5
thread 123145438244864 square 6
thread 123145442451456 square 7
thread 123145442451456 square 9
thread 123145438244864 square 8
total result=285 cost: 5.02s
复制代码
接下来,咱们改变参数,扩大到10个线程,看看全部任务总共须要多久完成socket
> python t.py 10 10
thread 123145327464448 square 0
thread 123145335877632 square 2
thread 123145331671040 square 1
thread 123145344290816 square 4
thread 123145340084224 square 3
thread 123145348497408 square 5
thread 123145352704000 square 6
thread 123145356910592 square 7
thread 123145365323776 square 9
thread 123145361117184 square 8
total result=285 cost: 1.01s
复制代码
能够看到1s中就完成了全部的任务。这就是多线程的魅力,能够将多个IO操做并行化,减小总体处理时间。分布式
相比多线程适合处理IO密集型任务,多进程适合计算密集型。接下来咱们要模拟一下计算密集型任务。个人我的电脑有2个核心,正好能够体验多核心计算的优点。
那这个密集型计算任务怎么模拟呢,咱们可使用圆周率计算公式。
经过扩大级数的长度n,就能够无限逼近圆周率。当n特别大时,计算会比较缓慢,这时候CPU就会一直处于繁忙状态,这正是咱们所指望的。
好,下面开写多进程并行计算代码
# coding: utf8
# p.py
import os
import sys
import math
import time
import fire
from concurrent.futures import ProcessPoolExecutor, wait
# 分割子任务
def each_task(n):
# 按公式计算圆周率
s = 0.0
for i in range(n):
s += 1.0/(i+1)/(i+1)
pi = math.sqrt(6*s)
# os.getpid能够得到子进程号
sys.stdout.write("process %s n=%d pi=%s\n" % (os.getpid(), n, pi))
return pi
def run(process_num, *ns): # 输入多个n值,分红多个子任务来计算结果
# 实例化进程池,process_num个进程
executor = ProcessPoolExecutor(process_num)
start = time.time()
fs = [] # future列表
for n in ns:
fs.append(executor.submit(each_task, int(n))) # 提交任务
wait(fs) # 等待计算结束
end = time.time()
duration = end - start
print "total cost: %.2fs" % duration
executor.shutdown() # 销毁进程池
if __name__ == '__main__':
fire.Fire(run)
复制代码
经过代码能够看出多进程模式在代码的编写上和多线程没有多大差别,仅仅是换了一个类名,其它都一摸同样。这也是concurrent库的魅力所在,将多线程和多进程模型抽象出了同样的使用接口。
接下来咱们运行一下python p.py 1 5000000 5001000 5002000 5003000
,总共计算4次pi,只用一个进程。观察输出
process 96354 n=5000000 pi=3.1415924626
process 96354 n=5001000 pi=3.14159246264
process 96354 n=5002000 pi=3.14159246268
process 96354 n=5003000 pi=3.14159246272
total cost: 9.45s
复制代码
能够看出来随着n的增大,结果愈来愈逼近圆周率,由于只用了一个进程,因此任务是串行执行,总共花了大约9.5s。
接下来再增长一个进程,观察输出
> python p.py 2 5000000 5001000 5002000 5003000
process 96529 n=5001000 pi=3.14159246264
process 96530 n=5000000 pi=3.1415924626
process 96529 n=5002000 pi=3.14159246268
process 96530 n=5003000 pi=3.14159246272
total cost: 4.98s
复制代码
从耗时上看缩短了接近1半,说明多进程确实起到了计算并行化的效果。此刻若是使用top命令观察进程的CPU使用率,这两个进程的CPU使用率都占到了接近100%。
若是咱们再增长2个进程,是否是还能继续压缩计算时间呢
> python p.py 4 5000000 5001000 5002000 5003000
process 96864 n=5002000 pi=3.14159246268
process 96862 n=5000000 pi=3.1415924626
process 96863 n=5001000 pi=3.14159246264
process 96865 n=5003000 pi=3.14159246272
total cost: 4.86s
复制代码
看来耗时不能继续节约了,由于只有2个计算核心,2个进程已经足以榨干它们了,即便再多加进程也只有2个计算核心可用。
concurrent用的时候很是简单,可是内部实现并非很好理解。在深刻分析内部的结构以前,咱们须要先理解一下Future这个对象。在前面的例子中,executor提交(submit)任务后都会返回一个Future对象,它表示一个结果的坑,在任务刚刚提交时,这个坑是空的,一旦子线程运行任务结束,就会将运行的结果塞到这个坑里,主线程就能够经过Future对象得到这个结果。简单一点说,Future对象是主线程和子线程通讯的媒介。
class Future(object):
def __init__(self):
self._condition = threading.Condition() # 条件变量
self._result = None
def result(self, timeout=None):
self._condition.wait(timeout)
return self._result
def set_result(self, result):
self._result = result
self._condition.notify_all()
复制代码
主线程将任务塞进线程池后获得了这个Future对象,它内部的_result仍是空的。若是主线程调用result()方法获取结果,就会阻塞在条件变量上。若是子线程计算任务完成了就会当即调用set_result()方法将结果填充进future对象,并唤醒阻塞在条件变量上的线程,也就是主线程。这时主线程当即醒过来并正常返回结果。
主线程和子线程交互分为两部分,第一部分是主线程如何将任务传递给子线程,第二部分是子线程如何将结果传递给主线程。第二部分已经讲过了是经过Future对象来完成的。那第一部分是怎么作到的呢?
如上图所示,秘密就在于这个队列,主线程是经过队列将任务传递给多个子线程的。一旦主线程将任务塞进任务队列,子线程们就会开始争抢,最终只有一个线程能抢到这个任务,并当即进行执行,执行完后将结果放进Future对象就完成了这个任务的完整执行过程。
concurrent的线程池有个重大的设计问题,那就是任务队列是无界的。若是队列的生产者任务生产的太快,而线程池消费太慢处理不过来,任务就会堆积。若是堆积一直持续下去,内存就会持续增加直到OOM,任务队列里堆积的全部任务所有完全丢失。用户使用时必定要注意这点,并作好适当的控制。
进程池内部结构复杂,连concurent库的做者本身也以为特别复杂,因此在代码里专门画了一张ascii图来说解模型内部结构
我以为做者的这张图还不够好懂,因此也单独画了一张图,请读者们仔细结合上面两张图,一块儿来过一边完整的任务处理过程。
这个复杂的流程中涉及到3个队列,还有中间附加的管理线程。那为何做者要设计的这么复杂,这样的设计有什么好处?
首先,咱们看这张图的左半边,它和线程池的处理流程没有太多区别,区别仅仅是管理线程只有一个,而线程池的子线程会有多个。这样设计可使得多进程模型和多线程模型的使用方法保持一致,这就是为何两个模型使用起来没有任何区别的缘由所在——经过中间的管理线程隐藏了背后的多进程交互逻辑。
而后咱们再看这张图的右半边,管理线程经过两个队列来和子进程们进行交互,这两个队列都是跨进程队列(multiprocessing.Queue)。CallQueue是单生产者多消费者,ResultQueue是多生产者单消费者。
CallQueue是个有界队列,它的上限在代码里写死了为「子进程数+1」。若是子进程们处理不过来,CallQueue就会变满,管理线程就会中止往里面塞数据。可是这里也遭遇了和线程池同样的问题,TaskQueue是无界队列,它的内容可无论消费者是否在持续(管理线程)消费,TaskQueue会无限制的持续生长,因而最终也会会致使OOM。
进程池模型中的跨进程队列是用multiprocessing.Queue实现的。那这个跨进程队列内部细节是怎样的,它又是用什么高科技来实现的呢
笔者仔细阅读了multiprocessing.Queue的源码发现,它使用无名套接字sockerpair来完成的跨进程通讯,socketpair和socket的区别就在于socketpair不须要端口,不须要走网络协议栈,经过内核的套接字读写缓冲区直接进行跨进程通讯。
当父进程要传递任务给子进程时,先使用pickle将任务对象进行序列化成字节数组,而后将字节数组经过socketpair的写描述符写入内核的buffer中。子进程接下来就能够从buffer中读取到字节数组,而后再使用pickle对字节数组进行反序列化来获得任务对象,这样总算能够执行任务了。一样子进程将结果传递给父进程走的也是同样的流程,只不过这里的socketpair是ResultQueue内部建立的无名套接字。
multiprocessing.Queue是支持双工通讯,数据流向能够是父到子,也能够是子到父,只不过在concurrent的进程池实现中只用到了单工通讯。CallQueue是从父到子,ResultQueue是从子到父。
concurrent.futures框架很是好用,虽然内部实现机制异常复杂,读者也无需彻底理解内部细节就能够直接使用了。可是须要特别注意的是不论是线程池仍是进程池其内部的任务队列都是无界的,必定要避免消费者处理不及时内存持续攀升的状况发生。
今天,做者新书《深刻理解RPC》正式上线,限时优惠9.9元,感兴趣的读者点击下面的链接进行阅读
深刻理解 RPC : 基于 Python 自建分布式高并发 RPC 服务