全称global interpreter lock 全局解释锁html
gil使得python同一个时刻只有一个线程在一个cpu上执行字节码,而且没法将多个线程映射到多个cpu上,即不能发挥多个cpu的优点。python
gil会根据执行的字节码行数以及时间片释放gil,也会在遇到IO操做时候主动释放。安全
操做系统可以调动的最小单元就是线程。最开始是进程,由于进程对资源的消耗大,因此演变成了线程。多线程
对于IO操做来讲,多线程和多进程性能差异不大。app
import threading import time def get_html(url): print('get html started') time.sleep(2) print('get html ended') def get_url(url): print('get url started') time.sleep(2) print('get url ended') get_html = threading.Thread(target=get_html, args=('url1',)) get_url = threading.Thread(target=get_url, args=('url2',)) if __name__ =='__main__': start_time = time.time() get_html.start() get_url.start() print(time.time() - start_time) 输出结果: get html started get url started 0.0009999275207519531 get html ended get url ended
此处由于自定义了两个线程,可是实际有三个线程,(还有一个主线程)由于直接线程.start()是非阻塞的,因此先会运行打印时间,而后再结束上面两个线程。若是想要等上面两个线程结束以后再执行主线程打印出时间话(即阻塞)能够有两种方法函数
①在线程开始前加入语言:(只要主线程结束以后就结束整个程序,Kill全部的子线程)性能
get_html.setDaemon(True)ui
get_url.setDaemon(True)编码
②在线程开始以后加入语言(将等待线程运行结束以后再往下继续执行代码):url
get_html.join()
get_url.join()
import threading import time class GetHtml(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): print('get html started') time.sleep(2) print('get html ended') class GetUrl(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): print('get url started') time.sleep(2) print('get url ended') get_html = GetHtml('HTML') get_url = GetUrl('URL') if __name__ =='__main__': start_time =time.time() get_html.start() get_url.start() get_html.join() get_url.join() print(time.time() - start_time) 输出结果: get html started get url started get html ended get url ended 2.0011143684387207
import time import threading url_list = [] def get_html(): global url_list url = url_list.pop() print('get html form {} started'.format(url)) time.sleep(2) print('get html from {} ended'.format(url)) def get_url(): global url_list print('get url started') time.sleep(2) for i in range(20): url_list.append('http://www.baidu.com/{id}'.format(id=i)) print('get url ended') if __name__ == '__main__': thread_url = threading.Thread(target=get_url) for i in range(10): thread_html = threading.Thread(target=get_html) thread_html.start()
上述代码比较原始,不灵活,能够将全局变量url_list经过参数传入函数调用
import time
import threading url_list = [] def get_html(url_list): url = url_list.pop() print('get html form {} started'.format(url)) time.sleep(1) print('get html from {} ended'.format(url))
def add_url(url_list): print('add url started') time.sleep(1) for i in range(20): url_list.append('http://www.baidu.com/{id}'.format(id=i)) print('add url ended') if __name__ == '__main__': thread_url = threading.Thread(target=add_url, args=(url_list,)) thread_url.start() thread_url.join() for i in range(20): thread_html = threading.Thread(target=get_html, args=(url_list,)) thread_html.start()
还有一种方式为新建一个py文件,而后在文件中定义一个变量,url_list = [] 而后开头的时候用import导入这个变量便可。这种方式对于变量不少的状况下为避免混乱统一将变量进行管理。可是此方式必定要注意import的时候只要import到文件,而不要import到变量。(好比说文件名为variables.python内定义一个变量名url_list=[], 须要import variables,而后代码中用variables.url_list 而不是 from variables import url_list 由于后一种方式导入的话,在其余线程修改此变量的时候,咱们是看不到的。可是第一种方式能够看到。
总结:无论以何种形式共享全局变量,都不是线程安全的操做,因此为了达到线程安全,就须要用到线程锁,lock的机制,代码就会比较复杂,全部引入了一种安全的线程通讯,from queue import Queue
import time import threading from queue import Queue def get_html(queue): url = queue.get() print('get html form {} started'.format(url)) time.sleep(1) print('get html from {} ended'.format(url)) def add_url(queue): print('add url started') time.sleep(1) for i in range(20): queue.put('http://www.baidu.com/{id}'.format(id=i)) print('add url ended') if __name__ == '__main__': url_queue = Queue(maxsize=1000) # 设置队列中元素的max个数。 thread_url = threading.Thread(target=add_url, args=(url_queue,)) thread_url.start() thread_url.join() list1=[] for i in range(20): thread_html = threading.Thread(target=get_html, args=(url_queue,)) list1.append(thread_html) for i in list1: i.start()
线程的同步(即当有一个线程在对内存进行操做时,其余线程都不能够对这个内存地址进行操做,直到该线程完成操做, 其余线程才能对该内存地址进行操做,而其余线程又处于等待状态)
问题:既然python有GIL机制,那么线程就是安全的,那么为何还有线程同步问题?
回到上面GIL的介绍(gil会根据执行的字节码行数以及时间片释放gil,也会在遇到IO操做时候主动释放)
再看一个经典的案列:若是GIL使线程绝对安全的话,那么最后结果恒为0,事实却不是这样。
from threading import Thread total = 0 def add(): global total for i in range(1000000): total += 1 def desc(): global total for i in range(1000000): total -= 1 thread1 = Thread(target=add) thread2 = Thread(target=desc) thread1.start() thread2.start() thread1.join() thread2.join() print(total)
312064
结果打印不稳定,都不会0,
注意,锁的获取和释放也须要时间,因而会对程序的运行性能产生必定的影响。并且极易形成死锁,因而对应的能够将Lock改成Rlock,就能够支持同时多个acquire进入锁,可是必定注意,Rlock只在单线程内起做用,而且acquire次数要和release次数想等。
import threading from threading import Lock l = Lock() a = 0 def add(): global a global l l.acquire() for i in range(1000000): a += i l.release() # 记得线程段结束运行以后必定须要解锁。否则其余程序就阻塞了。 def desc(): global a global l l.acquire() for i in range(1000000): a -= i l.release() thread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() thread1.join() # 再次注意若是线程只是start()没有join()的话,那么任意线程执行完了就会往下执行print语句,可是若是加了join的话,就会等thread1和thread2运行完以后在运行下面的语句。 thread2.join() print(a) 输出结果恒为0
复杂的线程通信的话lock机制已经再也不适用,例如:
from threading import Condition, Thread, Lock # 条件变量,用复杂的线程间的同步 lock = Lock() class Tom(Thread): def __init__(self, lock): self.lock = lock super().__init__(name='Tom') def run(self): self.lock.acquire() print('{}: hello, Bob.'.format(self.name)) self.lock.release() self.lock.acquire() print("{}: Let's have a chat.".format(self.name)) self.lock.release() class Bob(Thread): def __init__(self, lock): self.lock = lock super().__init__(name='Bob') def run(self): self.lock.acquire() print('{}: Hi, Tom.'.format(self.name)) self.lock.release() self.lock.acquire() print("{}:Well, I like to talk to you.".format(self.name)) self.lock.release() tom = Tom(lock) bob = Bob(lock) tom.start() bob.start() Tom: hello, Bob. Tom: Let's have a chat. Bob: Hi, Tom. Bob:Well, I like to talk to you.
为何会这样?缘由很简单,Tom在start()的时候,尚未来得及Bob start()以前就将全部的逻辑执行完了,其次,GIL切换的时候是根据时间片或者字节码行数来的,即也可能由于在时间片内将Tom执行完毕以后才切换到Bob。因而引入了条件变量机制,condition,
看condition原代码能够了解到,其集成了魔法方法__enter__ 和 __exit__因而能够用with语句调用,在__enter__方法中,调用了
def __enter__(self): return self._lock.__enter__()
而__enter__() 方法则直接调用了acquire方法, 同时acquire其实就是调用了Rlock.acquire()方法。因此condition内部其实仍是使用了Rlock方法来实现。同理__exit__则调用了Rlock.release()
重要方法 wait()和notify()
wait()容许咱们等待某个条件变量的通知,而notify()方法则是发送一个通知。因而就能够修改上述代码:
from threading import Condition, Thread, Lock # 条件变量,用复杂的线程间的同步 class Tom(Thread): def __init__(self, condition): self.condition = condition super().__init__(name='Tom') def run(self): with self.condition: print('{}: hello, Bob.'.format(self.name)) self.condition.notify() self.condition.wait() print("{}: Let's have a chat.".format(self.name)) self.condition.notify() class Bob(Thread): def __init__(self, condition): self.condition = condition super().__init__(name='Bob') def run(self): with self.condition: self.condition.wait() print('{}: Hi, Tom.'.format(self.name)) self.condition.notify() self.condition.wait() print("{}:Well, I like to talk to you.".format(self.name)) if __name__ == '__main__': condition = Condition() tom = Tom(condition) bob = Bob(condition) bob.start() tom.start()
上述代码注意:
用于控制进入某段代码线程的数量,好比说作爬虫的时候,在请求页面的时候防止线程数量过多,短期内请求频繁被发现,可使用semaphore来控制进入请求的线程数量。
from threading import Thread, Semaphore, Condition, Lock, RLock import time class GetHtml(Thread): def __init__(self, url, sem): super().__init__() self.url = url self.sem = sem def run(self): time.sleep(2) print('get html successful.') self.sem.release() # 开启以后记得要释放。 class GetUrl(Thread): def __init__(self, sem): super().__init__() self.sem = sem def run(self): for i in range(20): self.sem.acquire() # 开启semaphore get_html = GetHtml('www.baidu.com/{}'.format(i), self.sem) get_html.start() if __name__ == '__main__': sem = Semaphore(3) # 接受一个参数,设置最大进入的线程数为3 get_url = GetUrl(sem) get_url.start()
from concurrent import futures
出了控制线程数量的其它功能:
import time from concurrent.futures import ThreadPoolExecutor def get_html(times): time.sleep(times) print('get page{} success'.format(times)) return times excutor = ThreadPoolExecutor(max_workers=2) task1 = excutor.submit(get_html, 3) #task1为一个Tuture类对象, submit方法是非阻塞的,当即返回的。第二个参数为函数参数 tesk2 = excutor.submit(get_html, 2) print(task1.done()) # 判断函数是否执行成功 输出结果: False get page2 success get page3 success
分析:由于submit方法是非阻塞的,当即返回的。后面的print代码不会等待task1运行结束。若是加入等待时间等待task1完成则将返回True:
import time from concurrent.futures import ThreadPoolExecutor def get_html(times): time.sleep(times) print('get page{} success'.format(times)) return times excutor = ThreadPoolExecutor(max_workers=2) task1 = excutor.submit(get_html, 3) #task1为一个futures类对象, submit方法是非阻塞的,当即返回的。第二个参数为函数参数 tesk2 = excutor.submit(get_html, 2) print(task1.done()) # 判断函数是否执行成功 time.sleep(4) print(task1.done()) 输出结果: False get page2 success get page3 success True
代码后面加入
print(task1.result()) # 用result()方法能够获取到线程函数返回的结果。
能够用result()方法能够获取到线程函数返回的结果。
用代码:print(task1.cancel())能够将task1在运行以前取消掉,若是取消成功则返回True,反之False
import time from concurrent.futures import ThreadPoolExecutor def get_html(times): time.sleep(times) print('get page{} success'.format(times)) return times excutor = ThreadPoolExecutor(max_workers=1) # 将线程池数量改成1,让tesk2先等待不执行,方便取消。 task1 = excutor.submit(get_html, 3) #task1为一个futures类对象, submit方法是非阻塞的,当即返回的。第二个参数为函数参数 tesk2 = excutor.submit(get_html, 2) print(task1.done()) # 判断函数是否执行成功 print(tesk2.cancel()) time.sleep(4) print(task1.done()) print(task1.result()) # 用result()方法能够获取到线程函数返回的结果。 输出结果:(结果无get page 2 sucess) False True get page3 success True 3
import time from concurrent.futures import ThreadPoolExecutor, as_completed def get_html(times): time.sleep(times) print('get page{} success'.format(times)) return times excutor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] all_task = [excutor.submit(get_html, url) for url in urls] for futures in as_completed(all_task): data = futures.result() print('get {} page'.format(data)) 输出结果: get page2 success get 2 page get page3 success get 3 page get page4 success get 4 page
代码分析:能够看到由于excutor.submit()是非阻塞的,由打印结果能够看出,没一个线程执行成功以后,as_complete()就会拿到其结果。
import time from concurrent.futures import ThreadPoolExecutor, as_completed def get_html(times): time.sleep(times) print('get page{} success'.format(times)) return times excutor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] for data in excutor.map(get_html, urls): print('get {} page'.format(data)) 结果: get page2 success get page3 success get 3 page get 2 page get page4 success get 4 page
能够看到用excutor.map方法不是完成一个打印一个,而是按照参数列表中的顺序,先get第一个参数结果,而后依次get,推荐可使用第一种as_complete()方式。
等待全部线程完成以后再往下走,wait()里面也能够选择参数return_when,默认是ALL_COMPLETE,若是为FIRST_COMPLETE(注意该参数须要在前面的import先导入)则第一个执行完成以后就会往下执行。
import time from concurrent.futures import ThreadPoolExecutor, as_completed, wait def get_html(times): time.sleep(times) print('get page{} success'.format(times)) return times excutor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] all_task = [excutor.submit(get_html, url) for url in urls] wait(all_task) print('主线程结束') 打印结果: get page2 success get page3 success get page4 success 主线程结束