Multiprocessing

和 threading 的比较 

多进程 Multiprocessing 和多线程 threading 相似, 他们都是在 python 中用来并行运算的. 不过既然有了 threading, 为何 Python 还要出一个 multiprocessing 呢? 缘由很简单, 就是用来弥补 threading 的一些劣势, 好比在 threading 教程中提到的GIL.python

使用 multiprocessing 也很是简单, 若是对 threading 有必定了解的朋友, 大家的享受时间就到了. 由于 python 把 multiprocessing 和 threading 的使用方法作的几乎差很少. 这样咱们就更容易上手. 也更容易发挥你电脑多核系统的威力了!git

A:添加进程 Process

导入线程进程标准模块 

import multiprocessing as mp
import threading as td

定义一个被线程和进程调用的函数 

def job(a,d):
    print('aaaaa')

建立线程和进程 

t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))

注意:Thread和Process的首字母都要大写,被调用的函数没有括号,被调用的函数的参数放在args(…)中github

分别启动线程和进程多线程

t1.start()
p1.start()

分别链接线程和进程app

t1.join()
p1.join()

完整的线程和进程建立使用对比代码 

import multiprocessing as mp
import threading as td

def job(a,d):
    print('aaaaa')

t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))
t1.start()
p1.start()
t1.join()
p1.join()

从上面的使用对比代码能够看出,线程和进程的使用方法类似async

运用 

在运用时须要添加上一个定义main函数的语句函数

if __name__=='__main__':

完整的应用代码:工具

import multiprocessing as mp

def job(a,d):
    print('aaaaa')

if __name__=='__main__':
    p1 = mp.Process(target=job,args=(1,2))
    p1.start()
    p1.join()

运行环境要在terminal环境下,可能其余的编辑工具会出现运行结束后没有打印结果,在terminal中的运行后打印的结果为:学习

aaaaa

B:存储进程输出 Queue

学习资料:spa

Queue的功能是将每一个核或线程的运算结果放在队里中, 等到每一个线程或核运行完毕后再从队列中取出结果, 继续加载运算。缘由很简单, 多线程调用的函数不能有返回值, 因此使用Queue存储多个线程运算的结果

把结果放在 Queue 里 

定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果

#该函数没有返回值!!!
def job(q):
    res=0
    for i in range(1000):
        res+=i+i**2+i**3
    q.put(res)    #queue

主函数 

定义一个多线程队列,用来存储结果

if __name__=='__main__':
    q = mp.Queue()

定义两个线程函数,用来处理同一个任务, args 的参数只要一个值的时候,参数后面须要加一个逗号,表示args是可迭代的,后面可能还有别的参数,不加逗号会出错

p1 = mp.Process(target=job,args=(q,))
p2 = mp.Process(target=job,args=(q,))

分别启动、链接两个线程

p1.start()
p2.start()
p1.join()
p2.join()

上面是分两批处理的,因此这里分两批输出,将结果分别保存

res1 = q.get()
res2 = q.get()

打印最后的运算结果

print(res1+res2)

完整的代码 

import multiprocessing as mp

def job(q):
    res=0
    for i in range(1000):
        res+=i+i**2+i**3
    q.put(res)    #queue

if __name__=='__main__':
    q = mp.Queue()
    p1 = mp.Process(target=job,args=(q,))
    p2 = mp.Process(target=job,args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print(res1+res2)

运行的时候仍是要在terminal中,最后运行结果为

499667166000

C:效率对比 threading & multiprocessing

学习资料:

上篇讲了多进程/多核的运算,此次咱们来对比下多进程,多线程和什么都不作时的消耗时间,看看哪一种方式更有效率。

建立多进程 multiprocessing 

和上节同样,首先import multiprocessing并定义要实现的job(),同时为了容易比较,咱们将计算的次数增长到1000000

import multiprocessing as mp

def job(q):
    res = 0
    for i in range(1000000):
        res += i + i**2 + i**3
    q.put(res) # queue

由于多进程是多核运算,因此咱们将上节的多进程代码命名为multicore()

def multicore():
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('multicore:',res1 + res2)

建立多线程 multithread 

接下来建立多线程程序,建立多线程和多进程有不少类似的地方。首先import threading而后定义multithread()完成一样的任务

import threading as td

def multithread():
    q = mp.Queue() # thread可放入process一样的queue中
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('multithread:', res1 + res2)

建立普通函数 

最后咱们定义最普通的函数。注意,在上面例子中咱们创建了两个进程或线程,均对job()进行了两次运算,因此在normal()中咱们也让它循环两次

def normal():
    res = 0
    for _ in range(2):
        for i in range(1000000):
            res += i + i**2 + i**3
    print('normal:', res)

运行时间 

最后,为了对比各函数运行时间,咱们须要import time, 而后依次运行定义好函数:

import time

if __name__ == '__main__':
    st = time.time()
    normal()
    st1 = time.time()
    print('normal time:', st1 - st)
    multithread()
    st2 = time.time()
    print('multithread time:', st2 - st1)
    multicore()
    print('multicore time:', time.time() - st2)

大功告成,下面咱们来看下实际运行对比。

结果对比 

"""
# range(1000000)
('normal:', 499999666667166666000000L)
('normal time:', 1.1306169033050537)
('thread:', 499999666667166666000000L)
('multithread time:', 1.3054230213165283)
('multicore:', 499999666667166666000000L)
('multicore time:', 0.646507978439331)
"""

普通/多线程/多进程的运行时间分别是1.131.30.64秒。 咱们发现多核/多进程最快,说明在同时间运行了多个任务。 而多线程的运行时间竟然比什么都不作的程序还要慢一点,说明多线程仍是有必定的短板的。 戳这里查看“多线程的短板是什么”。

咱们将运算次数加十倍,再来看看三种方法的运行时间:

"""
# range(10000000)
('normal:', 4999999666666716666660000000L)
('normal time:', 40.041773080825806)
('thread:', 4999999666666716666660000000L)
('multithread time:', 41.777158975601196)
('multicore:', 4999999666666716666660000000L)
('multicore time:', 22.4337899684906)
"""

此次运行时间依然是 多进程 < 普通 < 多线程,由此咱们能够清晰地看出哪一种方法更有效率。

D:进程池 Pool

此次咱们讲进程池Pool。 进程池就是咱们将所要运行的东西,放到池子里,Python会自行解决多进程的问题

首先import multiprocessing和定义job()

import multiprocessing as mp

def job(x):
    return x*x

进程池 Pool() 和 map() 

而后咱们定义一个Pool

pool = mp.Pool()

有了池子以后,就可让池子对应某一个函数,咱们向池子里丢数据,池子就会返回函数返回的值。 Pool和以前的Process的不一样点是丢向Pool的函数有返回值,而Process的没有返回值。

接下来用map()获取结果,在map()中须要放入函数和须要迭代运算的值,而后它会自动分配给CPU核,返回结果

res = pool.map(job, range(10))

让咱们来运行一下

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore()

运行结果:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

自定义核数量 

咱们怎么知道Pool是否真的调用了多个核呢?咱们能够把迭代次数增大些,而后打开CPU负载看下CPU运行状况

打开CPU负载(Mac):活动监视器 > CPU > CPU负载(单击一下便可)

Pool默认大小是CPU的核数,咱们也能够经过在Pool中传入processes参数便可自定义须要的核数量,

def multicore():
    pool = mp.Pool(processes=3) # 定义CPU核数量为3
    res = pool.map(job, range(10))
    print(res)

apply_async() 

Pool除了map()外,还有能够返回结果的方式,那就是apply_async().

apply_async()中只能传递一个值,它只会放入一个核进行运算,可是传入值时要注意是可迭代的,因此在传入值后须要加逗号, 同时须要用get()方法获取返回值

def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get得到结果
    print(res.get())

运行结果;

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()
4 # apply_async()

用 apply_async() 输出多个结果 

那么如何用apply_async()输出多个迭代呢?

咱们在apply_async()中多传入几个值试试

res = pool.apply_async(job, (2,3,4,))

结果会报错:

TypeError: job() takes exactly 1 argument (3 given)

apply_async()只能输入一组参数。

在此咱们将apply_async() 放入迭代器中,定义一个新的multi_res

multi_res = [pool.apply_async(job, (i,)) for i in range(10)]

一样在取出值时须要一个一个取出来

print([res.get() for res in multi_res])

合并代码

def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get得到结果
    print(res.get())
    # 迭代器,i=0时apply一次,i=1时apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 从迭代器中取出
    print([res.get() for res in multi_res])

运行结果

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map()
4 
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # multi_res

能够看出在apply用迭代器的获得的结果和用map获得的结果是同样的

总结 

  1. Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
  2. map() 放入迭代参数,返回多个结果
  3. apply_async()只能放入一组参数,并返回一个结果,若是想获得map()的效果须要经过迭代

相关文章
相关标签/搜索