目录python
线程同步,线程间协同,经过某种技术,让一个线程访问某些数据时,其余线程不能访问这些数据,直到该线程完成对数据的操做后。不一样的操做系统有多种实现方式。好比临界区(Critical Section)、互斥锁(Mutex)、信号量(Semaphore)、事件(Event)等。多线程
Event是线程间通信机制最简单的实现,使用一个内部标记flag,主要提供了三个方法wait、clear、set,经过操做flag来控制线程的执行。app
Event对象在全局定义了一个'Flag',若是'Flag'值为 False,那么当程序执行 Event对象的wait方法时就会阻塞,若是'Flag'值为True,那已经阻塞的wait方法会继续执行。dom
在使用threading.Event 实现线程间通讯时:使用threading.Event可使一个线程等待其余线程的通知,咱们把这个Event传递到线程对象中,Event默认内置了一个标志,初始值为False。一旦该线程经过wait()方法进入等待状态,直到另外一个线程调用该Event的set()方法将内置标志设置为True时,该Event会通知全部等待状态的线程恢复运行。性能
有下面代码,大欣负责吃包子,厨师负责作包子,只有厨师作好了,大欣才能开始吃。ui
import time import random event = threading.Event() def consumer(): current = threading.current_thread() print('{} 等着吃包子...'.format(current.name)) event.wait() print('包子来了,我正在吃') for i in range(1,5): time.sleep(random.randrange(1,3)) print('{} 吃 包子-{}。'.format(current.name,i)) def chef(): current = threading.current_thread() print('{} 正在作包子'.format(current.name)) time.sleep(3) print('{} 包子作好了,下班'.format(current.name)) event.set() chef = threading.Thread(target=chef,name='大厨师') chef.start() consumer = threading.Thread(target=consumer,name='大欣') consumer.start()
运行consumer时,包子还没作,因此只能等着,等chef作完了之后,设置了event为True,这时consumer就开始吃了。wait还能够指定等待时间,好比chef作的太慢了,consumer不吃了。操作系统
import time import random event = threading.Event() def consumer(): current = threading.current_thread() print('{} 等着吃包子...'.format(current.name)) if not event.wait(2): print('太慢了,不吃了') else: print('包子来了,我正在吃') for i in range(1,5): time.sleep(random.randrange(1,3)) print('{} 吃 包子-{}。'.format(current.name,i)) def chef(): current = threading.current_thread() print('{} 正在作包子'.format(current.name)) time.sleep(8) print('{} 包子作好了,下班'.format(current.name)) event.set() chef = threading.Thread(target=chef,name='大厨师') chef.start() consumer = threading.Thread(target=consumer,name='大欣') consumer.start()
当Event被set后,wait的返回值就是True,若是wait(2),在2秒内,Event没有被set,那么返回值是False。线程
因为线程间的数据是共享的,当咱们多个线程操做一个相同的用户的数据时,有可能形成混乱,以下例子:code
import threading import time n = 10 def work(): global n while n > 0: time.sleep(0.01) n -= 1 if __name__ == '__main__': t_l = [] for i in range(10): t = threading.Thread(target=work) t_l.append(t) t.start() for t in t_l: t.join() print(n)
咱们认为结果应该是0,可是结果可能不如人意,由于进程间共享数据的问题,多个进程同时修改共享数据时,因为GIL的存在同一时刻只有1个线程在运行。当n的值为1的时候,10个子线程颇有可能同时判断成功,再要修改的时候被挂起(时间片用完),等到真正回来修改的时候,n已经被其余线程改过来!因此若是要保持数据的正确性,那么就须要牺牲性能,即便用锁机制。orm
import threading mutex = threading.Lock()
# 调用方式一 mutex.acquire() # 加锁 '''code''' mutex.release() # 解锁 # 调用方式二 with mutex: '''code''' # 离开wit代码段,自动解锁
名称 | 含义 |
---|---|
acquire(blocking=True,timeout=-1) | 获取锁并加锁 blocking:表示是否阻塞,默认为True表示阻塞。 timeout:表示阻塞超时时间。 当blocking为非阻塞时,timeout不能进行设置 |
release() | 释放锁,能够从任何线程上调用释放. 已上锁,调用时会释放锁,即重置为unlocked。 未上锁,调用时会抛出RuntimeError异常 |
因此,上面的例子能够有以下修改:
import threading import time mutex = threading.Lock() n = 10 def work(): global n while True: mutex.acquire() if n > 0: time.sleep(1) n -= 1 mutex.release() else: mutex.release() break if __name__ == '__main__': t_l = [] for i in range(10): t = threading.Thread(target=work) t_l.append(t) t.start() for t in t_l: t.join() print(n)
在判断的时候就开始加锁,在修改完毕的时候解锁。这样加锁的状况下咱们发现运行时间变长了,那是由于只有抢到锁的线程才能够工做(穿行执行),
有下面一个计数器类,来看如何加锁
import threading class Counter: def __init__(self): self._value = 0 @property def value(self): return self._value def inc(self): self._value += 1 def dec(self): self._value -= 1 def calc(c:Counter): for _ in range(1000): for i in range(-50,50): if i < 0: c.dec() else: c.inc() c = Counter() lst = [] for i in range(10): t = threading.Thread(target=calc, args=(c,)) lst.append(t) t.start() for t in lst: t.join() print(c.value)
在须要调用和修改的地方加锁,修改完毕后解锁,是锁使用的基本原则,通常来讲,加锁就要解锁,可是加锁和解锁之间会有一些代码要执行,若是出现异常,那么锁是没法释放的,可是当前线程已经终止了,这种状况通常称为死锁,能够添加异常处理,来确保锁必定被释放。
import threading mutex = threading.Lock() class Counter: def __init__(self): self._value = 0 @property def value(self): return self._value def inc(self): try: # 添加异常处理,即使时崩溃也能够释放锁 mutex.acquire() self._value += 1 finally: mutex.release() def dec(self): with mutex: # 上下文管理写法 self._value -= 1 def calc(c:Counter): for _ in range(1000): for i in range(-50,50): if i < 0: c.dec() else: c.inc() c = Counter() lst = [] for i in range(10): t = threading.Thread(target=calc, args=(c,)) lst.append(t) t.start() for t in lst: t.join()
固然这里也能够为每个计数器实例对象初始化一个本身的锁,若是用全局锁,那么不一样的计数器实例,会相互影响(由于多个实例,共享一把锁),由于不一样实例的结果是不一样的,因此建议为每一个实例构建一个本身的锁。
class Counter: def __init__(self): self._value = 0 self._lock = threading.Lock() @property def value(self): return self._value def inc(self): try: self._lock.acquire() self._value += 1 finally: self._lock.release() def dec(self): with self._lock: self._value -= 1
当lock.acquire(False)时,该锁就是非阻塞锁了,调用时,仅获取一次,若是获取到那么返回True,不然返回False。
import time import threading import random import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s %(thread)s %(message)s') def worker(tasks:list): for task in tasks: if task.lock.acquire(False): # 抢到任务 time.sleep(random.randrange(1,3)) # 模拟执行任务须要的时间 logging.info('{} I am working for {}'.format(threading.current_thread().name, task)) else: # 没有抢到任务 logging.info('{} not get {}'.format(threading.current_thread().name,task)) class Task: def __init__(self,name): self.task = name self.lock = threading.Lock() def __repr__(self): return self.task __str__ = __repr__ if __name__ == '__main__': task_list = [Task('task{}'.format(x)) for x in range(1,11)] # 构建任务列表 for _ in range(10): threading.Thread(target=worker,args=(task_list,)).start() # 开启多线程执行任务
10个任务,交给10个线程运行,谁抢到哪一个线程,谁就运行。
锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。但若是都是读取的话,就不须要了。使用锁的时候有一下几点须要特别注意: