Py西游攻关之多进程(multiprocessing模块)

多进程

一 多进程的概念

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.html

因为GIL的存在,python中的多线程其实并非真正的多线程,若是想要充分地使用多核CPU的资源,在python中大部分状况须要使用多进程。Python提供了很是好用的多进程包multiprocessing,只须要定义一个函数,Python会完成其余全部事情。借助这个包,能够轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。python

  multiprocessing包是Python中的多进程管理包。与threading.Thread相似,它能够利用multiprocessing.Process对象来建立一个进程。该进程能够运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象能够像多线程那样,经过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。因此,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。多线程

但在使用这些共享API的时候,咱们要注意如下几点:并发

  • 在UNIX平台上,当某个进程终结以后,该进程须要被其父进程调用wait,不然进程成为僵尸进程(Zombie)。因此,有必要对每一个Process对象调用join()方法 (实际上等同于wait)。对于多线程来讲,因为只有一个进程,因此不存在此必要性。
  • multiprocessing提供了threading包中没有的IPC(好比Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (由于它们占据的不是用户进程的资源)。
  • 多进程应该避免共享资源。在多线程中,咱们能够比较容易地共享资源,好比使用全局变量或者传递参数。在多进程状况下,因为每一个进程有本身独立的内存空间,以上方法并不合适。此时咱们能够经过共享内存和Manager的方法来共享资源。但这样作提升了程序的复杂度,并由于同步的须要而下降了程序的效率。

Process.PID中保存有PID,若是进程尚未start(),则PID为None。app

window系统下,须要注意的是要想启动一个子进程,必须加上那句if __name__ == "main",进程相关的要写在这句下面。async

实例:     ide

复制代码
from multiprocessing import Process
import time
def f(name):
    time.sleep(1)
    print('hello', name,time.ctime())

if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = Process(target=f, args=('alvin',))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()
    print('end')
复制代码

类式调用函数

复制代码
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        #self.name = name

    def run(self):
        time.sleep(1)
        print ('hello', self.name,time.ctime())


if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()

    print('end')
复制代码

To show the individual process IDs involved, here is an expanded example:ui

复制代码
from multiprocessing import Process
import os
import time
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())


def f(name):
    info('\033[31;1mfunction f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')
    time.sleep(100)
    p = Process(target=info, args=('bob',))
    p.start()
    p.join()
复制代码

二 Process类

构造方法:this

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前尚未实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,若是实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():无论任务是否完成,当即中止工做进程

属性:

  authkey

  daemon:和线程的setDeamon功能同样

  exitcode(进程在运行时为None、若是为–N,表示被信号N结束)

  name:进程名字。

  pid:进程号。

复制代码
import time
from  multiprocessing import Process

def foo(i):
    time.sleep(1)
    print (p.is_alive(),i,p.pid)
    time.sleep(1)

if __name__ == '__main__':
    p_list=[]
    for i in range(10):
        p = Process(target=foo, args=(i,))
        #p.daemon=True
        p_list.append(p)

    for p in p_list:
        p.start()
    # for p in p_list:
    #     p.join()

    print('main process end')
复制代码

二 进程间通信 

不一样进程间内存是不共享的,要想实现两个进程间的数据交换,能够用如下方法:

Queues

使用方法跟threading里的queue相似:

复制代码
from multiprocessing import Process, Queue

def f(q,n):
    q.put([42, n, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p_list=[]
    for i in range(3):
        p = Process(target=f, args=(q,i))
        p_list.append(p)
        p.start()
    print(q.get())
    print(q.get())
    print(q.get())
    for i in p_list:
            i.join()
复制代码

Pipes

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

复制代码
from multiprocessing import Process, Pipe
 
def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
 
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()
复制代码

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

Managers

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example,

复制代码
from multiprocessing import Process, Manager

def f(d, l,n):
    d[n] = '1'
    d['2'] = 2
    d[0.25] = None
    l.append(n)
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()

        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l,i))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()

        print(d)
        print(l)
复制代码

三 进程同步

Without using the lock output from the different processes is liable to get all mixed up.

复制代码
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()
复制代码

四 进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply
  • apply_async
复制代码
from  multiprocessing import Process,Pool
import time
 
def Foo(i):
    time.sleep(2)
    return i+100
 
def Bar(arg):
    print('-->exec done:',arg)
 
pool = Pool(5)
 
for i in range(10):
    pool.apply_async(func=Foo, args=(i,),callback=Bar)
    #pool.apply(func=Foo, args=(i,))
 
print('end')
pool.close()
pool.join()
复制代码

协程

相关文章
相关标签/搜索