PS:咱们知道现代操做系统好比Mac OS X,UNIX,Linux,Windows等,都是支持“多任务”的操做系统。多任务的实现共有3种方式:多进程模式;多线程模式;多进程+多线程模式。Python既支持多进程又支持多线程,下面咱们将会讨论如何编写这两种多任务程序。
为了让Python程序实现多进程(multiprocessing),咱们先来了解操做系统在这方面的相关知识。python
Unix/Linx操做系统提供了一个fork()系统调用,它很是特殊,不一样于普通的函数(调用一次,返回一次),fork()调用一次,返回两次。这是由于操做系统自动把当前进程(父进程)复制了一份(子进程),而后分别在父进程和子进程中返回。数据库
在子进程中永远返回0,而在父进程中返回子进程的ID。这样作是由于一个父进程能够fork出不少子进程,因此父进程要记下每一个子进程的ID,而子进程只须要调用getppid()就能够拿到父进程的ID。编程
在Python中的os模块中封装了常见的系统调用,其中就包括了fork,能够在Python程序中轻松建立出子进程:windows
import os print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
结果:服务器
Process (4423) start... I (4423) just created a child process (4424). I am child process (4424) and my parent is 4423.
注意:windows没有fork调用。有了fork调用一个进程在接到新任务时就能够复制出一个子进程来处理新任务。
既然Windows没有fork调用,那怎么在Windows上用Python编写多进程的程序?由于Python是跨平台的,其中的multiprocessing模块就是跨平台版本的多进程模块。网络
在multiprocessing模块中提供了一个Process类来表明一个进程对象。下面的例子演示了启动一个子进程并等待其结束:多线程
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.') ''' Parent process 20280. Child process will start. Run child process test (5772)... Child process end. '''
Tips:用multiprocessing建立子进程时,Process类表明一个进程,只需传入子进程需执行的函数和参数,用start方法启动子进程,join()方法能够等待子进程结束后再往下运行(一般用于进程间的同步)。
上面的方法都是启动一个子进程,可是当咱们要启动大量的子进程时,怎么办呢?能够用进程池的方式批量建立子进程:并发
from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name,os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__ == '__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocess done...') p.close() p.join() print('All subprocesses done.') ''' Parent process 896. Waiting for all subprocess done... Run task 0 (9728)... Run task 1 (22216)... Run task 2 (20572)... Run task 3 (6844)... Task 0 runs 0.36 seconds. Run task 4 (9728)... Task 4 runs 0.43 seconds. Task 3 runs 0.94 seconds. Task 1 runs 1.19 seconds. Task 2 runs 2.72 seconds. All subprocesses done. '''
注意apply_async是异步的,就是说子进程执行的同时,主进程继续向下执行。因此“Waiting for all subprocesses done...”先打印出来,close方法意味着不能再添加新的Process了。对Pool对象调用join()方法,会暂停主进程,等待全部的子进程执行完,因此“All subprocesses done.”最后打印。app
Tips:task4最后执行,是由于Pool的默认大小是4(CPU的核数),因此最多执行4个进程。固然这是Pool有意设计的限制,并非操做系统的限制,你也能够本身改变它的默认大小,就能够跑不止4个进程。
上面的子进程的代码实现都是在主进程内部的,然而不少时候,子进程都是一个外部进程,咱们须要控制子进程的输入和输出。
subprocess(能够在当前程序中执行其余程序或命令)模块可让咱们很是方便地启动一个外部子进程,而后控制其输入和输出:
import subprocess print('$ nslookup www.python.org') r = subprocess.call(['nslookup', 'www.python.org']) print('Exit cod:', r) ''' $ nslookup www.python.org 服务器: ns.sc.cninfo.net Address: 61.139.2.69 非权威应答: 名称: dualstack.python.map.fastly.net Addresses: 2a04:4e42:36::223 151.101.72.223 Aliases: www.python.org Exit cod: 0 '''
上面的运行效果至关于在命令行直接输入“nslookup www.python.org”(至关于开了一个进程)。
若是子进程还须要经过手动输入一些参数,那么能够经过communicate()方法输入:
import subprocess print('$ nslookup') p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'set q=mx\npython.org\nexit\n') print(output.decode('gbk')) print('Exit code:', p.returncode) ''' $ nslookup 默认服务器: ns.sc.cninfo.net Address: 61.139.2.69 > > 服务器: ns.sc.cninfo.net Address: 61.139.2.69 python.org MX preference = 50, mail exchanger = mail.python.org mail.python.org internet address = 188.166.95.178 mail.python.org AAAA IPv6 address = 2a03:b0c0:2:d0::71:1 > Exit code: 0 '''
上面的代码至关于在命令行直接输入nslookup,而后手动输入:
set q=mx
python.org
exit
Process之间确定是须要通讯的,操做系统提供了不少机制来实现进程间的通讯。Python的multiprocessing模块包装了底层的机制,提供了Queue(队列)、Pipes(管道)等多种方式来交换数据。下面咱们就以Queue为例,在父进程中建立两个子进程,一个往Queue里写数据,一个从Queue中读数据:
from multiprocessing import Process, Queue import os, time, random #写数据进程执行的代码: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) #读数据进程执行的代码: def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__=='__main__': #父进程建立出Queue,并传给各个子进程 q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) #启动子进程pw,写入队列 pw.start() #启动子进程pr,读取队列 pr.start() #等待pw进程结束 pw.join() #pr进程死循环,没法等待,只能强行终止; pr.terminate() ''' Process to write: 8768 Put A to queue... Process to read: 19700 Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue. '''
在Linux/Unix下,multiprocessing模块分装了fork调用,而因为Windows没有fork调用,所以multiprocessing要想模拟出fork的效果。父进程中的全部Python对象都必须经过pickle序列化到子进程中,所以若是multiprocessing在Windows下调用失败了,要先考虑是否是pickle失败了。
前面已经说过了多个任务既能够用多进程来实现,又能够用多线程来实现。那么多线程与多进程相比,有什么优势呢?线程共享相同的内存空间,不一样的线程能够读取内存中的同一变量(每一个进程都有各自独立的空间)。线程带来的开销要比进程小。
因为线程是操做系统直接支持的执行单元,所以许多高级语言都内置了多线程的支持,Python也不例外,Python中的线程是真正的Posix Thread而不是模拟出来的线程。
要实现多线程,Python的标准库提供了两个模块:_thread和threading,前者是低级模块,后者是高级模块,后者分装了前者。绝大多数状况下,咱们只须要使用threading这个高级的模块。
启动一个线程就是把一个函数传入并建立Thread实例,而后调用start()执行:
import time, threading #新线程执行的代码 def loop(): print('thread %s is runnging...' % threading.current_thread().name) n = 0 while n < 5: n = n + 1 print('thread %s >>> %s' % (threading.current_thread().name, n)) time.sleep(1) print('thread %s ended.' % threading.current_thread().name) print('thread %s is running...' % threading.current_thread().name) t = threading.Thread(target=loop, name='LoopTread') t.start() t.join() print('Thread %s ended.' % threading.current_thread().name) '''thread MainThread is running... thread LoopTread is runnging... thread LoopTread >>> 1 thread LoopTread >>> 2 thread LoopTread >>> 3 thread LoopTread >>> 4 thread LoopTread >>> 5 thread LoopTread ended. Thread MainThread ended. '''
因为任何进程都会默认启动一个线程,咱们就把这个线程称为主线程,主线程又能够启动新的线程。上面的current_thread()函数返回当前线程的实例,主线程的名字就MainThread,子线程的名字是在建立时咱们指定的。
使用多线程仍是有风险的,由于在多线程全部变量被全部线程共享,此时可能会出现多个线程同时改变一个变量,致使出现错误。为了不这个错误的出现,咱们应该加锁lock。
咱们先不使用lock,来看一个错误的实例:
import time, threading #假定这是你的银行存款 balance = 0 def change_it(n):#先存后取结果应该为0 global balance #共享变量 balance = balance + n balance = balance - n def run_thread(n): for n in range(100000): change_it(n) t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance)
咱们启动了连个线程,先存后取,理论上结果应该为0,可是线程对的调度也是由操做系统决定,因此,当t1 和 t2交替执行,循环次数够多,结果就不必定是0了。由于高级语言的一条语句在CPU执行时是若干条语句。
因此若是咱们要保证balance的计算正确,就应该就上一把锁,使该变量同一时刻只能被一个线程操做。在这里咱们就能够给change_it()加上一把锁:
balance = 0 lock = threading.Lock() def run_thread(n): for i in range(100000): # 先要获取锁: lock.acquire() try: # 放心地改吧: change_it(n) finally: # 改完了必定要释放锁: lock.release()
当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,而后继续执行下面的代码,其余线程就只能等待直到或取到锁为止。因此获取到锁的线程在用完后必定要释放锁,不然等待锁开启的线程,将永远等待,因此咱们用try...finally来确保锁必定会被释放。
Tips:锁的坏处就是阻止了多线程的并发执行,效率大大地降低了。当不一样的线程持有不一样的锁,并试图获取对方的锁时,可能会形成死锁。
小结:多线程编程,模型复杂,容易发生冲突,必须加锁以隔离,同时又要当心死锁的发生。Python解释器因为设计时有GIL全局锁。致使了多线程没法利用多核,这就是模拟出来的并发(线程数量大于处理器数量)。
咱们已经知道多线中变量是能够共享的,在多线程的环境下,每一个线程都有本身的数据。那么每个线程应该也能够拥有本身的局部变量,线程使用本身的局部变量比使用全局变量好,由于局部变量只能本身使用,不会影响其余的线程,而使用全局变量的话则必须加锁。
那么具体怎么在Python中使用线程的局部变量呢?那就是使用ThreadLocal,先来看一个例子:
import threading #建立全局ThreadLocal对象: local_school = threading.local() def process_student(): #获取当前线程关联的student: std = local_school.student print('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread(name): #绑定当前线程关联的student: local_school.student = name process_student() t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A') t2 = threading.Thread(target=process_thread,args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join() ''' Hello, Alice (in Thread-A) Hello, Bob (in Thread-B) '''
全局变量local_school就是一个ThreadLocal对象,每一个线程对她均可以读写student属性,但互不影响。你能够把local_school当作全局变量,但每一个属性如local_school.student都是线程的局部变量,能够任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
ThreaLocal最经常使用的地方就是为每一个线程绑定一个数据库链接,HTTP请求用户信息身份等。这样一个线程的全部调用到的处理函数均可以很是方便地访问这些资源。
Tip:一个ThreadLocal变量虽然是全局变量,但每一个线程都只能读写本身线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。
前面咱们已经介绍了多进程和多线程,这是实现多任务最经常使用的两种方式。如今,咱们来讨论下这两种方式的优缺点。
首先,要实现多任务,一般咱们会设计Master-Worker模式,Master负责分配任务,Worker负责执行任务,所以,在多任务环境下,一般是一个Master,多个Worker。
若是咱们用多进程实现Master-Worker,主进程就是Master,其余进程就是Worker。若是用多线程实现Master-Worker,主线程就是Master,其余线程就是Worker。
其中多进程模式最大的优势就是稳定性高,这是由于一个子进程崩溃了,不会影响主进程和其余子进程(固然主进程crash了,全部的进程就crash了,可是几率很低毕竟Master进程只负责分配任务),著名的Apache最先采用的就是多进程模式。可是多进程的缺点就是建立进程的代价大,在Unix/Linux系统下,用fork调用还行,可是在Windows下建立进程的开销巨大。另外,操做系统能同时运行的进程也是有限的,在CPU和内存的限制下,若是有几千个进程同时运行,那么操做系统连调度都会成问题。
而多线程模式一般比多进程模式快一点,但也快不到哪去。并且,多线程模式致命的缺点就是由于任何一个线程crash了均可能形成整个进程crash,由于全部线程共享进程的内存。
Tips: 在Windows下,多线程的效率比多进程要高,因此微软的IIS服务器默认采用多线程的模式。因为多线程存在稳定性问题,IIS的稳定性就不如Apache。可是如今为了平衡,IIS和Apache如今又有了多进程+多线程的混合模式。
咱们须要考虑任务的类型,咱们能够把任务分为计算密集型和IO密集型。
顾名思义,计算密集型任务的特色就是要进行大量的计算,消耗大量的CPU资源,如计算圆周率,对视频进行高清解码等,全靠CPU的运算能力。这种计算密集型任务最好不要用多任务完成,由于这样会切换不少次才能执行完,切换任务花费的时间就很长了,就会致使CPU的效率低下。
Tips:因为计算密集型任务主要消耗CPU资源,所以代码运行效率就很是重要了。由于Python这样的脚本语言运行效率很低,因此对于计算密集型任务,最好用C语言编写。
再来讲IO密集型任务,涉及到网络、磁盘的IO任务都是IO密集型任务,特色是CPU消耗不多,任务的大部分时间都在等待IO操做的完成。对于IO密集型任务,任务越多,CPU效率越高(但仍是有一个限度)。
Tips:常见的大部分任务都是IO密集型任务,好比Web应用,对于IO密集型任务,最适合的语言就是开发效率最高(代码量最少)的语言,因此脚本语言是首选,C语言最差。
考虑到CPU和IO之间巨大的速度差别,单进程单线程模式会致使别的任务没法执行,所以咱们才须要多进程或多线程的模型来支持多任务并发。异步文件IO方式中,线程发送一个IO请求到内核,而后继续处理其余的事情,内核完成IO请求后,将会通知线程IO操做完成了。
若是充分利用操做系统提供的异步IO支持,就能够利用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型。使用异步IO编程模型来实现多任务是一个主要的趋势。
在Python中,单进程单线程的异步编程模型称为协程,有了协程的支持就能够基于事件驱动编写高效的多任务程序了。
Process能够分布到多台机器上,而Thread最多只能分布到同一台机器上上的多个CPU中。
咱们已经知道Python的multiprocessing模块支持多进程,其中的managers子模块还支持把多进程分布到多台机器上。
例子:若是咱们已经有一个经过Queue通讯的多进程程序在同一台机器上运行,如今,因为处理任务的进程任务繁重,但愿把服务进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?
咱们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,而后往Queue里面写入任务:
import random, time, queue from multiprocessing.managers import BaseManager # 发送任务的队列: task_queue = queue.Queue() # 接收结果的队列: result_queue = queue.Queue() # 从BaseManager继承的QueueManager: class QueueManager(BaseManager): pass # 把两个Queue都注册到网络上, callable参数关联了Queue对象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 绑定端口5000, 设置验证码'abc': manager = QueueManager(address=('', 5000), authkey='abc') # 启动Queue: manager.start() # 得到经过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() # 放几个任务进去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 从result队列读取结果: print('Try get results...') for i in range(10): r = result.get(timeout=10) #暂停10秒等待分布式进程处理结果并返回 print('Result: %s' % r) # 关闭: manager.shutdown() print('master exit.')
接着在另外一台机器上启动任务进程也能够是本机:
# task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # 建立相似的QueueManager: class QueueManager(BaseManager): pass # 因为这个QueueManager只从网络上获取Queue,因此注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 链接到服务器,也就是运行task_master.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与task_master.py设置的彻底一致: m = QueueManager(address=(server_addr, 5000), authkey='abc') # 从网络链接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 处理结束: print('worker exit.')
注意:先启动master进程,完成两个队列的网上注册,接着发出请求队列task,等待result队列的结果;此时启动worker进程对task队列进行操做,而后写入到result队列中;master获得响应结果,打印出result。
Tips:Python的分布式进程的接口简单,封装良好,适合须要把繁重任务分布到多台机器的环境下。注意Queue的做用是用来传递任务和接收结果,每一个任务的描述数据量要尽可能小。好比发送一个处理日志文件的任务,就不要发送几百兆的日志文件自己,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。