Python | 感知线程状态的解决方案,Event与信号量

本文始发于我的公众号:TechFlow,原创不易,求个关注web


今天是Python专题的第21篇文章,咱们继续多线程的话题。多线程

上周的文章当中咱们简单介绍了线程和进程的概念,以及在Python当中如何在主线程以外建立其余线程,而且还了解了用户级线程和后台线程的区别以及使用方法。今天咱们来看看线程的其余使用,好比如何中止一个线程,线程之间的Event用法等等。并发

中止线程

利用Threading库咱们能够很方便地建立线程,让它按照咱们的想法执行咱们想让它执行的事情,从而加快程序运行的效率。然而有一点坑爹的是,线程建立以后,就交给了操做系统执行,咱们没法直接结束一个线程,也没法给它发送信号,没法调整它的调度,也没有其余高级操做。若是想要相关的功能,只能本身开发。socket

怎么开发呢?编辑器

咱们建立线程的时候指定了target等于一个咱们想让它执行的函数,这个函数并不必定是全局函数,实际上也能够是一个对象中的函数。若是是对象中的函数,那么咱们就能够在这个函数当中获取到对象中的其余信息,咱们能够利用这一点来实现手动控制线程的中止。函数

提及来好像不太好理解,可是看下代码真的很是简单:工具

import time
from threading import Thread  class TaskWithSwitch:  def __init__(self):  self._running = True   def terminate(self):  self._running = False   def run(self, n):  while self._running and n > 0:  print('Running {}'.format(n))  n -= 1  time.sleep(1)  c = TaskWithSwitch() t = Thread(target=c.run, args=(10, )) t.start() c.terminate() t.join() 复制代码

若是你运行这段代码,会发现屏幕上只输出了10,由于咱们将_running这个字段置为False以后,下次循环的时候再也不知足循环条件,它就会本身退出了。flex

若是咱们想要用多线程来读取IO,因为IO可能存在堵塞,因此可能会出现线程一直没法返回的状况。也就是说咱们在循环内部卡死了,这个时候单纯用_running来判断仍是不够的,咱们须要在线程内部设置计时器,防止循环内部的卡死。ui

class IOTask:
 def __init__(self):  self._running = True   def terminate(self):  self._running = False   def run(self, sock):  # 在socket中设置计时器  sock.settimeout(10)  while self._running:  try:  # 因为设置了计时器,因此这里不会永久等待  data = sock.recv(1024)  break  except socket.timeout:  continue  return 复制代码

线程信号的传递

咱们之因此如此费劲才能控制线程的运行,主要缘由是线程的状态是不可知的,而且咱们没法直接操做它,由于它是被操做系统管理的。咱们运行的主线程和建立出来的线程是独立的,二者之间并无从属关系,因此想要实现对线程的状态进行控制,每每须要咱们经过其余手段来实现。url

咱们来思考一个场景,假设咱们有一个任务,须要在另一个线程运行结束以后才能开始执行。要想要实现这一点,就必须对线程的状态有所感知,须要其余线程传递出信号来才行。咱们可使用threading中的Event工具来实现这一点。Event工具就是能够用来传递信号的,就好像是一个开关,当一个线程执行完成以后,会去启动这个开关。而这个开关控制着另一段逻辑的运行。

咱们来看下样例代码:

import time
from threading import Thread, Event  def run_in_thread():  time.sleep(1)  print('Thread is running')  t = Thread(target=run_in_thread) t.start()  print('Main thread print')  复制代码

咱们在线程里面就只作了输出一行提示符,没有其余任何逻辑。因为咱们在run_in_thread函数当中沉睡了1s,因此必定是先输出Main thread print再输出的Thread is running。假设这个线程是一个很重要的任务,咱们但愿主线程可以等待它运行到一个阶段再往下执行,咱们应该怎么办呢?

注意,这里说的是运行到一个阶段,并非运行结束。运行结束咱们很好处理,能够经过join来完成。但若是不是运行结束,而是运行完成了某一个阶段,固然经过join也能够,可是会损害总体的效率。这个时候咱们就必需要用上Event了。加上Event以后,咱们再来看下代码:

import time
from threading import Thread, Event  def run_in_thread(event):  time.sleep(1)  print('Thread is running')  # set一下event,这样外面wait的部分就会被启动  event.set()  # 初始化Event event = Event() t = Thread(target=run_in_thread, args=(event, )) t.start()  # event等待set event.wait() print('Main thread print') 复制代码

总体的逻辑没有太多的修改,主要的是增长了几行关于Event的使用代码。

咱们若是要用到Event,最好在代码当中只使用一次。固然经过Event中的clear方法咱们能够重置Event的值,但问题是咱们没办法保证重置的这个逻辑会在wait以前执行。若是是在以后执行的,那么就会问题,而且在debug的时候会异常痛苦,由于bug不是必现的,而是有时候会出现有时候不会出现。这种状况每每都是由于多线程的使用问题。

因此若是要屡次使用开关和信号的话,不要使用Event,可使用信号量。

信号量

Event的问题在于若是多个线程在等待Event的发生,当它一旦被set的时候,那么这些线程都会同时执行。但有时候咱们并不但愿这样,咱们但愿能够控制这些线程一个一个地运行。若是想要作到这一点,Event就没法知足了,而须要使用信号量。

信号量和Event的使用方法相似,不一样的是,信号量能够保证每次只会启动一个线程。由于这二者的底层逻辑不太一致,对于Event来讲,它更像是一个开关。一旦开关启动,全部和这个开关关联的逻辑都会同时执行。而信号量则像是许可证,只有拿到许可证的线程才能执行工做,而且许可证一次只发一张。

想要使用信号量并不须要本身开发,thread库当中为咱们提供了现成的工具——Semaphore,咱们来看它的使用代码:

# 工做线程
def worker(n, sema):  # 等待信号量  sema.acquire()  print('Working', n)  # 初始化 sema = threading.Semaphore(0) nworkers = 10 for n in range(nworkers):  t = threading.Thread(target=worker, args=(n, sema,))  t.start() 复制代码

在上面的代码当中咱们建立了10个线程,虽然这些线程都被启动了,可是都不会执行逻辑,由于sema.acquire是一个阻塞方法,没有监听到信号量是会一直挂起等待。

当咱们释放信号量以后,线程被启动,才开始了执行。咱们每释放一个信号,则会多启动一个线程。这里面的逻辑应该不难理解。

总结

在并发场景当中,多线程的使用毫不是多启动几个线程作不一样的任务而已,咱们须要线程间协做,须要同步、获取它们的状态,这是很是不容易的。一不当心就会出现幽灵bug,时显时隐,这也是并发问题让人头疼的主要缘由。

这篇文章当中咱们只是简单介绍了线程间通讯的基本方法,针对这个问题,还有更好的解决方案。咱们将在后续的文章当中继续讨论这个问题,敬请期待。

今天的文章到这里就结束了,若是喜欢本文的话,请来一波素质三连,给我一点支持吧(关注、转发、点赞)。

本文使用 mdnice 排版

相关文章
相关标签/搜索