前面的一些文章和脚本都是只能作学习多线程的原理使用,实际上什么有用的事情也没有作。接下来进行多线程的实践,看一看在实际项目中是怎么使用多线程的。python
Bookrank.py:程序员
该脚本经过单线程进行下载图书排名信息的调用数据库
1 from atexit import register 2 from re import compile 3 from threading import Thread 4 from time import sleep, ctime 5 import requests 6 7 REGEX = compile('#([\d,]+) in Books') 8 AMZN = 'https://www.amazon.com/dp/' 9 ISBNS = { 10 '0132269937': 'Core Python Programming', 11 '0132356139': 'Python Web Development with Django', 12 '0137143419': 'Python Fundamentals', 13 } 14 15 def getRanking(isbn): 16 url = '%s%s' % (AMZN, isbn) 17 page = requests.get(url) 18 data = page.text 19 return REGEX.findall(data)[0] 20 21 def _showRanking(isbn): 22 print '- %r ranked %s' % ( 23 ISBNS[isbn], getRanking(isbn)) 24 25 def _main(): 26 print 'At', ctime(), 'on Amazon' 27 for isbn in ISBNS: 28 _showRanking(isbn) 29 30 @register 31 def _atexit(): 32 print 'all DONE at:', ctime() 33 34 if __name__ == '__main__': 35 _main() 36
输出结果为:多线程
1 /usr/bin/python ~/Test_Temporary/bookrank.py 2 At Sat Jul 28 17:16:51 2018 on Amazon 3 - 'Core Python Programming' ranked 322,656 4 - 'Python Fundamentals' ranked 4,739,537 5 - 'Python Web Development with Django' ranked 1,430,855 6 all DONE at: Sat Jul 28 17:17:08 2018
上面的例子只是一个单线程程序,下面引入线程,并使用多线程再执行程序对比各自所需的时间。dom
将上面脚本中 _main() 函数的 _showRanking(isbn)
修改如下代码:函数
Thread(target=_showRanking, args=(isbn,)).start()
再次执行查看返回结果:oop
1 /usr/bin/python ~/Test_Temporary/bookrank.py 2 At Sat Jul 28 17:39:16 2018 on Amazon 3 - 'Python Fundamentals' ranked 4,739,537 4 - 'Python Web Development with Django' ranked 1,430,855 5 - 'Core Python Programming' ranked 322,656 6 all DONE at: Sat Jul 28 17:39:19 2018
从两个的输出结果中能够看出,使用单线程时整体完成的时间为 7s ,而使用多线程时,整体完成时间为 3s 。另一个须要注意的是,单线程版本是按照变量的顺序输出,而多线程版本按照完成的顺序输出。学习
通常在多线程代码中,总会有一些特定的函数或代码块不但愿(或不该该)被多个线程同时执行,一般包括修改数据库、更新文件或其它会产生竟态条件的相似状况。这就是须要使用同步的状况。ui
当任意数量的线程能够访问临界区的代码,但给定的时刻只有一个线程能够经过时,就是使用同步的时候了;url
程序员选择适合的同步原语,或者线程控制机制来执行同步;
进程同步有不一样的类型【参见:https://en.wikipedia.org/wiki/Synchronization_(computer_science) 】
同步原语有:锁/互斥、信号量。锁是最简单、最低级的机制,而信号量用于多线程竞争有限资源的状况。
锁有两种状态:锁定和未锁定。并且它也只支持两个函数:得到锁和释放锁。
当多线程争夺锁时,容许第一个得到锁的线程进入临界区,并执行代码;
全部以后到达的线程将被阻塞,直到第一个线程结束退出临界区并释放锁;
锁被释放后,其它等待的线程能够继续争夺锁,并进入临界区;
被阻塞的线程没有顺序,不会先到先得,胜出的线程是不肯定的。
代码示例(mtsleepF.py):
*注:该脚本派生了随机数量的线程,每一个线程执行结束时会进行输出
1 # -*- coding=utf-8 -*- 2 from atexit import register 3 from random import randrange 4 from threading import Thread, currentThread 5 from time import sleep, ctime 6 7 class CleanOutputSet(set): 8 def __str__(self): 9 return ', '.join(x for x in self) 10 11 loops = (randrange(2, 5) for x in range(randrange(3, 7))) 12 remaining = CleanOutputSet() 13 14 def loop(nsec): 15 myname = currentThread().name 16 remaining.add(myname) 17 print('这个是目前线程池中的线程:', remaining) 18 print('[%s] Started %s' % (ctime(), myname)) 19 sleep(nsec) 20 remaining.remove(myname) 21 print('[%s] Completed %s (%d secs)' % (ctime(), myname, nsec)) 22 print(' (remaining: %s)' % (remaining or 'None')) 23 24 def _main(): 25 for pause in loops: 26 Thread(target=loop, args=(pause,)).start() 27 28 @register 29 def _atexit(): 30 print('all DONE at:%s' % ctime()) 31 32 if __name__ == '__main__': 33 _main()
执行后的输出结果:
1 /usr/local/bin/python3.6 /Users/zhenggougou/Project/Test_Temporary/mtsleepF.py 2 这个是目前线程池中的线程: Thread-1 3 [Sat Jul 28 21:09:44 2018] Started Thread-1 4 这个是目前线程池中的线程: Thread-2, Thread-1 5 [Sat Jul 28 21:09:44 2018] Started Thread-2 6 这个是目前线程池中的线程: Thread-3, Thread-2, Thread-1 7 [Sat Jul 28 21:09:44 2018] Started Thread-3 8 这个是目前线程池中的线程: Thread-3, Thread-2, Thread-4, Thread-1 9 [Sat Jul 28 21:09:44 2018] Started Thread-4 10 这个是目前线程池中的线程: Thread-5, Thread-4, Thread-3, Thread-2, Thread-1 11 [Sat Jul 28 21:09:44 2018] Started Thread-5 12 这个是目前线程池中的线程: Thread-5, Thread-6, Thread-4, Thread-3, Thread-2, Thread-1 13 [Sat Jul 28 21:09:44 2018] Started Thread-6 14 [Sat Jul 28 21:09:46 2018] Completed Thread-2 (2 secs) 15 [Sat Jul 28 21:09:46 2018] Completed Thread-1 (2 secs) 16 [Sat Jul 28 21:09:46 2018] Completed Thread-3 (2 secs) 17 (remaining: Thread-5, Thread-6, Thread-4) 18 [Sat Jul 28 21:09:46 2018] Completed Thread-6 (2 secs) 19 (remaining: Thread-5, Thread-4) 20 [Sat Jul 28 21:09:46 2018] Completed Thread-4 (2 secs) 21 (remaining: Thread-5) 22 (remaining: Thread-5) 23 [Sat Jul 28 21:09:46 2018] Completed Thread-5 (2 secs) 24 (remaining: None) 25 (remaining: None) 26 all DONE at:Sat Jul 28 21:09:46 2018
从执行结果中能够看出,有的时候可能会存在多个线程并行执行操做删除 remaining 集合中数据的状况。好比上面结果中,线程一、二、3 就是同时执行去删除集合中数据的。因此为了不这种状况须要加锁,经过引入 Lock (或 RLock),而后建立一个锁对象来保证数据的修改每次只有一个线程能操做。
首先先导入锁类,而后建立锁对象
from threading import Thread, Lock, currentThread
lock = Lock()
而后使用建立的锁,将上面 mtsleepF.py 脚本中 loop() 函数作如下改变:
1 def loop(nsec): 2 myname = currentThread().name 3 lock.acquire() # 获取锁 4 remaining.add(myname) 5 print('这个是目前线程池中的线程:', remaining) 6 print('[%s] Started %s' % (ctime(), myname)) 7 lock.release() # 释放锁 8 sleep(nsec) 9 lock.acquire() # 获取锁 10 remaining.remove(myname) 11 print('[%s] Completed %s (%d secs)' % (ctime(), myname, nsec)) 12 print(' (remaining: %s)' % (remaining or 'None')) 13 lock.release() # 释放锁
在操做变量的先后须要进行获取锁和释放锁的操做,以保证在修改变量时只有一个线程进行。上面的代码有两处修改变量,一是:remaining.add(myname)
,二是:remaining.remove(myname)
。 因此上面代码中有两次获取锁和释放锁的操做。其实还有一种方案能够再也不调用锁的 acquire()
和 release()
方法,二是使用上下文管理,进一步简化代码。代码以下:
1 def loop(nesc): 2 myname = currentThread().name 3 with lock: 4 remaining.add(myname) 5 print('[{0}] Started {1}'.format(ctime(), myname)) 6 sleep(nesc) 7 with lock: 8 remaining.remove(myname) 9 print('[{0}] Completed {1} ({2} secs)'.format(ctime(), myname, nesc)) 10 print(' (remaining: {0})'.format(remaining or 'None'))
锁很是易于理解和实现,也很容易决定什么时候须要它们,然而,若是状况更加复杂,可能须要一个更强大的同步原语来代替锁。
信号量是最古老的同步原语之一。它是一个计数器,当资源消耗时递减,当资源释放时递增。能够认为信号量表明它们的资源可用或不可用。信号量比锁更加灵活,由于能够有多个线程,每一个线程都拥有有限资源的一个实例。
消耗资源使计数器递减的操做习惯上称为 P() —— acquire ;
当一个线程对一个资源完成操做时,该资源须要返回资源池中,这个操做通常称为 V() —— release 。
示例,糖果机和信号量(candy.py):
*注:该脚本使用了锁和信号量来模拟一个糖果机
1 # -*- coding=utf-8 -*- 2 from atexit import register 3 from random import randrange 4 from threading import BoundedSemaphore, Lock, Thread 5 from time import sleep, ctime 6 7 lock = Lock() 8 MAX = 5 9 candytray = BoundedSemaphore(MAX) 10 11 def refill(): 12 lock.acquire() 13 print('Refilling candy') 14 try: 15 candytray.release() # 释放资源 16 except ValueError: 17 print('full, skipping') 18 else: 19 print('OK') 20 lock.release() 21 22 def buy(): 23 lock.acquire() 24 print('Buying candy...') 25 if candytray.acquire(False): # 消耗资源 26 print('OK') 27 else: 28 print('empty, skipping') 29 lock.release() 30 31 def producer(loops): 32 for i in range(loops): 33 refill() 34 sleep(randrange(3)) 35 36 def consumer(loops): 37 for i in range(loops): 38 buy() 39 sleep(randrange(3)) 40 41 def _main(): 42 print('starting at:{0}'.format(ctime())) 43 nloops = randrange(2, 6) 44 print('THE CANDY MACHINE (full with %d bars)!' % MAX) 45 Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2),)).start() 46 Thread(target=producer, args=(nloops,)).start() 47 48 @register 49 def _atexit(): 50 print('all DONE at:{0}'.format(ctime())) 51 52 if __name__ == '__main__': 53 _main()
执行结果为:
1 /usr/local/bin/python3.6 ~/Test_Temporary/candy.py 2 starting at:Sun Jul 29 21:12:50 2018 3 THE CANDY MACHINE (full with 5 bars)! 4 Buying candy... 5 OK 6 Refilling candy 7 OK 8 Refilling candy 9 full, skipping 10 Buying candy... 11 OK 12 Buying candy... 13 OK 14 all DONE at:Sun Jul 29 21:12:52 2018