并行计算是使用并行计算机来减小单个计算问题所须要的时间,咱们能够经过利用编程语言显式的说明计算中的不一样部分如何再不一样的处理器上同时执行来设计咱们的并行程序,最终达到大幅度提高程序效率的目的。html
众所周知,Python中的GIL限制了Python多线程并行对多核CPU的利用,可是咱们仍然能够经过各类其余的方式来让Python真正利用多核资源, 例如经过C/C++扩展来实现多线程/多进程, 以及直接利用Python的多进程模块multiprocessing来进行多进程编程。python
本文主要尝试仅仅经过python内置的multiprocessing模块对本身的动力学计算程序来进行优化和效率提高,其中:编程
本文并非对Python的multiprocessing模块的接口进行翻译介绍,须要熟悉multiprocessing的童鞋能够参考官方文档https://docs.python.org/2/library/multiprocessing.html。网络
最近想用本身的微观动力学程序进行一系列的求解并将结果绘制成二维Map图进行可视化,这样就须要对二维图上的多个点进行计算并将结果收集起来并进行绘制,因为每一个点都须要进行一次ODE积分以及牛顿法求解方程组,所以要串行地绘制整张图可能会遇到极低的效率问题尤为是对参数进行测试的时候,每画一张图都须要等好久的时间。其中绘制的二维图中每一个点都是独立计算的,因而很天然而然的想到了进行并行化处理。多线程
因为脚本比较长,并且实现均为本身的程序,脚本的大体结构以下, 本质是一个二重循环,循环的变量分别为反应物气体(O2 和 CO)的分压的值:app
1编程语言 2分布式 3函数 4工具 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import time import numpy as np # 省略若干... pCOs = np.linspace(1e-5, 0.5, 10) pO2s = np.linspace(1e-5, 0.5, 10) if "__main__" == __name__: try: start = time.time() for i, pO2 in enumerate(pO2s): # ... for j, pCO in enumerate(pCOs): # 针对当前的分压值 pCO, pO2进行动力学求解 # 具体代码略... end = time.time() t = end - start finally: # 收集计算的结果并进行处理绘图 |
总体过程就这么简单,我须要作的就是使用multiprocessing的接口来对这个二重循环进行并行化。
使用单核串行绘制100个点所须要的时间以下, 总共花了240.76秒:
python学习交流群:923414804,群内天天分享干货,包括最新的企业级案例学习资料和零基础入门教程,欢迎小伙伴入群学习。
二维map图绘制的效果以下:
multiprocessing模块
multiprocessing模块提供了相似threading模块的接口,并对进程的各类操做进行了良好的封装,提供了各类进程间通讯的接口例如Pipe
, Queue
等等,能够帮助咱们实现进程间的通讯,同步等操做。
使用Process
类来动态建立进程实现并行
multiprocessing模块提供了Process
能让咱们经过建立进程对象并执行该进程对象的start
方法来建立一个真正的进程来执行任务,该接口相似threading
模块中的线程类Thread
.
可是当被操做对象数目不大的时候可使用Process
动态生成多个进程,可是若是须要的进程数一旦不少的时候,手动限制进程的数量以及处理不一样进程返回值会变得异常的繁琐,所以这个时候咱们须要使用进程池来简化操做。
使用进程池来管理进程
multiprocessing模块提供了一个进程池Pool
类,负责建立进程池对象,并提供了一些方法来说运算任务offload到不一样的子进程中执行,并很方便的获取返回值。例如咱们如今要进行的循环并行便很容易的将其实现。
对于这里的单指令多数据流的并行,咱们能够直接使用Pool.map()
来将函数映射到参数列表中。Pool.map
实际上是map函数的并行版本,此函数将会阻塞直到全部进程所有结束,并且此函数返回的结果顺序仍然不变。
首先,我先把针对每对分压数据的处理过程封装成一个函数,这样能够将函数对象传递给子进程执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import time from multiprocessing import Pool import numpy as np # 省略若干... pCOs = np.linspace(1e-5, 0.5, 10) pO2s = np.linspace(1e-5, 0.5, 10) def task(pO2): '''接受一个O2分压,根据当前的CO分压进行动力学求解''' # 代码细节省略... if "__main__" == __name__: try: start = time.time() pool = Pool() # 建立进程池对象,进程数与multiprocessing.cpu_count()相同 tofs = pool.map(task, pCOs) # 并行执行函数 end = time.time() t = end - start finally: # 收集计算的结果并进行处理绘图 |
使用两个核心进行计算,计算时间从240.76s降到了148.61秒, 加速比为1.62
对不一样核心的加速效果进行测试
为了查看使用不一样核心数对程序效率的改善,我对不一样的核心数和加速比进行了测试绘图,效果以下:
运行核心数与程序运行时间:
运行核心数与加速比:
可见,因为我外层循环只循环了10次所以使用的核心数超过10之后核心数的增长并不能对程序进行加速,也就是多余的核心都浪费掉了。
前面使用了multiprocessing包提供的接口咱们使用了再一台机器上进行多核心计算的并行处理,可是multiprocessing的用处还有更多,经过multiprocessing.managers模块,咱们能够实现简单的多机分布式并行计算,将计算任务分布到不一样的计算机中运行。
Managers提供了另外的多进程通讯工具,他提供了在多台计算机之间共享数据的接口和数据对象,这些数据对象所有都是经过代理类实现的,好比ListProxy
和DictProxy
等等,他们都实现了与原生list
和dict
相同的接口,可是他们能够经过网络在不一样计算机中的进程中进行共享。
关于managers模块的接口的详细使用能够参考官方文档:https://docs.python.org/2/library/multiprocessing.html#managers
好了如今咱们开始尝试将绘图程序改形成能够在多台计算机中分布式并行的程序。改造的主要思想是:
大体可总结为下图:
服务进程
首先服务端须要一个manager对象来管理共享对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
def get_manager(): '''建立服务端manager对象. '''
# 自定义manager类 class JobManager(BaseManager): pass
# 建立任务队列,并将此数据对象共享在网络中 jobid_queue = Queue() JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)
# 建立列表代理类,并将其共享再网络中 tofs = [None]*N JobManager.register('get_tofs_list', callable=lambda: tofs, proxytype=ListProxy)
# 将分压参数共享到网络中 JobManager.register('get_pCOs', callable=lambda: pCOs, proxytype=ListProxy) JobManager.register('get_pO2s', callable=lambda: pCOs, proxytype=ListProxy)
# 建立manager对象并返回 manager = JobManager(address=(ADDR, PORT), authkey=AUTHKEY)
return manager |
1 |
JobManager.register('get_jobid_queue', callable=lambda: jobid_queue) |
我就将一个返回任务队列的函数对象同manager对象绑定并共享到网络中,这样在网络中的进程就能够经过本身的manager对象的get_jobid_queue
方法获得相同的队列,这样便实现了数据的共享.
2. 建立manager对象的时候须要两个参数,
192.168.0.1
地址的5000
端口进行监听,那么此参数能够是('192.169.0.1
, 5000)`进行任务分配
上面咱们将一个任务队列绑定到了manager对象中,如今我须要将队列进行填充,这样才能将任务发放到不一样的客户端来进行并行执行。
1 2 3 4 5 6 7 8 9 10 |
def fill_jobid_queue(manager, nclient): indices = range(N) interval = N/nclient jobid_queue = manager.get_jobid_queue() start = 0 for i in range(nclient): jobid_queue.put(indices[start: start+interval]) start += interval if N % nclient > 0: jobid_queue.put(indices[start:]) |
这里所谓的任务其实就是相应参数在list中的index值,这样不一样计算机中获得的结果能够按照相应的index将结果填入到结果列表中,这样服务端就能在共享的网络中收集各个计算机计算的结果。
启动服务端进行监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
def run_server(): # 获取manager manager = get_manager() print "Start manager at {}:{}...".format(ADDR, PORT) # 建立一个子进程来启动manager manager.start() # 填充任务队列 fill_jobid_queue(manager, NNODE) shared_job_queue = manager.get_jobid_queue() shared_tofs_list = manager.get_tofs_list()
queue_size = shared_job_queue.qsize()
# 循环进行监听,直到结果列表被填满 while None in shared_tofs_list: if shared_job_queue.qsize() < queue_size: queue_size = shared_job_queue.qsize() print "Job picked..."
return manager |
任务进程
服务进程负责进行简单的任务分配和调度,任务进程则只负责获取任务并进行计算处理。
在任务进程(客户端)中基本代码与咱们上面单机中的多核运行的脚本基本相同(由于都是同一个函数处理不一样的数据),可是咱们也须要为客户端建立一个manager来进行任务的获取和返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
def get_manager(): class WorkManager(BaseManager): pass
# 因为只是从共享网络中获取,所以只须要注册名字便可 WorkManager.register('get_jobid_queue') WorkManager.register('get_tofs_list') WorkManager.register('get_pCOs') WorkManager.register('get_pO2s')
# 这里的地址和验证码要与服务端相同才能够进行数据共享 manager = WorkManager(address=(ADDR, PORT), authkey=AUTHKEY)
return manager |
在客户端咱们仍然能够多进程利用多核资源来加速计算。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
if "__main__" == __name__:
manager = get_manager() print "work manager connect to {}:{}...".format(ADDR, PORT)
# 将客户端本地的manager链接到相应的服务端manager manager.connect()
# 获取共享的结果收集列表 shared_tofs_list = manager.get_tofs_list()
# 获取共享的任务队列 shared_jobid_queue = manager.get_jobid_queue()
# 从服务端获取计算参数 pCOs = manager.get_pCOs() shared_pO2s = manager.get_pO2s()
# 建立进程池在本地计算机进行多核并行 pool = Pool()
while 1: try: indices = shared_jobid_queue.get_nowait() pO2s = [shared_pO2s[i] for i in indices] print "Run {}".format(str(pO2s)) tofs_2d = pool.map(task, pO2s)
# Update shared tofs list. for idx, tofs_1d in zip(indices, tofs_2d): shared_tofs_list[idx] = tofs_1d # 直到将任务队列中的任务所有取完,结束任务进程 except Queue.Empty: break |
下面我将在3台在同一局域网中的电脑来进行简单的分布式计算测试,
10.10.10.245
1 |
python server.py |
2. 在两个客户端运行任务脚原本获取任务队列中的任务并执行
1 |
python worker.py |
当任务队列为空且任务完成时,任务进程终止; 当结果列表中的结果收集完毕时,服务进程也会终止。
执行过程如图:
执行结果以下图:
上面的panel为服务端监听,左下为本身的笔记本运行结果,右下panel为集群中的其中一个节点。
可见运行时间为56.86s,无奈,是个人本子脱了后腿(-_-!)
本文经过python内置模块multiprocessing实现了单机内多核并行以及简单的多台计算机的分布式并行计算,multiprocessing为咱们提供了封装良好而且友好的接口来使咱们的Python程序更方面利用多核资源加速本身的计算程序,但愿能对使用python实现并行话的童鞋有所帮助。