因为GIL的存在,python中的多线程其实并非真正的多线程,若是想要充分地使用多核CPU的资源,在python中大部分状况须要使用多进程。Python提供了很是好用的多进程包multiprocessing,只须要定义一个函数,Python会完成其余全部事情。借助这个包,能够轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。python
multiprocessing包是Python中的多进程管理包。与threading.Thread相似,它能够利用multiprocessing.Process对象来建立一个进程。该进程能够运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象能够像多线程那样,经过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。因此,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。多线程
但在使用这些共享API的时候,咱们要注意如下几点:并发
Process.PID中保存有PID,若是进程尚未start(),则PID为None。app
window系统下,须要注意的是要想启动一个子进程,必须加上那句if __name__ == "main",进程相关的要写在这句下面。异步
有两种使用方法,直接传入要运行的方法或从Process继承并覆盖run():async
from multiprocessing import Process import threading import time def foo(i): print 'say hi', i if __name__ == '__main__': for i in range(10): p = Process(target=foo, args=(i,)) p.start()
say hi 4 say hi 3 say hi 5 say hi 2 say hi 1 say hi 6 say hi 0 say hi 7 say hi 8 say hi 9 Process finished with exit code 0 能够看出多个进程随机顺序执行
from multiprocessing import Process import time class MyProcess(Process): def __init__(self, arg): super(MyProcess, self).__init__() self.arg = arg def run(self): print 'say hi', self.arg time.sleep(1) if __name__ == '__main__': for i in range(10): p = MyProcess(i) p.start()
构造方法:ide
Process([group [, target [, name [, args [, kwargs]]]]])函数
group: 线程组,目前尚未实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。ui
实例方法:spa
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,若是实例进程时未制定传入target,这star执行t默认run()方法。
terminate():无论任务是否完成,当即中止工做进程
属性:
authkey
daemon:和线程的setDeamon功能同样
exitcode(进程在运行时为None、若是为–N,表示被信号N结束)
name:进程名字。
pid:进程号。
例子一:
from multiprocessing import Process import threading import time def foo(i): print 'say hi',i for i in range(10): p = Process(target=foo,args=(i,)) p.start()
say hi 0 say hi 3 say hi 6 say hi 1 say hi 8 say hi 2 say hi 5 say hi 4 say hi 7 say hi 9 Process finished with exit code 0
例子二:
def foo(i): time.sleep(1) print 'say hi', i time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(10): p = Process(target=foo, args=(i,)) p.daemon=True p_list.append(p) for p in p_list: p.start() for p in p_list: p.join() print 'main process end'
say hi 1 say hi 2 say hi 5 say hi 6 say hi 7 say hi 0 say hi 4 say hi 3 say hi 8 say hi 9 main process end Process finished with exit code 0
能够看出join()方法和deamon属性的用法和多线程的基本一致。
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。进程池设置最好等于CPU核心数量
构造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes :使用的工做进程的数量,若是processes是None那么使用 os.cpu_count()返回的数量。
initializer: 若是initializer是None,那么每个工做进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工做进程退出以前能够完成的任务数,完成后用一个新的工做进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工做进程就会一直存活。
context: 用在制定工做进程启动时的上下文,通常使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来建立一个池,两种方法都适当的设置了context
实例方法:
apply(func[, args[, kwds]]):同步进程池
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池
close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工做进程会退出。
terminate() : 结束工做进程,不在处理未完成的任务
join() : wait工做线程的退出,在调用join()前,必须调用close() or terminate()。这样是由于被终止的进程须要被父进程调用wait(join等价与wait),不然进程会成为僵尸进程。pool.join()必须使用在
例子一(异步进程池):
pool.close()或者pool.terminate()以后。其中close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。
# coding:utf-8 from multiprocessing import Pool import time def Foo(i): time.sleep(2) return i + 100 def Bar(arg): print arg if __name__ == '__main__': t_start=time.time() pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,), callback=Bar)#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 pool.close() pool.join() # 进程池中进程执行完毕后再关闭,若是注释,那么程序直接关闭。 pool.terminate() t_end=time.time() t=t_end-t_start print 'the program time is :%s' %t
101 100 102 103 104 106 105 107 108 109 the program time is :4.22099995613 Process finished with exit code 0
例子二(同步进程池):
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process, Pool import time def Foo(i): time.sleep(1) print i + 100 if __name__ == '__main__': t_start=time.time() pool = Pool(5) for i in range(10): pool.apply(Foo, (i,)) pool.close() pool.join() # 进程池中进程执行完毕后再关闭,若是注释,那么程序直接关闭。 t_end=time.time() t=t_end-t_start print 'the program time is :%s' %t
100 101 102 103 104 105 106 107 108 109 the program time is :10.2409999371 Process finished with exit code 0 能够看出进程同步顺序执行了,效率下降
例子三:异步进程池使用get()方法得到进程执行结果值(错误使用get()方法获取结果)
def Bar(arg): return arg if __name__ == '__main__': t_start=time.time() pool = Pool(5) for i in range(10): res = pool.apply_async(func=Foo, args=(i,), callback=Bar)#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 print res.get() pool.close() pool.join() # 进程池中进程执行完毕后再关闭,若是注释,那么程序直接关闭。 pool.terminate() t_end=time.time() t=t_end-t_start print 'the program time is :%s' %t
100 101 102 103 104 105 106 107 108 109 the program time is :20.2850000858 Process finished with exit code 0 能够看出因为每一个进程的get()方法,程序变成同步执行了
例子四(正确使用get()方法获取结果)
# coding:utf-8 from multiprocessing import Pool import time def Foo(i): time.sleep(2) return i + 100 def Bar(arg): return arg if __name__ == '__main__': res_list=[] t_start=time.time() pool = Pool(5) for i in range(10): res = pool.apply_async(func=Foo, args=(i,), callback=Bar) res_list.append(res) pool.close() pool.join() for res in res_list: print res.get() t_end=time.time() t=t_end-t_start print 'the program time is :%s' %t
100 101 102 103 104 105 106 107 108 109 the program time is :4.22399997711 Process finished with exit code 0
进程各自持有一份数据,默认没法共享数据
#!/usr/bin/env python # coding:utf-8 from multiprocessing import Process li = [] def foo(i): li.append(i) print 'say hi', li if __name__ == '__main__': for i in range(10): p = Process(target=foo, args=(i,)) p.start() print 'ending', li #指望输出[0到10的随机排列的列表]
say hi [1] say hi [0] say hi [2] say hi [3] say hi [4] say hi [5] ending [] say hi [6] say hi [7] say hi [8] say hi [9] Process finished with exit code 0
方法一(使用Array):
Array(‘i’, range(10))中的‘i’参数C语言中的类型:
‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte
‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint
‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double
from multiprocessing import Process, Array def f(a): for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': arr = Array('i', range(10)) p = Process(target=f, args=(arr,)) p.start() p.join() print(arr[:])
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
方法二(使用Manager):
Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] Process finished with exit code 0