python 线程、进程、协程---线程的用法

python做为动态脚本解释性语言,在编译速度上面一直是被诟病的地方,不少时候在群里面你们对于python的性能都有着不少的质疑, 其实经过一些优化仍是可让py程序变得很高效。python

一. 从线程开始讲起

1. 介绍

  在python中若是提及多线程确定是会被认为是伪多线程,由于python在1989年圣诞节诞生的时候计算机硬件并无那么发达,计算机也都是单核存在,因此天然而然都是以单线程去运行程序, 等到多核的出现,python开始支持多线程了, 那么解决多线程之间的数据完整性和状态同步最简单的方法就是加锁, 因此GIL(Global Interperter Lock, 全局解释器锁)出现了,顾名思义它并非在python语言特性上的东西, 而是在python解释器上, GIL给线程加锁, 在同一个进程中,同一时间只容许一个线程运行,当线程切换的时候同时释放又从新加锁,这么一想反而多线程变得更加的麻烦,那么python的多线程真的是鸡肋吗?算法


2. 实际分析

2.1 经常使用方法

  • 在python中threading库能够用来实现线程, 请看下面的🌰:编程

    import time
    def countdown(n):
    	while n > 0:
    	print('T-mins', n)
    	n -= 1
    	time.sleep(1)
    
    from threading import Thread
    t = Thread(target= countdown, args=(10, )) # 此时建立了一个线程实例对象t
    t.start() # 调用start()方法线程才开始被执行
    
    t.is_alive()  # 查看该线程是否属于活跃状态
    
    t.join()  # join()执行会等待该线程结束
    
    
    th = Thread(target= countdown, args=(10, ), daemon=True) # 守护线程
    
    复制代码
  • 上述例子中线程实例会在它们本身所属的系统级线程(即,POSIX 线程或者Windows线程)中执行, 这些线程彻底由操做系统来管理,一旦被启动后线程就开始独立运行,直到目标函数返回。当设置为守护线程时,该线程是没法被链接的,可是当主线程结束后他们就会自动销毁掉.安全

  • 以上就是标准库提供给咱们的一些经常使用的方法,其他的一些操做好比说,终止进程给线程发信号调整线程调度属性以及执行其余的高级操做都尚未,须要本身去构建,好比想要终止线程,这个线程就必需要在某个时间点轮询退出状态,那么能够将线程放到下面这样的类中:bash

    class CountdownTask(object):
    	def __init__(self):
    		self._running = True
    	
    	def terminate(self):
    		self._running = False
    	
    	def run(self, n):
    		while self._running and n > 0:
    			print('T-minus', n)
    			n -= 1
    			time.sleep(5)
    
    c = CountdownTask()
    t = Thread(target=c.run, args=(10, ))
    t.start()
    
    c.terminate()
    t.join()
    
    复制代码
  • 对于上面的例子其实适用于CPU密集型的, 并不会有太多的阻塞性操做,可是若是是一些I/O操做的操做实现同步就会变得很棘手,若是一个任务一直被阻塞,那对于其余线程来说并不能及时终止,因此须要当心的为线程加上超时循环, 例以下面一段代码:网络

    class IOTask(object):
    	def terminate(self):
    		self._running = False
    	
    	def run(self, sock):
    		# sock is a socket
    		sock.settimeout(5)
    		while self._running:
    			try:
    				data = sock.recv(8192)
    				break
    			except socket.timout:
    				continue 
    		
    		return
    
    复制代码

2.2 判断线程是否启动和线程间同步问题

  • 问题: 当建立一个线程的时候,可是想知道它实际是在何时运行的,首先咱们来看线程的核心特征是可以以非肯定性的方式(即 什么时候开始执行、什么时候被打断、什么时候恢复执行彻底由操做系统调度管理)独立运行的数据结构

  • 解决方案: 咱们能够用threading库中的Event对象, 它容许线程等待某个时间发生。 初始状态时事件被设置为0, 若是事件没有被设置而线程正在等待该事件, 那么线程就会被阻塞, 这话听起来有点绕口,就是若是被标记的线程没有设置,以后等待的全部线程都会一直等待。具体示例请看下面代码:多线程

    from threading import Thread, Event
    import time
    
    def countdown(n, start_evt):
    	print('starting') 
    	start_evt.set() # 设置
    	while n > 0:
    		print('T-minus', n)
    		n -= 1
    		time.sleep(5)
    		
    start_evt = Event()
    t = Thread(ttarget=countdown, args=(10, start_evt))
    t.start()
    
    started_evt.wait()  # 等待 
    print('running') # running 总于 starting 后打印
    
    复制代码
  • 注: Event最好只用于一次性的事件, 也就是咱们建立一个事件, 让线程等待事件被设置, 而后一旦完成设置了Event对象就被丢弃, 尽管可使用clear()方法来清除事件, 可是要安全地清除事件并等待它被再次设置这个过程很难同步协调,可能会形成事件丢失、死锁或者其余的问题,好比咱们没法保证事件再次发起的时候是否被清除若是须要反复通知某个事件, 那最好使用Condition对象来处理, 好比实现一个定时器, 每次超时的时候其余线程能够感知到超时时间, 代码以下:app

    import threading
    import time
    
    class PeriodicTimer(object):
    	def __init__(self, interval):
    		self._interval = interval
    		self._flag = 0
    		self._cv = threading.Condition()
    	
    	def start(self):
    		t = threading.Thread(target=self.run)
    		t.daemon = True
    		t.start()
    	
    	def run(self):
    		''' 启动定时器而且通知其余等待线程 '''
    		while True:
    			time.sleep(self._interval)
    			with self._cv:
    				self._flag ^= 1
    				self._cv.notify_all()
    
    	def wait_for_tick(self):
    		''' Wait for the next tick of the timer '''
    		with self._cv:
    			last_flag = self._flag
    			while last_flag == self._flag:
    				self._cv.wait()
    	# 示例 使用定时器
    	ptimer = PeriodicTimer(5)
    	ptimer.start()
    	
    	
    	def countdown(nticks):
    		while nticks > 0:
    			ptimer.wait_for_tick()
    			nticks -= 1
    	
    	def countup(last):
    		m = 0
    		while n < last:
    			ptimer.wait_for_tick()
    			n += 1
    
    threading.Thread(target=countdown, args(10, )).start()
    threading.Thread(target=countup, args(10, )).start()
    
    复制代码

2.3 线程间通讯

  • 要实现线程间通讯最安全的作法仍是队列,就是使用queue模块中的Queue了,首先先建立一个Queue实例,它会被全部的线程共享,以后就可使用put()和get()操做来给队列添加或者移除元素,示例以下:socket

    from queue import Queue
    from threading import Thread
    
    def producer(out_q):
    	while True:
    		out_q.put(data)
    
    def consumer(in_q):
    	while True:
    		data = in_q.get()
    
    q = Queue(20)  # size=20能够设置队列大小或者不写
    t1 = Thread(target= producer, args=(q,))
    t2 = Thread(target= consumer, args=(q,))
    t1.start()
    t2.start()
    复制代码
  • Queue实例已经拥有了全部所需的锁, 所以能够在线程间通讯,同时也能够在进程间共享,当使用队列时如何对生产者和消费者的关闭过程进行同步协调,须要使用一些技巧使得消费者在必定的条件下退出,示例以下:

    from queue import Queue
    from threading import Thread
    _sentinel = object()
    
    def producer(out_q):
    	while True:
    		out_q.put(data)
    	out_q.put(_sentinel)
    
    def consumer(in_q):
    	while True:
    		data = in_q.get()
    		if data in _sentinel:
    			in_q.put(_sentinel)
    			break
    
    q = Queue(20)  # size=20能够设置队列大小或者不写
    t1 = Thread(target= producer, args=(q,))
    t2 = Thread(target= consumer, args=(q,))
    t1.start()
    t2.start()
    复制代码
  • 当消费者接收到这个特殊的终止符以后会当即将其放回队列中,那么其余监听同一队列的线程也会接收到终止符,所以能够一次关掉线程,尽管队列是线程通讯的最多见的机制,可是只要添加了所须要的锁和同步功能, 就能够构建本身的线程安全型的数据结构,最多见的作法就是将数据结构和条件变量打包在一块儿,用headq实现优先级队列,示例代码以下:

    import headq
    import threading
    
    class PriorityQueue(object):
    	
    	def __init(self):
    		self._queue = []
    		self._count = 0
    		self._cv = threading.Condition()
    	
    	def put(self, item, priority):
    		with self._cv:
    			headq.heappush(self._queue, (-priority, self._count, item))
    			self._count += 1
    			self._cv.notify()
    	
    	def get(self):
    		with self._cv:
    			while len(self._queue) == 0:
    				self._cv.wait()
    			
    			return headq.heappop(self._queue)[-1]
    			
    复制代码
  • 可是队列通讯是一种单方向且不肯定的过程。通常来讲咱们没法得知接受线程什么时候会收到消息开始工做,Queue对象的确提供了一些基本的事件完成功能,能够经过task_done()和join()实现。 当消费者已经处理了数据以后生产者须要对此当即感知的话,那么能够将发送的数据和一个Event事件绑定(元祖的形式绑定),这样就容许生产者能够监视这个过程了, 代示例以下:

    from queue importt Queue
    from threading immport Thread, Event
    
    def porducer(out_q):
    	while running:
    		evt = Event()
    		out_q.put((data, evt))
    		evt.wait()
    
    
    def consumer(in_q):
    	while True:
    		data, evt = in_q.get() # 拆包
    		evt.set()
    
    复制代码
  • 用队列机制实现多线程能够避免不少锁机制以及底层同步原理了,并且队列进行通讯易于拓展,在不一样线程中要传递对象引用的话若是须要关心共享状态,那么只传递不可变类型的数据时能够对队列进行深拷贝。代码示例:

    from queue import Queue
    from threading import Thread
    import copy
    
    def producer(out_q):
    	while True:
    		out_q.out(copy.deepcopy(data))
    
    def consumer(in_q):
    	while True:
    		data = in_q.get()
    		
    复制代码

    此外在队列中还有不少内置方法,好比empty(), full(), size().

2.4 下面咱们来看一下python的多线程的性能, 请看下面一段代码

  • 注:建立mutex互斥锁防止资源竞争

示例一

```
	[output]:  多线程消耗的时间: 6s
	[output]:  单线程消耗的时间: 1s
	
	```
复制代码

结果发如今进行cpu运算的时候反而是单线程的耗时较短,好像确实多线程起到了拔苗助长的做用

2.5 请再看一段代码

  • 注:一样我建立了task四、五、6用于完成对百度首页的100次请求

    • 咱们发现多线程确实是比单线程快了不少,这样看来在网络请求当中python的多线程仍是有用的

2.6. 分析

  • 那为何会出现俩种不同的结果呢?对! 使用类型不同, 前者是cpu密集型的后者为io密集型,第一段出现的缘由是cpu的运算速度比python线程切换的时间还快, 第二段网络请求速度和线程切换速度比前者是耗时比较长的,因此多线程仍是起到了做用,针对于如今的网络应用场景,基本上是为io密集型, 若是碰到一些大数据运算, 那么python还有不少办法, 好比引用第三方库 numpy,何况在如今多核cpu的优点下, 彻底能够用多进程来解决这个问题,一般状况让进程数和核数相同便可。

3. 实际场景

  • 虽然pyhton弱化了类的引用, 可是实际运用的过程当中面向对象编程的思想仍是很实用的, 因此在建立线程能够这样,能够继承Thread类。

  • 刚才介绍了一些python的多线程适用于IO密集型的操做, 可是是否是用的越多效率越高呢?会不会出现什么问题呢? 刚才在 请看下面示例

  • 我建立了task一、task2分别用两个线程去运行, 结果会出现什么呢?实际发现运行代码以后一直 属于阻塞状态,咱们把这种状态叫作**死锁**。举个实际场景中的例子,多线程爬虫咱们启用线程A去抓取网页内容,线程B作页面解析,线程C作文件保存处理,这个时候资源不得不共享。那么有没有实际解决的方法呢?

4. 死锁的解决方案

4.1 死锁解决方案

  • 添加超时时间

  • 设置加锁机制:给每一个锁分配惟一id,按照升序规则使用多个锁

  • 可重入锁 RLock

  • 银行家算法

    • 添加超时时间: 在加锁的时候mutex.acquire(timeout=10)便可

    • 设置加锁机制:分配id而且按照升序可使用上下文管理器实现,代码实例以下

    • 分析:表面上thread-1的先获取锁x,再获取锁y,而thread-2是先获取锁y,再获取x。 可是实际上,acquire函数,已经对x,y两个锁进行了排序。因此thread-1,thread-2都是以同一顺序来获取锁的,是不会形成死锁的。

    • 可重入锁: 针对于同一个线程中咱们能够用 lock = threading.RLock()

    • 银行家算法: 当一个进程申请使用资源的时候,银行家算法经过先 试探 分配给该进程资源,而后经过安全性算法判断分配后的系统是否处于安全状态,若不安全则试探分配做废,让该进程继续等待。

      分析:首先是银行家算法中的进程: 包含进程Pi的需求资源数量(也是最大需求资源数量,MAX) 已分配给该进程的资源A(Allocation) 还须要的资源数量N(Need=M-A) Available为空闲资源数量,即资源池(注意:资源池的剩余资源数量+已分配给全部进程的资源数量=系统中的资源总量) 假设资源P1申请资源,银行家算法先试探的分配给它 (固然先要看看当前资源池中的资源数量够不够),若申请的资源数量小于等于Available,而后接着判断分配给P1后剩余的资源,能不能使进程队列的某个进程执行完毕,若没有进程可执行完毕,则系统处于不安全状态(即此时没有一个进程可以完成并释放资源,随时间推移,系统终将处于死锁状态)。 如有进程可执行完毕,则假设回收已分配给它的资源(剩余资源数量增长),把这个进程标记为可完成,并继续判断队列中的其它进程,若全部进程均可执行完毕,则系统处于安全状态,并根据可完成进程的分配顺序生成安全序列(如{P0,P3,P2,P1}表示将申请后的剩余资源Work先分配给P0–>回收(Work+已分配给P0的A0=Work)–>分配给P3–>回收(Work+A3=Work)–>分配给P2–>······知足全部进程)。

相关文章
相关标签/搜索