multiprocessing
模块就是跨平台版本的多进程模块,提供了一个Process
类来表明一个进程对象,这个对象能够理解为是一个独立的进程,能够执行另外的事情算法
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print('启动子线程{}{}'.format(name, os.getpid())) if __name__ == '__main__': print('父进程{}'.format(os.getpid())) p = Process(target=run_proc, args=('test',)) print('子进程将要启动') p.start() p.join() print('子线程结束')
from multiprocessing import Process import os import time def run_proc(): """子进程要执行的代码""" print('子进程运行中,pid=%d...' % os.getpid()) # os.getpid获取当前进程的进程号 print('子进程将要结束...') if __name__ == '__main__': print('父进程pid: %d' % os.getpid()) # os.getpid获取当前进程的进程号 p = Process(target=run_proc) p.start() >>> 父进程pid: 3580 >>> 子进程运行中,pid=3581... >>> 子进程将要结束...
Process([group [, target [, name [, args [, kwargs]]]]])多线程
from multiprocessing import Process import os from time import sleep def run_proc(name, age, **kwargs): for i in range(10): print('子进程运行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid())) print(kwargs) sleep(0.2) if __name__=='__main__': p = Process(target=run_proc, args=('test',18), kwargs={"m":20}) p.start() sleep(1) # 1秒中以后,当即结束子进程 p.terminate() p.join() >>> 子进程运行中,name= test,age=18 ,pid=3593... >>> {'m': 20} >>> 子进程运行中,name= test,age=18 ,pid=3593... >>> {'m': 20} >>> 子进程运行中,name= test,age=18 ,pid=3593... >>> {'m': 20} >>> 子进程运行中,name= test,age=18 ,pid=3593... >>> {'m': 20} >>> 子进程运行中,name= test,age=18 ,pid=3593... >>> {'m': 20}
from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__ == '__main__': # 父进程建立Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入: pw.start() # 启动子进程pr,读取: pr.start() # 等待pw结束: pw.join() # pr进程里是死循环,没法等待其结束,只能强行终止: pr.terminate()
初始化Queue()对象时(例如:q=Queue()
),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就表明可接受的消息数量没有上限(直到内存的尽头);并发
""" 若是要启动大量的子进程,能够用进程池的方式批量建立子进程: """ from multiprocessing import Pool import os, time, random def long_time_task(name): print('运行任务 %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('任务 %s 运行 %0.2f 秒' % (name, (end - start))) if __name__ == '__main__': print('父进程 %s.' % os.getpid()) p = Pool(4) # 建立进程池中最多存4个子进程 for i in range(5): # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,)) # 每次循环将会用空闲出来的子进程去调用目标 p.apply_async(long_time_task, args=(i,)) print('等待全部子进程完成...') p.close() p.join() print('全部子进程完成.') >>> 等待全部子进程完成... >>> 运行任务 0 (3722)... >>> 运行任务 1 (3723)... >>> 运行任务 2 (3724)... >>> 运行任务 3 (3725)... >>> 任务 3 运行 0.67 秒 >>> 运行任务 4 (3725)... >>> 任务 2 运行 1.29 秒 >>> 任务 0 运行 2.00 秒 >>> 任务 1 运行 2.77 秒 >>> 任务 4 运行 2.31 秒 >>> 全部子进程完成.
对Pool
对象调用join()
方法会等待全部子进程执行完毕,调用join()
以前必须先调用close()
,调用close()
以后就不能继续添加新的Process
了。
请注意输出的结果,task 0,1,2,3是马上执行的,而task 4要等待前面某个task完成后才执行,这是由于Pool的默认大小在个人电脑上是4,所以,最多同时执行4个进程。这是Pool有意设计的限制,并非操做系统的限制。若是改为:app
p = Pool(5)
就能够同时跑5个进程。
因为Pool
的默认大小是CPU的核数,若是你不幸拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果。
dom
# 修改import中的Queue为Manager from multiprocessing import Manager,Pool import os,time,random def reader(q): print("reader启动(%s),父进程为(%s)" % (os.getpid(), os.getppid())) for i in range(q.qsize()): print("reader从Queue获取到消息:%s" % q.get(True)) def writer(q): print("writer启动(%s),父进程为(%s)" % (os.getpid(), os.getppid())) for i in "itcast": q.put(i) if __name__=="__main__": print("(%s) start" % os.getpid()) q = Manager().Queue() # 使用Manager中的Queue po = Pool() po.apply_async(writer, (q,)) time.sleep(1) # 先让上面的任务向Queue存入数据,而后再让下面的任务开始从中取数据 po.apply_async(reader, (q,)) po.close() po.join() print("(%s) End" % os.getpid()) >>> (4157) start >>> writer启动(4159),父进程为(4157) >>> reader启动(4160),父进程为(4157) >>> reader从Queue获取到消息:i >>> reader从Queue获取到消息:t >>> reader从Queue获取到消息:c >>> reader从Queue获取到消息:a >>> reader从Queue获取到消息:s >>> reader从Queue获取到消息:t >>> (4157) End