Python多进程应用 进程,线程,GIL,Python多线程,生产者消费者模型都是什么鬼

在我以前的一篇博文中详细介绍了Python多线程的应用:html

 进程,线程,GIL,Python多线程,生产者消费者模型都是什么鬼

可是因为GIL的存在,使得python多线程没有充分利用CPU的多核,为了利用多核,我能够采用多进程;python

1. 父进程与子进程

wiki上对于父进程与子进程的定义:windows

a)Parent process服务器

In Unix-like operating systems, every process except process 0 (the swapper) is created when another process executes the fork() system call. The process that invoked fork is the parent process and the newly created process is the child process. Every process (except process 0) has one parent process, but can have many child processes.网络

In the Linux kernel, in which there is a very slim difference between processes and POSIX threads, there are two kinds of parent processes, namely real parent and parent. Parent is the process that receives the SIGCHLD signal on child's termination, whereas real parent is the thread that actually created this child process in a multithreaded environment. For a normal process, both these two values are same, but for a POSIX thread which acts as a process, these two values may be different.[1]多线程

b)Child processapp

A child process in computing is a process created by another process (the parent process). This technique pertains to multitasking operating systems, and is sometimes called a subprocess or traditionally a subtask.运维

There are two major procedures for creating a child process: the fork system call (preferred in Unix-like systems and the POSIX standard) and the spawn (preferred in the modern (NT) kernel of Microsoft Windows, as well as in some historical operating systems).async

即,Unix/Linux操做系统提供了一个fork()系统调用,用于建立子进程;fork()很是特殊。普通的函数调用,调用一次,返回一次,可是fork()调用一次,返回两次,由于操做系统自动把当前进程(称为父进程)复制了一份(称为子进程),而后,分别在父进程和子进程内返回。对于返回值,子进程永远返回0,而父进程返回子进程的ID。这样作的理由是,一个父进程能够fork出不少子进程,因此,父进程要记下每一个子进程的ID;函数

python的os模块,就含有fork函数:

#!/bin/env python
#coding:utf-8

import os
import time

print('Process %s start...' % os.getpid())
pid = os.fork()
if pid == 0:
    print('i am child process %s and my parent is %s' % (os.getpid(), os.getppid()))
else:
    print('i %s just created a child process %s' % (os.getpid(), pid))

运行结果:

Process 3522 start...
i 3522 just created a child process 3523
i am child process 3523 and my parent is 3522

由于fork()调用一次,返回两次,因此获得上面的结果;这里注意:因为Windows没有fork调用,上面的代码在Windows上没法运行;有了fork调用,一个进程在接到新任务时就能够复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

2. multiprocessing

上面说到windows没有fork调用,那么如何在windows上实现多进程呢?

经过multiprocess模块,因为Python是跨平台的,天然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块

python中两个经常使用来处理进程的模块分别是subprocess和multiprocessing,其中subprocess一般用于执行外部程序,好比一些第三方应用程序,而不是Python程序。若是须要实现调用外部程序的功能,python的psutil模块是更好的选择,它不只支持subprocess提供的功能,并且还能对当前主机或者启动的外部程序进行监控,好比获取网络、cpu、内存等信息使用状况,在作一些自动化运维工做时支持的更加全面。multiprocessing是python的多进程模块,主要经过启动python进程,调用target回调函数来处理任务。

注意:multiprocessing的方法与threading的方法相似,因此咱们这里只给出示例代码,而不作详细介绍;

1)multiprocessing基本使用

与threading相似,也是有两种方式

a)直接调用

 1 from multiprocessing import Process, freeze_support
 2 import os
 3 
 4 processes = []
 5 
 6 def run(item):
 7     print('-'*50)
 8     print('child process %s id: %s'%(item, os.getpid()))
 9     print('child process %s parent id: %s' % (item, os.getppid()))
10 
11 def main():
12     #打印主进程进程号
13     print('main process id: ', os.getpid())
14     #建立多个子进程
15     for item in range(2):
16         p = Process(target=run, args=(item, ))
17         processes.append(p)
18         print('child process %s name: %s' % (item, p.name))
19         print('child process %s id: %s' % (item, p.pid))
20 
21     for item in processes:
22         item.start()
23 
24     for item in processes:
25         item.join()
26 
27 if __name__ == '__main__':
28     main()
29     freeze_support()

b)面向对象方式调用

 1 from multiprocessing import Process, freeze_support
 2 import os
 3 
 4 processes = []
 5 
 6 class MyProcess(Process):
 7     def __init__(self, func, item):
 8         super(MyProcess, self).__init__()
 9         self.__func = func
10         self.__item = item
11 
12     def run(self):
13         self.__func(self.__item)
14 
15 def proc(item):
16     print('-'*50)
17     print('child process %s id: %s'%(item, os.getpid()))
18     print('child process %s parent id: %s' % (item, os.getppid()))
19 
20 def main():
21     #打印主进程进程号
22     print('main process id: ', os.getpid())
23     #建立多个子进程
24     for item in range(2):
25         p = MyProcess(proc, item)
26         processes.append(p)
27         print('child process %s name: %s' % (item, p.name))
28         print('child process %s id: %s' % (item, p.pid))
29 
30     for item in processes:
31         item.start()
32 
33     for item in processes:
34         item.join()
35 
36 if __name__ == '__main__':
37     main()
38     freeze_support()

注:2.7中,if __name__ == '__main__'的代码块中必须加上freeze_support(),python3好像不须要了

结果:

main process id:  10972
child process 0 name: MyProcess-1
child process 0 id: None
child process 1 name: MyProcess-2
child process 1 id: None
--------------------------------------------------
child process 0 id: 10636
child process 0 parent id: 10972
--------------------------------------------------
child process 1 id: 8076
child process 1 parent id: 10972

2)daemon属性设置

 1 from multiprocessing import Process
 2 import time
 3 
 4 processes = []
 5 
 6 def run(item):
 7     time.sleep(1)
 8     print('item: ', item)
 9 
10 def main():
11     #建立多个子进程
12     for item in range(2):
13         p = Process(target=run, args=(item, ))
14         processes.append(p)
15         p.daemon = True
16 
17     for item in processes:
18         item.start()
19 
20     print('all done')
21 
22 if __name__ == '__main__':
23     main()

结果:

all done

注意daemon和threading的方式不一样,这里是直接设置属性,而不是调用方法;另外要在start前设置daemon;

3)进程同步

既然进程之间不共享数据,为何还有进程同步问题呢?若是多个进程打开同一个文件,在同一个屏幕输出呢?这些仍是须要进程同步的,经过Lock

4)Semaphore

同threading.Semaphore()用法相同,只是建立的Semaphore须要做为参数传入子进程,由于进程间不共享资源

5)Event

同threading.Event()用法相同,只是建立的Event须要做为参数传入子进程

6)进程间通信

由于进程之间不共享资源,咱们先看一个例子证实一下:

 1 from multiprocessing import Process
 2 
 3 processes = []
 4 data_list = []
 5 
 6 def run(lst, item):
 7     lst.append(item)
 8     print('%s : %s' % (item, lst))
 9 
10 def main():
11     for item in range(4):
12         p = Process(target=run, args=(data_list, item))
13         processes.append(p)
14 
15     for item in processes:
16         item.start()
17 
18     for item in processes:
19         item.join()
20 
21     print('final lst: ', data_list)
22 
23 if __name__ == '__main__':
24     main()

结果:

1 : [1]
2 : [2]
0 : [0]
3 : [3]
final lst:  []

因此必须经过第三方实现进程间通信,下面介绍3种方法

a)Queue

用法与queue.Queue在多线程中的应用相同,只是建立的queue要做为参数传入子进程

 1 from multiprocessing import Process, Queue
 2 import time
 3 
 4 q = Queue(10)
 5 
 6 def put(q):
 7     for i in range(3):
 8         q.put(i)
 9     print('queue size after put: %s' % q.qsize())
10 
11 def get(q):
12     print('queue size before get: %s' % q.qsize())
13     while not q.empty():
14         print('queue get: ', q.get())
15 
16 def main():
17     p_put = Process(target=put, args=(q,))
18     p_get = Process(target=get, args=(q,))
19     p_put.start()
20     time.sleep(1)
21     p_get.start()
22     p_get.join()
23     print('all done')
24 
25 if __name__ == '__main__':
26     main()

结果:

queue size after put: 3
queue size before get: 3
queue get:  0
queue get:  1
queue get:  2
all done

b)Pipe

Pipe方法返回(conn1, conn2)表明一个管道的两个端。Pipe方法有duplex参数,若是duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2都可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,能够调用conn1.send发送消息,conn1.recv接收消息。若是没有消息可接收,recv方法会一直阻塞。若是管道已经被关闭,那么recv方法会抛出EOFError。
 1 import multiprocessing
 2 import time
 3 
 4 pipe = multiprocessing.Pipe()
 5 
 6 def send(pipe):
 7     for i in range(5):
 8         print("send: %s" % (i,))
 9         pipe.send(i)
10         time.sleep(0.2)
11 
12 def recv_1(pipe):
13     while True:
14         print("rev_1:", pipe.recv())
15         time.sleep(1)
16 
17 def recv_2(pipe):
18     while True:
19         print("rev_2:", pipe.recv())
20         time.sleep(1)
21 
22 def main():
23     p_send = multiprocessing.Process(target=send, args=(pipe[0],))
24     p_recv_1 = multiprocessing.Process(target=recv_1, args=(pipe[1],))
25     p_recv_2 = multiprocessing.Process(target=recv_2, args=(pipe[1],))
26 
27     p_send.start()
28     p_recv_1.start()
29     p_recv_2.start()
30 
31     p_send.join()
32     p_recv_1.join()
33     p_recv_2.join()
34 
35 if __name__ == "__main__":
36     main()

结果:

send: 0
rev_1: 0
send: 1
rev_2: 1
send: 2
send: 3
send: 4
rev_1: 2
rev_2: 3
rev_1: 4

c)Manager

至关至关给力,上面的Queue,Pipe仅仅能够传递数据,而不能作到数据共享(不一样进程修改同一份数据),可是Manger能够作到数据共享

看一下官方文档:

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventQueueValue and Array.

from multiprocessing import Process, Manager

def run(d, l):
    d['name'] = 'winter'
    l.reverse()


def main():
    p = Process(target=run, args=(d, l, ))
    p.start()
    p.join()

    print('final dict: ', d)
    print('final list: ', l)

if __name__ == "__main__":
    mgmt = Manager()
    d = mgmt.dict()
    l = mgmt.list(range(10))
    main()

注意:mgmt = Manger()必须放在if __name__ == "__main__"的代码块中,否则报freeze_support()的错误

并且,注意这里:

Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.

还能够在不一样主机之间共享数据;

7)进程池Pool

若是要启动大量的子进程,能够用进程池pool批量建立子进程:Pool能够提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,若是池尚未满,那么就会建立一个新的进程用来执行该请求;但若是池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会建立新的进程来执行。

有两种方法:阻塞方法Pool.apply()和非阻塞方法Pool.apply_async()

a)阻塞方法Pool.apply()

import multiprocessing
import time

def func(name):
    print("start: %s" % name)
    time.sleep(2)
    return 'end: %s' % name

if __name__ == "__main__":
    name_list = ['winter', 'elly', 'james', 'yule']
    res_list = []
    # 建立一个进程总数为3的进程池
    pool = multiprocessing.Pool(3)
    for member in name_list:
        # 建立子进程,并执行,不须要start
        res = pool.apply(func, (member,))
        print(res)
    pool.close()
    # 调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool
    pool.join()
    print("all done...")

结果:

start: winter
end: winter
start: elly
end: elly
start: james
end: james
start: yule
end: yule
all done...

发现,阻塞方式下,进程是一个一个执行的,仍是串行,因此apply用的少;

注意两点:

1. 进程池执行子进程不须要start;

2. 调用join()以前必须先调用close(),调用close()以后就不能继续添加新的Process了;

b)非阻塞方法Pool.apply_async()

import multiprocessing
import time

def func(name):
    print("start: %s" % name)
    time.sleep(2)
    return 'end: %s' % name

def func_exp(msg):
    print('callback: %s' % msg)

if __name__ == "__main__":
    name_list = ['winter', 'elly', 'james', 'yule']
    res_list = []
    # 建立一个进程总数为3的进程池
    pool = multiprocessing.Pool()
    for member in name_list:
        # 建立子进程,并执行,不须要start
        res = pool.apply_async(func, (member,), callback=func_exp)
        #注意这里是append了res,不是res.get(),否则又要阻塞了
        res_list.append(res)
    for res_mem in res_list:
        print(res_mem.get())
    pool.close()
    # 调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool
    pool.join()
    print("all done...")

结果:

start: winter
start: elly
start: james
start: yule
callback: end: winter
end: winter
callback: end: elly
end: elly
callback: end: james
end: james
callback: end: yule
end: yule
all done...

结果分析:

1. 能够看到非阻塞状况下,充分利用了多核,实现了并行;

2. apply_async方法含有callback参数,能够用于回调

3.为何apply方法是阻塞的呢?到底阻塞在了哪里呢?同时apply_async方法作了什么改进呢?

查看apply方法源码:

def apply(self, func, args=(), kwds={}):
    '''
    Equivalent of `func(*args, **kwds)`.
    '''
    assert self._state == RUN
    return self.apply_async(func, args, kwds).get()

apply方法最终执行了self.apply_async(func, args, kwds).get(),一样调用了apply_async()方法,只是对结果执行了get()方法;阻塞就是阻塞在了这里;

那我修改一下apply_async()的代码是否是可让apply_async()能够变成阻塞的呢?试一下

 1 import multiprocessing
 2 import time
 3 
 4 def func(name):
 5     print("start: %s" % name)
 6     time.sleep(2)
 7     return 'end: %s' % name
 8 
 9 def func_exp(msg):
10     print('callback: %s' % msg)
11 
12 if __name__ == "__main__":
13     name_list = ['winter', 'elly', 'james', 'yule']
14     # 建立一个进程总数为3的进程池
15     pool = multiprocessing.Pool()
16     for member in name_list:
17         # 建立子进程,并执行,不须要start
18         res = pool.apply_async(func, (member,), callback=func_exp)
19         print(res.get())
20     pool.close()
21     # 调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool
22     pool.join()
23     print("all done...")

注意红色部分是我修改的编码,结果果真变成了阻塞状态:

start: winter
callback: end: winter
end: winter
start: elly
callback: end: elly
end: elly
start: james
callback: end: james
end: james
start: yule
callback: end: yule
end: yule
all done...

c)进程池该设置多少个进程数?

既然多进程能够利用多核,那么是否是建立越多的进程越好呢?不是的,由于进程的切换成本高,因此数量太多的进程来回切换反而会下降效率!

进程数是一个经验值,和系统的硬件资源有很大关系;最优的进程数须要经过不断调整得出;

Pool建立时,进程池的进程数默认大小为CPU的逻辑CPU数目(内核线程数);

经验上来讲:

进程数与CPU核数比例1:1比较好,对于支持多线程的模型,线程数通常推荐的至少是1:1.5,这样能够留一部分线程来作IO。Python的多进程通常要么是作纯计算,要么是协程模型(没有IO等待时间,或者等待时间不多),要么在进程内再使用多线程(很是不推荐,须要了解fork机制),这样每一个核一个进程通常足够了,进程切换的开销略大,数量太多的话来回切换反而会下降效率。不过有种状况例外,若是磁盘IO比较多,通常即便是协程,磁盘IO也是同步的,这时候多增长一些进程数也许有帮助。
相关文章
相关标签/搜索