本文代码在 Python 3.6 环境下测试经过。python
多进程(multiprocessing)模块是在 Python 2.6 版本中加入的,和多线程(threading)模块相似,都是用来作并行运算的。不过Python既然有了threading,为何还要搞一个multiprocessing呢?这是由于Python内部有一个全局解释锁(GIL),任何一个进程任什么时候候只容许一个线程进行CPU运算,若是一个进程中的某个线程在进行CPU运算时得到GIL,其余线程将没法进行CPU运算只能等待,使得多线程没法利用CPU多核的特性。多进程处理实际上对每一个任务都会生成一个操做系统的进程,而且每个进程都被单独赋予了Python的解释器和GIL,因此程序在运行中有多个GIL存在,每一个运行者的线程都会拿到一个GIL,在不一样的环境下运行,天然也能够被分配到不一样的处理器上。编程
multiprocessing模块提供了一个Process类能够建立进程对象。建立进程有两种方式,第一种经过Process类直接建立,参数target指定子进程要执行的程序。第二种经过继承Process类来实现。安全
咱们先用第一种方式建立子进程,子进程会将传递给它的参数扩大一倍,代码以下:多线程
#-*- coding:utf8 -*-
import os
from multiprocessing import Process, current_process
def doubler(number):
result = number * 2
# 获取子进程ID
proc_id = os.getpid()
# 获取子进程名称
proc_name = current_process().name
print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
procs = []
# 父进程ID和名称
print('parent_proc_id:{0} parent_proc_name:{1}'.format(os.getpid(), current_process().name))
for num in numbers:
# 建立子进程
proc = Process(target=doubler, args=(num,))
procs.append(proc)
# 启动子进程
proc.start()
# join方法会让父进程等待子进程结束后再执行
for proc in procs:
proc.join()
print("Done.")
复制代码
第二种方式经过继承Process类,并重写run方法:app
class MyProcess(Process):
def __init__(self, number):
# 必须调用父类的init方法
super(MyProcess, self).__init__()
self.number = number
def run(self):
result = self.number * 2
# 获取子进程ID
# self.pid
proc_id = os.getpid()
# 获取子进程名称
# self.name
proc_name = current_process().name
print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
procs = []
# 父进程的ID和名称
print('parent_proc_id:{0} parent_proc_name:{1}'.format(os.getpid(), current_process().name))
for num in numbers:
# 建立子进程
proc = MyProcess(num)
procs.append(proc)
# 启动子进程,启动一个新进程实际就是执行本进程对应的run方法
proc.start()
# join方法会让父进程等待子进程结束后再执行
for proc in procs:
proc.join()
print("Done.")
复制代码
multiprocessing模块和threading模块同样也支持锁。经过acquire获取锁,执行操做后经过release释放锁。dom
#-*- coding:utf8 -*-
from multiprocessing import Process, Lock
def printer(item, lock):
# 获取锁
lock.acquire()
try:
print(item)
except Exception as e:
print(e)
else:
print('no exception.')
finally:
# 释放锁
lock.release()
if __name__ == '__main__':
# 实例化全局锁
lock = Lock()
items = ['PHP', 'Python', 'Java']
procs = []
for item in items:
proc = Process(target=printer, args=(item, lock))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
print('Done.')
复制代码
Pool类表示工做进程的池子,它能够提供指定数量的进程供用户调用,当有请求提交到进程池时,若是进程池有空闲进程或进程数还没到达指定上限,就会分配一个进程响应请求,不然请求只能等待。Pool类主要在执行目标多且须要控制进程数量的状况下使用,若是目标少且不用控制进程数量可使用Process类。async
进程池能够经过map和apply_async方法来调用执行代码,首先咱们来看map方法:函数
#-*- coding:utf8 -*-
import os
from multiprocessing import Pool, current_process
def doubler(number):
result = number * 2
proc_id = os.getpid()
proc_name = current_process().name
print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
pool = Pool(processes=3)
pool.map(doubler, numbers)
# 关闭pool使其再也不接受新的任务
pool.close()
# 关闭pool,结束工做进程,不在处理未完成的任务
# pool.terminate()
# 主进程阻塞,结束工做进程,再也不处理未完成的任务,join方法要在close或terminate以后使用
pool.join()
print('Done')
复制代码
map只能向处理函数传递一个参数测试
下面来看一下apply/apply_async函数,apply函数是阻塞的,apply_async函数是阻塞的,这里咱们以apply_async函数为例:ui
#-*- coding:utf8 -*-
import os, time
from multiprocessing import Pool, current_process
def doubler(number, parent_proc_id, parent_proc_name):
result = number * 2
proc_id = os.getpid()
proc_name = current_process().name
# 设置等待时间,能够验证apply和apply_async的阻塞和非阻塞
time.sleep(2)
print('parent_proc_id:{0} parent_proc_name:{1} proc_id:{2} proc_name:{3} number:{4} result:{5}'.format(parent_proc_id, parent_proc_name, proc_id, proc_name, number, result))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
parent_proc_id = os.getpid()
parent_proc_name = current_process().name
pool = Pool(processes=3)
for num in numbers:
# 非阻塞
pool.apply_async(doubler, (num, parent_proc_id, parent_proc_name))
# 阻塞其它进程
# pool.apply_async(doubler, (num, parent_proc_id, parent_proc_name))
# 关闭pool使其再也不接受新的任务
pool.close()
# 关闭pool,结束工做进程,不在处理未完成的任务
# pool.terminate()
# 主进程阻塞,结束工做进程,再也不处理未完成的任务,join方法要在close或terminate以后使用
pool.join()
print('Done')
复制代码
进程间通讯的方式通常有管道(Pipe)、信号(Signal)、消息队列(Message)、信号量(Semaphore)、共享内存(Shared Memory)、套接字(Socket)等。这里咱们着重讲一下在Python多进程编程中经常使用的进程方式multiprocessing.Pipe函数和multiprocessing.Queue类。
multiprocessing.Pipe()即管道模式,调用Pipe()方法返回管道的两端的Connection。Pipe方法返回(conn1, conn2)表明一个管道的两个端。Pipe方法有duplex参数,若是duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2都可收发;duplex为False,conn1只负责接受消息,conn2只负责发送消息。send()和recv()方法分别是发送和接受消息的方法。一个进程从Pipe某一端输入对象,而后被Pipe另外一端的进程接收,单向管道只容许管道一端的进程输入另外一端的进程接收,不能够反向通讯;而双向管道则容许从两端输入和从两端接收。
#-*- coding:utf8 -*-
import os, time
from multiprocessing import Process, Pipe, current_process
def proc1(pipe, data):
for msg in range(1, 6):
print('{0} 发送 {1}'.format(current_process().name, msg))
pipe.send(msg)
time.sleep(1)
pipe.close()
def proc2(pipe, length):
count = 0
while True:
count += 1
if count == length:
pipe.close()
try:
# 若是没有接收到数据recv会一直阻塞,若是管道被关闭,recv方法会抛出EOFError
msg = pipe.recv()
print('{0} 接收到 {1}'.format(current_process().name, msg))
except Exception as e:
print(e)
break
if __name__ == '__main__':
conn1, conn2 = Pipe(True)
data = range(0, 6)
length = len(data)
proc1 = Process(target=proc1, args=(conn1, data))
proc2 = Process(target=proc2, args=(conn2, length))
proc1.start()
proc2.start()
proc1.join()
proc2.join()
print('Done.')
复制代码
Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。Queue的使用主要是一边put(),一边get(),可是Queue能够是多个Process进行put()操做,也能够是多个Process进行get()操做。
在父进程中建立两个子进程,一个往Queue里写数据,一个从Queue里读数据:
#-*- coding:utf8 -*-
import os, time, random
from multiprocessing import Process, Queue
def write(q):
print('Process to write: %s' % os.getpid())
for val in range(0, 6):
print('Put %s to queue...' % val)
q.put(val)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
try:
val = q.get(block=True, timeout=5)
print('Get %s from queue.' % val)
except Exception as e:
if q.empty():
print('队列消费完毕.')
break
if __name__ == '__main__':
q = Queue()
proc1 = Process(target=write, args=(q,))
proc2 = Process(target=read, args=(q,))
proc1.start()
proc2.start()
proc1.join()
proc2.join()
# 若是proc2不break的话会一直阻塞,不调用join调用terminate方法能够终止进程
# proc2.terminate()
print('Done.')
复制代码
Pipe的读写效率要高于Queue。那么咱们如何的选择它们呢?