最近会开始继续 Python 的进阶系列文章,这是该系列的第一篇文章,介绍进程和线程的知识,恰好上一篇文章就介绍了采用 concurrent.futures
模块实现多进程和多线程的操做,本文则介绍下进程和线程的概念,多进程和多线程各自的实现方法和优缺点,以及分别在哪些状况采用多进程,或者是多线程。html
并发编程就是实现让程序同时执行多个任务,而如何实现并发编程呢,这里就涉及到进程和线程这两个概念。python
对于操做系统来讲,一个任务(或者程序)就是一个进程(Process),好比打开一个浏览器是开启一个浏览器进程,打开微信就启动了一个微信的进程,打开两个记事本,就启动两个记事本进程。git
进程的特色有:github
fork
或者 spawn
方式建立新的进程来执行其余任务一个进程还能够同时作多件事情,好比在 Word 里面同时进行打字、拼音检查、打印等事情,也就是一个任务分为多个子任务同时进行,这些进程内的子任务被称为线程(Thread)。算法
由于每一个进程至少须要完成一件事情,也就是一个进程至少有一个线程。当要实现并发编程,也就是同时执行多任务时,有如下三种解决方案:数据库
注意:真正的并行执行多任务只有在多核 CPU 上才能够实现,单核 CPU 系统中,真正的并发是不可能的,由于在某个时刻可以得到CPU的只有惟一的一个线程,多个线程共享了CPU的执行时间。编程
Python 是同时支持多进程和多线程的,下面就分别介绍多进程和多线程。windows
在 Unix/Linux
系统中,提供了一个 fork()
系统调用,它是一个特殊的函数,普通函数调用是调用一次,返回一次,但 fork
函数调用一次,返回两次,由于调用该函数的是父进程,而后复制出一份子进程了,最后同时在父进程和子进程内返回,因此会返回两次。浏览器
子进程返回的永远是 0
,而父进程会返回子进程的 ID,由于父进程能够复制多个子进程,因此须要记录每一个子进程的 ID,而子进程能够经过调用 getpid()
获取父进程的 ID。bash
Python 中 os
模块封装了常见的系统调用,这就包括了 fork
,代码示例以下:
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
复制代码
运行结果:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
复制代码
因为 windows 系统中是不存在 fork
,因此上述函数没法调用,但 Python 是跨平台的,因此也仍是有其余模块能够实现多进程的功能,好比 multiprocessing
模块。
multiprocessing
模块中提供了 Process
类来表明一个进程对象,接下来用一个下载文件的例子来讲明采用多进程和不用多进程的差异。
首先是不采用多进程的例子:
def download_task(filename):
'''模拟下载文件'''
print('开始下载%s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))
def download_without_multiprocess():
'''不采用多进程'''
start = time()
download_task('Python.pdf')
download_task('nazha.mkv')
end = time()
print('总共耗费了%.2f秒.' % (end - start))
if __name__ == '__main__':
download_without_multiprocess()
复制代码
运行结果以下,这里用 randint
函数来随机输出当前下载文件的耗时,从结果看,程序运行时间等于两个下载文件的任务时间总和。
开始下载Python.pdf...
Python.pdf下载完成! 耗费了9秒
开始下载nazha.mkv...
nazha.mkv下载完成! 耗费了9秒
总共耗费了18.00秒.
复制代码
若是是采用多进程,例子以下所示:
def download_task(filename):
'''模拟下载文件'''
print('开始下载%s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))
def download_multiprocess():
'''采用多进程'''
start = time()
p1 = Process(target=download_task, args=('Python.pdf',))
p1.start()
p2 = Process(target=download_task, args=('nazha.mkv',))
p2.start()
p1.join()
p2.join()
end = time()
print('总共耗费了%.2f秒.' % (end - start))
if __name__ == '__main__':
download_multiprocess()
复制代码
这里多进程例子中,咱们经过 Process
类建立了进程对象,经过 target
参数传入一个函数表示进程须要执行的任务,args
是一个元组,表示传递给函数的参数,而后采用 start
来启动进程,而 join
方法表示等待进程执行结束。
运行结果以下所示,耗时就不是两个任务执行时间总和,速度上也是大大的提高了。
开始下载Python.pdf...
开始下载nazha.mkv...
Python.pdf下载完成! 耗费了5秒
nazha.mkv下载完成! 耗费了9秒
总共耗费了9.36秒.
复制代码
上述例子是开启了两个进程,但若是须要开启大量的子进程,上述代码的写法就不合适了,应该采用进程池的方式批量建立子进程,仍是用下载文件的例子,但执行下部分的代码以下所示:
import os
from multiprocessing import Process, Pool
from random import randint
from time import time, sleep
def download_multiprocess_pool():
'''采用多进程,并用 pool 管理进程池'''
start = time()
filenames = ['Python.pdf', 'nazha.mkv', 'something.mp4', 'lena.png', 'lol.avi']
# 进程池
p = Pool(5)
for i in range(5):
p.apply_async(download_task, args=(filenames[i], ))
print('Waiting for all subprocesses done...')
# 关闭进程池
p.close()
# 等待全部进程完成任务
p.join()
end = time()
print('总共耗费了%.2f秒.' % (end - start))
if __name__ == '__main__':
download_multiprocess_pool()
复制代码
代码中 Pool
对象先建立了 5 个进程,而后 apply_async
方法就是并行启动进程执行任务了,调用 join()
方法以前必须先调用 close
() ,close
() 主要是关闭进程池,因此执行该方法后就不能再添加新的进程对象了。而后 join()
就是等待全部进程执行完任务。
运行结果以下所示:
Waiting for all subprocesses done...
开始下载Python.pdf...
开始下载nazha.mkv...
开始下载something.mp4...
开始下载lena.png...
开始下载lol.avi...
nazha.mkv下载完成! 耗费了5秒
lena.png下载完成! 耗费了6秒
something.mp4下载完成! 耗费了7秒
Python.pdf下载完成! 耗费了8秒
lol.avi下载完成! 耗费了9秒
总共耗费了9.80秒.
复制代码
大多数状况,子进程是一个外部进程,而非自身。在建立子进程后,咱们还须要控制子进程的输入和输出。
subprocess
模块可让咱们很好地开启子进程以及管理子进程的输入和输出。
下面是演示如何用 Python 演示命令 nslookup www.python.org
,代码以下所示:
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
复制代码
运行结果:
$ nslookup www.python.org
Server: 192.168.19.4
Address: 192.168.19.4#53
Non-authoritative answer:
www.python.org canonical name = python.map.fastly.net.
Name: python.map.fastly.net
Address: 199.27.79.223
Exit code: 0
复制代码
若是子进程须要输入,能够经过 communicate()
进行输入,代码以下所示:
import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
复制代码
这段代码就是执行命令 nslookup
时,输入:
set q=mx
python.org
exit
复制代码
运行结果:
$ nslookup
Server: 192.168.19.4
Address: 192.168.19.4#53
Non-authoritative answer:
python.org mail exchanger = 50 mail.python.org.
Authoritative answers can be found from:
mail.python.org internet address = 82.94.164.166
mail.python.org has AAAA address 2001:888:2000:d::a6
Exit code: 0
复制代码
进程之间是须要通讯的,multiprocess
模块中也提供了 Queue
、Pipes
等多种方式来交换数据。
这里以 Queue
为例,在父进程建立两个子进程,一个往 Queue
写入数据,另外一个从 Queue
读取数据。代码以下:
import os
from multiprocessing import Process, Queue
import random
from time import time, sleep
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
def ipc_queue():
''' 采用 Queue 实现进程间通讯 :return: '''
# 父进程建立Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,没法等待其结束,只能强行终止:
pr.terminate()
if __name__ == '__main__':
ipc_queue()
复制代码
运行结果以下所示:
Process to write: 24992
Put A to queue...
Process to read: 22836
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
复制代码
前面也提到了一个进程至少包含一个线程,其实进程就是由若干个线程组成的。线程是操做系统直接支持的执行单元,所以高级语言一般都内置多线程的支持,Python 也不例外,并且 Python 的线程是真正的 Posix Thread
,而不是模拟出来的线程。
多线程的运行有以下优势:
线程能够分为:
Python 的标准库提供了两个模块:_thread
和 threading
,前者是低级模块,后者是高级模块,对 _thread
进行了封装。大多数状况只须要采用 threading
模块便可,而且也推荐采用这个模块。
这里再次如下载文件做为例子,用多线程的方式来实现一遍:
from random import randint
from threading import Thread, current_thread
from time import time, sleep
def download(filename):
print('thread %s is running...' % current_thread().name)
print('开始下载%s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))
def download_multi_threading():
print('thread %s is running...' % current_thread().name)
start = time()
t1 = Thread(target=download, args=('Python.pdf',), name='subthread-1')
t1.start()
t2 = Thread(target=download, args=('nazha.mkv',), name='subthread-2')
t2.start()
t1.join()
t2.join()
end = time()
print('总共耗费了%.3f秒' % (end - start))
print('thread %s is running...' % current_thread().name)
if __name__ == '__main__':
download_multi_threading()
复制代码
实现多线程的方式和多进程相似,也是经过 Thread
类建立线程对象,target
参数表示传入须要执行的函数,args
参数是表示传给函数的参数,而后 name
是给当前线程进行命名,默认命名是如 Thread-
一、Thread-2
等等。
此外,任何进程默认会启动一个线程,咱们将它称为主线程,主线程又能够启动新的线程,在 threading
模块中有一个函数 current_thread()
,能够返回当前线程的实例。主线程实例的名字叫 MainThread
,子线程的名字是在建立的时候指定,也就是 name
参数。
运行结果:
thread MainThread is running...
thread subthread-1 is running...
开始下载Python.pdf...
thread subthread-2 is running...
开始下载nazha.mkv...
nazha.mkv下载完成! 耗费了5秒
Python.pdf下载完成! 耗费了7秒
总共耗费了7.001秒
thread MainThread is running...
复制代码
多线程和多进程最大的不一样在于,多进程中,同一个变量,各自有一份拷贝存在于每一个进程中,互不影响,而多线程中,全部变量都由全部线程共享,因此,任何一个变量均可以被任何一个线程修改,所以,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
下面是一个例子,演示了多线程同时操做一个变量,如何把内存给改乱了:
from threading import Thread
from time import time, sleep
# 假定这是你的银行存款:
balance = 0
def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
change_it(n)
def nolock_multi_thread():
t1 = Thread(target=run_thread, args=(5,))
t2 = Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
if __name__ == '__main__':
nolock_multi_thread()
复制代码
运行结果:
-8
复制代码
代码中定义了一个共享变量 balance
,而后启动两个线程,先存后取,理论上结果应该是 0
。可是,因为线程的调度是由操做系统决定的,当 t一、t2 交替执行时,只要循环次数足够多,balance
的结果就不必定是0
了。
缘由就是下面这条语句:
balance = balance + n
复制代码
这条语句的执行分为两步的:
balance + n
,保存结果到一个临时变量balance
也就是能够当作:
x = balance+n
balance=x
复制代码
正常运行以下所示:
初始值 balance = 0
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2 # balance = 0
结果 balance = 0
复制代码
但实际上两个线程是交替运行的,也就是:
初始值 balance = 0
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance - 8 # x2 = 0 - 8 = -8
t2: balance = x2 # balance = -8
结果 balance = -8
复制代码
简单说,就是由于对 balance
的修改须要多条语句,而执行这几条语句的时候,线程可能中断,致使多个线程把同个对象的内容该乱了。
要保证计算正确,须要给 change_it()
添加一个锁,添加锁后,其余线程就必须等待当前线程执行完并释放锁,才能够执行该函数。而且锁是只有一个,不管多少线程,同一时刻最多只有一个线程持有该锁。经过 threading
模块的 Lock
实现。
所以代码修改成:
from threading import Thread, Lock
from time import time, sleep
# 假定这是你的银行存款:
balance = 0
lock = Lock()
def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n
def run_thread_lock(n):
for i in range(100000):
# 先要获取锁:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了必定要释放锁:
lock.release()
def nolock_multi_thread():
t1 = Thread(target=run_thread_lock, args=(5,))
t2 = Thread(target=run_thread_lock, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
if __name__ == '__main__':
nolock_multi_thread()
复制代码
但遗憾的是 Python 并不能彻底发挥多线程的做用,这里能够经过写一个死循环,而后经过任务管理器查看进程的 CPU 使用率。
正常来讲,若是有两个死循环线程,在多核CPU中,能够监控到会占用200%的CPU,也就是占用两个CPU核心。
要想把 N 核CPU的核心所有跑满,就必须启动 N 个死循环线程。
死循环代码以下所示:
import threading, multiprocessing
def loop():
x = 0
while True:
x = x ^ 1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
复制代码
在 4 核CPU上能够监控到 CPU 占用率仅有102%,也就是仅使用了一核。
可是用其余编程语言,好比C、C++或 Java来改写相同的死循环,直接能够把所有核心跑满,4核就跑到400%,8核就跑到800%,为何Python不行呢?
由于 Python 的线程虽然是真正的线程,但解释器执行代码时,有一个 GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先得到GIL锁,而后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个 GIL 全局锁实际上把全部线程的执行代码都给上了锁,因此,多线程在Python中只能交替执行,即便100个线程跑在100核CPU上,也只能用到1个核。
GIL是 Python 解释器设计的历史遗留问题,一般咱们用的解释器是官方实现的 CPython,要真正利用多核,除非重写一个不带GIL的解释器。
尽管多线程不能彻底利用多核,但对于程序的运行效率提高仍是很大的,若是想实现多核任务,能够经过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。
采用多线程的时候,一个线程采用本身的局部变量会比全局变量更好,缘由前面也介绍了,若是不加锁,多个线程可能会乱改某个全局变量的数值,而局部变量是只有每一个线程本身可见,不会影响其余线程。
不过,局部变量的使用也有问题,就是函数调用时候,传递起来会比较麻烦,即以下所示:
def process_student(name):
std = Student(name)
# std是局部变量,可是每一个函数都要用它,所以必须传进去:
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)
复制代码
局部变量须要一层层传递给每一个函数,比较麻烦,有没有更好的办法呢?
一个思路是用一个全局的 dict
,而后用每一个线程做为 key
,代码例子以下所示:
global_dict = {}
def std_thread(name):
std = Student(name)
# 把std放到全局变量global_dict中:
global_dict[threading.current_thread()] = std
do_task_1()
do_task_2()
def do_task_1():
# 不传入std,而是根据当前线程查找:
std = global_dict[threading.current_thread()]
...
def do_task_2():
# 任何函数均可以查找出当前线程的std变量:
std = global_dict[threading.current_thread()]
复制代码
这种方式理论上是可行的,它能够避免局部变量在每层函数中传递,只是获取局部变量的代码不够优雅,在 threading
模块中提供了 local
函数,能够自动完成这件事情,代码以下所示:
import threading
# 建立全局ThreadLocal对象:
local_school = threading.local()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
复制代码
运行结果:
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
复制代码
在代码中定义了一个全局变量 local_school
,它是一个 ThreadLocal
对象,每一个线程均可以对它读写 student
属性,但又不会互相影响,也不须要管理锁的问题,这是 ThreadLocal
内部会处理。
ThreadLocal
最经常使用的是为每一个线程绑定一个数据库链接,HTTP 请求,用户身份信息等,这样一个线程的全部调用到的处理函数均可以很是方便地访问这些资源。
咱们已经分别介绍了多进程和多线程的实现方式,那么究竟应该选择哪一种方法来实现并发编程呢,这二者有什么优缺点呢?
一般多任务的实现,咱们都是设计 Master-Worker
,Master
负责分配任务,Worker
负责执行任务,所以多任务环境下,一般是一个 Master
和多个 Worker
。
若是用多进程实现 Master-Worker
,主进程就是 Master
,其余进程就是 Worker
。
若是用多线程实现 Master-Worker
,主线程就是 Master
,其余线程就是 Worker
。
对于多进程,最大的优势就是稳定性高,由于一个子进程挂了,不会影响主进程和其余子进程。固然主进程挂了,全部进程天然也就挂,但主进程只是负责分配任务,挂掉几率很是低。著名的 Apache 最先就是采用多进程模式。
缺点有:
Unix/ Linux
系统由于能够调用 fork()
,因此开销还行;对于多线程,一般会快过多进程,但也不会快太多;缺点就是稳定性很差,由于全部线程共享进程的内存,一个线程挂断均可能直接形成整个进程崩溃。好比在Windows上,若是一个线程执行的代码出了问题,你常常能够看到这样的提示:“该程序执行了非法操做,即将关闭”,其实每每是某个线程出了问题,可是操做系统会强制结束整个进程。
是否采用多任务模式,第一点须要注意的就是,一旦任务数量过多,效率确定上不去,这主要是切换进程或者线程是有代价的。
操做系统在切换进程或者线程时的流程是这样的:
这个切换过程虽然很快,可是也须要耗费时间,若是任务数量有上千个,操做系统可能就忙着切换任务,而没有时间执行任务,这种状况最多见的就是硬盘狂响,点窗口无反应,系统处于假死状态。
采用多任务的第二个考虑就是任务的类型,能够将任务分为计算密集型和 I/O 密集型。
计算密集型任务的特色是要进行大量的计算,消耗CPU资源,好比对视频进行编码解码或者格式转换等等,这种任务全靠 CPU 的运算能力,虽然也能够用多任务完成,可是任务越多,花在任务切换的时间就越多,CPU 执行任务的效率就越低。计算密集型任务因为主要消耗CPU资源,这类任务用 Python这样的脚本语言去执行效率一般很低,最能胜任这类任务的是C语言,咱们以前提到了 Python 中有嵌入 C/C++ 代码的机制。不过,若是必须用 Python 来处理,那最佳的就是采用多进程,并且任务数量最好是等同于 CPU 的核心数。
除了计算密集型任务,其余的涉及到网络、存储介质 I/O 的任务均可以视为 I/O 密集型任务,这类任务的特色是 CPU 消耗不多,任务的大部分时间都在等待 I/O 操做完成(由于 I/O 的速度远远低于 CPU 和内存的速度)。对于 I/O 密集型任务,若是启动多任务,就能够减小 I/O 等待时间从而让 CPU 高效率的运转。通常会采用多线程来处理 I/O 密集型任务。
现代操做系统对 I/O 操做的改进中最为重要的就是支持异步 I/O。若是充分利用操做系统提供的异步 I/O 支持,就能够用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型。Nginx 就是支持异步 I/O的 Web 服务器,它在单核 CPU 上采用单进程模型就能够高效地支持多任务。在多核 CPU 上,能够运行多个进程(数量与CPU核心数相同),充分利用多核 CPU。用 Node.js 开发的服务器端程序也使用了这种工做模式,这也是当下实现多任务编程的一种趋势。
在 Python 中,单线程+异步 I/O 的编程模型称为协程,有了协程的支持,就能够基于事件驱动编写高效的多任务程序。协程最大的优点就是极高的执行效率,由于子程序切换不是线程切换,而是由程序自身控制,所以,没有线程切换的开销。协程的第二个优点就是不须要多线程的锁机制,由于只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不用加锁,只须要判断状态就行了,因此执行效率比多线程高不少。若是想要充分利用CPU的多核特性,最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可得到极高的性能。
参考
以上就是本次教程的全部内容,代码已经上传到:
欢迎关注个人微信公众号--算法猿的成长,或者扫描下方的二维码,你们一块儿交流,学习和进步!