这一篇是Python并发的第四篇,主要介绍进程和线程的定义,Python线程和全局解释器锁以及Python如何使用thread模块处理并发html
考虑一下这个场景,咱们有10000条数据须要处理,处理每条数据须要花费1秒,但读取数据只须要0.1秒,每条数据互不干扰。该如何执行才能花费时间最短呢?python
在多线程(MT)编程出现以前,电脑程序的运行由一个执行序列组成,执行序列按顺序在主机的中央处理器(CPU)中运行。不管是任务自己要求顺序执行仍是整个程序是由多个子任务组成,程序都是按这种方式执行的。即便子任务相互独立,互相无关(即,一个子任务的结果不影响其它子 任务的结果)时也是这样。linux
HUGOMORE42git
对于上边的问题,若是使用一个执行序列来完成,咱们大约须要花费 10000*0.1 + 10000 = 11000 秒。这个时间显然是太长了。程序员
那咱们有没有可能在执行计算的同时取数据呢?或者是同时处理几条数据呢?若是能够,这样就能大幅提升任务的效率。这就是多线程编程的目的。github
对于本质上就是异步的, 须要有多个并发事务,各个事务的运行顺序能够是不肯定的,随机的,不可预测的问题,多线程是最理想的解决方案。这样的任务能够被分红多个执行流,每一个流都有一个要完成的目标,而后将获得的结果合并,获得最终的结果。算法
进程(有时被称为重量级进程)是程序的一次 执行。每一个进程都有本身的地址空间,内存,数据栈以及其它记录其运行轨迹的辅助数据。操做系 统管理在其上运行的全部进程,并为这些进程公平地分配时间。进程也能够经过 fork 和 spawn 操做 来完成其它的任务。不过各个进程有本身的内存空间,数据栈等,因此只能使用进程间通信(IPC), 而不能直接共享信息。编程
线程(有时被称为轻量级进程)跟进程有些类似,不一样的是,全部的线程运行在同一个进程中, 共享相同的运行环境。它们能够想像成是在主进程或“主线程”中并行运行的“迷你进程”。安全
线程状态如图服务器
线程有开始,顺序执行和结束三部分。它有一个本身的指令指针,记录本身运行到什么地方。 线程的运行可能被抢占(中断),或暂时的被挂起(也叫睡眠),让其它的线程运行,这叫作让步。 一个进程中的各个线程之间共享同一片数据空间,因此线程之间能够比进程之间更方便地共享数据以及相互通信。
固然,这样的共享并非彻底没有危险的。若是多个线程共同访问同一片数据,则因为数据访 问的顺序不同,有可能致使数据结果的不一致的问题。这叫作竞态条件(race condition)。
线程通常都是并发执行的,不过在单 CPU 的系统中,真正的并发是不可能的,每一个线程会被安排成每次只运行一小会,而后就把 CPU 让出来,让其它的线程去运行。因为有的函数会在完成以前阻塞住,在没有特别为多线程作修改的情 况下,这种“贪婪”的函数会让 CPU 的时间分配有所倾斜。致使各个线程分配到的运行时间可能不 尽相同,不尽公平。
首先须要明确的一点是GIL并非Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就比如C++是一套语言(语法)标准,可是能够用不一样的编译器来编译成可执行代码。一样一段代码能够经过CPython,PyPy,Psyco等不一样的Python执行环境来执行(其中的JPython就没有GIL)。
那么CPython实现中的GIL又是什么呢?GIL全称Global Interpreter Lock为了不误导,咱们仍是来看一下官方给出的解释:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
尽管Python彻底支持多线程编程, 可是解释器的C语言实现部分在彻底并行执行时并非线程安全的。 实际上,解释器被一个全局解释器锁保护着,它确保任什么时候候都只有一个Python线程执行。
在多线程环境中,Python 虚拟机按如下方式执行:
对全部面向 I/O 的(会调用内建的操做系统 C 代码的)程序来讲,GIL 会在这个 I/O 调用之 前被释放,以容许其它的线程在这个线程等待 I/O 的时候运行。若是某线程并未使用不少 I/O 操做, 它会在本身的时间片内一直占用处理器(和 GIL)。也就是说,I/O 密集型的 Python 程序比计算密集 型的程序更能充分利用多线程环境的好处。
当一个线程结束计算,它就退出了。线程能够调用 thread.exit()之类的退出函数,也可使用 Python 退出进程的标准方法,如 sys.exit()或抛出一个 SystemExit 异常等。不过,你不能够直接 “杀掉”("kill")一个线程。
在 Win32 和 Linux, Solaris, MacOS, *BSD 等大多数类 Unix 系统上运行时,Python 支持多线程 编程。Python 使用 POSIX 兼容的线程,即 pthreads。
默认状况下,只要在解释器中
>> import thread复制代码
若是没有报错,则说明线程可用。
Python 供了几个用于多线程编程的模块,包括 thread, threading 和 Queue 等。thread 和 threading 模块容许程序员建立和管理线程。thread 模块 供了基本的线程和锁的支持,而 threading 供了更高级别,功能更强的线程管理的功能。Queue 模块容许用户建立一个能够用于多个线程之间 共享数据的队列数据结构。
出于如下几点考虑,咱们不建议您使用 thread 模块。
除了产生线程外,thread 模块也提供了基本的同步数 据结构锁对象(lock object,也叫原语锁,简单锁,互斥锁,互斥量,二值信号量)。
thread 模块函数
下面是一个使用 thread 的例子:
import thread
from time import sleep, time
def loop(num):
print('start loop at:', time())
sleep(num)
print('loop done at:', time())
def loop1(num):
print('start loop 1 at:', time())
sleep(num)
print('loop 1 done at:', time())
def main():
print('starting at:', time())
thread.start_new_thread(loop, (4,))
thread.start_new_thread(loop1, (5,))
sleep(6)
print('all DONE at:', time())
if __name__ == '__main__':
main()
('starting at:', 1489387024.886667)
('start loop at:', 1489387024.88705)
('start loop 1 at:', 1489387024.887277)
('loop done at:', 1489387028.888182)
('loop 1 done at:', 1489387029.888904)
('all DONE at:', 1489387030.889918)复制代码
start_new_thread()要求必定要有前两个参数。因此,就算咱们想要运行的函数不要参数,也要传一个空的元组。
为何要加上sleep(6)这一句呢? 由于,若是咱们没有让主线程停下来,那主线程就会运行下一条语句,显示 “all done”,而后就关闭运行着 loop()和 loop1()的两个线程,退出了。
咱们有没有更好的办法替换使用sleep() 这种不靠谱的同步方式呢?答案是使用锁,使用了锁,咱们就能够在两个线程都退出以后立刻退出。
#! -*- coding: utf-8 -*-
import thread
from time import sleep, time
loops = [4, 2]
def loop(nloop, nsec, lock):
print('start loop %s at: %s' % (nloop, time()))
sleep(nsec)
print('loop %s done at: %s' % (nloop, time()))
# 每一个线程都会被分配一个事先已经得到的锁,在 sleep()的时间到了以后就释放 相应的锁以通知主线程,这个线程已经结束了。
lock.release()
def main():
print('starting at:', time())
locks = []
nloops = range(len(loops))
for i in nloops:
# 调用 thread.allocate_lock()函数建立一个锁的列表
lock = thread.allocate_lock()
# 分别调用各个锁的 acquire()函数得到, 得到锁表示“把锁锁上”
lock.acquire()
locks.append(lock)
for i in nloops:
# 建立线程,每一个线程都用各自的循环号,睡眠时间和锁为参数去调用 loop()函数
thread.start_new_thread(loop, (i, loops[i], locks[i]))
for i in nloops:
# 在线程结束的时候,线程要本身去作解锁操做
# 当前循环只是坐在那一直等(达到暂停主 线程的目的),直到两个锁都被解锁为止才继续运行。
while locks[i].locked(): pass
print('all DONE at:', time())
if __name__ == '__main__':
main()复制代码
为何咱们不在建立锁的循环里建立线程呢?有如下几个缘由:
threading 模块不只提供了 Thread 类,还提供了各类很是好用的同步机制。
下面是threading 模块里全部的对象:
另外一个避免使用 thread 模块的缘由是,它不支持守护线程。当主线程退出时,全部的子线程不 论它们是否还在工做,都会被强行退出。有时,咱们并不指望这种行为,这时,就引入了守护线程 的概念
threading 模块支持守护线程,它们是这样工做的:守护线程通常是一个等待客户请求的服务器, 若是没有客户 出请求,它就在那等着。若是你设定一个线程为守护线程,就表示你在说这个线程 是不重要的,在进程退出的时候,不用等待这个线程退出。
若是你的主线程要退出的时候,不用等待那些子线程完成,那就设定这些线程的 daemon 属性。 即,在线程开始(调用 thread.start())以前,调用 setDaemon()函数设定线程的 daemon 标志 (thread.setDaemon(True))就表示这个线程“不重要”
若是你想要等待子线程完成再退出,那就什么都不用作,或者显式地调用 thread.setDaemon(False)以保证其 daemon 标志为 False。你能够调用 thread.isDaemon()函数来判 断其 daemon 标志的值。新的子线程会继承其父线程的 daemon 标志。整个 Python 会在全部的非守护 线程退出后才会结束,即进程中没有非守护线程存在的时候才结束。
Thread类提供了如下方法:
用 Thread 类,你能够用多种方法来建立线程。咱们在这里介绍三种比较相像的方法。
下边是三种不一样方式的建立线程的示例:
#! -*- coding: utf-8 -*-
# 建立一个Thread的实例,传给它一个函数
import threading
from time import sleep, time
loops = [4, 2]
def loop(nloop, nsec, lock):
print('start loop %s at: %s' % (nloop, time()))
sleep(nsec)
print('loop %s done at: %s' % (nloop, time()))
# 每一个线程都会被分配一个事先已经得到的锁,在 sleep()的时间到了以后就释放 相应的锁以通知主线程,这个线程已经结束了。
def main():
print('starting at:', time())
threads = []
nloops = range(len(loops))
for i in nloops:
t = threading.Thread(target=loop, args=(i, loops[i]))
threads.append(t)
for i in nloops:
# start threads
threads[i].start()
for i in nloops:
# wait for all
# join()会等到线程结束,或者在给了 timeout 参数的时候,等到超时为止。
# 使用 join()看上去 会比使用一个等待锁释放的无限循环清楚一些(这种锁也被称为"spinlock")
threads[i].join() # threads to finish
print('all DONE at:', time())
if __name__ == '__main__':
main()复制代码
与传一个函数很类似的另外一个方法是在建立线程的时候,传一个可调用的类的实例供线程启动 的时候执行——这是多线程编程的一个更为面向对象的方法。相对于一个或几个函数来讲,因为类 对象里可使用类的强大的功能,能够保存更多的信息,这种方法更为灵活
#! -*- coding: utf-8 -*-
# 建立一个 Thread 的实例,传给它一个可调用的类对象
from threading import Thread
from time import sleep, time
loops = [4, 2]
class ThreadFunc(object):
def __init__(self, func, args, name=""):
self.name = name
self.func = func
self.args = args
def __call__(self):
# 建立新线程的时候,Thread 对象会调用咱们的 ThreadFunc 对象,这时会用到一个特殊函数 __call__()。
self.func(*self.args)
def loop(nloop, nsec):
print('start loop %s at: %s' % (nloop, time()))
sleep(nsec)
print('loop %s done at: %s' % (nloop, time()))
def main():
print('starting at:', time())
threads = []
nloops = range(len(loops))
for i in nloops:
t = Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
threads.append(t)
for i in nloops:
# start threads
threads[i].start()
for i in nloops:
# wait for all
# join()会等到线程结束,或者在给了 timeout 参数的时候,等到超时为止。
# 使用 join()看上去 会比使用一个等待锁释放的无限循环清楚一些(这种锁也被称为"spinlock")
threads[i].join() # threads to finish
print('all DONE at:', time())
if __name__ == '__main__':
main()复制代码
最后一个例子介绍如何子类化 Thread 类,这与上一个例子中的建立一个可调用的类很是像。使用子类化建立线程(第 29-30 行)使代码看上去更清晰明了。
#! -*- coding: utf-8 -*-
# 建立一个 Thread 的实例,传给它一个可调用的类对象
from threading import Thread
from time import sleep, time
loops = [4, 2]
class MyThread(Thread):
def __init__(self, func, args, name=""):
super(MyThread, self).__init__()
self.name = name
self.func = func
self.args = args
def getResult(self):
return self.res
def run(self):
# 建立新线程的时候,Thread 对象会调用咱们的 ThreadFunc 对象,这时会用到一个特殊函数 __call__()。
print 'starting', self.name, 'at:', time()
self.res = self.func(*self.args)
print self.name, 'finished at:', time()
def loop(nloop, nsec):
print('start loop %s at: %s' % (nloop, time()))
sleep(nsec)
print('loop %s done at: %s' % (nloop, time()))
def main():
print('starting at:', time())
threads = []
nloops = range(len(loops))
for i in nloops:
t = MyThread(loop, (i, loops[i]), loop.__name__)
threads.append(t)
for i in nloops:
# start threads
threads[i].start()
for i in nloops:
# wait for all
# join()会等到线程结束,或者在给了 timeout 参数的时候,等到超时为止。
# 使用 join()看上去 会比使用一个等待锁释放的无限循环清楚一些(这种锁也被称为"spinlock")
threads[i].join() # threads to finish
print('all DONE at:', time())
if __name__ == '__main__':
main()复制代码
下面,咱们接咱们以前按以前并发的套路,用实现一下使用 threading 并发下载国旗
# python3
import threading
from threading import Thread
from flags import save_flag, show, main, get_flag
class MyThread(Thread):
def __init__(self, func, args, name=""):
super(MyThread, self).__init__()
self.name = name
self.func = func
self.args = args
def getResult(self):
return self.res
def run(self):
# 建立新线程的时候,Thread 对象会调用咱们的 ThreadFunc 对象,这时会用到一个特殊函数 __call__()。
self.res = self.func(*self.args)
def download_one(cc): # <3>
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc
def download_many(cc_list):
threads = []
for cc in cc_list:
thread = MyThread(download_one, (cc, ), download_one.__name__)
threads.append(thread)
for thread in threads:
# 启动线程
thread.start()
for thread in threads:
# wait for all
# join()会等到线程结束,或者在给了 timeout 参数的时候,等到超时为止。
# 使用 join()看上去 会比使用一个等待锁释放的无限循环清楚一些(这种锁也被称为"spinlock")
thread.join()
return len(list(threads)) # <7>
if __name__ == '__main__':
main(download_many)复制代码
执行代码发现和使用协程相比速度基本一致。
除了各类同步对象和线程对象外,threading 模块还 供了一些函数。
原语锁定是一个同步原语,状态是锁定或未锁定。两个方法acquire()和release() 用于加锁和释放锁。
RLock 可重入锁是一个相似于Lock对象的同步原语,但同一个线程能够屡次调用。
Lock 不支持递归加锁,也就是说即使在同 线程中,也必须等待锁释放。一般建议改 RLock, 它会处理 "owning thread" 和 "recursion level" 状态,对于同 线程的屡次请求锁 为,只累加
计数器。每次调 release() 将递减该计数器,直到 0 时释放锁,所以 acquire() 和 release() 必须 要成对出现。
from time import sleep
from threading import current_thread, Thread
lock = Rlock()
def show():
with lock:
print current_thread().name, i
sleep(0.1)
def test():
with lock:
for i in range(3):
show(i)
for i in range(2):
Thread(target=test).start()复制代码
事件用于在线程间通讯。一个线程发出一个信号,其余一个或多个线程等待。
Event 经过经过 个内部标记来协调多线程运 。 法 wait() 阻塞线程执 ,直到标记为 True。 set() 将标记设为 True,clear() 更改标记为 False。isSet() 用于判断标记状态。
from threading import Event
def test_event():
e = Event()
def test():
for i in range(5):
print 'start wait'
e.wait()
e.clear() # 若是不调用clear(),那么标记一直为 True,wait()就不会发生阻塞行为
print i
Thread(target=test).start()
return e
e = test_event()复制代码
条件变量和 Lock 参数同样,也是一个,也是一个同步原语,当须要线程关注特定的状态变化或事件的发生时使用这个锁定。
能够认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另外一个线程调用notify()/notifyAll()通知;获得通知后线程进入锁定池等待锁定。
构造方法:
Condition([lock/rlock])
Condition 有如下这些方法:
from threading import Condition, current_thread, Thread
con = Condition()
def tc1():
with con:
for i in range(5):
print current_thread().name, i
sleep(0.3)
if i == 3:
con.wait()
def tc2():
with con:
for i in range(5):
print current_thread().name, i
sleep(0.1)
con.notify()
Thread(target=tc1).start()
Thread(target=tc2).start()
Thread-1 0
Thread-1 1
Thread-1 2
Thread-1 3 # 让出锁
Thread-2 0
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4
Thread-1 4 # 从新获取锁,继续执复制代码
只有获取锁的线程才能调用 wait() 和 notify(),所以必须在锁释放前调用。
当 wait() 释放锁后,其余线程也可进入 wait 状态。notifyAll() 激活全部等待线程,让它们去抢锁而后完成后续执行。
如今咱们用一个经典的(生产者消费者)例子来介绍一下 Queue模块。
生产者消费者的场景是: 生产者生产货物,而后把货物放到一个队列之类的数据结构中,生产货物所要花费的时间没法预先肯定。消费者消耗生产者生产的货物的时间也是不肯定的。
经常使用的 Queue 模块的属性:
Queue 模块能够用来进行线程间通信,让各个线程之间共享数据。
如今,咱们建立一个队列,让 生产者(线程)把新生产的货物放进去供消费者(线程)使用。
# python2
#! -*- coding: utf-8 -*-
from Queue import Queue
from random import randint
from time import sleep, time
from threading import Thread
class MyThread(Thread):
def __init__(self, func, args, name=""):
super(MyThread, self).__init__()
self.name = name
self.func = func
self.args = args
def getResult(self):
return self.res
def run(self):
# 建立新线程的时候,Thread 对象会调用咱们的 ThreadFunc 对象,这时会用到一个特殊函数 __call__()。
print 'starting', self.name, 'at:', time()
self.res = self.func(*self.args)
print self.name, 'finished at:', time()
# writeQ()和 readQ()函数分别用来把对象放入队列和消耗队列中的一个对象。在这里咱们使用 字符串'xxx'来表示队列中的对象。
def writeQ(queue):
print 'producing object for Q...'
queue.put('xxx', 1)
print "size now", queue.qsize()
def readQ(queue):
queue.get(1)
print("consumed object from Q... size now", queue.qsize())
def writer(queue, loops):
# writer()函数只作一件事,就是一次往队列中放入一个对象,等待一会,而后再作一样的事
for i in range(loops):
writeQ(queue)
sleep(1)
def reader(queue, loops):
# reader()函数只作一件事,就是一次从队列中取出一个对象,等待一会,而后再作一样的事
for i in range(loops):
readQ(queue)
sleep(randint(2, 5))
# 设置有多少个线程要被运行
funcs = [writer, reader]
nfuncs = range(len(funcs))
def main():
nloops = randint(10, 20)
q = Queue(32)
threads = []
for i in nfuncs:
t = MyThread(funcs[i], (q, nloops), funcs[i].__name__)
threads.append(t)
for i in nfuncs:
threads[i].start()
for i in nfuncs:
threads[i].join()
print threads[i].getResult()
print 'all DONE'
if __name__ == '__main__':
main()复制代码
进程(有时被称为重量级进程)是程序的一次 执行。每一个进程都有本身的地址空间,内存,数据栈以及其它记录其运行轨迹的辅助数据。
线程(有时被称为轻量级进程)跟进程有些类似,不一样的是,全部的线程运行在同一个进程中, 共享相同的运行环境。它们能够想像成是在主进程或“主线程”中并行运行的“迷你进程”。
这篇文章很好的解释了 线程和进程的区别,推荐阅读: www.ruanyifeng.com/blog/2013/0…
因为GIL的缘故,对全部面向 I/O 的(会调用内建的操做系统 C 代码的)程序来讲,GIL 会在这个 I/O 调用之 前被释放,以容许其它的线程在这个线程等待 I/O 的时候运行。若是某线程并未使用不少 I/O 操做, 它会在本身的时间片内一直占用处理器(和 GIL)。也就是说,I/O 密集型的 Python 程序比计算密集 型的程序更能充分利用多线程环境的好处。
Python的线程就是C语言的一个pthread,并经过操做系统调度算法进行调度(例如linux是CFS)。为了让各个线程可以平均利用CPU时间,python会计算当前已执行的微代码数量,达到必定阈值后就强制释放GIL。而这时也会触发一次操做系统的线程调度(固然是否真正进行上下文切换由操做系统自主决定)。
伪代码
while True:
acquire GIL
for i in 1000:
do something
release GIL
/* Give Operating System a chance to do thread scheduling */复制代码
这种模式在只有一个CPU核心的状况下毫无问题。任何一个线程被唤起时都能成功得到到GIL(由于只有释放了GIL才会引起线程调度)。
但当CPU有多个核心的时候,问题就来了。从伪代码能够看到,从release GIL到acquire GIL之间几乎是没有间隙的。因此当其余在其余核心上的线程被唤醒时,大部分状况下主线程已经又再一次获取到GIL了。这个时候被唤醒执行的线程只能白白的浪费CPU时间,看着另外一个线程拿着GIL欢快的执行着。而后达到切换时间后进入待调度状态,再被唤醒,再等待,以此往复恶性循环。
简单的总结下就是:Python的多线程在多核CPU上,只对于IO密集型计算产生正面效果;而当有至少有一个CPU密集型线程存在,那么多线程效率会因为GIL而大幅降低。
>欢迎关注 | >请我喝芬达 |
---|---|
![]() |
![]() |