介绍如何使用python的multiprocess和threading模块进行多线程和多进程编程。html
Python的多进程编程与multiprocess模块python
python的多进程编程主要依靠multiprocess模块。咱们先对比两段代码,看看多进程编程的优点。咱们模拟了一个很是耗时的任务,计算8的20次方,为了使这个任务显得更耗时,咱们还让它sleep 2秒。第一段代码是单进程计算(代码以下所示),咱们按顺序执行代码,重复计算2次,并打印出总共耗时。linux
import time import os def long_time_task(): print('当前进程: {}'.format(os.getpid())) time.sleep(2) print("结果: {}".format(8 ** 20)) if __name__ == "__main__": print('当前母进程: {}'.format(os.getpid())) start = time.time() for i in range(2): long_time_task() end = time.time() print("用时{}秒".format((end-start)))
输出结果以下,总共耗时4秒,至始至终只有一个进程14236。看来电脑计算8的20次方基本不费时。git
当前母进程: 14236
当前进程: 14236
结果: 1152921504606846976
当前进程: 14236
结果: 1152921504606846976
用时4.01080060005188秒
第2段代码是多进程计算代码。咱们利用multiprocess模块的Process方法建立了两个新的进程p1和p2来进行并行计算。Process方法接收两个参数, 第一个是target,通常指向函数名,第二个时args,须要向函数传递的参数。对于建立的新进程,调用start()方法便可让其开始。咱们可使用os.getpid()打印出当前进程的名字。程序员
from multiprocessing import Process import os import time def long_time_task(i): print('子进程: {} - 任务{}'.format(os.getpid(), i)) time.sleep(2) print("结果: {}".format(8 ** 20)) if __name__=='__main__': print('当前母进程: {}'.format(os.getpid())) start = time.time() p1 = Process(target=long_time_task, args=(1,)) p2 = Process(target=long_time_task, args=(2,)) print('等待全部子进程完成。') p1.start() p2.start() p1.join() p2.join() end = time.time() print("总共用时{}秒".format((end - start)))
输出结果以下所示,耗时变为2秒,时间减了一半,可见并发执行的时间明显比顺序执行要快不少。你还能够看到尽管咱们只建立了两个进程,可实际运行中却包含里1个母进程和2个子进程。之因此咱们使用join()方法就是为了让母进程阻塞,等待子进程都完成后才打印出总共耗时,不然输出时间只是母进程执行的时间。github
当前母进程: 6920
等待全部子进程完成。
子进程: 17020 - 任务1
子进程: 5904 - 任务2
结果: 1152921504606846976
结果: 1152921504606846976
总共用时2.131091356277466秒
知识点:算法
新建立的进程与进程的切换都是要耗资源的,因此平时工做中进程数不能开太大。编程
同时能够运行的进程数通常受制于CPU的核数。缓存
除了使用Process方法,咱们还可使用Pool类建立多进程。安全
利用multiprocess模块的Pool类建立多进程
不少时候系统都须要建立多个进程以提升CPU的利用率,当数量较少时,能够手动生成一个个Process实例。当进程数量不少时,或许能够利用循环,可是这须要程序员手动管理系统中并发进程的数量,有时会很麻烦。这时进程池Pool就能够发挥其功效了。能够经过传递参数限制并发进程的数量,默认值为CPU的核数。
Pool类能够提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,若是进程池尚未满,就会建立一个新的进程来执行请求。若是池满,请求就会告知先等待,直到池中有进程结束,才会建立新的进程来执行这些请求。
下面介绍一下multiprocessing 模块下的Pool类的几个方法:
1.apply_async
函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])
其做用是向进程池提交须要执行的函数及参数, 各个进程采用非阻塞(异步)的调用方式,即每一个子进程只管运行本身的,无论其它进程是否已经完成。
2.map()
函数原型:map(func, iterable[, chunksize=None])
Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回。 注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
3.map_async()
函数原型:map_async(func, iterable[, chunksize[, callback]])
与map用法一致,可是它是非阻塞的。其有关事项见apply_async。
4.close()
关闭进程池(pool),使其不在接受新的任务。
5. terminate()
结束工做进程,不在处理未处理的任务。
6.join()
主进程阻塞等待子进程的退出, join方法要在close或terminate以后使用。
下例是一个简单的multiprocessing.Pool类的实例。由于小编个人CPU是4核的,一次最多能够同时运行4个进程,因此我开启了一个容量为4的进程池。4个进程须要计算5次,你能够想象4个进程并行4次计算任务后,还剩一次计算任务(任务4)没有完成,系统会等待4个进程完成后从新安排一个进程来计算。
from multiprocessing import Pool, cpu_count import os import time def long_time_task(i): print('子进程: {} - 任务{}'.format(os.getpid(), i)) time.sleep(2) print("结果: {}".format(8 ** 20)) if __name__=='__main__': print("CPU内核数:{}".format(cpu_count())) print('当前母进程: {}'.format(os.getpid())) start = time.time() p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('等待全部子进程完成。') p.close() p.join() end = time.time() print("总共用时{}秒".format((end - start)))
知识点:
对Pool对象调用join()方法会等待全部子进程执行完毕,调用join()以前必须先调用close()或terminate()方法,让其再也不接受新的Process了。
输出结果以下所示,5个任务(每一个任务大约耗时2秒)使用多进程并行计算只需4.37秒,, 耗时减小了60%。
CPU内核数:4
当前母进程: 2556
等待全部子进程完成。
子进程: 16480 - 任务0
子进程: 15216 - 任务1
子进程: 15764 - 任务2
子进程: 10176 - 任务3
结果: 1152921504606846976
结果: 1152921504606846976
子进程: 15216 - 任务4
结果: 1152921504606846976
结果: 1152921504606846976
结果: 1152921504606846976
总共用时4.377134561538696秒
相信你们都知道python解释器中存在GIL(全局解释器锁), 它的做用就是保证同一时刻只有一个线程能够执行代码。因为GIL的存在,不少人认为python中的多线程其实并非真正的多线程,若是想要充分地使用多核CPU的资源,在python中大部分状况须要使用多进程。然而这并意味着python多线程编程没有意义哦,请继续阅读下文。
多进程间的数据共享与通讯
一般,进程之间是相互独立的,每一个进程都有独立的内存。经过共享内存(nmap模块),进程之间能够共享对象,使多个进程能够访问同一个变量(地址相同,变量名可能不一样)。多进程共享资源必然会致使进程间相互竞争,因此应该尽最大可能防止使用共享状态。还有一种方式就是使用队列queue来实现不一样进程间的通讯或数据共享,这一点和多线程编程相似。
from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): print('Process to write: {}'.format(os.getpid())) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): print('Process to read:{}'.format(os.getpid())) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__=='__main__': # 父进程建立Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入: pw.start() # 启动子进程pr,读取: pr.start() # 等待pw结束: pw.join() # pr进程里是死循环,没法等待其结束,只能强行终止: pr.terminate()
下例这段代码中中建立了2个独立进程,一个负责写(pw), 一个负责读(pr), 实现了共享一个队列queue。
输出结果以下所示:
Process to write: 3036
Put A to queue...
Process to read:9408
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Python的多线程编程与threading模块
python 3中的多进程编程主要依靠threading模块。建立新线程与建立新进程的方法很是相似。threading.Thread方法能够接收两个参数, 第一个是target,通常指向函数名,第二个时args,须要向函数传递的参数。对于建立的新线程,调用start()方法便可让其开始。咱们还可使用current_thread().name打印出当前线程的名字。 下例中咱们使用多线程技术重构以前的计算代码。
import threading import time def long_time_task(i): print('当前子线程: {} - 任务{}'.format(threading.current_thread().name, i)) time.sleep(2) print("结果: {}".format(8 ** 20)) if __name__=='__main__': start = time.time() print('这是主线程:{}'.format(threading.current_thread().name)) t1 = threading.Thread(target=long_time_task, args=(1,)) t2 = threading.Thread(target=long_time_task, args=(2,)) t1.start() t2.start() end = time.time() print("总共用时{}秒".format((end - start)))
下面是输出结果。为何总耗时竟然是0秒? 咱们能够明显看到主线程和子线程实际上是独立运行的,主线程根本没有等子线程完成,而是本身结束后就打印了消耗时间。主线程结束后,子线程仍在独立运行,这显然不是咱们想要的。
这是主线程:MainThread
当前子线程: Thread-1 - 任务1
当前子线程: Thread-2 - 任务2
总共用时0.0017192363739013672秒
结果: 1152921504606846976
结果: 1152921504606846976
若是要实现主线程和子线程的同步,咱们必需使用join方法(代码以下所示)。
import threading import time def long_time_task(i): print('当前子线程: {} 任务{}'.format(threading.current_thread().name, i)) time.sleep(2) print("结果: {}".format(8 ** 20)) if __name__=='__main__': start = time.time() print('这是主线程:{}'.format(threading.current_thread().name)) thread_list = [] for i in range(1, 3): t = threading.Thread(target=long_time_task, args=(i, )) thread_list.append(t) for t in thread_list: t.start() for t in thread_list: t.join() end = time.time() print("总共用时{}秒".format((end - start)))
修改代码后的输出以下所示。这时你能够看到主线程在等子线程完成后才答应出总消耗时间(2秒),比正常顺序执行代码(4秒)仍是节省了很多时间。
这是主线程:MainThread
当前子线程: Thread - 1 任务1
当前子线程: Thread - 2 任务2
结果: 1152921504606846976
结果: 1152921504606846976
总共用时2.0166890621185303秒
当咱们设置多线程时,主线程会建立多个子线程,在python中,默认状况下主线程和子线程独立运行互不干涉。若是但愿让主线程等待子线程实现线程的同步,咱们须要使用join()方法。若是咱们但愿一个主线程结束时再也不执行子线程,咱们应该怎么办呢? 咱们可使用t.setDaemon(True),代码以下所示。
import threading import time def long_time_task(): print('当子线程: {}'.format(threading.current_thread().name)) time.sleep(2) print("结果: {}".format(8 ** 20)) if __name__=='__main__': start = time.time() print('这是主线程:{}'.format(threading.current_thread().name)) for i in range(5): t = threading.Thread(target=long_time_task, args=()) t.setDaemon(True) t.start() end = time.time() print("总共用时{}秒".format((end - start)))
经过继承Thread类重写run方法建立新线程
除了使用Thread()方法建立新的线程外,咱们还能够经过继承Thread类重写run方法建立新的线程,这种方法更灵活。下例中咱们自定义的类为MyThread, 随后咱们经过该类的实例化建立了2个子线程。
#-*- encoding:utf-8 -*- import threading import time def long_time_task(i): time.sleep(2) return 8**20 class MyThread(threading.Thread): def __init__(self, func, args , name='', ): threading.Thread.__init__(self) self.func = func self.args = args self.name = name self.result = None def run(self): print('开始子进程{}'.format(self.name)) self.result = self.func(self.args[0],) print("结果: {}".format(self.result)) print('结束子进程{}'.format(self.name)) if __name__=='__main__': start = time.time() threads = [] for i in range(1, 3): t = MyThread(long_time_task, (i,), str(i)) threads.append(t) for t in threads: t.start() for t in threads: t.join() end = time.time() print("总共用时{}秒".format((end - start)))
输出结果以下所示:
开始子进程1
开始子进程2
结果: 1152921504606846976
结果: 1152921504606846976
结束子进程1
结束子进程2
总共用时2.005445718765259秒
不一样线程间的数据共享
一个进程所含的不一样线程间共享内存,这就意味着任何一个变量均可以被任何一个线程修改,所以线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。若是不一样线程间有共享的变量,其中一个方法就是在修改前给其上一把锁lock,确保一次只有一个线程能修改它。threading.lock()方法能够轻易实现对一个共享变量的锁定,修改完后release供其它线程使用。好比下例中帐户余额balance是一个共享变量,使用lock可使其不被改乱。
# -*- coding: utf-8 -* import threading class Account: def __init__(self): self.balance = 0 def add(self, lock): # 得到锁 lock.acquire() for i in range(0, 100000): self.balance += 1 # 释放锁 lock.release() def delete(self, lock): # 得到锁 lock.acquire() for i in range(0, 100000): self.balance -= 1 # 释放锁 lock.release() if __name__ == "__main__": account = Account() lock = threading.Lock() # 建立线程 thread_add = threading.Thread(target=account.add, args=(lock,), name='Add') thread_delete = threading.Thread(target=account.delete, args=(lock,), name='Delete') # 启动线程 thread_add.start() thread_delete.start() # 等待线程结束 thread_add.join() thread_delete.join() print('The final balance is: {}'.format(account.balance))
另外一种实现不一样线程间数据共享的方法就是使用消息队列queue。不像列表,queue是线程安全的,能够放心使用,见下文。
使用queue队列通讯-经典的生产者和消费者模型
下例中建立了两个线程,一个负责生成,一个负责消费,所生成的产品存放在queue里,实现了不一样线程间沟通。
from queue import Queue import random, threading, time # 生产者类 class Producer(threading.Thread): def __init__(self, name, queue): threading.Thread.__init__(self, name=name) self.queue = queue def run(self): for i in range(1, 5): print("{} is producing {} to the queue!".format(self.getName(), i)) self.queue.put(i) time.sleep(random.randrange(10) / 5) print("%s finished!" % self.getName()) # 消费者类 class Consumer(threading.Thread): def __init__(self, name, queue): threading.Thread.__init__(self, name=name) self.queue = queue def run(self): for i in range(1, 5): val = self.queue.get() print("{} is consuming {} in the queue.".format(self.getName(), val)) time.sleep(random.randrange(10)) print("%s finished!" % self.getName()) def main(): queue = Queue() producer = Producer('Producer', queue) consumer = Consumer('Consumer', queue) producer.start() consumer.start() producer.join() consumer.join() print('All threads finished!') if __name__ == '__main__': main()
队列queue的put方法能够将一个对象obj放入队列中。若是队列已满,此方法将阻塞至队列有空间可用为止。queue的get方法一次返回队列中的一个成员。若是队列为空,此方法将阻塞至队列中有成员可用为止。queue同时还自带emtpy(), full()等方法来判断一个队列是否为空或已满,可是这些方法并不可靠,由于多线程和多进程,在返回结果和使用结果之间,队列中可能添加/删除了成员。
Python多进程和多线程哪一个快?
因为GIL的存在,不少人认为Python多进程编程更快,针对多核CPU,理论上来讲也是采用多进程更能有效利用资源。网上不少人已作过比较,我直接告诉你结论吧。
对CPU密集型代码(好比循环计算) - 多进程效率更高
对IO密集型代码(好比文件操做,网络爬虫) - 多线程效率更高。
为何是这样呢?其实也不难理解。对于IO密集型操做,大部分消耗时间实际上是等待时间,在等待时间中CPU是不须要工做的,那你在此期间提供双CPU资源也是利用不上的,相反对于CPU密集型代码,2个CPU干活确定比一个CPU快不少。那么为何多线程会对IO密集型代码有用呢?这因是为python碰到等待会释放GIL供新的线程使用,实现了线程间的切换。
GIL是什么
首先须要明确的一点是GIL并非Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就比如C++是一套语言(语法)标准,可是能够用不一样的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也同样,一样一段代码能够经过CPython,PyPy,Psyco等不一样的Python执行环境来执行。像其中的JPython就没有GIL。然而由于CPython是大部分环境下默认的Python执行环境。因此在不少人的概念里CPython就是Python,也就想固然的把GIL归结为Python语言的缺陷。因此这里要先明确一点:GIL并非Python的特性,Python彻底能够不依赖于GIL。
GIL: 一个防止多线程并发执行机器码的一个Mutex,乍一看就是个BUG般存在的全局锁嘛!别急,咱们下面慢慢的分析。
为何会有GIL
因为物理上得限制,各CPU厂商在核心频率上的比赛已经被多核所取代。为了更有效的利用多核处理器的性能,就出现了多线程的编程方式,而随之带来的就是线程间数据一致性和状态同步的困难。即便在CPU内部的Cache也不例外,为了有效解决多份缓存之间的数据同步时各厂商花费了很多心思,也不可避免的带来了必定的性能损失。
Python固然也逃不开,为了利用多核,Python开始支持多线程。而解决多线程之间数据完整性和状态同步的最简单方法天然就是加锁。 因而有了GIL这把超级大锁,而当愈来愈多的代码库开发者接受了这种设定后,他们开始大量依赖这种特性(即默认python内部对象是thread-safe的,无需在实现时考虑额外的内存锁和同步操做)。
慢慢的这种实现方式被发现是蛋疼且低效的。但当你们试图去拆分和去除GIL的时候,发现大量库代码开发者已经重度依赖GIL而很是难以去除了。有多难?作个类比,像MySQL这样的“小项目”为了把Buffer Pool Mutex这把大锁拆分红各个小锁也花了从5.5到5.6再到5.7多个大版为期近5年的时间,本且仍在继续。MySQL这个背后有公司支持且有固定开发团队的产品走的如此艰难,那又更况且Python这样核心开发和代码贡献者高度社区化的团队呢?
因此简单的说GIL的存在更多的是历史缘由。若是推到重来,多线程的问题依然仍是要面对,可是至少会比目前GIL这种方式会更优雅。
GIL的影响
从上文的介绍和官方的定义来看,GIL无疑就是一把全局排他锁。毫无疑问全局锁的存在会对多线程的效率有不小影响。甚至就几乎等于Python是个单线程的程序。
那么读者就会说了,全局锁只要释放的勤快效率也不会差啊。只要在进行耗时的IO操做的时候,能释放GIL,这样也仍是能够提高运行效率的嘛。或者说再差也不会比单线程的效率差吧。理论上是这样,而实际上呢?Python比你想的更糟。
下面咱们就对比下Python在多线程和单线程下得效率对比。测试方法很简单,一个循环1亿次的计数器函数。一个经过单线程执行两次,一个多线程执行。最后比较执行总时间。测试环境为双核的Mac pro。注:为了减小线程库自己性能损耗对测试结果带来的影响,这里单线程的代码一样使用了线程。只是顺序的执行两次,模拟单线程。
顺序执行的单线程(single_thread.py)