线程相关 GIL queue event 死锁与递归锁 信号量l

#1、GIL全局解释器锁

GIl是一个互斥锁:保证数据的安全(以牺牲效率来换取数据的安全)
阻止同一个进程内多个线程同时执行(不能并行但能实现并发)
并发:看起来像同时进行的)
GIL全局解释器存在的缘由是由于Cpython解释器的内存管理不是线程安全的

垃圾回收机制:  (能够做为一种线程)
    一、引用计数
    二、标记清除
    三、分带回收

同一个进程下的多个线程不能实现并行可是可以实现并发,多个进程下的线程可以实现并行

一、存在四个任务:计算密集型的任务 每一个耗时10s
    单核状况下:多线程好一点,消耗的资源较少
    多核状况下:
        开四个进程:10s多一些
        一个进程下开启四个线程:40多秒
二、存在四个任务:IO密集型的任务 每一个任务IO 10s
    单核状况下:多线程好一些
    多核状况下:多线程好一些

  

#计算密集与IO密集状况下线程与进程的耗时比较

#计算密集型

from multiprocessing import Process
from threading import Thread
import os,time

def work():
    res = 0
    for i in range(12345678):
        res*=i

if __name__ == '__main__':
    l = []
    print(os.cpu_count())  #查看cpu核数
    start_time = time.time()
    for i in range(4):   #4个进程或者4个线程去计算
        # p = Process(target=work) #run time is 3.224184513092041
        p = Thread(target=work)   #run time is 6.93839693069458
        l.append(p)
        p.start()

    for p in l:
        p.join()
    stop_time = time.time()
    print('run time is %s'%(stop_time-start_time))


#IO密集型

from multiprocessing import Process
from threading import Thread
import time

def work():
    time.sleep(2)

if __name__ == '__main__':
    l = []
    start_time = time.time()
    for i in range(4):
        # p = Process(target=work)  #2.416138172149658
        p = Thread(target=work) #2.0021145343780518
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop_time = time.time()
    print(stop_time-start_time)


#结果验证正确

  

#2、GIL与普通锁的对比

#不加其余锁

from threading import Thread
import time

n = 100
def task():
    global n
    tmp = n
    time.sleep(0.1)
    n = tmp - 1

t_list = []
for i in range(100):
    t = Thread(target=task)
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(n)  #99
#解释:当启动线程后,因为GIL的存在,一次只能运行一个线程,在其拿到n=100的值后,遇到IO,进入阻塞态,这时
#系统强制结束该线程,换其余线程来执行,如此循环,致使全部线程拿到的都是n=100的值,最后一个线程结束后,返回的值是
# n=100-1,则为99。 这个结果没有达到咱们的初衷,所以须要依靠其余锁来保证数据的安全


#加上其余锁

from threading import Thread,Lock
import time

n=100
mutex = Lock()
def task():
    global n
    mutex.acquire()
    tmp = n
    time.sleep(0.1)
    n = tmp - 1
    mutex.release()

t_list = []
for i in range(100):
    t = Thread(target=task)
    t_list.append(t)
    t.start()
for t in t_list:
    t.join()
print(n) #0

#解释:加上了锁之后,第一个线程先拿到了锁,拿到了n=100这个数据,而后遇到了IO阻塞,系统强制结束该进程,其余线程开始运行,可是
#发现数据被加锁了,没法进一步运行,到时间了,系统又让其强制结束,如此往复,直到第一个进程再次拿到了执行权限,此时的阻塞已通过去,
# 该进程进行相关运算,再释放锁,交给其余线程去竞争,如此循环往复,最后获得n = 0



所以对于不一样的数据,要想保证安全,须要加上不一样的锁去处理
GIL并不能保证数据的安全,他是对Cpython解释器加锁,针对的是线程
保证的是在同一个进程中多个线程一个时间内只能运行一个线程

  

#3、死锁与递归锁

#死锁

from threading import Thread,Lock
import time

mutex1 = Lock()
mutex2 = Lock()

class MyThead(Thread):
    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):
        mutex1.acquire()
        print('%s 抢到A锁'%self.name)
        mutex2.acquire()
        print('%s 抢到了B锁'%self.name)
        mutex2.release()
        print('%s释放了B锁'%self.name)
        mutex1.release()
        print('%s 释放了A锁'%self.name)

    def fun2(self):
        mutex2.acquire()
        print('%s抢到了B锁'%self.name)
        time.sleep(1)
        mutex1.acquire()
        print('%s 抢到了A锁'%self.name)
        mutex1.release()
        print('%s 释放了A锁'%self.name)
        mutex2.release()
        print('%s 释放了B锁'%self.name)

for i in range(100):
    t = MyThead()
    t.start()

#结果:

Thread-1 抢到A锁
Thread-1 抢到了B锁
Thread-1释放了B锁
Thread-1 释放了A锁
Thread-1抢到了B锁
Thread-2 抢到A锁
而后程序就会卡着,此时就陷入了死锁 
由于线程1拿到了B锁,线程2拿到了A锁,此时彼此拿着对方的命脉,不给对方活路,除非有外力,否则就会一直这样


#递归锁

from threading import RLock,Thread
import time

mutexA = mutexB =RLock()

class MyThead(Thread):
    def run(self):
        self.fun1()
        self.fun2()
    def fun1(self):
        mutexA.acquire()
        print('%s抢到了A锁'%self.name)
        mutexB.acquire()
        print('%s抢到了B锁'%self.name)
        mutexB.release()
        print('%s释放了B锁'%self.name)
        mutexA.release()
        print('%s释放了A锁'%self.name)
    def fun2(self):
        mutexB.acquire()
        print('%s抢到了B锁'%self.name)
        time.sleep(1)
        mutexA.acquire()
        print('%s抢到了A锁'%self.name)
        mutexA.release()
        print('%s释放了A锁'%self.name)
        mutexB.release()
        print('%s释放了B锁'%self.name)

for i in range(10):
    t = MyThead()
    t.start()


#总结

自定义锁一次acquire必须对应一次release,不能连续acquire
递归锁Rlock:这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源能够被屡次require。
直到一个线程全部的acquire都被release,其余的线程才能得到资源。
mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的状况,则counter继续加1,
这期间全部其余线程都只能等待,等待该线程释放全部锁,即counter递减到0为止

  

#4、信号量

from threading import Thread,Semaphore
import time,random

sm = Semaphore(5)  #至关于厕全部5个茅坑

def task(name):
    sm.acquire()
    print('%s正在蹲坑'%name)
    time.sleep(random.randint(1,5)) #模拟蹲坑耗时
    sm.release()

for i in range(20):
    t = Thread(target=task,args=('巴豆%s号'%i,))
    t.start()
#和普通的互斥锁区别在于,普通的是独立卫生间,全部人抢一个坑位
#信号量 是公共卫生间,有多个茅坑,全部人抢多个坑位

  

#5、线程 queue   使用import queue ,用法与进程的Queue同样
#先进先出 queue.Queue
#先进后出 queue.LifoQueue
#优先级   queue.PriorityQueue


import queue

q = queue.Queue(3) #队列里最多放置的数据个数
q.put(1)
q.put(2)
print(q.get()) #1
print(q.get()) #2

q = queue.LifoQueue(3)
q.put(1)
q.put(2)
print(q.get()) #2
print(q.get()) #1

q = queue.PriorityQueue(3)
q.put((10,'a'))
q.put((-1,'a'))
q.put((0,'a'))
print(q.get()) #(-1, 'a')
print(q.get()) #(0, 'a')
print(q.get()) #(10, 'a')
#对于优先级,元组里的第一个元素一般是数字,也能够是非数字之间去比较大小
#比较的结果中,该元素越小,优先级越高

  

#6、event事件

线程的一个关键特性是每一个线程都是独立运行且状态不可预测。若是程序中的其 他线程须要经过判断某个线程的状态来肯定本身下一步
的操做,这时线程同步问题就会变得很是棘手。为了解决这些问题,咱们须要使用threading库中的Event对象。 对象包含一个可由线程
设置的信号标志,它容许线程等待某些事件的发生。在 初始状况下,Event对象中的信号标志被设置为假。若是有线程等待一个Event对象
, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程若是将一个Event对象的信号标志设置为真,
它将唤醒全部等待这个Event对象的线程。
若是一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

event.isSet():返回event的状态值;

event.wait():若是 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度;

event.clear():恢复event的状态值为False。

#多线程尝试链接MySQL
from threading import Thread,Event
import threading
import time,random

# event = Event()
def conn_mysql():
    count = 1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('连接超时')
        print('<%s>第%s尝试连接'%(threading.current_thread().getName(),count))
        event.wait(0.5)
        count += 1
    print('<%s>连接成功'%threading.current_thread().getName())

def check_mysql():
    print('\033[45m[%s]正在检查mysql\033[0m'%threading.current_thread().getName())
    time.sleep(random.random())
    event.set()
event = Event()
conn1 = Thread(target=conn_mysql)
conn2 = Thread(target=conn_mysql)
check = Thread(target=check_mysql)

conn1.start()
conn2.start()
check.start()