Python多进程编程详解

本文代码在 Python 3.6 环境下测试经过。python

简介

多进程(multiprocessing)模块是在 Python 2.6 版本中加入的,和多线程(threading)模块相似,都是用来作并行运算的。不过Python既然有了threading,为何还要搞一个multiprocessing呢?这是由于Python内部有一个全局解释锁(GIL),任何一个进程任什么时候候只容许一个线程进行CPU运算,若是一个进程中的某个线程在进行CPU运算时得到GIL,其余线程将没法进行CPU运算只能等待,使得多线程没法利用CPU多核的特性。多进程处理实际上对每一个任务都会生成一个操做系统的进程,而且每个进程都被单独赋予了Python的解释器和GIL,因此程序在运行中有多个GIL存在,每一个运行者的线程都会拿到一个GIL,在不一样的环境下运行,天然也能够被分配到不一样的处理器上。编程

建立进程(Process)

multiprocessing模块提供了一个Process类能够建立进程对象。建立进程有两种方式,第一种经过Process类直接建立,参数target指定子进程要执行的程序。第二种经过继承Process类来实现。安全

咱们先用第一种方式建立子进程,子进程会将传递给它的参数扩大一倍,代码以下:多线程

#-*- coding:utf8 -*-
import os
from multiprocessing import Process, current_process

def doubler(number):
    result = number * 2
    # 获取子进程ID
    proc_id = os.getpid()
    # 获取子进程名称
    proc_name = current_process().name
    print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []	
    # 父进程ID和名称
    print('parent_proc_id:{0} parent_proc_name:{1}'.format(os.getpid(), current_process().name))
    
    for num in numbers:
    	# 建立子进程
    	proc = Process(target=doubler, args=(num,))
    	procs.append(proc)
    	# 启动子进程
    	proc.start()
    
    # join方法会让父进程等待子进程结束后再执行
    for proc in procs:
    	proc.join()
    
    print("Done.")
复制代码

第二种方式经过继承Process类,并重写run方法:app

class MyProcess(Process):
    def __init__(self, number):
    	# 必须调用父类的init方法
    	super(MyProcess, self).__init__()
    	self.number = number
    
    def run(self):
    	result = self.number * 2
    	# 获取子进程ID
    	# self.pid
    	proc_id = os.getpid()
    	# 获取子进程名称
    	# self.name
    	proc_name = current_process().name
    	print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
    # 父进程的ID和名称
    print('parent_proc_id:{0} parent_proc_name:{1}'.format(os.getpid(), current_process().name))
    
    for num in numbers:
    	# 建立子进程
    	proc = MyProcess(num)
    	procs.append(proc)
    	# 启动子进程,启动一个新进程实际就是执行本进程对应的run方法
    	proc.start()
    
    # join方法会让父进程等待子进程结束后再执行
    for proc in procs:
    	proc.join()
    
    print("Done.")
复制代码

进程锁(Lock)

multiprocessing模块和threading模块同样也支持锁。经过acquire获取锁,执行操做后经过release释放锁。dom

#-*- coding:utf8 -*-
from multiprocessing import Process, Lock

def printer(item, lock):
    # 获取锁
    lock.acquire()
    try:
    	print(item)
    except Exception as e:
    	print(e)
    else:
    	print('no exception.')
    finally:
        # 释放锁
    	lock.release()

if __name__ == '__main__':
    # 实例化全局锁
    lock = Lock()
    items = ['PHP', 'Python', 'Java']
    procs = []
    
    for item in items:
    	proc = Process(target=printer, args=(item, lock))
    	procs.append(proc)
    	proc.start()
    
    for proc in procs:
    	proc.join()
    
    print('Done.')
复制代码

进程池(Pool)

Pool类表示工做进程的池子,它能够提供指定数量的进程供用户调用,当有请求提交到进程池时,若是进程池有空闲进程或进程数还没到达指定上限,就会分配一个进程响应请求,不然请求只能等待。Pool类主要在执行目标多且须要控制进程数量的状况下使用,若是目标少且不用控制进程数量可使用Process类。async

进程池能够经过map和apply_async方法来调用执行代码,首先咱们来看map方法:函数

#-*- coding:utf8 -*-
import os
from multiprocessing import Pool, current_process

def doubler(number):
    result = number * 2
    proc_id = os.getpid()
    proc_name = current_process().name
    print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    pool = Pool(processes=3)
    pool.map(doubler, numbers)
    
    # 关闭pool使其再也不接受新的任务
    pool.close()
    
    # 关闭pool,结束工做进程,不在处理未完成的任务
    # pool.terminate()
    
    # 主进程阻塞,结束工做进程,再也不处理未完成的任务,join方法要在close或terminate以后使用
    pool.join()
    
    print('Done')
复制代码

map只能向处理函数传递一个参数测试

下面来看一下apply/apply_async函数,apply函数是阻塞的,apply_async函数是阻塞的,这里咱们以apply_async函数为例:ui

#-*- coding:utf8 -*-
import os, time
from multiprocessing import Pool, current_process

def doubler(number, parent_proc_id, parent_proc_name):
    result = number * 2
    proc_id = os.getpid()
    proc_name = current_process().name
    # 设置等待时间,能够验证apply和apply_async的阻塞和非阻塞
    time.sleep(2)
    print('parent_proc_id:{0} parent_proc_name:{1} proc_id:{2} proc_name:{3} number:{4} result:{5}'.format(parent_proc_id, parent_proc_name, proc_id, proc_name, number, result))

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    parent_proc_id = os.getpid()
    parent_proc_name = current_process().name
    pool = Pool(processes=3)
    for num in numbers:
    	# 非阻塞
    	pool.apply_async(doubler, (num, parent_proc_id, parent_proc_name))
    	# 阻塞其它进程
    	# pool.apply_async(doubler, (num, parent_proc_id, parent_proc_name))
    	
    # 关闭pool使其再也不接受新的任务
    pool.close()
    
    # 关闭pool,结束工做进程,不在处理未完成的任务
    # pool.terminate()
    
    # 主进程阻塞,结束工做进程,再也不处理未完成的任务,join方法要在close或terminate以后使用
    pool.join()
    
    print('Done')
复制代码

进程间通讯

进程间通讯的方式通常有管道(Pipe)、信号(Signal)、消息队列(Message)、信号量(Semaphore)、共享内存(Shared Memory)、套接字(Socket)等。这里咱们着重讲一下在Python多进程编程中经常使用的进程方式multiprocessing.Pipe函数和multiprocessing.Queue类。

Pipe

multiprocessing.Pipe()即管道模式,调用Pipe()方法返回管道的两端的Connection。Pipe方法返回(conn1, conn2)表明一个管道的两个端。Pipe方法有duplex参数,若是duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2都可收发;duplex为False,conn1只负责接受消息,conn2只负责发送消息。send()和recv()方法分别是发送和接受消息的方法。一个进程从Pipe某一端输入对象,而后被Pipe另外一端的进程接收,单向管道只容许管道一端的进程输入另外一端的进程接收,不能够反向通讯;而双向管道则容许从两端输入和从两端接收。

#-*- coding:utf8 -*-
import os, time
from multiprocessing import Process, Pipe, current_process

def proc1(pipe, data):
    for msg in range(1, 6):
        print('{0} 发送 {1}'.format(current_process().name, msg))
        pipe.send(msg)
        time.sleep(1)
    pipe.close()

def proc2(pipe, length):
    count = 0
    while True:
        count += 1
        if count == length:
            pipe.close()
        try:
            # 若是没有接收到数据recv会一直阻塞,若是管道被关闭,recv方法会抛出EOFError
            msg = pipe.recv()
            print('{0} 接收到 {1}'.format(current_process().name, msg))
        except Exception as e:
            print(e)
            break

if __name__ == '__main__':
    conn1, conn2 = Pipe(True)
    data = range(0, 6)
    length = len(data)
    proc1 = Process(target=proc1, args=(conn1, data))
    proc2 = Process(target=proc2, args=(conn2, length))
    
    proc1.start()
    proc2.start()

    proc1.join()
    proc2.join()
    
    print('Done.')
复制代码

Queue

Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。Queue的使用主要是一边put(),一边get(),可是Queue能够是多个Process进行put()操做,也能够是多个Process进行get()操做。

  • put方法用来插入数据到队列中,put方法还有两个可选参数:block和timeout。若是block为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是block为False,但该Queue已满,会当即抛出Queue.Full异常。
  • get方法能够从队列读取而且删除一个元素。一样,get方法有两个可选参数:block和timeout。若是block为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是block为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值;不然,若是队列为空,则当即抛出Queue.Empty异常。

在父进程中建立两个子进程,一个往Queue里写数据,一个从Queue里读数据:

#-*- coding:utf8 -*-
import os, time, random
from multiprocessing import Process, Queue

def write(q):
    print('Process to write: %s' % os.getpid())
    for val in range(0, 6):
        print('Put %s to queue...' % val)
        q.put(val)
        time.sleep(random.random())

def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        try:
            val = q.get(block=True, timeout=5)
            print('Get %s from queue.' % val)
        except Exception as e:
            if q.empty():
                print('队列消费完毕.')
                break
                
if __name__ == '__main__':
    q = Queue()
    
    proc1 = Process(target=write, args=(q,))
    proc2 = Process(target=read, args=(q,))

    proc1.start()
    proc2.start()

    proc1.join()
    proc2.join()
    
    # 若是proc2不break的话会一直阻塞,不调用join调用terminate方法能够终止进程
    # proc2.terminate()
    
    print('Done.')
复制代码

Pipe的读写效率要高于Queue。那么咱们如何的选择它们呢?

  • 若是你的环境是多生产者和消费者,那么你只能是选择queue队列。
  • 若是两个进程间处理的逻辑简单,可是就是要求绝对的速度,那么pipe是个好选择。
相关文章
相关标签/搜索