并发编程之线程进阶

1、互斥锁算法

多进程中数据不安全,因此要加锁。数据库

多线程虽然有GIL锁,可是因为GIL锁轮转的策略(多线程之间时间片的轮转),仍存在数据不安全的状况,可是相对概率较低。安全

GIL锁轮转的策略:早期执行700条指令(不是700行,+= 操做至关于4条指令),如今是执行一个时间片时间,当前线程会让出cpu给其余线程使用。多线程

dis模块中的方法能够查看某个操做对应的cpu指令并发

 

解决线程之间的数据安全的问题:app

①多线程中,不在线程中操做全局变量dom

②涉及+=,-=,lis[0]+1,相似的操做必定要加锁异步

③列表、字典自带的方法都是线程安全的分布式

④队列也是数据安全的ide

 

线程不安全的案例

from threading import Thread count = 0 def fun_add(): global count for i in range(100000): count += 1

def fun_sub(): global count for i in range(100000): count -= 1 t_lis = [] for i in range(10): t1 = Thread(target=fun_add) t1.start() t_lis.append(t1) t2 = Thread(target=fun_sub) t2.start() t_lis.append(t2) for t in t_lis: t.join() print(count)            # -98445

 

使用互斥锁解决线程安全问题(操做的指令都加上锁)

from threading import Thread,Lock count = 0 def fun_add(lock): global count for i in range(100000): lock.acquire() count += 1 lock.release() def fun_sub(lock): global count for i in range(100000): lock.acquire() count -= 1 lock.release() t_lis = [] lock = Lock()                   # 建立锁对象
for i in range(10): t1 = Thread(target=fun_add,args=(lock,)) t1.start() t_lis.append(t1) t2 = Thread(target=fun_sub,args=(lock,)) t2.start() t_lis.append(t2) for t in t_lis: t.join() # 等待全部的子线程执行完
print(count)                    # 0

 

2、递归锁

当在并发的状况下使用两把锁,会形成死锁的现象。一个线程抢占到一把锁,另外一个线程抢占到另外一把锁,而操做须要同时抢占两把锁才能执行操做。

解决方案:递归锁

多少个acquire上锁,就要有多少个release释放锁,一个线程先acquire后,其余的线程只能等着。这个锁比如像一串钥匙。

 

递归锁和互斥锁的区别:

互斥锁是两把锁多个线程抢占,而递归锁是一把锁多个线程抢占

在一个线程里,用多个锁的时候,用递归锁实例化一个锁,acquire屡次

在一个线程里,只用一个所的时候,用互斥锁为了提升效率,在锁多个资源的时候,应该酌情选用互斥锁,用完一个资源应该立刻释放

 

递归锁可以快速的解决死锁问题,可是递归锁并非一个好的解决方案,死锁现象的发生不是互斥锁的问题,而是代码的逻辑问题,递归锁只是临时快速解决死锁的有效方案,解决时只需将递归锁替换互斥锁。后续须要将递归锁从新替换成互斥锁,完善代码的逻辑,而且提升代码的效率

多线程之间,用完一个资源再用另外一个资源,应该先释放一个资源再去获取一个资源的锁

 

经典死锁案例:科学家吃面(互斥锁)

from threading import Thread,Lock import time noodles_lock = Lock() fork_lock = Lock() def eat1(name,i,): fork_lock.acquire() print('%s%s拿到叉子'%(name,i)) noodles_lock.acquire() print('%s%s拿到面条'%(name,i)) print('%s%s吃面'%(name,i)) time.sleep(0.5) fork_lock.release() print('%s%s放下叉子' % (name, i)) noodles_lock.release() print('%s%s放下面条' % (name, i)) def eat2(name,i,): noodles_lock.acquire() print('%s%s拿到面条'%(name,i)) fork_lock.acquire() print('%s%s拿到叉子' % (name, i)) print('%s%s吃面'%(name,i)) time.sleep(0.5) noodles_lock.release() print('%s%s放下面条' % (name, i)) fork_lock.release() print('%s%s放下叉子' % (name, i)) for i in range(2):Thread(target=eat1,args=('科学家',i+1)).start() for i in range(3,5):Thread(target=eat2,args=('科学家',i)).start() ''' 科学家1拿到叉子 科学家1拿到面条 科学家1吃面 科学家1放下叉子 科学家1放下面条 科学家3拿到面条 科学家2拿到叉子 '''

 

递归锁解决方案

from threading import Thread,RLock import time fork_lock = noodles_lock = RLock()          # 递归锁
def eat1(name,i,): fork_lock.acquire() print('%s%s拿到叉子'%(name,i)) noodles_lock.acquire() print('%s%s拿到面条'%(name,i)) print('%s%s吃面'%(name,i)) time.sleep(0.5) fork_lock.release() print('%s%s放下叉子' % (name, i)) noodles_lock.release() print('%s%s放下面条' % (name, i)) def eat2(name,i,): noodles_lock.acquire() print('%s%s拿到面条'%(name,i)) fork_lock.acquire() print('%s%s拿到叉子' % (name, i)) print('%s%s吃面'%(name,i)) time.sleep(0.5) noodles_lock.release() print('%s%s放下面条' % (name, i)) fork_lock.release() print('%s%s放下叉子' % (name, i)) for i in range(2):Thread(target=eat1,args=('科学家',i+1)).start() for i in range(3,5):Thread(target=eat2,args=('科学家',i)).start() ''' 科学家1拿到叉子 科学家1拿到面条 科学家1吃面 科学家1放下叉子 科学家1放下面条 科学家2拿到叉子 科学家2拿到面条 科学家2吃面 科学家2放下叉子 科学家2放下面条 科学家3拿到面条 科学家3拿到叉子 科学家3吃面 科学家3放下面条 科学家3放下叉子 科学家4拿到面条 科学家4拿到叉子 科学家4吃面 科学家4放下面条 科学家4放下叉子 '''

 

3、信号量

信号量是基于锁+计数器实现的,使用方式跟进程的信号量同样使用

from threading import Semaphore,Thread

from threading import Semaphore,Thread import time def func(index,sem): sem.acquire() print(index) time.sleep(2) sem.release() sem = Semaphore(4) for i in range(12): Thread(target=func,args=(i,sem)).start()

 

4、事件

事件的应用:检测数据库链接

from threading import Event,Thread

方法:

wait()    # 能够设置阻塞的时间

set()     # 将信号设置为true

clear()       # 将信号设置为False

is_set()   # 查看信号的状态

事件的默认状态时False

# 检测数据库链接

from threading import Event,Thread import time def check(e): time.sleep(2) e.set() # 将信号设置为True

def connect(e): for i in range(3): e.wait(1)       # 阻塞一秒
        if e.is_set():  # 查看信号的状态 
            print('链接成功') break
    else:print('链接失败') e = Event() Thread(target=check,args=(e,)).start() Thread(target=connect,args=(e,)).start()

 

5、条件

方法:

notify()      # 控制流量,通知多少个能够经过,有参数。

wait()        # 阻塞全部进程

notify_all()      # 所有放行,通常配合notify()使用

这两个方法都是线程不安全的,每一个方法使用的先后都须要加锁,条件里面有锁的方法。

# 条件

from threading import Condition,Thread import time def get_through(name,c): print('%s在等待'%name) c.acquire() c.wait() # 阻塞,等待经过线程的命令
    print('%s经过'%name) c.release() name_list = ['刘一','陈二','张三','李四','王五','赵六','孙七','周八'] c = Condition() for i in name_list: t = Thread(target=get_through,args=(i,c)) t.start() for k in range(4): c.acquire() c.notify(2)         # 设置每次经过的线程数
 c.release() time.sleep(5)

 

6、定时器

 使用场景:定时任务

Timer(n,函数)    实例化时接收两个参数,(执行的m秒数,执行的函数)

不影响主线程

# 定时器

from threading import Timer def func(): print('action') t = Timer(5,func)           # 建立子线程,而且设置开启子线程的时间
t.start()

 

7、队列

qps概念:每秒钟接收到的请求数

队列的线程是安全的,队列用于作排队相关的逻辑,帮助维持相应的顺序

特色:先进先出

方法:

get()

put()

get_nowait()

put_nowait()

import queue q = queue.Queue() q.put(1) print(q.get())

 

8、新的队列

from queue import LifoQueue

相似于栈,特色是后进先出,而且不容许插队

应用:算法的完成,有点相似分布式的思想,例如:三级菜单

from queue import LifoQueue q = LifoQueue() for i in range(1,6): q.put(i) for i in range(1,6): print(q.get(),end=' ')      # 5 4 3 2 1 

 

9、优先级队列

只能放同一种相似的值

应用场景:会员服务

 

①若是是数值,按照数值从小到大取值

from queue import PriorityQueue q = PriorityQueue() q.put(10) q.put(5) q.put(20) for i in range(3): print(q.get(),end=' ')      # 5 10 20 

 

②若是是字符串,按照ASICC编码来取值

from queue import PriorityQueue q = PriorityQueue() q.put('c') q.put('a') q.put('b') for i in range(3): print(q.get(), end=' ')     # a b c 

 

③若是是数字、字母组成的元组,按第一个元素来取值,从小到大取值

from queue import PriorityQueue q = PriorityQueue() q.put((3,'zxc')) q.put((3,'abc')) q.put((1,'asd')) q.put((2,'qwe')) for i in range(4): print(q.get(),end=' ')      # (1, 'asd') (2, 'qwe') (3, 'abc') (3, 'zxc') 

 

10、线程池

concurrent.futures 模块不只提供线程池,还提供进程池。

from concurrent.futures import ThreadPoolExecutor         # 线程池
from concurrent.futures import ProcessPoolExecutor            # 进程池

实例化的线程池数量 = 5 * cpu_count

 

方法:

submit(函数,参数)    异步提交任务,只能按位置传参,不用加args=

ret = submit()         获取返回值,须要经过result()方法取值

ret.result()        获取值

map(函数,iterable)     取代for循环submit操做

shutdown()         等于进程池的close()和join()方法,阻塞直到任务完成

 

① 有返回值

from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('子线程号:',currentThread().ident)        # 打印子线程的线程号
    return i * '*' tp = ThreadPoolExecutor(5)          # 建立线程池,建立5个线程
ret_lis = [] for i in range(15): ret = tp.submit(func,i)         # 异步提交任务
    ret_lis.append(ret)             # 将返回值存到列表
for ret in ret_lis: print(ret.result())               # 经过result()方法获取返回值的值
print('主线程',currentThread().ident)   # 打印主线程的线程号

 

 

② 无返回值

from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(): print('子进程',currentThread().ident)      # 打印子线程的线程号
 tp = ThreadPoolExecutor(3)                    # 建立线程池,开启3个线程
for i in range(9): tp.submit(func) # 异步提交任务
tp.shutdown()                                 # 阻塞主线程,待全部的子线程运行完
print('主线程',currentThread().ident)

 

③ map方法

使用may方法必须传入参数

from concurrent.futures import ThreadPoolExecutor from threading import currentThread import time def func(n):                            # 使用map必须有一个参数
    print('子线程号:',currentThread().ident) time.sleep(1) tp = ThreadPoolExecutor(3) ret = tp.map(func,range(15))            # map函数会传入一个参数 # for i in range(15): # tp.submit(func,i) # 异步提交任务
print('主线程:',currentThread().ident)

 

 

 11、回调函数

线程池和进程池的回调函数经过submit实现的,

add_done_callback调用回调函数,不须要传参,回调函数须要经过result()取值

 

线程池的回调函数由子线程完成

from concurrent.futures import ThreadPoolExecutor from threading import currentThread import time def func(i): print('子线程:',currentThread().ident)         # 获取子线程的线程号
    time.sleep(1) return i def call_back(ret): print('ret>',ret.result())                      # 经过result()方法取值
    print('callback线程号:',currentThread().ident)   # 获取回调函数的线程号
 tp = ThreadPoolExecutor(3) for i in range(9): tp.submit(func,(i+1)).add_done_callback(call_back)  # add_done_callback回调函数的方法,函数不须要传入参数
tp.shutdown()                                           # 阻塞主线程,等待全部子线程执行完
print('主线程号:',currentThread().ident)

 

进程池的回调函数由主进程完成

from concurrent.futures import ProcessPoolExecutor import time,os def func(i): print('子进程:',os.getpid())         # 获取子进程的进程号
    time.sleep(1) return i def call_back(ret): print('ret>',ret.result())            # 经过result()方法取值
    print('callback进程号:',os.getpid())   # 获取回调函数的进程号

if __name__ == '__main__': tp = ProcessPoolExecutor(3) for i in range(9): tp.submit(func,(i+1)).add_done_callback(call_back)  # add_done_callback回调函数的方法,函数不须要传入参数
    tp.shutdown()                           # 阻塞主进程
    print('主线程号:',os.getpid())

 

12、local模块

from threading import local

不一样线程的ID存储的值和取到的值是不一样的

多个线程之间使用threading.local对象,能够实现多个线程之间的数据隔离

import time
import random
from threading import local,Thread

loc = local()
def func2():
    global loc
    print(loc.name,loc.age)

def func1(name,age):
    global loc
    loc.name = name
    loc.age = age
    time.sleep(random.random())
    func2()

Thread(target=func1,args=('xiaobai',20)).start()
Thread(target=func1,args=('xiaohei',25)).start()

'''
xiaobai 20
xiaohei 25
'''
相关文章
相关标签/搜索