上一篇文章介绍了:建立线程的两种方式、Event对象判断线程是否启动、利用信号量控制线程并发。html
博客连接:线程、进程、协程和GIL(二)程序员
这一篇来讲说线程间通讯的那些事儿: 安全
一个线程向另外一个线程发送数据最安全的方式就是使用queue库中的队列了,经过建立一个供多个线程共享的Queue对象,这些线程使用put()和get()操做来向队列中添加数据或者从队列中取出数据,以达到线程间通讯的效果。微信
queue队列基本方法:数据结构
queue.Queue(maxsize = num): FIFO 先进先出队列,若是maxsize小于或等于0 则表明队列长度无线。并发
queue.LifoQueue(maxsize = num): LIFO 后进先出队列(相似于栈),若是maxsize小于或等于0 则表明队列长度无线。app
Queue.qsize(): 返回当前队列中元素的个数函数
Queue.empty() 若是队列为空,返回True,反之False ui
Queue.full() 若是队列满了,返回True,反之Falsespa
Queue.get([block[, timeout]]) 读队列,timeout等待时间
Queue.put(item, [block[, timeout]]) 写队列,timeout等待时间
Queue.queue.clear() 清空队列
使用Queue构造生产者消费者模型来实现线程间的通讯:
import time from queue import Queue,LifoQueue from threading import Thread def producer(in_q): while True: time.sleep(1) data = '包子' if in_q.full() == True: print('蒸笼满了,放不下了') in_q.put(data) # 向队列中塞东西 print('小明蒸%s!' %(data)) def customer(out_q): while True: time.sleep(3) if out_q.empty() == True: print('小红没取到包子,饿死了!') data = out_q.get() # 从队列中取东西 print('小红取 %s ' % (data)) if __name__ == '__main__': q = Queue(maxsize=3) t1 = Thread(target=producer, args=(q,)) t2 = Thread(target=customer, args=(q,)) t1.start() t2.start()
上面的代码实现了一个简单的生产者消费者,小明负责蒸包子,小红负责吃包子。当队列被包子塞满时,小明就再也放不进去了,此时生产者这个线程就会阻塞。当小红将队列中的包子吃完时,消费者这个线程就会阻塞。由于Queue对象已经封装了必要的锁,因此咱们能够在多个线程之间安全的功能共享数据。可是在生产者消费者的关闭问题会有一些麻烦,通用的解决方式就是在队列中放置一个特殊值,当消费者读到这个值时,就终止执行。
不过有个问题须要注意:向队列中添加数据项时,并不会复制此数据项,线程间的通讯其实是在线程间传递对象引用。若是你单线对象的共享状态,那么最好只传递不可修改的数据结构(如:整型、字符串、或者元组)或者一个对象的深拷贝。
给关键部分加锁
线程的不安全:同一进程里线程是共享数据的,当各个线程访问同一个数据资源时会出现竞争状态,即数据可能会同时被多个线程占用,形成数据混乱,这就是线程的不安全。
为了保证线程安全,因此引进了互斥锁,确保某段关键代码、共享数据只能由一个线程从头至尾完整地执行:
显式的加锁:
from threading import Thread, Lock num = 0 lock = Lock() # 定义一个锁 def run(): global num, lock # 获取全局变量 lock.acquire() # 加锁 num += 1 print(num) lock.release() # 释放锁 if __name__ == '__main__': Thread_list = [] for i in range(1000): t = Thread(target=run) Thread_list.append(t) for i in Thread_list: i.start()
死锁:可是加入互斥锁以后有可能会产生一个问题:死锁:若干子线程在系统资源竞争时,都在等待对方对某部分资源解除占用状态,结果谁也不肯意先解锁,互相等着,程序没法执行下去,这就是死锁。
好比:有两个线程1、二,两个共享资源A、B,线程一给资源A加锁,线程二给资源B加锁,而后资源A须要访问资源B,资源B须要调用资源A,线程一二双方都在等待对方释放锁,因此就会形成死锁。
But、当程序员在加锁以后忘记调用release()方法,或者加锁以后程序抛异常致使不能正常释放锁,有可能会形成死锁,为了不这种状况,咱们不须要显式的手动加锁和释放锁,而是使用with语句来进行自动控制:
from threading import Thread, Lock num = 0 lock = Lock() # 定义一个锁 def run(): global num, lock with lock: # 自动的控制加锁和释放锁 num += 1 print(num) if __name__ == '__main__': Thread_list = [] for i in range(1000): t = Thread(target=run) Thread_list.append(t) for i in Thread_list: i.start()
建立一个线程池:
concurrent.futures 函数库有一个 ThreadPoolExecutor 类能够被用来完成这个任务
from concurrent.futures import ThreadPoolExecutor def run(): print('我是子线程') if __name__ == '__main__': pool = ThreadPoolExecutor(max_workers=3) # 建立一个容量为3的线程池 for i in range(3): t = pool.submit(run,) #在线程池中生成三个线程,他们都来调用run方法 print('我是主线程')
想了解更多Python关于爬虫、数据分析的内容,欢迎你们关注个人微信公众号:悟道Python