线程(thread)是操做系统可以进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运做单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中能够并发多个线程,每条线程并行执行不一样的任务。python
因为GIL(Global Interpreter Lock)的存在,python并不能真的实现并行,只能同时容许一个进程运行。GIL是CPython解释器的概念,并非python的缺陷。安全
threading模块多线程
python中经过threading模块实现多线程并发
线程的2种调用方式app
直接调用dom
threading.Thread(target=sayhi, args=(1, )) target的值是函数名,args传入的参数,元组形式函数
1 import threading 2 import time 3 4 def sayhi(num): #定义每一个线程要运行的函数 5 6 print("running on number:%s" %num) 7 8 time.sleep(3) 9 10 if __name__ == '__main__': 11 12 t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例 13 t2 = threading.Thread(target=sayhi,args=(2,)) #生成另外一个线程实例 14 15 t1.start() #启动线程 16 t2.start() #启动另外一个线程 17 18 print(t1.getName()) #获取线程名 19 print(t2.getName()) 20 ####### 结果 ########### 21 running on numbei: 1 22 running on numbei: 2 23 Thread-1 24 Thread-2
继承式调用ui
1 import threading 2 import time 3 4 5 class MyThread(threading.Thread): 6 def __init__(self,num): 7 threading.Thread.__init__(self) 8 self.num = num 9 10 def run(self):#定义每一个线程要运行的函数 11 12 print("running on number:%s" %self.num) 13 14 time.sleep(3) 15 16 if __name__ == '__main__': 17 18 t1 = MyThread(1) 19 t2 = MyThread(2) 20 t1.start() 21 t2.start()
join & Daemonspa
1 import threading 2 from time import ctime, sleep 3 4 5 def music(func): 6 for i in range(2): 7 print("Begin listening to %s. %s" % (func, ctime())) 8 sleep(2) 9 print("end listening %s" % ctime()) 10 11 12 def move(func): 13 for i in range(2): 14 print("Begin watching at the %s! %s" % (func, ctime())) 15 sleep(3) 16 print('end watching %s' % ctime()) 17 18 threads = [] 19 t1 = threading.Thread(target=music, args=('七里香',)) 20 threads.append(t1) 21 t2 = threading.Thread(target=move, args=('阿甘正传',)) 22 threads.append(t2) 23 24 if __name__ == '__main__': 25 26 for t in threads: 27 # t.setDaemon(True) 28 t.start() 29 # t.join() 30 # t1.join() 31 t2.join() # 考虑这三种join位置下的结果? 32 print("all over %s" % ctime())
join():操作系统
在子线程完成以前,这个子线程的父线程将一直被阻塞。
setDaemon(True):
将线程声明为守护线程,必须在start() 方法调用以前设置, 若是不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当咱们 在程序运行中,执行一个主线程,若是主线程又建立一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。可是有时候咱们须要的是 只要主线程完成了,无论子线程是否完成,都要和主线程一块儿退出,这时就能够 用setDaemon方法啦
join以子线程为主判断,setDaemon(True)以主线程为主判断
thread 模块提供的其余方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。 # 除了使用方法外,线程模块一样提供了Thread类来处理线程,Thread类提供了如下方法: # run(): 用以表示线程活动的方法。 # start():启动线程活动。 # join([time]): 等待至线程停止。这阻塞调用线程直至线程的join() 方法被调用停止-正常退出或者抛出未处理的异常-或者是可选的超时发生。 # isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。
同步锁
1 import time 2 import threading 3 4 def addNum(): 5 global num #在每一个线程中都获取这个全局变量 6 # num-=1 7 8 temp=num 9 print('--get num:',num ) 10 #time.sleep(0.1) 11 num =temp-1 #对此公共变量进行-1操做 12 13 14 num = 100 #设定一个共享变量 15 thread_list = [] 16 for i in range(100): 17 t = threading.Thread(target=addNum) 18 t.start() 19 thread_list.append(t) 20 21 for t in thread_list: #等待全部线程执行完毕 22 t.join() 23 24 print('final num:', num )
多个线程同时操做同一个共享资源,因此形成了资源破坏,使用join时会把整个线程停住,形成了串行,失去了多线程的意义,因此要把计算的时候串行
使用同步锁
threading.Lock()
1 import time 2 import threading 3 4 def addNum(): 5 global num #在每一个线程中都获取这个全局变量 6 # num-=1 7 lock.acquire() # 锁开始 8 temp=num 9 print('--get num:',num ) 10 #time.sleep(0.1) 11 num =temp-1 #对此公共变量进行-1操做 12 lock.release() # 锁结束 13 14 num = 100 #设定一个共享变量 15 thread_list = [] 16 lock=threading.Lock() 17 18 for i in range(100): 19 t = threading.Thread(target=addNum) 20 t.start() 21 thread_list.append(t) 22 23 for t in thread_list: #等待全部线程执行完毕 24 t.join() 25 26 print('final num:', num )
锁中的内容一次只容许一个进程执行
线程死锁和递归锁
在线程间共享多个资源的时候,若是两个线程分别占有一部分资源而且同时等待对方的资源,就会形成死锁,由于系统判断这部分资源都正在使用,全部这两个线程在无外力做用下将一直等待下去。下面是一个死锁的例子:
1 import threading,time 2 3 class myThread(threading.Thread): 4 def doA(self): 5 lockA.acquire() 6 print(self.name,"gotlockA",time.ctime()) 7 time.sleep(3) 8 lockB.acquire() 9 print(self.name,"gotlockB",time.ctime()) 10 lockB.release() 11 lockA.release() 12 13 def doB(self): 14 lockB.acquire() 15 print(self.name,"gotlockB",time.ctime()) 16 time.sleep(2) 17 lockA.acquire() 18 print(self.name,"gotlockA",time.ctime()) 19 lockA.release() 20 lockB.release() 21 def run(self): 22 self.doA() 23 self.doB() 24 if __name__=="__main__": 25 26 lockA=threading.Lock() 27 lockB=threading.Lock() 28 threads=[] 29 for i in range(5): 30 threads.append(myThread()) 31 for t in threads: 32 t.start() 33 for t in threads: 34 t.join()#等待线程结束,后面再讲。
为了支持在同一进程中屡次请求同一资源,使用“可重入锁”
threading.RLock()
1 import time 2 3 import threading 4 5 class Account: 6 def __init__(self, _id, balance): 7 self.id = _id 8 self.balance = balance 9 self.lock = threading.RLock() 10 11 def withdraw(self, amount): 12 13 with self.lock: 14 self.balance -= amount 15 16 def deposit(self, amount): 17 with self.lock: 18 self.balance += amount 19 20 21 def drawcash(self, amount): # lock.acquire中嵌套lock.acquire的场景 22 23 with self.lock: 24 interest=0.05 25 count=amount+amount*interest 26 27 self.withdraw(count) 28 29 30 def transfer(_from, to, amount): 31 32 # 锁不能够加在这里 由于其余的其它线程执行的其它方法在不加锁的状况下数据一样是不安全的 33 _from.withdraw(amount) 34 35 to.deposit(amount) 36 37 38 39 alex = Account('alex',1000) 40 yuan = Account('yuan',1000) 41 42 t1=threading.Thread(target = transfer, args = (alex,yuan, 100)) 43 t1.start() 44 45 t2=threading.Thread(target = transfer, args = (yuan,alex, 200)) 46 t2.start() 47 48 t1.join() 49 t2.join() 50 51 print('>>>',alex.balance) 52 print('>>>',yuan.balance)
条件变量同步(Condition)
有一些线程须要知足条件后才能继续执行python提供了threading.Condition对象用于条件变量线程的支持
lock_con = threading.Condition([Lock/Rlock]) 锁是可选项,不穿入锁,对象自动建立一个RLock
wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已得到锁定,不然将抛出异常。
notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试得到锁定(进入锁定池);其余线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已得到锁定,不然将抛出异常。激活时从锁的acquire开始执行,而不是从wait开始
notifyAll(): 调用这个方法将通知等待池中全部的线程,这些线程都将进入锁定池尝试得到锁定。调用这个方法不会释放锁定。使用前线程必须已得到锁定,不然将抛出异常。
实例
1 import threading,time 2 from random import randint 3 class Producer(threading.Thread): 4 def run(self): 5 global L 6 while True: 7 val=randint(0,100) 8 print('生产者',self.name,":Append"+str(val),L) 9 if lock_con.acquire(): 10 L.append(val) 11 lock_con.notify() # 通知wait() 12 lock_con.release() 13 time.sleep(3) 14 class Consumer(threading.Thread): 15 def run(self): 16 global L 17 while True: 18 lock_con.acquire() 19 if len(L)==0: 20 lock_con.wait() # 等待notify() 通知 21 print('消费者',self.name,":Delete"+str(L[0]),L) 22 del L[0] 23 lock_con.release() 24 time.sleep(0.25) 25 26 if __name__=="__main__": 27 28 L=[] 29 lock_con=threading.Condition() 30 threads=[] 31 for i in range(5): 32 threads.append(Producer()) 33 threads.append(Consumer()) 34 for t in threads: 35 t.start() 36 for t in threads: 37 t.join()
多线程通讯
同步条件(Event)
条件同步和条件变量同步意思差很少,只是不能加锁
event = threading.Event() 条件环境对象,初始值为false
event.isSet():返回event的状态值; event.wait():若是 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度; event.clear():恢复event的状态值为False。
1 import threading,time 2 class Boss(threading.Thread): 3 def run(self): 4 print("BOSS:今晚你们都要加班到22:00。") 5 event.isSet() or event.set() 6 time.sleep(5) 7 print("BOSS:<22:00>能够下班了。") 8 event.isSet() or event.set() 9 class Worker(threading.Thread): 10 def run(self): 11 event.wait() 12 print("Worker:哎……命苦啊!") 13 time.sleep(0.25) 14 event.clear() 15 event.wait() 16 print("Worker:OhYeah!") 17 if __name__=="__main__": 18 event=threading.Event() 19 threads=[] 20 for i in range(5): 21 threads.append(Worker()) 22 threads.append(Boss()) 23 for t in threads: 24 t.start() 25 for t in threads: 26 t.join()
信号量(Semaphore)
信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其余线程调用release()。(相似于停车位的概念)
BoundedSemaphore与Semaphore的惟一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,若是超过了将抛出一个异常。
1 import threading,time 2 class myThread(threading.Thread): 3 def run(self): 4 if semaphore.acquire(): 5 print(self.name) 6 time.sleep(5) 7 semaphore.release() 8 if __name__=="__main__": 9 semaphore=threading.Semaphore(5) 10 thrs=[] 11 for i in range(100): 12 thrs.append(myThread()) 13 for t in thrs: 14 t.start()
多线程利器(queue)
建立一个“队列”对象 import queue q = queue.Queue(maxsize = 10) queue.Queue类便是一个队列的同步实现。队列长度可为无限或者有限。可经过Queue的构造函数的可选参数maxsize来设定队列长度。若是maxsize小于1就表示队列长度无限。 将一个值放入队列中 q.put(10) 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为 1。若是队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。若是block为0,put方法将引起Full异常。 将一个值从队列中取出 q.get() 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。若是队列为空且block为True,get()就使调用线程暂停,直至有项目可用。若是队列为空且block为False,队列将引起Empty异常。 Python Queue模块有三种队列及构造函数: 一、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize) 二、LIFO相似于堆,即先进后出。 class queue.LifoQueue(maxsize) 三、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) 此包中的经常使用方法(q = queue.Queue()): q.qsize() 返回队列的大小 q.empty() 若是队列为空,返回True,反之False q.full() 若是队列满了,返回True,反之False q.full 与 maxsize 大小对应 q.get([block[, timeout]]) 获取队列,timeout等待时间 q.get_nowait() 至关q.get(False) 非阻塞 q.put(item) 写入队列,timeout等待时间 q.put_nowait(item) 至关q.put(item, False) q.task_done() 在完成一项工做以后,q.task_done() 函数向任务已经完成的队列发送一个信号 q.join() 实际上意味着等到队列为空,再执行别的操做
相似列表,不过列表在多线程里不安全