Event 事件 是线程间通讯的最简单方法之一,主要用于线程同步。html
定义一个全局内置标志Flag,若是Flag为False,执行到 event.wait 时程序就会阻塞,若是Flag为True,event.wait 便不会阻塞多线程
【注意若是处于阻塞状态,无论在哪使得Flag为true,wait都会继续执行】app
set() 将标志设置为True,并通知全部处于阻塞状态的线程恢复运行ui
clear() 将标志设置为Falseurl
isSet() 获取内置标志的状态,返回 True 或者 Falsespa
wait(timeout) 若是标志为False,将使得线程阻塞,若是为True,继续运行,默认为False线程
示例代码--等通知代理
import threading import time event = threading.Event() def chihuoguo(name): # 等待事件,进入等待阻塞状态 print '%s 已经启动' % threading.currentThread().getName() print '小伙伴 %s 已经进入就餐状态!'%name time.sleep(1) event.wait() # 收到事件后进入运行状态 print '%s 收到通知了.' % threading.currentThread().getName() print '小伙伴 %s 开始吃咯!'%name threads = [] thread1 = threading.Thread(target=chihuoguo, args=("a", )) thread2 = threading.Thread(target=chihuoguo, args=("b", )) threads.append(thread1) threads.append(thread2) for thread in threads: thread.start() time.sleep(0.1) # 发送事件通知 print '主线程通知小伙伴开吃咯!' event.set()
示例代码--互相通知code
import threading import time def producer(): print u'等人来买包子....' event.wait() #event.clear() print event.isSet() print u'chef:sb is coming for baozi...' print u'chef:making a baozi for sb...' time.sleep(5) print u'chef:你的包子好了...' event.set() def consumer(): print u'chenchao:去买包子....' event.set() time.sleep(2) print 'chenchao:waiting for baozi to be ready...' print event.wait() print u'chenchao:哎呀真好吃....' event = threading.Event() p = threading.Thread(target=producer,args=()) c = threading.Thread(target=consumer,args=()) p.start() c.start()
输出server
等人来买包子.... chenchao:去买包子.... True chef:sb is coming for baozi... chef:making a baozi for sb... chenchao:waiting for baozi to be ready... True chenchao:哎呀真好吃.... chef:你的包子好了...
上面实现了一个生产者-消费者模式,显然有错误,包子还没作好就吃上了。
稍微细心的缕下思路就会发现,消费者中的wait并无阻塞线程,由于Flag此时为True
解决方法:
1. 用另外一个 event2 来阻塞线程
2. 在生产者得到set时及时把Flag设置为False 【取消生产者中 event.clear() 的注释便可】
注意点1
import threading event = threading.Event() print(1) print(event.wait()) # 打印也会使线程阻塞 print(2)
注意点2
import time import threading def myfunc(): while 1: time.sleep(1) print(1) event = threading.Event() ts = [] if len(ts) > 2: event.wait() # 此时阻塞,已经开启的线程将继续运行 for i in range(12): t = threading.Thread(target=myfunc) t.start() ts.append(t)
多线程验证代理ip的有效性
问题:计算机并不能无休止的增长线程,每台计算机都有本身的上限
### 计算机可以执行的最大线程数 def myfunc(): time.sleep(2) count = 0 while 1: count += 1 print(count) t = threading.Thread(target=myfunc) t.start()
超过上限,就会报错
thread.error: can't start new thread
思路:设置最大线程数,当启动的线程超过最大限制时,阻塞,再也不生成新线程,而且持续跟踪线程数,一旦减少或者小于某个阈值,就取消阻塞,继续生成线程
class MyTestProxy(object): def __init__(self): self.sFile = 'ip.txt' self.dFile = 'alive.txt' self.url = 'https://www.qiushibaike.com/text/' self.threadmax = 500 # 最大线程数 self.threadmin = 400 # 最低线程数 self.timeout = 3 self.regex = re.compile('qiushibaike.com') self.aliveList = [] self.event = threading.Event() self.event2 = threading.Event() self.lock = threading.Lock() self.run() def run(self): with open(self.sFile, 'rb') as fp: lines = fp.readlines() self.ts = 0 # 初始化线程数 while lines: if self.ts > self.threadmax: self.event.clear() self.event.wait() # 超过设定线程就阻塞 line = lines.pop() t = threading.Thread(target=self.linkWithProxy, args=(line, )) t.start() self.lock.acquire() self.ts += 1 # 启动一个就加1,ts 被其余线程一直在更新,因此加锁 self.lock.release() self.event2.wait() # 处理完毕后统一存储 with open(self.dFile, 'w') as fp: for i in range(len(self.aliveList)): fp.write(self.aliveList[i]) def act(self): # 执行完一个线程就减1,由于同时执行,要加锁 self.lock.acquire() self.ts -= 1 self.lock.release() print(self.ts) if self.ts < self.threadmin: self.event.set() # 小于最低线程取消阻塞 if self.ts == 0: self.event2.set() def linkWithProxy(self, line): # 爬虫 server = line.strip() # print(server) protocol = line.split(':')[0] opener = urllib2.build_opener(urllib2.ProxyHandler({protocol:server})) urllib2.install_opener(opener) try: response = urllib2.urlopen(self.url, timeout=self.timeout) except: return else: try: str = response.read() if self.regex.search(str): # print(str) print('%s connect success'%server) print(response.geturl()) self.aliveList.append(line) except: return finally: self.act() if __name__ == '__main__': time.clock() tp = MyTestProxy() print(time.clock())
效率仍是不错的
参考资料:
http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html