学习Python多线程的资料不少,吐槽Python多线程的博客也很多。本文主要介绍Python多线程实际应用,且假设读者已经了解多线程的基本概念。若是读者对进程线程概念不甚了解,可参见知名博主 阮一峰 转译的一篇博客:《进程与线程的一个简单解释》。html
Python中多线程主要有两个模块,_thread和threading模块。前者更底层,后者更经常使用,能知足绝大部分编程需求,今天主要围绕threading模块展开介绍。启动一个线程须要用threading模块中的Thread。python
线程的启动须要先建立Thread对象,而后调用该对象的start()方法,参见下例:算法
import time import threading def func(n): while n > 0: print("线程name:", threading.current_thread().name, "参数n:", n) n -= 1 time.sleep(1) t = threading.Thread(target=func, args=(5,)) t.start() print("主线程:", threading.current_thread().name) # 运行结果: # 线程name: Thread-1 参数n: 5 # 主线程: MainThread # 线程name: Thread-1 参数n: 4 # 线程name: Thread-1 参数n: 3 # 线程name: Thread-1 参数n: 2 # 线程name: Thread-1 参数n: 1
上例中,threading.current_thread().name 是获取当前线程的name属性。数据库
Thread中,形参target传入函数名,args传入函数对应的参数,参数必须是可迭代对象,若是是元组且只有一个参数必须写成(参数,)的形式,逗号不能省略。编程
一旦启动一个线程,该线程将由操做系统来全权管理,独立执行直到目标函数返回。通常状况下,线程的操做有如下几种:安全
t.is_alive() # 查询线程对象的状态,返回布尔值 t.join() # 将线程加入到当前线程,并等待其终止 t = Thread(target=countdown, args=(10,), daemon=True) # 后台线程 t.start()
查看线程状态示例:服务器
import time import threading def func(n): while n > 0: print("线程name:", threading.current_thread().name, "参数n:", n) n -= 1 time.sleep(1) t = threading.Thread(target=func, args=(2,)) t.start() print("主线程:", threading.current_thread().name) if t.is_alive(): print("活着的") else: print("未存活") print("主线程结束")
让主线程等待其余线程,就是主线程会在join()处一直等待全部线程都结束以后,再继续运行。参见下例:多线程
import time import threading def func(n): while n > 0: print("线程name:", threading.current_thread().name, "参数n:", n) n -= 1 time.sleep(1) t = threading.Thread(target=func, args=(2,)) t.start() t.join() print("主线程:", threading.current_thread().name) print("主线程结束") # 运行结果: # 线程name: Thread-1 参数n: 2 # 线程name: Thread-1 参数n: 1 # 主线程: MainThread # 主线程结束
后台线程参见下例:app
import time import threading def func(n): while n > 0: print("参数n:", n) n -= 1 time.sleep(1) t = threading.Thread(target=func, args=(10, ), daemon=True) t.start() time.sleep(3) print("主线程结束") # 参数n: 10 # 参数n: 9 # 参数n: 8 # 参数n: 7 # 主线程结束
后台线程没法等待,但主线程终止时后台线程自动销毁。 若是要对线程进行高级操做,如发送信号,终止线程,都须要本身实现。下例经过轮询控制线程退出:异步
import time from threading import Thread class StopThread: def __init__(self): self._flag = True def terminate(self): self._flag = False def run(self, n): while self._flag and n > 0: print('num>>:', n) n -= 1 time.sleep(1) obj = StopThread() t = Thread(target=obj.run, args=(11,)) t.start() time.sleep(5) # 表示do something obj.terminate() # 终止线程 t.join() print("主线程结束")
上例经过类中的_flag控制线程的终止,当主线程执行5秒以后,主动将_flag赋值为False终止线程。经过轮询终止线程存在一个问题,若是while self._flag and n > 0:这句后,某次循环一直阻塞在I/O操做上,根本不会进行下一次循环,天然就没法终止。这该怎么办呢?留一个思考题。
多线程还能够经过继承Thread实现,以下:
import time from threading import Thread class A(Thread): def __init__(self,): super().__init__() def run(self): print("run1..", ) time.sleep(5) print("run2..") obj = A() obj.start() print("主线程结束")
当咱们用多个线程同时修改同一份数据时,怎么保证最终结果是咱们期许的呢?举个例子,当前有一个全局变量a=0,若是有10个线程同时对其加1,这就出现了线程间的竞争,到底应该听谁的呢?这时候,应该用线程锁来解决。也就是当某一个线程A对该数据操做时,对该数据加锁,其余线程只能等着,等待A操做完以后释放了该锁,其余线程才能操做该数据,一旦某个线程得到操做数据的权限,当即又加上锁。如此便能保证数据的安全准确。奇怪的是,在Python3中,即便不加锁,好像也不会发生数据出错的状况。或许这个例子不是很好,也或许是Python3中自动加了锁。但愿有知道的读者赐教一下。这个奇怪的现象就是下例了:
from threading import Thread import time def add_one(a): time.sleep(1) print("in thread a:", a) a[1] += 1 if __name__ == '__main__': array = [0, 1, 4] thread_obj_list = [] for i in range(50): t = Thread(target=add_one, args=(array,)) t.start() thread_obj_list.append(t) for j in thread_obj_list: j.join() print("array result::", array) # array result:: [0, 51, 4]
咱们看到,最后array的第二个元素是51,并无出错,这真是使人费解。好了,言归正传,来看看线程锁的几个方法吧:
lock = threading.Lock() # Lock对象 lock.acquire() # 锁定 lock.release() # 解锁
Lock有“锁定”或“解锁”两种状态之一。它是在解锁状态下建立的。它有两个基本方法,acquire() 和 release()。
当状态为解锁时,acquire()将状态更改成锁定并当即返回。当状态被锁定时,acquire()块直到对另外一个协程中的release()的调用将其改变为解锁,而后acquire()调用将其重置为锁定并返回。
release()方法只应在锁定状态下调用;它将状态更改成已解锁并当即返回。若是尝试释放已解锁的锁,则会引起 RuntimeError。
下面是一个具体的使用例子:
from threading import Thread import time import threading lock = threading.Lock() def add_one(a): time.sleep(1) lock.acquire() a[1] += 1 lock.release() if __name__ == '__main__': array = [0, 1, 4] thread_obj_list = [] for i in range(50): t = Thread(target=add_one, args=(array,)) t.start() thread_obj_list.append(t) for j in thread_obj_list: j.join() print("array result::", array) # array result:: [0, 51, 4]
acquire()和release()方法成对出现。可是这样手动释放有时候可能会遗忘,这时候能够考虑用上下文管理协议。关于上下文管理协议,可参见做者的这篇文章【Python上下文管理器】。
Lock对象支持with语句:
def add_one(a): time.sleep(1) with lock: a[1] += 1
可重入锁(又称递归锁,RLock),就是大锁中包含子锁的状况下使用。在这种状况下,再用Lock时,就会出现死锁现象,此时应该用threading.RLock()对象了,用法同Lock,参见下例:
from threading import Thread import time import threading lock = threading.RLock() def add_one(a): lock.acquire() a[1] += 1 lock.release() def add_two(b): time.sleep(1) lock.acquire() b[1] += 2 add_one(b) lock.release() if __name__ == '__main__': array = [0, 1, 4] thread_obj_list = [] for i in range(50): t = Thread(target=add_two, args=(array,)) t.start() thread_obj_list.append(t) for j in thread_obj_list: j.join() print("array result::", array) # array result:: [0, 151, 4]
上例读者能够试试Lock(),看看什么效果。RLock()还支持上下文管理协议,上例中的两个函数能够改为这样:
def add_one(a): with rlock: a[1] += 1 def add_two(b): time.sleep(1) with rlock: b[1] += 2 add_one(b)
全局解释器锁(英语:Global Interpreter Lock,缩写GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任什么时候刻仅有一个线程在执行。因此不少人说Python的线程是假线程,并能利用多核,并不能真正并行。之因此感受到线程并行,是由于线程上下文不断切换的缘故。Python 3.2开始使用新的GIL。新的GIL实现中用一个固定的超时时间来指示当前的线程放弃全局锁。在当前线程保持这个锁,且其余线程请求这个锁时,当前线程就会在5毫秒后被强制释放该锁。关于全局锁,强调三点:
(1)GIL的存在,同一时刻只能有一个线程在运行。
(2)GIL是CPython的特性,Jython,pypy等并没有GIL。
(3)Cpython的多线程适用于I/O密集型问题,计算密集型问题可以使用多进程编程。
在多线程编程中,有时候某个线程依赖另外一个线程的状态,须要使用threading库中的Event对象。 Event对象包含一个可由线程设置的信号标志,它容许线程等待某些事件的发生。可将线程设置等待Event对象, 直到有其余线程将Event对象设置为真,这些等待Event对象的线程将开始执行。Event()对象的经常使用方法:
event = threading.Event() # 建立threading.Event()对象 event.is_set() # 获取event的设置值,默认为False event.set() # 设置event的值为True event.clear() # 设置event的值为False event.wait() # 等到event的值被设为True就执行
下面经过“交通讯号灯”问题示范event的使用:
import threading import time def traffic_light(event): count = 0 event.set() while True: # 若是计数器[0, 5)之间, 红灯,event=False if 0 <= count < 5: event.clear() print("light is Red") # 若是计数器[5, 10)之间, 绿灯,event=True elif 5 <= count < 10: event.set() print("light is Green") # 若是计数器大于10,红灯,将event设置为False,计数器置为0 else: event.clear() count = 0 time.sleep(1) count += 1 def car(name, event): while True: if not event.is_set(): # event为False, 表示红灯, 车只能等待 print("RED, the %s is waiting..." % name) # 此处会阻塞住,直到event被设置为True在执行 event.wait() print("Green, The %s going...." % name) e = threading.Event() light = threading.Thread(target=traffic_light, args=(e,)) light.start() car1 = threading.Thread(target=car, args=("Tesla", e, )) car1.start()
交通讯号灯有红灯和绿灯两种状态,每5秒切换一次状态,而car()函数中,只要灯变绿就放car通行。运行试试看。
event对象的一个重要特色是当它被设置为真时会唤醒全部等待它的线程。若是你只想唤醒单个或者必定数目的线程,最好是使用信号量或者 Condition
对象来替代。
Condition对象
condition对象老是与锁关联,能够手动传入锁对象,也能够不传入使用默认值。当有多个线程须要等待某个变量改变时,才开始执行。这种状况能够用condition对象实现。condition对象的主要方法有:
condition = threading.Condition(lock=None) # 建立Condition对象 参数能够不传 condition.acquire() # 加锁 condition.release() # 解锁 condition.wait(timeout=None) # 阻塞,直到有调用notify(),或者notify_all()时再触发 condition.wait_for(predicate, timeout=None) # 阻塞,等待predicate条件为真时执行 condition.notify(n=1) # 通知n个wait()的线程执行, n默认为1 condition.notify_all() # 通知全部wait着的线程执行 with condition: # 支持with语法,没必要每次手动调用acquire()/release()
看一个例子不是很优雅的例子:
import threading import time condition = threading.Condition() # 建立condition对象 def func(): condition.acquire() # 若是没有with语句,必写这句,否者报错 condition.wait() # 阻塞,等待其余线程调用notify() print("in func..") condition.release() # 与acquire()成对出现 # 启10个线程 for i in range(10): t = threading.Thread(target=func, args=()) t.start() time.sleep(5) condition.acquire() condition.notify(2) # 通知两个线程执行 condition.release() # in func.. # in func.. # 其余8个线程会继续等待...
上例中,咱们看到启动的10个线程会等待5秒钟而且调用了notify(2)以后,才会通知两个线程继续运行。且这两个线程执行完毕以后,其余8个线程仍然会阻塞在condition.wait() 处。
频繁写acquire() / release()很繁琐,下面是优雅的写法:
import threading import time condition = threading.Condition() # 建立condition对象 def func(n): with condition: # with更优雅 condition.wait() # 阻塞,等待其余线程调用notify() print("in func..", n) # 启10个线程 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() time.sleep(5) with condition: condition.notify_all() # 通知全部线程执行
运行下,是否是等待5秒以后,全部线程都继续执行了?
信号量一般用于防范容量有限的资源,例如数据库服务器。通常而言信号量能够控制释放固定量的线程。好比启动100个线程,信号量的控制值设为5,那么前5个线程拿到信号量以后,其他线程只能阻塞,等到这5个线程释放信号量锁以后才能去拿锁。参见下例:
import threading import time def func(n): # semaphore.acquire() with semaphore: time.sleep(2) print("Thread::", n) # semaphore.release() semaphore = threading.BoundedSemaphore(5) # 信号量, 每次释放5个线程 thread_list = [] for i in range(23): t = threading.Thread(target=func, args=(i,)) thread_list.append(t) t.start() for j in thread_list: j.join() print("all threads done")
上例中,能够看到线程是每5个一组进行释放的。
Barriers字面意思是“屏障”,是Python线程(或进程)同步原语。每一个线程中都调用wait()方法,当其中一个线程执行到wait方法处会立阻塞;一直等到全部线程都执行到wait方法处,全部线程再继续执行。参见下例:
import time import threading bar = threading.Barrier(3) # 建立barrier对象,指定知足3个线程 def worker1(): print("worker1") time.sleep(1) bar.wait() print("worker1 end") def worker2(): print("worker2") time.sleep(2) bar.wait() print("worker2 end") def worker3(): print("worker3") time.sleep(5) bar.wait() print("worker3 end") thread_list = [] t1 = threading.Thread(target=worker1) t2 = threading.Thread(target=worker2) t3 = threading.Thread(target=worker3) thread_list.append(t1) thread_list.append(t2) thread_list.append(t3) for t in thread_list: t.start() # 每一个线程中都调用了wait()方法,在全部(此处设置为3)线程调用wait方法以前是阻塞的。 # 也就是说,只有等到3个线程都执行到了wait方法这句时,全部线程才继续执行。
上例中,能够看到,全部线程会先各自运行wait()方法以前的代码,到wait()处阻塞。等待最后一个线程执行到wait()处,也就是5秒以后,全部线程恢复执行。
两个或多个线程之间相互发送数据最安全的方式可能就是使用 queue 库中的队列了。建立一个线程共享的 Queue 对象,线程经过使用 put()和 get()操做来向队列中添加或者删除元素。Queue对象已经内置了锁机制,编程时没必要手动操做锁。下例producer()函数表明包子铺,生产包子放入队列中;consumer()函数表明吃包子的人,不断从队列中取出包子吃掉;以此演示线程间经过队列通讯。
from queue import Queue import threading import time q = Queue(10) def producer(): n = 0 while True: q.put("包子%s" % n) print("包子铺生产 包子%s" % n) n += 1 time.sleep(2) def consumer(): while True: r = q.get() print("bucker 吃掉 %s" % r) time.sleep(1) t1 = threading.Thread(target=producer) t1.start() t2 = threading.Thread(target=consumer) t2.start()
形如上例的编程模型,又叫生产者-消费者模型。它下降了程序以前的耦合,使得队列的上游只关注生产数据,队列的下游只关注消费数据。在票务系统,或者资源有限的状况中可用此模型。补充两点:
(1)get() 和 put() 方法都支持非阻塞方式和设定超时。
(2)q.qsize() , q.full() , q.empty() 等能够获取一个队列的当前大小和状态。但它们不是线程安全的,尽可能别用。
Python3.2开始,增长了标准库concurrent.futures,该库中的ThreadPoolExecutor是自带的线程池。简单使用:
from concurrent.futures import ThreadPoolExecutor import time def tell(i): print("this is tread {}.".format(i)) time.sleep(1) if __name__ == '__main__': future = ThreadPoolExecutor(10) a = "ddd" for i in range(100): future.submit(tell, (i,)) # 添加一个线程到线程池 future.shutdown(wait=True) # 此函数用于释放异步执行操做后的系统资源。
其中,submit()方法第一个参数为函数名,第二个为函数的参数。shutdown(wait=True)用于释放异步执行操做后的系统资源。ThreadPoolExecutor还有一个优势就是:任务提交者更方便的从被调用函数中获取返回值。参见下例:
import concurrent.futures import requests URLS = ['http://www.cnblogs.com/zingp/p/5878330.html', 'http://www.cnblogs.com/zingp/', 'https://docs.python.org/'] # 爬取网页内容 def load_url(url, timeout): with requests.get(url, timeout=timeout) as conn: return conn.text with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # 建立future对象和对应的url的字典 future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as err: print('url:%s -- err: %s' % (url, err)) else: print(url, len(data)) # http://www.cnblogs.com/zingp/ 12391 # http://www.cnblogs.com/zingp/p/5878330.html 90029 # https://docs.python.org/ 9980
上例建立一个大小为3的线程池,用了很多with语句,并用future.result() 获取函数返回值。最终,咱们看到爬取了三个网页,并得到网页内容。future.result()操做会阻塞,直到对应的函数执行完成并返回一个结果。
此外,ThreadPoolExecutor还提供了异步回调的功能,大大简化了多线程编程中处理线程返回结果的难度,参见下例:
from concurrent.futures import ThreadPoolExecutor import time import threading def tell(i): print("this is tread {}.".format(i)) time.sleep(1) return [i, threading.get_ident()] # 必须有返回,经过.result()拿到返回值 def callback(obj): # obj 至关于传过来的future独享,且回调函数必须有这个参数 result = obj.result() # 线程函数的返回值 print(result) if __name__ == '__main__': future = ThreadPoolExecutor(10) a = "ddd" for i in range(100): # 线程运行结束后将future对象传给回调函数callback(obj) future.submit(tell, i,).add_done_callback(callback) future.shutdown(wait=True) # 此函数用于释放异步执行操做后的系统资源。
Python3.2之前并无自带线程池,那时每每采用自定义线程池。下面一个就是自定义线程池的例子,看看是否可以看得懂:
import queue import threading import contextlib StopEvent = object() class ThreadPool(object): """定义一个线程池类。""" def __init__(self, max_num, max_task_num=None): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 线程池执行一个任务。 :param func: 任务函数; :param args: 任务函数所需参数; :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数一、任务函数执行状态; 二、任务函数返回值(默认为None,即:不执行回调函数); :return: 若是线程池已经终止,则返回True不然None。 """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 建立一个线程。 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循环去获取任务函数并执行任务函数。 """ current_thread = threading.currentThread() self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 执行完全部的任务后,全部线程中止。 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 不管是否还有任务,终止线程。 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.queue.clear() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用于记录线程中正在等待的线程数。 """ state_list.append(worker_thread) try: # 遇到yield就返回回去执行with中的语句,执行完了回来。 yield finally: state_list.remove(worker_thread)
建立大的线程池的一个可能须要关注的问题是内存的使用。 例如,若是你在OS X系统上面建立2000个线程,系统显示Python进程使用了超过9GB的虚拟内存。 不过,这个计算一般是有偏差的。当建立一个线程时,操做系统会预留一个虚拟内存区域来 放置线程的执行栈(一般是8MB大小)。可是这个内存只有一小片断被实际映射到真实内存中。 所以,Python进程使用到的真实内存其实很小 (好比,对于2000个线程来说,只使用到了70MB的真实内存,而不是9GB)。若是担忧虚拟内存大小,可使用 threading.stack_size() 函数来下降它。
import threading threading.stack_size(65536)
若是加上这条语句并再次运行前面的建立2000个线程试验, 会发现Python进程只使用到了大概210MB的虚拟内存,而真实内存使用量没有变。 注意线程栈大小必须至少为32768字节,一般是系统内存页大小(409六、8192等)的整数倍。
同步的定义是:在发出一个功能调用时,在没有获得结果以前,该调用就不返回,同时其它线程也不能调用这个方法。按照这个定义,其实绝大多数函数都是同步调用。
可是一般说进程、线程同步,每每特指多进程、线程编程时,多个进程、线程之间协同步调,按预约的前后次序进行运行。好比线程A和线程B一块儿配合,A执行到必定程度依赖B的某个结果,因而停下来示意B运行,B开始执行,执行完将结果返回给A,A接着执行。这里的“同”应该理解为协同、协助、互相配合。
在多线程编程里面,一些敏感数据不容许被多个线程同时访问,此时就使用同步访问技术,保证数据在任什么时候刻,最多有一个线程访问,以保证数据的完整性。
原语:前文说起原语,不少同窗可能不了解这个名词的意思。内核或微核提供核外调用的过程或函数称为原语(primitive)。操做系统用语范畴。是由若干多机器指令构成的完成某种特定功能的一段程序,具备不可分割性。即原语的执行必须是连续的,在执行过程当中不容许被中断。不一样层次之间对话的语言称为原语,即不一样层之间经过原语来实现信息交换。
(1)Python多线程编程经常使用threading模块。启动一个多线程须要建立一个Thread对象,调用star()方法启动线程。注意is_alive() /join()方法和daemon参数的使用。
(2)python多线程锁有Lock / Rlock, 全局锁GIL。GIL是CPython特性,同一时刻只能运行一个线程,不能利用多核资源。
(3)线程同步原语有Event / Condition / Semaphore / Barrier。Event用于经常使用语通知所有线程,condition和Semapher经常使用于通知必定数量的线程, Barrier用于多个线程必须完成某些步骤再一块儿执行。
(4)Lock / Rlock / Event / Condition / Semaphore 支持上下文管理协议(with语句,好用)。
(5)线程间通讯能够用queue模块中的Queue队列,get()和put()已加锁,是线程安全的。qsize()/full()/empty() 等能够获取一个队列的当前大小和状态, 不是线程安全的,尽可能别用。
(6)concurrent.futures中的ThreadPoolExecutor是Python3.2以后自带的线程池模块,十分好用,支持with语句,经过future.result()获取线程返回值。
(7)Python多线程适用于I/O密集型问题,CPU密集型问题能够用C代码优化底层算法提高性能,需注意一个写的很差的C语言扩展会致使这个问题更加严重;也能够用pypy或者多进程。
以上是本篇所有内容,欢迎读者批评指正。
参考资料: