1、互斥锁算法
多进程中数据不安全,因此要加锁。数据库
多线程虽然有GIL锁,可是因为GIL锁轮转的策略(多线程之间时间片的轮转),仍存在数据不安全的状况,可是相对概率较低。安全
GIL锁轮转的策略:早期执行700条指令(不是700行,+= 操做至关于4条指令),如今是执行一个时间片时间,当前线程会让出cpu给其余线程使用。多线程
dis模块中的方法能够查看某个操做对应的cpu指令并发
解决线程之间的数据安全的问题:app
①多线程中,不在线程中操做全局变量dom
②涉及+=,-=,lis[0]+1,相似的操做必定要加锁异步
③列表、字典自带的方法都是线程安全的分布式
④队列也是数据安全的ide
线程不安全的案例
from threading import Thread count = 0 def fun_add(): global count for i in range(100000): count += 1
def fun_sub(): global count for i in range(100000): count -= 1 t_lis = [] for i in range(10): t1 = Thread(target=fun_add) t1.start() t_lis.append(t1) t2 = Thread(target=fun_sub) t2.start() t_lis.append(t2) for t in t_lis: t.join() print(count) # -98445
使用互斥锁解决线程安全问题(操做的指令都加上锁)
from threading import Thread,Lock count = 0 def fun_add(lock): global count for i in range(100000): lock.acquire() count += 1 lock.release() def fun_sub(lock): global count for i in range(100000): lock.acquire() count -= 1 lock.release() t_lis = [] lock = Lock() # 建立锁对象
for i in range(10): t1 = Thread(target=fun_add,args=(lock,)) t1.start() t_lis.append(t1) t2 = Thread(target=fun_sub,args=(lock,)) t2.start() t_lis.append(t2) for t in t_lis: t.join() # 等待全部的子线程执行完
print(count) # 0
2、递归锁
当在并发的状况下使用两把锁,会形成死锁的现象。一个线程抢占到一把锁,另外一个线程抢占到另外一把锁,而操做须要同时抢占两把锁才能执行操做。
解决方案:递归锁
多少个acquire上锁,就要有多少个release释放锁,一个线程先acquire后,其余的线程只能等着。这个锁比如像一串钥匙。
递归锁和互斥锁的区别:
互斥锁是两把锁多个线程抢占,而递归锁是一把锁多个线程抢占
在一个线程里,用多个锁的时候,用递归锁实例化一个锁,acquire屡次
在一个线程里,只用一个所的时候,用互斥锁为了提升效率,在锁多个资源的时候,应该酌情选用互斥锁,用完一个资源应该立刻释放
递归锁可以快速的解决死锁问题,可是递归锁并非一个好的解决方案,死锁现象的发生不是互斥锁的问题,而是代码的逻辑问题,递归锁只是临时快速解决死锁的有效方案,解决时只需将递归锁替换互斥锁。后续须要将递归锁从新替换成互斥锁,完善代码的逻辑,而且提升代码的效率
多线程之间,用完一个资源再用另外一个资源,应该先释放一个资源再去获取一个资源的锁
经典死锁案例:科学家吃面(互斥锁)
from threading import Thread,Lock import time noodles_lock = Lock() fork_lock = Lock() def eat1(name,i,): fork_lock.acquire() print('%s%s拿到叉子'%(name,i)) noodles_lock.acquire() print('%s%s拿到面条'%(name,i)) print('%s%s吃面'%(name,i)) time.sleep(0.5) fork_lock.release() print('%s%s放下叉子' % (name, i)) noodles_lock.release() print('%s%s放下面条' % (name, i)) def eat2(name,i,): noodles_lock.acquire() print('%s%s拿到面条'%(name,i)) fork_lock.acquire() print('%s%s拿到叉子' % (name, i)) print('%s%s吃面'%(name,i)) time.sleep(0.5) noodles_lock.release() print('%s%s放下面条' % (name, i)) fork_lock.release() print('%s%s放下叉子' % (name, i)) for i in range(2):Thread(target=eat1,args=('科学家',i+1)).start() for i in range(3,5):Thread(target=eat2,args=('科学家',i)).start() ''' 科学家1拿到叉子 科学家1拿到面条 科学家1吃面 科学家1放下叉子 科学家1放下面条 科学家3拿到面条 科学家2拿到叉子 '''
递归锁解决方案
from threading import Thread,RLock import time fork_lock = noodles_lock = RLock() # 递归锁
def eat1(name,i,): fork_lock.acquire() print('%s%s拿到叉子'%(name,i)) noodles_lock.acquire() print('%s%s拿到面条'%(name,i)) print('%s%s吃面'%(name,i)) time.sleep(0.5) fork_lock.release() print('%s%s放下叉子' % (name, i)) noodles_lock.release() print('%s%s放下面条' % (name, i)) def eat2(name,i,): noodles_lock.acquire() print('%s%s拿到面条'%(name,i)) fork_lock.acquire() print('%s%s拿到叉子' % (name, i)) print('%s%s吃面'%(name,i)) time.sleep(0.5) noodles_lock.release() print('%s%s放下面条' % (name, i)) fork_lock.release() print('%s%s放下叉子' % (name, i)) for i in range(2):Thread(target=eat1,args=('科学家',i+1)).start() for i in range(3,5):Thread(target=eat2,args=('科学家',i)).start() ''' 科学家1拿到叉子 科学家1拿到面条 科学家1吃面 科学家1放下叉子 科学家1放下面条 科学家2拿到叉子 科学家2拿到面条 科学家2吃面 科学家2放下叉子 科学家2放下面条 科学家3拿到面条 科学家3拿到叉子 科学家3吃面 科学家3放下面条 科学家3放下叉子 科学家4拿到面条 科学家4拿到叉子 科学家4吃面 科学家4放下面条 科学家4放下叉子 '''
3、信号量
信号量是基于锁+计数器实现的,使用方式跟进程的信号量同样使用
from threading import Semaphore,Thread
from threading import Semaphore,Thread import time def func(index,sem): sem.acquire() print(index) time.sleep(2) sem.release() sem = Semaphore(4) for i in range(12): Thread(target=func,args=(i,sem)).start()
4、事件
事件的应用:检测数据库链接
from threading import Event,Thread
方法:
wait() # 能够设置阻塞的时间
set() # 将信号设置为true
clear() # 将信号设置为False
is_set() # 查看信号的状态
事件的默认状态时False
# 检测数据库链接
from threading import Event,Thread import time def check(e): time.sleep(2) e.set() # 将信号设置为True
def connect(e): for i in range(3): e.wait(1) # 阻塞一秒
if e.is_set(): # 查看信号的状态
print('链接成功') break
else:print('链接失败') e = Event() Thread(target=check,args=(e,)).start() Thread(target=connect,args=(e,)).start()
5、条件
方法:
notify() # 控制流量,通知多少个能够经过,有参数。
wait() # 阻塞全部进程
notify_all() # 所有放行,通常配合notify()使用
这两个方法都是线程不安全的,每一个方法使用的先后都须要加锁,条件里面有锁的方法。
# 条件
from threading import Condition,Thread import time def get_through(name,c): print('%s在等待'%name) c.acquire() c.wait() # 阻塞,等待经过线程的命令
print('%s经过'%name) c.release() name_list = ['刘一','陈二','张三','李四','王五','赵六','孙七','周八'] c = Condition() for i in name_list: t = Thread(target=get_through,args=(i,c)) t.start() for k in range(4): c.acquire() c.notify(2) # 设置每次经过的线程数
c.release() time.sleep(5)
6、定时器
使用场景:定时任务
Timer(n,函数) 实例化时接收两个参数,(执行的m秒数,执行的函数)
不影响主线程
# 定时器
from threading import Timer def func(): print('action') t = Timer(5,func) # 建立子线程,而且设置开启子线程的时间
t.start()
7、队列
qps概念:每秒钟接收到的请求数
队列的线程是安全的,队列用于作排队相关的逻辑,帮助维持相应的顺序
特色:先进先出
方法:
get()
put()
get_nowait()
put_nowait()
import queue q = queue.Queue() q.put(1) print(q.get())
8、新的队列
from queue import LifoQueue
相似于栈,特色是后进先出,而且不容许插队
应用:算法的完成,有点相似分布式的思想,例如:三级菜单
from queue import LifoQueue q = LifoQueue() for i in range(1,6): q.put(i) for i in range(1,6): print(q.get(),end=' ') # 5 4 3 2 1
9、优先级队列
只能放同一种相似的值
应用场景:会员服务
①若是是数值,按照数值从小到大取值
from queue import PriorityQueue q = PriorityQueue() q.put(10) q.put(5) q.put(20) for i in range(3): print(q.get(),end=' ') # 5 10 20
②若是是字符串,按照ASICC编码来取值
from queue import PriorityQueue q = PriorityQueue() q.put('c') q.put('a') q.put('b') for i in range(3): print(q.get(), end=' ') # a b c
③若是是数字、字母组成的元组,按第一个元素来取值,从小到大取值
from queue import PriorityQueue q = PriorityQueue() q.put((3,'zxc')) q.put((3,'abc')) q.put((1,'asd')) q.put((2,'qwe')) for i in range(4): print(q.get(),end=' ') # (1, 'asd') (2, 'qwe') (3, 'abc') (3, 'zxc')
10、线程池
concurrent.futures 模块不只提供线程池,还提供进程池。
from concurrent.futures import ThreadPoolExecutor # 线程池
from concurrent.futures import ProcessPoolExecutor # 进程池
实例化的线程池数量 = 5 * cpu_count
方法:
submit(函数,参数) 异步提交任务,只能按位置传参,不用加args=
ret = submit() 获取返回值,须要经过result()方法取值
ret.result() 获取值
map(函数,iterable) 取代for循环submit操做
shutdown() 等于进程池的close()和join()方法,阻塞直到任务完成
① 有返回值
from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('子线程号:',currentThread().ident) # 打印子线程的线程号
return i * '*' tp = ThreadPoolExecutor(5) # 建立线程池,建立5个线程
ret_lis = [] for i in range(15): ret = tp.submit(func,i) # 异步提交任务
ret_lis.append(ret) # 将返回值存到列表
for ret in ret_lis: print(ret.result()) # 经过result()方法获取返回值的值
print('主线程',currentThread().ident) # 打印主线程的线程号
② 无返回值
from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(): print('子进程',currentThread().ident) # 打印子线程的线程号
tp = ThreadPoolExecutor(3) # 建立线程池,开启3个线程
for i in range(9): tp.submit(func) # 异步提交任务
tp.shutdown() # 阻塞主线程,待全部的子线程运行完
print('主线程',currentThread().ident)
③ map方法
使用may方法必须传入参数
from concurrent.futures import ThreadPoolExecutor from threading import currentThread import time def func(n): # 使用map必须有一个参数
print('子线程号:',currentThread().ident) time.sleep(1) tp = ThreadPoolExecutor(3) ret = tp.map(func,range(15)) # map函数会传入一个参数 # for i in range(15): # tp.submit(func,i) # 异步提交任务
print('主线程:',currentThread().ident)
11、回调函数
线程池和进程池的回调函数经过submit实现的,
add_done_callback调用回调函数,不须要传参,回调函数须要经过result()取值
线程池的回调函数由子线程完成
from concurrent.futures import ThreadPoolExecutor from threading import currentThread import time def func(i): print('子线程:',currentThread().ident) # 获取子线程的线程号
time.sleep(1) return i def call_back(ret): print('ret>',ret.result()) # 经过result()方法取值
print('callback线程号:',currentThread().ident) # 获取回调函数的线程号
tp = ThreadPoolExecutor(3) for i in range(9): tp.submit(func,(i+1)).add_done_callback(call_back) # add_done_callback回调函数的方法,函数不须要传入参数
tp.shutdown() # 阻塞主线程,等待全部子线程执行完
print('主线程号:',currentThread().ident)
进程池的回调函数由主进程完成
from concurrent.futures import ProcessPoolExecutor import time,os def func(i): print('子进程:',os.getpid()) # 获取子进程的进程号
time.sleep(1) return i def call_back(ret): print('ret>',ret.result()) # 经过result()方法取值
print('callback进程号:',os.getpid()) # 获取回调函数的进程号
if __name__ == '__main__': tp = ProcessPoolExecutor(3) for i in range(9): tp.submit(func,(i+1)).add_done_callback(call_back) # add_done_callback回调函数的方法,函数不须要传入参数
tp.shutdown() # 阻塞主进程
print('主线程号:',os.getpid())
12、local模块
from threading import local
不一样线程的ID存储的值和取到的值是不一样的
多个线程之间使用threading.local对象,能够实现多个线程之间的数据隔离
import time import random from threading import local,Thread loc = local() def func2(): global loc print(loc.name,loc.age) def func1(name,age): global loc loc.name = name loc.age = age time.sleep(random.random()) func2() Thread(target=func1,args=('xiaobai',20)).start() Thread(target=func1,args=('xiaohei',25)).start() ''' xiaobai 20 xiaohei 25 '''