python 多进程

要让Python程序实现多进程(multiprocessing),咱们先了解操做系统的相关知识。python

Unix/Linux操做系统提供了一个fork()系统调用,它很是特殊。普通的函数调用,调用一次,返回一次,可是fork()调用一次,返回两次,由于操做系统自动把当前进程(称为父进程)复制了一份(称为子进程),而后,分别在父进程和子进程内返回。windows

子进程永远返回0,而父进程返回子进程的ID。这样作的理由是,一个父进程能够fork出不少子进程,因此,父进程要记下每一个子进程的ID,而子进程只须要调用getppid()就能够拿到父进程的ID。多线程

Python的os模块封装了常见的系统调用,其中就包括fork,能够在Python程序中轻松建立子进程:app

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

以上代码在windows上会运行失败,由于没有fork调用dom

multiprocessing

python提供了multiprocessing包供多线程的开发,其提供了一个Process类来表明一个进程对象,使用方式和多线程Threading同样async

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

# 输出:
Parent process 8768.
Child process will start.
Run child process test (8572)...
Child process end.

建立子进程时,只须要传入一个执行函数和函数的参数,建立一个Process实例,用start()方法启动函数

join()方法能够等待子进程结束后再继续往下运行,一般用于进程间的同步。ui

Pool

若是要启动大量的子进程,能够用进程池的方式批量建立子进程:spa

from multiprocessing import Pool, current_process
import os, time, random

def long_time_task(name):
    print('Run task %s %s (%s)...' % (name, current_process().name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s %s runs %0.2f seconds.' % (name, current_process().name, (end - start)))
    return name

def done(name):
    print("Task %s %s is done" % (name, current_process().name))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,), callback=done)
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

# 输出:
Parent process 10040.
Run task 0 PoolWorker-4 (5528)...
Waiting for all subprocesses done...
Run task 1 PoolWorker-1 (4844)...
Run task 2 PoolWorker-2 (4892)...
Run task 3 PoolWorker-3 (7492)...
Task 0 PoolWorker-4 runs 1.70 seconds.
Run task 4 PoolWorker-4 (5528)...
Task 0 MainProcess is done
Task 2 PoolWorker-2 runs 1.94 seconds.
Task 2 MainProcess is done
Task 1 PoolWorker-1 runs 2.26 seconds.
Task 1 MainProcess is done
Task 3 PoolWorker-3 runs 2.27 seconds.
Task 3 MainProcess is done
Task 4 PoolWorker-4 runs 1.83 seconds.
Task 4 MainProcess is done
All subprocesses done.

# 如是 p.apply(long_time_task, args=(i,)) 阻塞版本的话(此处无callback参数),输出:
Parent process 7624.
Run task 0 PoolWorker-3 (2128)...
Task 0 PoolWorker-3 runs 2.98 seconds.
Run task 1 PoolWorker-4 (5460)...
Task 1 PoolWorker-4 runs 1.51 seconds.
Run task 2 PoolWorker-2 (8780)...
Task 2 PoolWorker-2 runs 0.66 seconds.
Run task 3 PoolWorker-1 (7044)...
Task 3 PoolWorker-1 runs 1.13 seconds.
Run task 4 PoolWorker-3 (2128)...
Task 4 PoolWorker-3 runs 2.94 seconds.
Waiting for all subprocesses done...
All subprocesses done.

Pool对象调用join()方法会等待全部子进程执行完毕,调用join()以前必须先调用close(),调用close()以后就不能继续添加新的Process了。操作系统

这里Pool(4)即表明同时跑4个进程,不填写默认是当前CPU个数。

上述代码中的p.apply_async()apply()函数的变体,apply_async()apply()的并行版本,apply()apply_async()的阻塞版本,使用apply()主进程会被阻塞直到子进程执行结束。apply()既是Pool的方法,也是Python内置的函数,二者等价。

apply_async()是能够有callback回调函数的,回调函数的参数即为子进程函数的返回值(这里咱们能够设计多种业务,回传任务id,结束更改任务状态等)

apply()没有回调函数,由于自己就是阻塞的,会等待返回子进程函数的返回值

进程间通讯

对于进程间的通讯,multiprocessing提供了Queue,Pipes,Value等多种方式来交换数据

以Queue为例,在父进程中建立两个子进程,一个往Queue里写数据,一个从Queue里读数据:

# -*- coding: utf-8 -*-

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程建立Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,没法等待其结束,只能强行终止:
    pr.terminate()

# 输出:
Process to read: 8868
Process to write: 6832
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

在Unix/Linux下,multiprocessing模块封装了fork()调用,使咱们不须要关注fork()的细节。因为Windows没有fork调用,所以,multiprocessing须要“模拟”出fork的效果,父进程全部Python对象都必须经过pickle序列化再传到子进程去,全部,若是multiprocessing在Windows下调用失败了,要先考虑是否是pickle失败了。

进程锁

当多个子进程操做同一个东西的时候,就可能会出现混乱的状况,好比咱们启动2个进程,对其中一个变量+1 和 +3

# -*- coding: utf-8 -*-

import time
from multiprocessing import Process, Value


# 更改value值
def write(v, n):
    for i in range(10):
        time.sleep(0.1)
        v.value += n
        print v.value


if __name__ == '__main__':
    """
    typecode_to_type = {
    'c': ctypes.c_char,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
    }
    """
    # int 类型的值 初始化0,其余类型见上
    v = Value("i", 0)
    p1 = Process(target=write, args=(v, 1))
    p2 = Process(target=write, args=(v, 3))
    # 启动子进程p1,把v + 1:
    p1.start()
    # 启动子进程p2,把v + 3:
    p2.start()
    # 等待结束:
    p1.join()
    p2.join()

# 输出:
1
4
5
8
9
12
13
16
17
20
21
24
25
28
29
32
33
36
37
40

可见输出比较混乱,没有按预想的进程1,输出1,2,3,4,5... 进程2输出3,6,9...,这时就须要进程间的锁

# -*- coding: utf-8 -*-

import time
from multiprocessing import Process, Value, Lock


def write(v, n, lock):
    with lock:
        for i in range(10):
            time.sleep(0.1)
            v.value += n
            print v.value


if __name__ == '__main__':
    """
    typecode_to_type = {
    'c': ctypes.c_char,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
    }
    """
    lock = Lock()
    # int 类型的值 初始化0,其余类型见上
    v = Value("i", 0)
    p1 = Process(target=write, args=(v, 1, lock))
    p2 = Process(target=write, args=(v, 3, lock))
    # 启动子进程p1,把v + 1:
    p1.start()
    # 启动子进程p2,把v + 3:
    p2.start()
    # 等待结束:
    p1.join()
    p2.join()

# 输出:
1
2
3
4
5
6
7
8
9
10
13
16
19
22
25
28
31
34
37
40

此时结果就对了,可是这里由于有锁的存在,会串行,致使效率降低些

小结

在Unix/Linux下,可使用fork()调用实现多进程。

要实现跨平台的多进程,可使用multiprocessing模块。

进程间通讯是经过QueuePipes等实现的。

相关文章
相关标签/搜索