线程同步,线程间协同,经过某种技术,让一个线程访问某些数据时,其余线程不能访问这个数据,直到该线程完成对数据的操做为止。python
临界区(critical section 全部碰到的都不能使用,等一个使用完成),互斥量(Mutex一个用一个不能用),信号量(semaphore),事件event数据库
event 事件。是线程间通讯机制中最简单的实现,使用一个内部标记的flag,经过flag的True或False的变化来进行操做。缓存
名称 | 含义 |
---|---|
set() | 标记设置为True,用于后面wait执行和is_set检查 |
clear() | 标记设置为False |
is_set() | 标记是否为True |
wait(timeout=None) | 设置等待标记为True的时长,None为无限等待,等到返回为True,未等到了超时返回为False |
老板雇佣了一个工人,让他生产杯子,老板一直等着工人。直到生成了10个杯子 安全
import logging import threading import time event=threading.Event() FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") def boss(event:threading.Event): logging.info("准备生产") event.wait() logging.info("生产完成") def woker(event:threading.Event,count:int=10): cups=[] while True: logging.info("开始生产杯子") if len(cups) >= count: event.set() break logging.info("生产了一个杯子") cups.append(1) time.sleep(0.5) logging.info("总共生产了:{} 个杯子".format(len(cups))) b=threading.Thread(target=boss,args=(event,),name='boss') w=threading.Thread(target=woker,args=(event,10),name='woker') b.start() w.start()
结果以下 多线程
import logging import threading import datetime event=threading.Event() FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") def do(event:threading.Event,interval:int): while not event.wait(interval): # 此处须要的结果是返回False或True logging.info('do sth.') e=threading.Event() start=datetime.datetime.now() threading.Thread(target=do,args=(e,3)).start() e.wait(10) e.set() print ("总体运行时间为:{}".format((datetime.datetime.now()-start).total_seconds())) print ('main exit')
结果以下 并发
wait 优于sleep,wait 会主动让出时间片,其余线程能够被调度,而sleep会占用时间片不让出。app
import logging import threading import datetime import time event=threading.Event() FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") def add(x:int,y:int): return x+y class Timer: def __init__(self,interval,fn,*args,**kwargs): self.interval=interval self.fn=fn self.args=args self.kwargs=kwargs self.event=threading.Event() def __run(self): start=datetime.datetime.now() logging.info('开始启动步骤') event.wait(self.interval) #在此处等待此时间后返回为False if not self.event.is_set(): # 此处返回为False 为正常 self.fn(*self.args,**self.kwargs) logging.info("函数执行成功,执行时间为{}".format((datetime.datetime.now()-start).total_seconds())) def start(self): threading.Thread(target=self.__run()).start() def cancel(self): self.event.set() t=Timer(3,add,10,20) t.start()
结果以下 dom
使用同一个event对象标记flag
谁wait就是等待flag变为True,或者等到超时返回False,不限制等待的个数。ide
lock: 锁,凡是在共享资源争抢的地方均可以使用,从而保证只有一个使用者能够彻底使用这个资源。一旦线程获取到锁,其余试图获取的锁的线程将被阻塞。函数
名称 | 含义 |
---|---|
acquire(blocking=True,timeout=1) | 默认阻塞,阻塞能够设置超时时间,非阻塞时,timeout禁止设置,成功获取锁后,返回True,不然返回False |
release() | 释放锁,能够从任何线程调用释放。已上锁的锁,会被重置为unlocked,未上锁的锁上调用,抛出RuntimeError异常 |
#!/usr/bin/poython3.6 #conding:utf-8 import threading import time lock=threading.Lock() # 实例化锁对象 lock.acquire() # 加锁处理,默认是阻塞,阻塞时间能够设置,非阻塞时,timeout禁止设置,成功获取锁,返回True,不然返回False print ('get locker 1') lock.release() # 释放锁,能够从任何线程调用释放,已上锁的锁,会被重置为unlocked未上锁的锁上调用,抛出RuntimeError异常。 print ('release Locker') lock.acquire() print ('get locker 2') lock.release() print ('release Locker') lock.acquire() print ('get locker 3') lock.acquire() # 此处未进行相关的释放操做,所以其下面的代码将不能被执行,其会一直阻塞 print ('get locker 4')
结果以下
#!/usr/bin/poython3.6 #conding:utf-8 import threading lock=threading.Lock() lock.acquire() print ('1') lock.release() print ('2') lock.release() # 此处二次调用释放,致使的结果是抛出异常。 print ('3')
结果以下
锁释放后资源必定会出现争抢状况,锁必定要支持上下文,不然全部的线程都将等待。
锁的注意事项是最好不要出现死锁的状况。
解不开的锁就是死锁。
此处是没有退出的状况的
订单要求生成100个杯子,组织10人生产
不加锁的状况下
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") cups=[] def worker(task=100): flag=False while True: count = len(cups) logging.info(len(cups)) if count >= task: flag=True time.sleep(0.001) if not flag: cups.append(1) if flag: break logging.info("共制造{}个容器".format(len(cups))) for i in range(10): #此处起10个线程,表示10个工人 threading.Thread(target=worker,args=(100,),name="woker-{}".format(i)).start()
结果以下
import logging import threading FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") cups=[] def worker(task=100): while True: count = len(cups) logging.info(len(cups)) if count >= task: break cups.append(1) logging.info("{}".format(threading.current_thread().name)) logging.info("共制造{}个容器".format(len(cups))) for i in range(10): #此处起10个线程,表示10个工人 threading.Thread(target=worker,args=(100,),name="woker-{}".format(i)).start()
结果以下
使用上述方式会致使多线程数据同步产生问题,进而致使产生的数据不许确。
import logging import threading FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") cups=[] Lock=threading.Lock() def worker(lock:threading.Lock,task=100): while True: lock.acquire() count = len(cups) logging.info(len(cups)) if count >= task: break # 此处保证每一个线程执行完成会自动退出,不然会阻塞其余线程的继续执行 cups.append(1) lock.release() # 释放锁 logging.info("{}".format(threading.current_thread().name)) logging.info("共制造{}个容器".format(len(cups))) for i in range(10): #此处起10个线程,表示10个工人 threading.Thread(target=worker,args=(Lock,100,),name="woker-{}".format(i)).start()
结果以下
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") class Counter: def __init__(self): self.__x=0 def add(self): self.__x+=1 def sub(self): self.__x-=1 @property def value(self): return self.__x def run(c:Counter,count=100): # 此处的100是执行100次, for _ in range(count): for i in range(-50,50): if i<0: c.sub() else: c.add() c=Counter() c1=1000 c2=10 for i in range(c1): t=threading.Thread(target=run,args=(c,c2,)) t.start() time.sleep(10) print (c.value)
结果以下
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") class Counter: def __init__(self): self.__x=0 def add(self): self.__x+=1 def sub(self): self.__x-=1 @property def value(self): return self.__x def run(c:Counter,count=100): # 此处的100是执行100次, for _ in range(count): for i in range(-50,50): if i<0: c.sub() else: c.add() c=Counter() c1=10 c2=10000 for i in range(c1): t=threading.Thread(target=run,args=(c,c2,)) t.start() time.sleep(10) print (c.value) #此处可能在未执行完成就进行了打印操做,可能形成延迟问题。
结果以下
总结以下:
若是修改线程多少,则效果不明显,由于其函数执行时长和CPU分配的时间片相差较大,所以在时间片的时间内,足够完成相关的计算操做,但如果增长执行的循环次数,则可能会致使一个线程在一个时间片内未执行完成相关的计算,进而致使打印结果错误。
通常来讲加锁后还有一些代码实现,在释放锁以前还可能抛出一些异常,一旦出现异常,锁是没法释放的,可是当前线程可能由于这个异常被终止了,就会产生死锁,可经过上下文对出现异常的锁进行关闭操做。
1 使用try...finally语句保证锁的释放
2 with上下文管理,锁对象支持上下文管理源码以下:
其类中是支持enter和exit的,所以其是能够经过上下文管理来进行相关的锁关闭操做的。
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") class Counter: def __init__(self): self.__x=0 self.__lock=threading.Lock() def add(self): try: self.__lock.acquire() self.__x+=1 finally: self.__lock.release() # 此处不论是否上述异常,此处都会执行 def sub(self): try: self.__lock.acquire() self.__x-=1 finally: self.__lock.release() @property def value(self): return self.__x def run(c:Counter,count=100): # 此处的100是执行100次, for _ in range(count): for i in range(-50,50): if i<0: c.sub() else: c.add() c=Counter() c1=10 c2=1000 for i in range(c1): t=threading.Thread(target=run,args=(c,c2,)) t.start() time.sleep(10) print (c.value)
结果以下
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") class Counter: def __init__(self): self.__x=0 self.__lock=threading.Lock() def add(self): try: self.__lock.acquire() self.__x+=1 finally: self.__lock.release() # 此处不论是否上述异常,此处都会执行 def sub(self): try: self.__lock.acquire() self.__x-=1 finally: self.__lock.release() @property def value(self): return self.__x def run(c:Counter,count=100): # 此处的100是执行100次, for _ in range(count): for i in range(-50,50): if i<0: c.sub() else: c.add() c=Counter() c1=100 c2=10 for i in range(c1): t=threading.Thread(target=run,args=(c,c2,)) t.start() time.sleep(10) print (c.value)
结果以下
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") class Counter: def __init__(self): self.__x=0 self.__lock=threading.Lock() def add(self): with self.__lock: self.__x+=1 def sub(self): with self.__lock: self.__x-=1 @property def value(self): return self.__x def run(c:Counter,count=100): # 此处的100是执行100次, for _ in range(count): for i in range(-50,50): if i<0: c.sub() else: c.add() c=Counter() c1=100 c2=10 for i in range(c1): t=threading.Thread(target=run,args=(c,c2,)) t.start() time.sleep(10) print (c.value)
结果以下
经过存活线程数进行判断
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") class Counter: def __init__(self): self.__x=0 self.__lock=threading.Lock() def add(self): with self.__lock: self.__x+=1 def sub(self): with self.__lock: self.__x-=1 @property def value(self): return self.__x def run(c:Counter,count=100): # 此处的100是执行100次, for _ in range(count): for i in range(-50,50): if i<0: c.sub() else: c.add() c=Counter() c1=10 c2=1000 for i in range(c1): t=threading.Thread(target=run,args=(c,c2,)) t.start() while True: time.sleep(1) if threading.active_count()==1: print (threading.enumerate()) print (c.value) break else: print (threading.enumerate())
结果以下
不阻塞,timeout没啥用,False表示不使用锁
非阻塞锁能提升效率,但可能致使数据不一致
#!/usr/bin/poython3.6 #conding:utf-8 import threading lock=threading.Lock() lock.acquire() print ('1') ret=lock.acquire(blocking=False) print (ret)
结果以下
import logging import threading import time FORMAT="%(asctime)s %(threadName)s %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S") cups=[] lock=threading.Lock() def worker(lock:threading.Lock,task=100): while True: if lock.acquire(False): # 此处返回为False,则表示未成功获取到锁 count=len(cups) logging.info(count) if count >=task: lock.release() break cups.append(1) lock.release() logging.info("{} make1 ".format(threading.current_thread().name)) logging.info("{}".format(len(cups))) for x in range(20): threading.Thread(target=worker,args=(lock,100)).start()
结果以下
锁适用于访问和修改同一个共享资源的时候,及就是读取同一个资源的时候。
若是所有都是读取同一个资源,则不须要锁,由于读取不会致使其改变,所以不必所用锁的注意事项:
少用锁,必要时用锁,多线程访问被锁定的资源时,就成了穿行访问,要么排队执行,要么争抢执行加锁的时间越短越好,不须要就当即释放锁
必定要避免死锁多线程运行模型(ATM机)
跟锁无关的尽可能排列在后面,和锁区分开
可重入锁,是线程相关的锁,线程A得到可重入锁,并能够屡次成功获取,不会阻塞,最后在线程A 中作和acquire次数相同的release便可。
import threading rlock=threading.RLock() #初始化可重用锁 rlock.acquire() #进行阻塞处理 print ('1') rlock.acquire() print ('2') rlock.acquire(False) # 此处设置为非阻塞 print ('3') rlock.release() print ('4') rlock.release() print ('5') rlock.release() print ('6') rlock.release() # 此处表示不能释放多余的锁,只能释放和加入锁相同次数 print ('7')
结果以下
不一样线程对Rlock操做的结果
import threading rlock=threading.RLock() #初始化可重用锁 def sub(lock:threading.RLock): lock.release() ret=rlock.acquire() print (ret) ret=rlock.acquire(timeout=5) print (ret) ret=rlock.acquire(False) print (ret) ret=rlock.acquire(False) print (ret) threading.Thread(target=sub,args=(rlock,)).start() # 此处是启用另外一个线程来完成对上述的开启的锁的关闭,由于其是基于线程的, #所以其必须在该线程中进行相关的处理操做,而不是在另一个线程中进行解锁操做
结果以下
跨线程的Rlock就没用了,必须使用Lock,Rlock是线程级别的,其余的锁都是能够在当前进程的另外一个线程中进行加锁和解锁的。
构造方法condition(lock=None),可传入一个Lock或Rlock,默认是Rlock。其主要应用于生产者消费者模型,为了解决生产者和消费者速度匹配的问题。
名称 | 含义 |
---|---|
acquire(*args) | 获取锁 |
wait(self,timeout=None) | 等待或超时 |
notify(n=1) | 唤醒至少指定数目个数的等待的线程,没有等待线程就没有任何操做 |
notify_all() | 唤醒全部等待的线程 |
def __init__(self, lock=None): if lock is None: lock = RLock() # 此处默认使用的是Rlock self._lock = lock # Export the lock's acquire() and release() methods self.acquire = lock.acquire # 进行相关处理 self.release = lock.release # If the lock defines _release_save() and/or _acquire_restore(), # these override the default implementations (which just call # release() and acquire() on the lock). Ditto for _is_owned(). try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self._waiters = _deque() def __enter__(self): # 此处定义了上下文管理的内容 return self._lock.__enter__() def __exit__(self, *args): # 关闭锁操做 return self._lock.__exit__(*args) def __repr__(self): # 此处实现了可视化相关的操做 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
其内部存储使用了_waiter 进行相关的处理,来对线程进行集中的放置操做。
def wait(self, timeout=None): if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() waiter.acquire() self._waiters.append(waiter) # 此处使用此方式存储锁 saved_state = self._release_save() gotit = False try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: waiter.acquire() gotit = True else: if timeout > 0: gotit = waiter.acquire(True, timeout) else: gotit = waiter.acquire(False) return gotit finally: self._acquire_restore(saved_state) if not gotit: try: self._waiters.remove(waiter) except Value
唤醒一个release
def notify(self, n=1): if not self._is_owned(): # 此处是用于判断是否有锁 raise RuntimeError("cannot notify on un-acquired lock") all_waiters = self._waiters waiters_to_notify = _deque(_islice(all_waiters, n)) if not waiters_to_notify: return for waiter in waiters_to_notify: waiter.release() try: all_waiters.remove(waiter) except ValueError: pass
唤醒全部的所等待
def notify_all(self): """Wake up all threads waiting on this condition. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. """ self.notify(len(self._waiters)) notifyAll = notify_all
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Dispather: def __init__(self,x): self.data=x self.event=threading.Event() def produce(self):# 生产者 for i in range(10): data=random.randint(1,100) self.data=data # 产生数据 self.event.wait(1) #此处用于一秒产生一个数据 def custom(self): # 消费者,消费者可能有多个 while True: logging.info(self.data) # 获取生产者生产的数据 self.event.wait(0.5) # 此处用于等待0.5s产生一个数据 d=Dispather(0) p=threading.Thread(target=d.produce,name='produce') c=threading.Thread(target=d.custom,name='custom') p.start() c.start()
此处会使得产生的数据只有一个,而消费者拿到的数据却有两份,此处是由消费者来控制其拿出的步骤的。
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Dispather: def __init__(self,x): self.data=x self.event=threading.Event() self.conition=threading.Condition() def produce(self):# 生产者 for i in range(10): data=random.randint(1,100) with self.conition: #此处用于先进行上锁处理,而后最后释放锁 self.data=data # 产生数据 self.conition.notify_all() #通知,此处表示有等待线程就通知处理 self.event.wait(1) #此处用于一秒产生一个数据 def custom(self): # 消费者,消费者可能有多个 while True: with self.conition: self.conition.wait() # 此处用于等待notify产生的数据 logging.info(self.data) # 获取生产者生产的数据 self.event.wait(0.5) # 此处用于等待0.5s产生一个数据 d=Dispather(0) p=threading.Thread(target=d.produce,name='produce') c=threading.Thread(target=d.custom,name='custom') p.start() c.start()
此处是由生产者产生数据,通知给消费者,而后消费者再进行拿取,
有时候可能须要多一点的消费者,来保证生产者无库存
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Dispather: def __init__(self,x): self.data=x self.event=threading.Event() self.conition=threading.Condition() def produce(self):# 生产者 for i in range(10): data=random.randint(1,100) with self.conition: #此处用于先进行上锁处理,而后最后释放锁 self.data=data # 产生数据 self.conition.notify_all() #通知,通知处理产生的数据 self.event.wait(1) #此处用于一秒产生一个数据 def custom(self): # 消费者,消费者可能有多个 while True: with self.conition: self.conition.wait() # 此处用于等待notify产生的数据 logging.info(self.data) # 获取生产者生产的数据 self.event.wait(0.5) # 此处用于等待0.5s产生一个数据 d=Dispather(0) p=threading.Thread(target=d.produce,name='produce') c1=threading.Thread(target=d.custom,name='custom-1') c2=threading.Thread(target=d.custom,name='custom-2') p.start() c1.start() c2.start()
结果以下
由于此默认是基于线程的锁,所以其产生另外一个消费者并不会影响当前消费者的操做,所以能够拿到两份生产获得的数据。
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Dispather: def __init__(self,x): self.data=x self.event=threading.Event() self.conition=threading.Condition() def produce(self):# 生产者 for i in range(10): data=random.randint(1,100) with self.conition: #此处用于先进行上锁处理,而后最后释放锁 self.data=data # 产生数据 self.conition.notify(2) #通知两个线程来处理数据 self.event.wait(1) #此处用于一秒产生一个数据 def custom(self): # 消费者,消费者可能有多个 while True: with self.conition: self.conition.wait() # 此处用于等待notify产生的数据 logging.info(self.data) # 获取生产者生产的数据 self.event.wait(0.5) # 此处用于等待0.5s产生一个数据 d=Dispather(0) p=threading.Thread(target=d.produce,name='produce') p.start() for i in range(5): # 此处用于配置5个消费者, threading.Thread(target=d.custom,name="c-{}".format(i)).start()
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Dispather: def __init__(self,x): self.data=x self.event=threading.Event() self.conition=threading.Condition() def produce(self):# 生产者 for i in range(10): data=random.randint(1,100) with self.conition: #此处用于先进行上锁处理,而后最后释放锁 self.data=data # 产生数据 self.conition.notify(5) #通知所有线程来处理数据 self.event.wait(1) #此处用于一秒产生一个数据 def custom(self): # 消费者,消费者可能有多个 while True: with self.conition: self.conition.wait() # 此处用于等待notify产生的数据 logging.info(self.data) # 获取生产者生产的数据 self.event.wait(0.5) # 此处用于等待0.5s产生一个数据 d=Dispather(0) p=threading.Thread(target=d.produce,name='produce') p.start() for i in range(5): # 此处用于配置5个消费者, threading.Thread(target=d.custom,name="c-{}".format(i)).start()
结果以下
注: 上述实例中。程序自己不是线程安全的,程序逻辑有不少瑕疵,可是能够很好的帮助理解condition的使用,和生产者消费者模式
轮循太消耗CPU时间了
condition 用于生产者消费者模型中,解决生产者消费者速度匹配的问题
采用了通知机制,很是有效率
使用方式
使用condition,必须先acquire,用完了要release,由于内部使用了锁,默认是Rlock,最好的方式使用with上下文。消费者wait,等待通知
生产者生产好消息,对消费者发送消息,可使用notifiy或者notify_all方法。
赛马模式,并行初始化,多线程并行初始化
有人翻译为栅栏,有人称为屏障,能够想象为路障,道闸
python3.2 中引入的新功能
名称 | 含义 |
---|---|
Barrier(parties,action=None,timeout=None) | 构建 barrier对象,指定参与方数目,timeout是wait方法未指定超时的默认值 |
n_waiting | 当前在屏障中等待的线程数 |
parties | 各方数,须要多少等待 |
wait(timeout=None) | 等待经过屏障,返回0到线程-1的整数,每一个线程返回不一样,若是wait方法设置了超时,并超时发送,屏障将处于broken状态 |
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") def worker(barrier:threading.Barrier): logging.info("当前等待线程数量为:{}".format(barrier.n_waiting)) # 此处一旦到了第三个线程,则其会直接向下执行,而可能不是须要从新等待第一个等待的线程顺序执行 try: bid=barrier.wait() # 此处只有3个线程都存在的状况下才会直接执行下面的,不然会一直等待 logging.info("after barrier:{}".format(bid)) except threading.BrokenBarrierError: logging.info("Broken Barrier in {}".format(threading.current_thread().name)) barrier=threading.Barrier(parties=3) # 三个线程释放一次 for x in range(3): # 此处表示产生3个线程 threading.Event().wait(2) threading.Thread(target=worker,args=(barrier,),name="c-{}".format(x)).start()
结果以下
产生的线程不是等待线程的倍数
import threading import random import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") def worker(barrier:threading.Barrier): logging.info("当前等待线程数量为:{}".format(barrier.n_waiting)) # 此处一旦到了第三个线程,则其会直接向下执行,而可能不是须要从新等待第一个等待的线程顺序执行 try: bid=barrier.wait() # 此处只有3个线程都存在的状况下才会直接执行下面的,不然会一直等待 logging.info("after barrier:{}".format(bid)) except threading.BrokenBarrierError: logging.info("Broken Barrier in {}".format(threading.current_thread().name)) barrier=threading.Barrier(parties=3) # 三个线程释放一次 for x in range(4): # 此处表示产生4个线程,则会有一个一直等待 threading.Event().wait(2) threading.Thread(target=worker,args=(barrier,),name="c-{}".format(x)).start()
其第4个线程会一直等待下去,直到有3个线程在等待的同时才进行下一步操做。
从运行结果来看,全部线程冲到了barrier前等待,直到parties的数目,屏障将会打开,全部线程中止等待,继续执行
再有wait,屏障就就绪等待达到参数数目时再放行
参数 | 含义 |
---|---|
broken | 若是屏障处于打破状态,则返回True |
abort() | 将屏障处于broken状态,等待中的线程或调用等待方法的线程都会抛出BrokenbarrierError异常,直到reset方法来恢复屏障 |
reset() | 恢复屏障,从新开始拦截 |
import threading import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") def worker(barrier:threading.Barrier): logging.info("当前等待线程数量为:{}".format(barrier.n_waiting)) # 此处一旦到了第三个线程,则其会直接向下执行,而可能不是须要从新等待第一个等待的线程顺序执行 try: bid=barrier.wait() # 此处只有3个线程都存在的状况下才会直接执行下面的,不然会一直等待 logging.info("after barrier:{}".format(bid)) except threading.BrokenBarrierError: logging.info("Broken Barrier in {}".format(threading.current_thread().name)) barrier=threading.Barrier(parties=3) # 三个线程释放一次 for x in range(5): # 此处表示产生5个线程 threading.Event().wait(2) threading.Thread(target=worker,args=(barrier,),name="c-{}".format(x)).start() if x==4: barrier.abort() # 打破屏障,前三个没问题,后两个会致使屏障打破一块儿走出
结果以下
import threading import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") def worker(barrier:threading.Barrier): logging.info("当前等待线程数量为:{}".format(barrier.n_waiting)) # 此处一旦到了第三个线程,则其会直接向下执行,而可能不是须要从新等待第一个等待的线程顺序执行 try: bid=barrier.wait() # 此处只有3个线程都存在的状况下才会直接执行下面的,不然会一直等待 logging.info("after barrier:{}".format(bid)) except threading.BrokenBarrierError: logging.info("Broken Barrier in {}".format(threading.current_thread().name)) barrier=threading.Barrier(parties=3) # 三个线程释放一次 for x in range(9): # 此处表示产生5个线程 if x==2: #此处第一个和第二个等到,等到了第三个直接打破,前两个和第三个一块儿都是打破输出 barrier.abort() # 打破屏障,前三个没问题,后两个会致使屏障打破一块儿走出 elif x==6: #x=6表示第7个,直到第6个,到第7个,第8个,第9个,恰好3个直接栅栏退出 barrier.reset() threading.Event().wait(2) threading.Thread(target=worker,args=(barrier,)).start()
结果以下
并发初始化
全部的线程都必须初始化完成后,才能继续工做,例如运行加载数据,检查,若是这些工做没有完成,就开始运行,则不能正常工做
10个线程作10种不一样的工做准备,每一个线程负责一种工做,只有这10个线程都完成后,才能继续工做,先完成的要等待后完成的线程。
如 启动了一个线程,须要先加载磁盘,缓存预热,初始化连接池等工做,这些工做能够齐头并进,只不过只有都知足了,程序才能继续向后执行,假设数据库连接失败,则初始化工做就会失败,就要about,屏蔽broken,全部线程收到异常后直接退出。
和Lock 很像,信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现计数为0时就会阻塞请求的线程,直到其余线程对信号量release后,计数大于0,恢复阻塞的线程。
名称 | 含义 |
---|---|
Semaphore(value=1) | 构造方法,value小于0,抛出ValueError异常 |
acquire(blocking=True,timeout=None) | 获取信号量,计数器减1,获取成功返回为True |
release() | 释放信号量,计数器加1 |
semaphore 默认值是1,表示只能去一个后就等待着,其至关于初始化一个值。
计数器中的数字永远不可能低于0
import threading import logging import time logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") def woker(sem:threading.Semaphore): logging.info("in sub thread") logging.info(sem.acquire()) logging.info("sub thread over") s=threading.Semaphore(3) # 初始化3个信号量 logging.info(s.acquire()) # 取出三个信号量 logging.info(s.acquire()) logging.info(s.acquire()) threading.Thread(target=woker,args=(s,)).start() # 此处若再想取出,则不能成功,则会阻塞 print ('----------------------') logging.info(s.acquire(False)) #此处表示不阻塞 print ('+++++++++++++++++++++++') time.sleep(2) logging.info(s.acquire(timeout=3)) # 此处表示阻塞超时3秒后释放 print ('%%%%%%%%%%%%%%%%%%%%%') s.release() # 此处用于对上述线程中的调用的函数中的内容进行处理
结果以下
都是针对同一个对象进行的处理
import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Name: def __init__(self,name): self.name=name class Pool: def __init__(self,count=3): self.count=count self.pool=[ Name("conn-{}".format(i)) for i in range(3)] # 初始化连接 def get_conn(self): if len(self.pool)>0: data=self.pool.pop() # 从尾部拿出来一个 logging.info(data) else: return None def return_conn(self,name:Name): # 此处添加一个 self.pool.append(name) pool=Pool(3) pool.get_conn() pool.get_conn() pool.get_conn() pool.return_conn(Name('aaa')) pool.get_conn()
结果以下
import logging import threading import random logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") class Name: def __init__(self,name): self.name=name class Pool: def __init__(self,count=3): self.count=count self.sem=threading.Semaphore(count) self.event=threading.Event() self.pool=[ Name("conn-{}".format(i)) for i in range(count)] def get_conn(self): self.sem.acquire() data=self.pool.pop() return data def return_conn(self,name:Name): # 此处添加一个 self.pool.append(name) self.sem.release() def woker(pool:Pool): conn=pool.get_conn() logging.info(conn) threading.Event().wait(random.randint(1,4)) pool.return_conn(conn) pool=Pool(3) for i in range(8): threading.Thread(target=woker,name="worker-{}".format(i),args=(pool,)).start()
结果以下
上述实例中,使用信号量解决资源有限的问题,若是池中有资源,请求者获取资源时信号量减1,请求者只能等待,当使用者彻底归资源后信号量加1,等待线程就能够唤醒拿走资源。
有界信号量,不容许使用release超出初始值范围,不然,抛出ValueError异常,这个用有界信号修改源代码,保证若是多return_conn 就会抛出异常,保证了归还连接抛出异常。
信号量一直release会一直向上加,其会将信号量和连接池都扩容了此处便产生了BoundedSemaphore
import logging import threading import random logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S") s=threading.BoundedSemaphore(3) # 边界 s.acquire() # 此处须要拿取,不然不能直接向其中加 print (1) s.release() print (2) s.release() print (3)
结果以下
应用以下
import logging import threading import time logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s ") class Conn: def __init__(self,name): self.name=name class Pool: def __init__(self,count=3): self.count=count # 初始化连接池 self.sema=threading.BoundedSemaphore(count) self.pool=[Conn("conn-{}".format(i)) for i in range(count)] # 初始化连接 def get_conn(self): self.sema.acquire() # 当拿取的时候,减一 data=self.pool.pop() # 从尾部拿出一个 print (data) def return_conn(self,conn:Conn): #此处返回一个链接池 self.pool.append(conn) # 必须保证其在拿的时候有 # 使用try 能够进行处理,下面的必须执行,加成功了,下面的必定要成功的, self.sema.release() pool=Pool(3) con=Conn('a') conn=pool.get_conn() conn=pool.get_conn() conn=pool.get_conn()
结果以下
若是使用了信号量,仍是没有用完
self.pool.append(conn)
self.sem.release()
一种极端的状况下,计数器还差1就满了,有3个线程A,B,C都执行了第一句,都没有来得release,这时候轮到线程A release,正常的release,而后轮到线程C先release,必定出现问题,超界了,必定出现问题。
不少线程用完了信号量没有获取信号量的线程都会阻塞,没有线程和归还的线程争抢,当append后才release,这时候才能等待的线程被唤醒,才能Pop,也就是没有获取信号量就不能pop,这是安全的。