多线程 Threading Multiprocessing(Python)

多线程是加速程序计算的有效方式,Python的多线程模块threading上手快速简单,学习莫烦多线程教程动手操做了一遍,这里记录一下。python

1 Threading

1.1 添加线程

import threading
#获取已激活的线程数
print(threading.active_count())  #1

#查看全部线程信息
print(threading.enumerate())  #[<_MainThread(MainThread, started 18496)>]

#查看如今正在运行的线程
print(threading.current_thread()) #<_MainThread(MainThread, started 18496)>


import threading

def thread_job():
    print('This is a thread of %s' % threading.current_thread())

def runMain():
    thread = threading.Thread(target=thread_job,) #定义线程
    thread.start()  #线程开始工做

if __name__ == '__main__':
    runMain()

#输出
This is a thread of <Thread(Thread-1, started 12324)>

1.2 join功能

不加join功能,线程任务还未完成便输出all done。git

import threading
import time

def thread_job():
    print('T1 start.\n')
    for i in range(10):
        time.sleep(0.1)  #任务间隔0.1秒
    print('T1 finish.\n')

add_thread = threading.Thread(target=thread_job, name='T1')
add_thread.start()

print('all done.\n')
#输出
T1 start.
all done.
T1 finish.

若要遵循顺序,在启动线程后调用join , 使用join控制多个线程的执行顺序,效果以下。github

import threading
import time

def thread_job():
    print('T1 start.\n')
    for i in range(10):
        time.sleep(0.1)  #任务间隔0.1秒
    print('T1 finish.\n')

add_thread = threading.Thread(target=thread_job, name='T1')
add_thread.start()
add_thread.join()

print('all done.\n')
#输出
T1 start.
T1 finish.
all done.

1.3 存储进程结果Queue

将数据列表中的数据传入,使用四个线程处理,将结果保存在Queue中,线程执行完后,从Queue中获取存储的结果多线程

#导入线程 队列的标准模块
import threading
import time
from queue import Queue

定义一个被多线程调用的函数:函数的参数时一个列表l和一个队列q,函数的功能是对列表的每一个元素进行平方计算,将结果保存在队列中app

def job(l,q):
    for i in range(len(l)):
        l[i] = l[i]**2
    q.put(l)

定义一个多线程函数:在多线程函数中定义一个Queue ,用来保存返回值 ,代替return ,定义一个多线程列表 ,初始化一个多维数据列表async

def mulithreading():
    q = Queue() #q中存放返回值 代替return的返回值
    threads = []
    data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]]

在多线程函数中定义四个线程,启动线程,将每一个线程添加到多线程的列表中函数

for i in range(4): #定义四个线程
        t = threading.Thread(target=job,args=(data[i],q))
        t.start()
        threads.append(t) #把每一个线程append到线程列表中

分别join四个线程到主线程学习

for thread in threads:
        thread.join()

定义一个空列表results 将四个线程运行后保存在队列中的结果返回给resultsui

results = []
    for _ in range(4):
        results.append(q.get()) #q.get()按顺序从q中拿出一个值
    print(results)

完整代码:线程

#导入线程 队列的标准模块
import threading
import time
from queue import Queue

#定义一个被多线程调用的函数
def job(l,q):
    for i in range(len(l)):
        l[i] = l[i]**2
    q.put(l)
#定义一个多线程函数
def mulithreading():
    q = Queue() #q中存放返回值 代替return的返回值
    threads = []
    data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]]
    for i in range(4): #定义四个线程
        t = threading.Thread(target=job,args=(data[i],q))
        t.start()
        threads.append(t) #把每一个线程append到线程列表中
    #分别join四个线程到主线程
    for thread in threads:
        thread.join()
    #定义一个空列表results 将四个线程运行后保存在队列中的结果返回给results
    results = []
    for _ in range(4):
        results.append(q.get()) #q.get()按顺序从q中拿出一个值
    print(results)
if __name__ == '__main__':
    mulithreading()

#输出
[[1, 4, 9], [9, 16, 25], [16, 16, 16], [25, 25, 25]]

1.4 GIL 不必定有效率

python 的多线程 threading 有时候并非特别理想. 最主要的缘由是就是, Python 的设计上, 有一个必要的环节, 就是 Global Interpreter Lock (GIL). 这个东西让 Python 仍是一次性只能处理一个东西.

import threading
from queue import Queue
import copy
import time

def job(l, q):
    res = sum(l)
    q.put(res)

def multithreading(l):
    q = Queue()
    threads = []
    for i in range(4):
        t = threading.Thread(target=job, args=(copy.copy(l), q), name='T%i' % i)
        t.start()
        threads.append(t)
    [t.join() for t in threads]
    total = 0
    for _ in range(4):
        total += q.get()
    print(total)

def normal(l):
    total = sum(l)
    print(total)

if __name__ == '__main__':
    l = list(range(10000000))
    s_t = time.time()
    normal(l*4)
    print('normal: ',time.time()-s_t)
    s_t = time.time()
    multithreading(l)
    print('multithreading: ', time.time()-s_t)
#输出
199999980000000
normal:  1.7343778610229492
199999980000000
multithreading:  2.218825340270996

程序 threading 和 Normal 运行了同样屡次的运算. 可是咱们发现 threading 却没有快多少, 按理来讲, 咱们预期会要快3-4倍, 由于有创建4个线程, 可是并无. 这就是其中的 GIL 在做怪.

1.5 线程锁

不使用锁

import threading

def job1():  #全局变量A的值每次加1,循环10次
    global A
    for i in range(10):
        A += 1
        print('job1',A)

def job2(): #全局变量A的值每次加10,循环10次
    global A
    for i in range(10):
        A += 10
        print('job2',A)

if __name__ == '__main__':
    A = 0
    t1 = threading.Thread(target=job1)
    t2 = threading.Thread(target=job2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

#输出 打印结果很是混乱
job1 1
job1 2
job1 3
job1 4
job2 14
job1 15
job2 25
job1 26
job2 36
job1 37
job2 47
job1 48
job2 58
job1 59
job2 69
job1 70
job2 80
job2 90
job2 100
job2 110

使用锁
lock在不一样线程使用同一共享内存时,可以确保线程之间互不影响,使用lock的方法是, 在每一个线程执行运算修改共享内存以前,执行lock.acquire()将共享内存上锁, 确保当前线程执行时,内存不会被其余线程访问,执行运算完毕后,使用lock.release()将锁打开, 保证其余的线程可使用该共享内存。

import threading

def job1():
    global A;lock = threading.Lock()
    lock.acquire()
    for i in range(10):
        A += 1
        print('job1',A)
    lock.release()

def job2():
    global A;lock = threading.Lock()
    lock.acquire()
    for i in range(10):
        A += 10
        print('job2',A)
    lock.release()

if __name__ == '__main__':
    A = 0
    t1 = threading.Thread(target=job1)
    t2 = threading.Thread(target=job2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

#输出  使用lock后 执行完一个线程后再执行另外一个线程。使用lock和不使用lock,最后打印输出的结果是不一样的。
job1 1
job1 2
job1 3
job1 4
job1 5
job1 6
job1 7
job1 8
job1 9
job1 10
job2 20
job2 30
job2 40
job2 50
job2 60
job2 70
job2 80
job2 90
job2 100
job2 110

2 Multiprocessing

多进程 Multiprocessing 和多线程 threading 相似, 都是在 python 中用来并行运算的。不过既然有了 threading, 为何 Python 还要出一个 multiprocessing 呢? 由于要用来弥补 threading 的一些劣势, 好比在 threading 教程中提到的GIL, python 把 multiprocessing 和 threading 的使用方法作的几乎差很少,使用多线程发挥电脑多核系统的威力。

2.1添加Process

#导入线程进程标准模块
import multiprocessing as mp
import threading as td

#定义一个被线程和进程调用的函数
def job(a,d):
    print('AA')

#建立线程和进程
t1=td.Thread(target=job,args=(1,2))
p1=mp.Process(target=job,args=(1,2))

#启动线程和进程
t1.start()
p1.start()

#链接线程和进程
t1.join()
p1.join()

#能够看出线程和进程的使用方式类似

完整代码

#导入线程进程标准模块
import multiprocessing as mp
import threading as td

#定义一个被进程调用的函数
def job(a,d):
    print('AA')

if __name__ == '__main__':
    p1 = mp.Process(target=job, args=(1, 2)) #建立进程
    p1.start() #启动进程
    p1.join()  #链接进程
#输出
AA

2.2 存储进程输出 Queue

Queue的功能是将每一个核或线程的运算结果放在队里中, 等到每一个线程或核运行完毕后再从队列中取出结果, 继续加载运算。由于多线程调用的函数不能有返回值, 因此使用Queue存储多个线程运算的结果。
定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果

#定义一个多线程调用函数
def job(q): #注:该函数没有返回值
    res = 0
    for i in range(1000):
        res += i+i**2+i**3
    q.put(res) #queue

定义两个线程函数,用来处理同一个任务, args 的参数只要一个值的时候,参数后面须要加一个逗号,表示args是可迭代的,后面可能还有别的参数,不加逗号会出错

p1 = mp.Process(target=job,args=(q,))
p2 = mp.Process(target=job,args=(q,))

完整代码实现

import multiprocessing as mp

#定义一个多线程调用函数
def job(q): #注:该函数没有返回值
    res = 0
    for i in range(1000):
        res += i+i**2+i**3
    q.put(res) #queue
if __name__ == '__main__':
    q=mp.Queue()  #定义一个多线程队列 存储结果
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start() #启动线程  分两批处理
    p2.start()
    p1.join() #链接线程
    p2.join()
    res1=q.get() #分两批输出 将结果分别保存
    res2=q.get()
    print(res1+res2)
#输出
499667166000

2.3效率对比

对比下多进程,多线程和什么都不作时的消耗时间,看看哪一种方式更有效率。

import multiprocessing as mp
def job(q):
    res=0
    for i in range(1000000):
        res += i + i**2 + i**3
    q.put(res)

#因为多进程是多核运算 多进程代码命名为multicore()
def multicore():
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('multicore:',res1 + res2)

#建立多线程
import threading as td
def multithread():
    q = mp.Queue() # thread可放入process一样的queue中
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('multithread:', res1 + res2)

#建立普通函数
def normal():
    res = 0
    for _ in range(2):
        for i in range(1000000):
            res += i + i**2 + i**3
    print('normal:', res)

import time
if __name__ == '__main__':
    st = time.time()
    normal()
    st1 = time.time()
    print('normal time:', st1 - st)
    multithread()
    st2 = time.time()
    print('multithread time:', st2 - st1)
    multicore()
    print('multicore time:', time.time() - st2)

#输出
normal: 499999666667166666000000
normal time: 1.6875250339508057
multithread: 499999666667166666000000
multithread time: 3.1562907695770264
multicore: 499999666667166666000000
multicore time: 1.0937612056732178

此次运行时间依然是:多进程 < 普通 < 多线程。 发现多核/多进程最快,说明在同时间运行了多个任务。 而多线程的运行时间竟然比什么都不作的程序还要慢一点,说明多线程仍是有短板。

2.4 进程池Pool

进程池就是将所要运行的东西,放到池子里,Python会自行解决多进程的问题
2.4.1 进程池Pool()和map()

#定义一个Pool
pool = mp.Pool()

有了池子以后,就可让池子对应某一个函数,向池子里丢数据,池子就会返回函数返回的值。 Pool和以前的Process的不一样点是丢向Pool的函数有返回值,而Process的没有返回值。
接下来用map()获取结果,在map()中须要放入函数和须要迭代运算的值,而后它会自动分配给CPU核,返回结果 res = pool.map(job, range(10))

import multiprocessing as mp

def job(x):
    return x*x

def multicore():
    pool = mp.Pool()
    res = pool.map(job,range(10))
    print(res)

if __name__ == '__main__':
    multicore()

#输出
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

2.4.2 自定义核数量
怎么知道Pool是否真的调用了多个核呢?能够把迭代次数增大些,而后打开CPU负载看下CPU运行状况
打开CPU负载:活动监视器 > CPU > CPU负载(单击一下便可)
Pool默认大小是CPU的核数,咱们也能够经过在Pool中传入processes参数便可自定义须要的核数量,

def multicore():
    pool = mp.Pool(processes=3) # 定义CPU核数量为3
    res = pool.map(job, range(10))
    print(res)

2.4.3 apply_async()
Pool除了map()外,还有能够返回结果的方式,就是apply_async()。apply_async()中只能传递一个值,它只会放入一个核进行运算,可是传入值时要注意是可迭代的,因此在传入值后须要加逗号, 同时须要用get()方法获取返回值

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get得到结果
    print(res.get())

#运行结果
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4

2.4.4 apply_async()输出多个结果
在apply_async()中多传入几个值
res = pool.apply_async(job, (2,3,4,)) #报错 TypeError: job() takes exactly 1 argument (3 given) 即apply_async()只能输入一组参数。
将apply_async() 放入迭代器中,定义一个新的multi_res
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
取出值时须要一个一个取出来
print([res.get() for res in multi_res])
合并代码

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get得到结果
    print(res.get())
    # 迭代器,i=0时apply一次,i=1时apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 从迭代器中取出
    print([res.get() for res in multi_res])
#运行结果

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # multi_res
  • Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
  • map() 放入迭代参数,返回多个结果
  • apply_async()只能放入一组参数,并返回一个结果,若是想获得map()的效果须要经过迭代

2.5 共享内存 shared memory

2.5.1 Shared Value
使用Value数据存储在一个共享的内存表中。

import multiprocessing as mp

value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)

其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型
2.5.2 Shared Array
在Python的mutiprocessing中,有还有一个Array类,能够和共享内存交互,来实如今进程之间共享数据
array = mp.Array('i', [1, 2, 3, 4])
这里的Array和numpy中的不一样,它只能是一维的,不能是多维的。一样和Value 同样,须要定义数据形式,不然会报错。

2.6 进程锁

2.6.1 不加锁

import multiprocessing as mp

def job(x):
    return x*x

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get得到结果
    print(res.get())
    # 迭代器,i=0时apply一次,i=1时apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 从迭代器中取出
    print([res.get() for res in multi_res])

if __name__ == '__main__':
    multicore()
#输出
1
4
5
8
9
12
13
16
17
20

上面代码中定义了一个共享变量v,两个进程均可以对它进行操做。 在job()中咱们想让v每隔0.1秒输出一次累加num的结果,可是在两个进程p1和p2 中设定了不一样的累加值。因此来看下这两个进程是否会出现冲突。
2.6.2 加锁

import multiprocessing as mp
import time

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1)
        v.value += num # 获取共享内存
        print(v.value)
    l.release() # 释放

def multicore():
    l = mp.Lock() # 定义一个进程锁
    v = mp.Value('i', 0) # 定义共享内存
    p1 = mp.Process(target=job, args=(v,1,l)) # 须要将lock传入
    p2 = mp.Process(target=job, args=(v,3,l))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()
#运行一下,看看是否还会出现抢占资源的状况

1
2
3
4
5
8
11
14
17
20

运行结果显示,进程锁保证了进程p1的完整运行,而后才进行了进程p2的运行

相关文章
相关标签/搜索