Python 线程|进程

Python   线程

 

Threading是用于提供线程相关的操做,线程是应用程序中工做的最小单元。线程与进程的关系下图所示:html

  子线程是由主线程产生的,但二者并无关联。java

利用threading建立线程:python

 1 '''利用threading包建立'''  2 import threading  3 import time  4  5 def run(n):  6 time.sleep(2)  7 print("task:",n)  8  9 '''串行:一个运行完后,再运行另一个''' 10 run("t1") #并非线程,只是调用方法传参数 11 run("t2") 12 13 '''并发性''' 14 t1 = threading.Thread(target=run,args=("T1",)) #t1是线程,args为元组 15 t2 = threading.Thread(target=run,args=("T2",)) 16 t1.start() #并发性地工做 17 t2.start() 18 19 20 '''运行结果''' 21 task: t1 #t1运行后会间隔两秒,而后运行t2 22 task: t2 23 24 task: T2 #T1,T2同时运行 25 task: T1

上述建立了两个线程t1和t2,而后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。算法

更多方法:bootstrap

  • start    线程准备就绪,等待CPU调度;启动线程的活动,每一个线程对象最多只能调用一次。
  • join     逐个执行每一个线程,执行完毕后继续往下执行,该方法使得多线程变得无心义。
  • run      表示线程活动的方法。能够在子类中重写此方法。标准run()方法调用传递给对象构造函数的可调用对象做为目标参数(若是有的话),分别使用args和kwargs参数中的顺序参数和关键字参数。线程被cpu调度后自动执行线程对象的run方法
  • get_ident()    得到线程地址
  • setName    为线程设置名称
  • getName    获取线程名称
  • daemon     一个布尔值,指示此线程是否为守护线程。这必须在调用start()以前设置,不然会引起运行时错误。它的初始值继承自建立线程;主线程不是守护进程线程,所以在主线程中建立的全部线程默认为守护进程= False。当没有存活的非守护进程线程时,整个Python程序退出。
  • setDaemon   设置为后台线程或前台线程(默认)若是是后台线程,主线程执行过程当中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均中止;若是是前台线程,主线程执行过程当中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序中止
 1 #子线程是由主线程产生的,但二者并无关联  2 import threading  3 import time  4  5 def run(n):  6 print("task:",n)  7 time.sleep(0.1)  8 print("taskdone:",n)  9 10 Start_time = time.time() 11 for i in range(50): #共有51个线程,代码自己是一个主线程 12 t = threading.Thread(target=run,args=("t--%s" % i,)) 13  t.start() 14 t.join() #join使得主线程与子线程成串行运行 15 16 print(time.time()-Start_time) #print为建立子线程所产生的时间,而非运行时间
多个线程的建立
 1 import threading  2 import time  3  4 class My_Thread(threading.Thread):  5 def __init__(self,n):  6 super(My_Thread,self).__init__()  7 self.n = n  8  9 def run(self): 10 print("task:",self.n) 11 time.sleep(0.1) 12 t_obj=[] 13 start_time = time.time() 14 for i in range(50): #共有51个线程,代码自己是一个主线程 15 t = My_Thread("t--%s" % i) 16 t.setDaemon(True) #监听端口,当主程序执行完毕,将不会执行其余线程(前提是去掉join方法) 17  t.start() 18  t_obj.append(t) 19 print(time.time()-start_time) 20 21 22 '''运行结果''' 23 task: t--0 24 task: t--1 25 task: t--2 26 task: t--3 27 task: t--4 28 task: t--5 29 task: t--6 30 task: t--7 31 task: t--8 32 task: t--9 33 task: t--10 34 task: t--11 35 task: t--12 36 task: t--13 37 task: t--14 38 task: t--15 39 task: t--16 40 task: t--17 41 task: t--18 42 task: t--19 43 task: t--20 44 task: t--21 45 task: t--22 46 task: t--23 47 task: t--24 48 task: t--25 49 task: t--26 50 task: t--27 51 task: t--28 52 task: t--29 53 task: t--30 54 task: t--31 55 task: t--32 56 task: t--33 57 task: t--34 58 task: t--35 59 task: t--36 60 task: t--37 61 task: t--38 62 task: t--39 63 task: t--40 64 task: t--41 65 task: t--42 66 task: t--43 67 task: t--44 68 task: t--45 69 task: t--46 70 task: t--47 71 task: t--48 72 task: t--49 73 0.01196908950805664
监听端口(setDaemon)

线程锁(Lock):api

 

 

 

 

 

 

 

 

 

 

 

 

 1     def acquire(self, blocking=True, timeout=None):  2         """Acquire a semaphore, decrementing the internal counter by one.  3  When invoked without arguments: if the internal counter is larger than  4  zero on entry, decrement it by one and return immediately. If it is zero  5  on entry, block, waiting until some other thread has called release() to  6  make it larger than zero. This is done with proper interlocking so that  7  if multiple acquire() calls are blocked, release() will wake exactly one  8  of them up. The implementation may pick one at random, so the order in  9  which blocked threads are awakened should not be relied on. There is no 10  return value in this case. 11  When invoked with blocking set to true, do the same thing as when called 12  without arguments, and return true. 13  When invoked with blocking set to false, do not block. If a call without 14  an argument would block, return false immediately; otherwise, do the 15  same thing as when called without arguments, and return true. 16  When invoked with a timeout other than None, it will block for at 17  most timeout seconds. If acquire does not complete successfully in 18  that interval, return false. Return true otherwise. 19         """
20         #得到一个信号量,将内部计数器减1。在没有参数的状况下调用时:若是内部计数器在入口时
21         # 大于0,则将其递减1并当即返回。若是进入时为零,阻塞,等待其余线程调用release()
22         # 使其大于零。这是经过适当的联锁完成的,这样,若是多个acquire()调用被阻塞,
23         # release()就会唤醒其中一个调用。实现能够随机选择一个线程,所以不该该依赖于被阻塞
24         # 线程被唤醒的顺序。在本例中没有返回值。当阻塞集调用为true时,执行与没有参数调用
25         # 时相同的操做,并返回true。当阻塞设置为false时,不要阻塞。若是一个没有参数的
26         # 调用将阻塞,当即返回false;不然,执行与没有参数调用时相同的操做,并返回true。
27         # 当使用除None之外的超时调用时,它最多将阻塞超时秒。若是在那段时间里收购没有成功
28         # 完成,还假。不然返回true。
29         if not blocking and timeout is not None: 30             raise ValueError("can't specify timeout for non-blocking acquire") 31         rc = False 32         endtime = None 33  with self._cond: 34             while self._value == 0: 35                 if not blocking: 36                     break
37                 if timeout is not None: 38                     if endtime is None: 39                         endtime = _time() + timeout 40                     else: 41                         timeout = endtime - _time() 42                         if timeout <= 0: 43                             break
44  self._cond.wait(timeout) 45             else: 46                 self._value -= 1
47                 rc = True 48         return rc 49 
50     __enter__ = acquire 51 
52     def release(self): 53         """Release a semaphore, incrementing the internal counter by one. 54  When the counter is zero on entry and another thread is waiting for it 55  to become larger than zero again, wake up that thread. 56         """
57         #释放信号量,增长一个内部计数器。当进入时计数器为零,而另外一个线程正在等待计数器
58         # 再次大于零时,唤醒该线程。
59  with self._cond: 60             self._value += 1
61  self._cond.notify() 62 
63     def __exit__(self, t, v, tb): 64         self.release()
acquire、release源代码
 1 import threading  2 import time  3  4 lock = threading.Lock() #线程锁  5  6 def run(n):  7 lock.acquire() #锁定  8 global num  9 num+=1 10 lock.release() #释放锁 11 time.sleep(1) 12 13 t_obj = [] 14 num = 0 15 for i in range(50): 16 t = threading.Thread(target=run,args=("t--%s" % i,)) 17  t.start() 18  t_obj.append(t) 19 20 for i in t_obj: 21  i.join() 22 23 print("num:",num) 24 25 26 '''运行结果''' 27 num: 50
'''可用来作测试''' if __name__ == "__main__" #表示函数的开始位置,判断自主运行与否 

  线程池(信号量(semaphore)):缓存

  信号量管理一个计数器,该计数器表示release()调用的数量减去acquire()调用的数量,再加上一个初始值。acquire()方法若是有必要会阻塞,直到它能够返回而不会使计数器变为负数为止。若是未指定,值默认为1。安全

'''信号量''' import threading import time def run(n): Semaphore.acquire() print("task:",n) time.sleep(1) Semaphore.release() if __name__ == "__main__": Semaphore = threading.BoundedSemaphore(5) #每五个子进程运行一次,间隔一秒后,再运行下五个 for i in range(20): t = threading.Thread(target=run,args=(i,)) t.start() while threading.active_count()!=1: pass else: print("--all threading has done")
 1 """Thread module emulating a subset of Java's threading model."""  2 #线程模块模拟Java线程模型的一个子集。  3 import os as _os  4 import sys as _sys  5 import _thread  6  7 from time import monotonic as _time  8 from traceback import format_exc as _format_exc  9 from _weakrefset import WeakSet  10 from itertools import islice as _islice, count as _count  11 try:  12 from _collections import deque as _deque  13 except ImportError:  14 from collections import deque as _deque  15  16 # Note regarding PEP 8 compliant names  17 # This threading model was originally inspired by Java, and inherited  18 # the convention of camelCase function and method names from that  19 # language. Those original names are not in any imminent danger of  20 # being deprecated (even for Py3k),so this module provides them as an  21 # alias for the PEP 8 compliant names  22 # Note that using the new PEP 8 compliant names facilitates substitution  23 # with the multiprocessing module, which doesn't provide the old  24 # Java inspired names.  25  26 __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',  27 'enumerate', 'main_thread', 'TIMEOUT_MAX',  28 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',  29 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',  30 'setprofile', 'settrace', 'local', 'stack_size']  31  32 # Rename some stuff so "from threading import *" is safe  33 _start_new_thread = _thread.start_new_thread  34 _allocate_lock = _thread.allocate_lock  35 _set_sentinel = _thread._set_sentinel  36 get_ident = _thread.get_ident  37 ThreadError = _thread.error  38 try:  39 _CRLock = _thread.RLock  40 except AttributeError:  41 _CRLock = None  42 TIMEOUT_MAX = _thread.TIMEOUT_MAX  43 del _thread  44  45  46 # Support for profile and trace hooks  47 #支持配置文件和跟踪挂钩  48  49 _profile_hook = None  50 _trace_hook = None  51  52 def setprofile(func):  53 """Set a profile function for all threads started from the threading module.  54  The func will be passed to sys.setprofile() for each thread, before its  55  run() method is called.  56 """  57 #为从线程模块启动的全部线程设置一个配置文件函数。在调用其run()方法以前,  58 # func将被传递给每一个线程的sys.setprofile()。  59  60 global _profile_hook  61 _profile_hook = func  62  63 def settrace(func):  64 """Set a trace function for all threads started from the threading module.  65  The func will be passed to sys.settrace() for each thread, before its run()  66  method is called.  67 """  68 #为从线程模块启动的全部线程设置跟踪函数。在调用其run()方法以前,  69 # func将被传递给每一个线程的sys.settrace()。  70  71 global _trace_hook  72 _trace_hook = func  73  74 # Synchronization classes  75 # 同步类  76  77 Lock = _allocate_lock  78  79 def RLock(*args, **kwargs):  80 """Factory function that returns a new reentrant lock.  81  A reentrant lock must be released by the thread that acquired it. Once a  82  thread has acquired a reentrant lock, the same thread may acquire it again  83  without blocking; the thread must release it once for each time it has  84  acquired it.  85 """  86 #返回一个新的可重入锁的工厂函数。可重入锁必须由得到它的线程释放。  87 # 一旦一个线程得到了可重入锁,该线程能够在不阻塞的状况下再次得到该锁;  88 # 线程每次得到它时都必须释放它一次。  89  90 if _CRLock is None:  91 return _PyRLock(*args, **kwargs)  92 return _CRLock(*args, **kwargs)  93  94 class _RLock:  95 """This class implements reentrant lock objects.  96  A reentrant lock must be released by the thread that acquired it. Once a  97  thread has acquired a reentrant lock, the same thread may acquire it  98  again without blocking; the thread must release it once for each time it  99  has acquired it.  100 """  101 #该类实现可重入锁对象。可重入锁必须由得到它的线程释放。一旦一个线程得到了可重入锁,  102 # 该线程能够在不阻塞的状况下再次得到该锁;线程每次得到它时都必须释放它一次。  103  104 def __init__(self):  105 self._block = _allocate_lock()  106 self._owner = None  107 self._count = 0  108  109 def __repr__(self):  110 owner = self._owner  111 try:  112 owner = _active[owner].name  113 except KeyError:  114 pass  115 return "<%s %s.%s object owner=%r count=%d at %s>" % (  116 "locked" if self._block.locked() else "unlocked",  117 self.__class__.__module__,  118 self.__class__.__qualname__,  119  owner,  120  self._count,  121  hex(id(self))  122  )  123  124 def acquire(self, blocking=True, timeout=-1):  125 """Acquire a lock, blocking or non-blocking.  126  When invoked without arguments: if this thread already owns the lock,  127  increment the recursion level by one, and return immediately. Otherwise,  128  if another thread owns the lock, block until the lock is unlocked. Once  129  the lock is unlocked (not owned by any thread), then grab ownership, set  130  the recursion level to one, and return. If more than one thread is  131  blocked waiting until the lock is unlocked, only one at a time will be  132  able to grab ownership of the lock. There is no return value in this  133  case.  134  When invoked with the blocking argument set to true, do the same thing  135  as when called without arguments, and return true.  136  When invoked with the blocking argument set to false, do not block. If a  137  call without an argument would block, return false immediately;  138  otherwise, do the same thing as when called without arguments, and  139  return true.  140  When invoked with the floating-point timeout argument set to a positive  141  value, block for at most the number of seconds specified by timeout  142  and as long as the lock cannot be acquired. Return true if the lock has  143  been acquired, false if the timeout has elapsed.  144 """  145 #得到一个锁,阻塞或非阻塞。在没有参数的状况下调用时:若是这个线程已经拥有锁,  146 # 那么将递归级别增长1,并当即返回。不然,若是另外一个线程拥有锁,  147 # 则阻塞直到锁被解锁。一旦锁被解锁(不属于任何线程),而后获取全部权,  148 # 将递归级别设置为1,而后返回。若是有多个线程被阻塞,等待锁被解锁,  149 # 每次只有一个线程可以获取锁的全部权。在本例中没有返回值。当阻塞参数设置  150 # 为true时,执行与没有参数时相同的操做,并返回true。当阻塞参数设置为false时,  151 # 不要阻塞。若是一个没有参数的调用将阻塞,当即返回false;不然,执行与没有  152 # 参数调用时相同的操做,并返回true。当将浮点超时参数设置为正值时,若是得到  153 # 了锁,则最多阻塞超时指定的秒数,若是超时已过,则返回true;若是超时已过,则返回false。  154  155 me = get_ident()  156 if self._owner == me:  157 self._count += 1  158 return 1  159 rc = self._block.acquire(blocking, timeout)  160 if rc:  161 self._owner = me  162 self._count = 1  163 return rc  164  165 __enter__ = acquire  166  167 def release(self):  168 """Release a lock, decrementing the recursion level.  169  If after the decrement it is zero, reset the lock to unlocked (not owned  170  by any thread), and if any other threads are blocked waiting for the  171  lock to become unlocked, allow exactly one of them to proceed. If after  172  the decrement the recursion level is still nonzero, the lock remains  173  locked and owned by the calling thread.  174  Only call this method when the calling thread owns the lock. A  175  RuntimeError is raised if this method is called when the lock is  176  unlocked.  177  There is no return value.  178 """  179 #释放锁,下降递归级别。若是减量后为零,则将锁重置为解锁(不属于任何线程),  180 # 若是任何其余线程被阻塞,等待锁解锁,则只容许其中一个线程继续执行。若是在递减  181 # 以后递归级别仍然是非零,则锁仍然被锁定,而且由调用线程拥有。只有当调用线程拥有  182 # 锁时才调用此方法。若是在解锁锁时调用此方法,将引起运行时错误。没有返回值。  183  184 if self._owner != get_ident():  185 raise RuntimeError("cannot release un-acquired lock")  186 self._count = count = self._count - 1  187 if not count:  188 self._owner = None  189  self._block.release()  190  191 def __exit__(self, t, v, tb):  192  self.release()  193  194 # Internal methods used by condition variables  195 #条件变量使用的内部方法  196  197 def _acquire_restore(self, state):  198  self._block.acquire()  199 self._count, self._owner = state  200  201 def _release_save(self):  202 if self._count == 0:  203 raise RuntimeError("cannot release un-acquired lock")  204 count = self._count  205 self._count = 0  206 owner = self._owner  207 self._owner = None  208  self._block.release()  209 return (count, owner)  210  211 def _is_owned(self):  212 return self._owner == get_ident()  213  214 _PyRLock = _RLock  215  216  217 class Condition:  218 """Class that implements a condition variable.  219  A condition variable allows one or more threads to wait until they are  220  notified by another thread.  221  If the lock argument is given and not None, it must be a Lock or RLock  222  object, and it is used as the underlying lock. Otherwise, a new RLock object  223  is created and used as the underlying lock.  224 """  225 #实现条件变量的类。条件变量容许一个或多个线程等待,直到另外一个线程通知它们。  226 # 若是锁参数是给定的而不是空的,那么它必须是一个锁或RLock对象,而且它被用做底层锁。  227 # 不然,将建立一个新的RLock对象并将其用做底层锁。  228  229 def __init__(self, lock=None):  230 if lock is None:  231 lock = RLock()  232 self._lock = lock  233 # Export the lock's acquire() and release() methods  234 #导出锁的acquire()和release()方法  235 self.acquire = lock.acquire  236 self.release = lock.release  237 # If the lock defines _release_save() and/or _acquire_restore(),  238 # these override the default implementations (which just call  239 # release() and acquire() on the lock). Ditto for _is_owned().  240 #若是锁定义了_release_save()和/或_acquire_restore(),就会覆盖默认的实现  241 # (它只调用release()和acquire()对锁进行访问)。_is_owned同上()。  242 try:  243 self._release_save = lock._release_save  244 except AttributeError:  245 pass  246 try:  247 self._acquire_restore = lock._acquire_restore  248 except AttributeError:  249 pass  250 try:  251 self._is_owned = lock._is_owned  252 except AttributeError:  253 pass  254 self._waiters = _deque()  255  256 def __enter__(self):  257 return self._lock.__enter__()  258  259 def __exit__(self, *args):  260 return self._lock.__exit__(*args)  261  262 def __repr__(self):  263 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))  264  265 def _release_save(self):  266 self._lock.release() # No state to save 没有状态保存  267  268 def _acquire_restore(self, x):  269 self._lock.acquire() # Ignore saved state 忽略保存的状态  270  271 def _is_owned(self):  272 # Return True if lock is owned by current_thread.  273 #若是锁属于current_thread,则返回True。  274 # This method is called only if _lock doesn't have _is_owned().  275 #只有当_lock没有_is_owned()时才调用该方法。  276 if self._lock.acquire(0):  277  self._lock.release()  278 return False  279 else:  280 return True  281  282 def wait(self, timeout=None):  283 """Wait until notified or until a timeout occurs.  284  If the calling thread has not acquired the lock when this method is  285  called, a RuntimeError is raised.  286  This method releases the underlying lock, and then blocks until it is  287  awakened by a notify() or notify_all() call for the same condition  288  variable in another thread, or until the optional timeout occurs. Once  289  awakened or timed out, it re-acquires the lock and returns.  290  When the timeout argument is present and not None, it should be a  291  floating point number specifying a timeout for the operation in seconds  292  (or fractions thereof).  293  When the underlying lock is an RLock, it is not released using its  294  release() method, since this may not actually unlock the lock when it  295  was acquired multiple times recursively. Instead, an internal interface  296  of the RLock class is used, which really unlocks it even when it has  297  been recursively acquired several times. Another internal interface is  298  then used to restore the recursion level when the lock is reacquired.  299 """  300 #等待直到通知或超时发生。若是调用该方法时调用的线程没有得到锁,则会引起运行时错误。  301 # 该方法释放底层锁,而后阻塞,直到它被另外一个线程中的notify()或notify_all()调用  302 # 唤醒,或者直到出现可选超时为止。一旦被唤醒或超时,它会从新得到锁并返回。  303 # 当出现timeout参数而不是None时,它应该是一个浮点数,以秒(或几分之一)为单位指定  304 # 操做的超时。当底层锁是RLock时,不会使用其release()方法释放它,由于当递归地屡次  305 # 获取锁时,这可能不会真正解锁它。相反,使用了RLock类的内部接口,即便递归地得到了  306 # 屡次,它也会真正地解锁它。而后使用另外一个内部接口在从新得到锁时恢复递归级别。  307  308 if not self._is_owned():  309 raise RuntimeError("cannot wait on un-acquired lock")  310 waiter = _allocate_lock()  311  waiter.acquire()  312  self._waiters.append(waiter)  313 saved_state = self._release_save()  314 gotit = False  315 try: # restore state no matter what (e.g., KeyboardInterrupt)  316 #不管如何都要恢复状态(例如,键盘中断)  317 if timeout is None:  318  waiter.acquire()  319 gotit = True  320 else:  321 if timeout > 0:  322 gotit = waiter.acquire(True, timeout)  323 else:  324 gotit = waiter.acquire(False)  325 return gotit  326 finally:  327  self._acquire_restore(saved_state)  328 if not gotit:  329 try:  330  self._waiters.remove(waiter)  331 except ValueError:  332 pass  333  334 def wait_for(self, predicate, timeout=None):  335 """Wait until a condition evaluates to True.  336  predicate should be a callable which result will be interpreted as a  337  boolean value. A timeout may be provided giving the maximum time to  338  wait.  339 """  340 #等待,直到条件的值为True。谓词应该是可调用的,其结果将被解释为布尔值。  341 # 可能会提供一个超时,以提供最长的等待时间。  342  343 endtime = None  344 waittime = timeout  345 result = predicate()  346 while not result:  347 if waittime is not None:  348 if endtime is None:  349 endtime = _time() + waittime  350 else:  351 waittime = endtime - _time()  352 if waittime <= 0:  353 break  354  self.wait(waittime)  355 result = predicate()  356 return result  357  358 def notify(self, n=1):  359 """Wake up one or more threads waiting on this condition, if any.  360  If the calling thread has not acquired the lock when this method is  361  called, a RuntimeError is raised.  362  This method wakes up at most n of the threads waiting for the condition  363  variable; it is a no-op if no threads are waiting.  364 """  365 #唤醒在此条件下等待的一个或多个线程(若是有的话)。若是调用该方法时调用的线程没有得到锁,  366 # 则会引起运行时错误。该方法最多唤醒n个等待条件变量的线程;若是没有线程在等待,那么  367 # 这是一个no-op。  368 if not self._is_owned():  369 raise RuntimeError("cannot notify on un-acquired lock")  370 all_waiters = self._waiters  371 waiters_to_notify = _deque(_islice(all_waiters, n))  372 if not waiters_to_notify:  373 return  374 for waiter in waiters_to_notify:  375  waiter.release()  376 try:  377  all_waiters.remove(waiter)  378 except ValueError:  379 pass  380  381 def notify_all(self):  382 """Wake up all threads waiting on this condition.  383  If the calling thread has not acquired the lock when this method  384  is called, a RuntimeError is raised.  385 """  386 #唤醒在此条件下等待的全部线程。若是调用该方法时调用的线程没有得到锁,  387 # 则会引起运行时错误。  388  self.notify(len(self._waiters))  389  390 notifyAll = notify_all  391  392  393 class Semaphore:  394 """This class implements semaphore objects.  395  Semaphores manage a counter representing the number of release() calls minus  396  the number of acquire() calls, plus an initial value. The acquire() method  397  blocks if necessary until it can return without making the counter  398  negative. If not given, value defaults to 1.  399 """  400 #这个类实现信号量对象。信号量管理一个计数器,该计数器表示release()调用的数量减去  401 # acquire()调用的数量,再加上一个初始值。acquire()方法若是有必要会阻塞,直到它能够  402 # 返回而不会使计数器变为负数为止。若是未指定,值默认为1。  403  404 # After Tim Peters' semaphore class, but not quite the same (no maximum)  405 #在Tim Peters的信号量类以后,但不彻底相同(没有最大值)  406  407 def __init__(self, value=1):  408 if value < 0:  409 raise ValueError("semaphore initial value must be >= 0")  410 self._cond = Condition(Lock())  411 self._value = value  412  413 def acquire(self, blocking=True, timeout=None):  414 """Acquire a semaphore, decrementing the internal counter by one.  415  When invoked without arguments: if the internal counter is larger than  416  zero on entry, decrement it by one and return immediately. If it is zero  417  on entry, block, waiting until some other thread has called release() to  418  make it larger than zero. This is done with proper interlocking so that  419  if multiple acquire() calls are blocked, release() will wake exactly one  420  of them up. The implementation may pick one at random, so the order in  421  which blocked threads are awakened should not be relied on. There is no  422  return value in this case.  423  When invoked with blocking set to true, do the same thing as when called  424  without arguments, and return true.  425  When invoked with blocking set to false, do not block. If a call without  426  an argument would block, return false immediately; otherwise, do the  427  same thing as when called without arguments, and return true.  428  When invoked with a timeout other than None, it will block for at  429  most timeout seconds. If acquire does not complete successfully in  430  that interval, return false. Return true otherwise.  431 """  432 #得到一个信号量,将内部计数器减1。在没有参数的状况下调用时:若是内部计数器在入口时  433 # 大于0,则将其递减1并当即返回。若是进入时为零,阻塞,等待其余线程调用release()  434 # 使其大于零。这是经过适当的联锁完成的,这样,若是多个acquire()调用被阻塞,  435 # release()就会唤醒其中一个调用。实现能够随机选择一个线程,所以不该该依赖于被阻塞  436 # 线程被唤醒的顺序。在本例中没有返回值。当阻塞集调用为true时,执行与没有参数调用  437 # 时相同的操做,并返回true。当阻塞设置为false时,不要阻塞。若是一个没有参数的  438 # 调用将阻塞,当即返回false;不然,执行与没有参数调用时相同的操做,并返回true。  439 # 当使用除None之外的超时调用时,它最多将阻塞超时秒。若是在那段时间里收购没有成功  440 # 完成,还假。不然返回true。  441 if not blocking and timeout is not None:  442 raise ValueError("can't specify timeout for non-blocking acquire")  443 rc = False  444 endtime = None  445  with self._cond:  446 while self._value == 0:  447 if not blocking:  448 break  449 if timeout is not None:  450 if endtime is None:  451 endtime = _time() + timeout  452 else:  453 timeout = endtime - _time()  454 if timeout <= 0:  455 break  456  self._cond.wait(timeout)  457 else:  458 self._value -= 1  459 rc = True  460 return rc  461  462 __enter__ = acquire  463  464 def release(self):  465 """Release a semaphore, incrementing the internal counter by one.  466  When the counter is zero on entry and another thread is waiting for it  467  to become larger than zero again, wake up that thread.  468 """  469 #释放信号量,增长一个内部计数器。当进入时计数器为零,而另外一个线程正在等待计数器  470 # 再次大于零时,唤醒该线程。  471  with self._cond:  472 self._value += 1  473  self._cond.notify()  474  475 def __exit__(self, t, v, tb):  476  self.release()  477  478  479 class BoundedSemaphore(Semaphore):  480 """Implements a bounded semaphore.  481  A bounded semaphore checks to make sure its current value doesn't exceed its  482  initial value. If it does, ValueError is raised. In most situations  483  semaphores are used to guard resources with limited capacity.  484  If the semaphore is released too many times it's a sign of a bug. If not  485  given, value defaults to 1.  486  Like regular semaphores, bounded semaphores manage a counter representing  487  the number of release() calls minus the number of acquire() calls, plus an  488  initial value. The acquire() method blocks if necessary until it can return  489  without making the counter negative. If not given, value defaults to 1.  490 """  491 #实现有界信号量。有界信号量检查其当前值是否不超过初始值。若是是,则会引起ValueError。  492 # 在大多数状况下,信号量被用来保护有限容量的资源。若是信号量被释放了太屡次,这是错误  493 # 的信号。若是未指定,值默认为1。与常规信号量同样,有界信号量管理一个计数器,  494 # 表示release()调用的数量减去acquire()调用的数量,再加上一个初始值。acquire()方法  495 # 若是有必要会阻塞,直到它能够返回而不会使计数器变为负数为止。若是未指定,值默认为1。  496  497 def __init__(self, value=1):  498 Semaphore.__init__(self, value)  499 self._initial_value = value  500  501 def release(self):  502 """Release a semaphore, incrementing the internal counter by one.  503  504  When the counter is zero on entry and another thread is waiting for it  505  to become larger than zero again, wake up that thread.  506  507  If the number of releases exceeds the number of acquires,  508  raise a ValueError.  509 """  510 #释放信号量,增长一个内部计数器。当进入时计数器为0,而另外一个线程正在等待i再次  511 # 大于0时,唤醒那个线程。若是发布的数量超过了得到的数量,则引起一个ValueError。  512  with self._cond:  513 if self._value >= self._initial_value:  514 raise ValueError("Semaphore released too many times")  515 self._value += 1  516  self._cond.notify()  517  518  519 class Event:  520 """Class implementing event objects.  521  522  Events manage a flag that can be set to true with the set() method and reset  523  to false with the clear() method. The wait() method blocks until the flag is  524  true. The flag is initially false.  525 """  526 #类实现事件对象。事件管理的标志能够用set()方法设置为true,用clear()方法重置为false。  527 # wait()方法将阻塞,直到标记为true。标志最初是假的。  528  529 # After Tim Peters' event class (without is_posted())  530 #在Tim Peters的事件类以后(没有is_post ())  531  532 def __init__(self):  533 self._cond = Condition(Lock())  534 self._flag = False  535  536 def _reset_internal_locks(self):  537 # private! called by Thread._reset_internal_locks by _after_fork()  538 #私人!调用线程._reset_internal_locks _after_fork()  539 self._cond.__init__(Lock())  540  541 def is_set(self):  542 """Return true if and only if the internal flag is true."""  543 #当且仅当内部标志为true时返回true。  544 return self._flag  545  546 isSet = is_set  547  548 def set(self):  549 """Set the internal flag to true.  550  All threads waiting for it to become true are awakened. Threads  551  that call wait() once the flag is true will not block at all.  552 """  553 #将内部标志设置为true。等待它成真的全部线程都被唤醒。一旦标志为true,  554 # 调用wait()的线程将不会阻塞。  555  with self._cond:  556 self._flag = True  557  self._cond.notify_all()  558  559 def clear(self):  560 """Reset the internal flag to false.  561  Subsequently, threads calling wait() will block until set() is called to  562  set the internal flag to true again.  563 """  564 #将内部标志重置为false。随后,调用wait()的线程将阻塞,直到调用set()将内部标志再次设置为true。  565  with self._cond:  566 self._flag = False  567  568 def wait(self, timeout=None):  569 """Block until the internal flag is true.  570  If the internal flag is true on entry, return immediately. Otherwise,  571  block until another thread calls set() to set the flag to true, or until  572  the optional timeout occurs.  573  When the timeout argument is present and not None, it should be a  574  floating point number specifying a timeout for the operation in seconds  575  (or fractions thereof).  576  This method returns the internal flag on exit, so it will always return  577  True except if a timeout is given and the operation times out.  578 """  579 #阻塞,直到内部标志为true。若是进入时内部标志为true,则当即返回。不然,阻塞直到  580 # 另外一个线程调用set()将标志设置为true,或者直到出现可选超时。当出现timeout参数  581 # 而不是None时,它应该是一个浮点数,以秒(或几分之一)为单位指定操做的超时。这个  582 # 方法在退出时返回内部标志,所以它老是返回True,除非超时和操做超时。  583  with self._cond:  584 signaled = self._flag  585 if not signaled:  586 signaled = self._cond.wait(timeout)  587 return signaled  588  589  590 # A barrier class. Inspired in part by the pthread_barrier_* api and  591 # the CyclicBarrier class from Java. See  592 '''一个障碍类。部分灵感来自于pthread_barrier_* api和来自Java的循环屏障类。看到'''  593 # http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and  594 # http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/  595 # CyclicBarrier.html  596 # for information. ##获取信息  597 # We maintain two main states, 'filling' and 'draining' enabling the barrier  598 # to be cyclic. Threads are not allowed into it until it has fully drained  599 # since the previous cycle. In addition, a 'resetting' state exists which is  600 # similar to 'draining' except that threads leave with a BrokenBarrierError,  601 # and a 'broken' state in which all threads get the exception.  602 '''咱们维持两种主要状态,“填充”和“排水”,使屏障是循环的。线程不容许进入它,直到它从  603 上一个循环中彻底耗尽为止。此外,存在一种“重置”状态,相似于“耗尽”状态,只是线程留下了  604 一个故障的barriererror错误,以及全部线程都获得异常的“中断”状态。'''  605 class Barrier:  606 """Implements a Barrier.  607  Useful for synchronizing a fixed number of threads at known synchronization  608  points. Threads block on 'wait()' and are simultaneously once they have all  609  made that call.  610 """  611 #实现了一个障碍。用于在已知同步点同步固定数量的线程。线程阻塞在'wait()'上,  612 # 而且一旦它们都进行了该调用,就会同时阻塞。  613  614 def __init__(self, parties, action=None, timeout=None):  615 """Create a barrier, initialised to 'parties' threads.  616  'action' is a callable which, when supplied, will be called by one of  617  the threads after they have all entered the barrier and just prior to  618  releasing them all. If a 'timeout' is provided, it is uses as the  619  default for all subsequent 'wait()' calls.  620 """  621 #建立一个障碍,初始化为“party”线程。“action”是一个可调用的线程,当它被提供时,  622 # 它将被其中一个线程在它们所有进入壁垒并释放它们以前调用。若是提供了'timeout',  623 # 那么它将用做全部后续'wait()'调用的默认值。  624 self._cond = Condition(Lock())  625 self._action = action  626 self._timeout = timeout  627 self._parties = parties  628 self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken  629 self._count = 0  630  631 def wait(self, timeout=None):  632 """Wait for the barrier.  633  When the specified number of threads have started waiting, they are all  634  simultaneously awoken. If an 'action' was provided for the barrier, one  635  of the threads will have executed that callback prior to returning.  636  Returns an individual index number from 0 to 'parties-1'.  637 """  638 #等待障碍。当指定数量的线程开始等待时,它们都同时被唤醒。若是为barrier提供了一个  639 # “操做”,其中一个线程将在返回以前执行该回调。返回从0到“parties-1”的单个索引号。  640  641 if timeout is None:  642 timeout = self._timeout  643  with self._cond:  644 self._enter() # Block while the barrier drains. 隔离墙排水时要进行隔离。  645 index = self._count  646 self._count += 1  647 try:  648 if index + 1 == self._parties:  649 # We release the barrier  650  self._release()  651 else:  652 # We wait until someone releases us  653  self._wait(timeout)  654 return index  655 finally:  656 self._count -= 1  657 # Wake up any threads waiting for barrier to drain.  658 #唤醒任何等待屏障耗尽的线程。  659  self._exit()  660  661 # Block until the barrier is ready for us, or raise an exception  662 # if it is broken.  663 #阻止,直到障碍为咱们准备好,或提出一个例外,若是它被打破。  664 def _enter(self):  665 while self._state in (-1, 1):  666 # It is draining or resetting, wait until done正在排水或重置,等待完成  667  self._cond.wait()  668 #see if the barrier is in a broken state看看势垒是否处于破碎状态  669 if self._state < 0:  670 raise BrokenBarrierError  671 assert self._state == 0  672  673 # Optionally run the 'action' and release the threads waiting  674 # in the barrier.  675 #能够选择运行“action”,并释放等待在barrier中的线程。  676  677 def _release(self):  678 try:  679 if self._action:  680  self._action()  681 # enter draining state 进入排水状态  682 self._state = 1  683  self._cond.notify_all()  684 except:  685 #an exception during the _action handler. Break and reraise  686 #_action处理程序期间的异常。打破和reraise  687  self._break()  688 raise  689  690 # Wait in the barrier until we are released. Raise an exception  691 # if the barrier is reset or broken.  692 #在障碍物里等着,直到咱们被释放。若是障碍被重置或破坏,则引起异常。  693 def _wait(self, timeout):  694 if not self._cond.wait_for(lambda : self._state != 0, timeout):  695 #timed out. Break the barrier  696  self._break()  697 raise BrokenBarrierError  698 if self._state < 0:  699 raise BrokenBarrierError  700 assert self._state == 1  701  702 # If we are the last thread to exit the barrier, signal any threads  703 # # waiting for the barrier to drain.  704 #若是咱们是最后一个退出屏障的线程,那么向等待屏障流出的线程发出信号。  705 def _exit(self):  706 if self._count == 0:  707 if self._state in (-1, 1):  708 #resetting or draining  709 self._state = 0  710  self._cond.notify_all()  711  712 def reset(self):  713 """Reset the barrier to the initial state.  714  Any threads currently waiting will get the BrokenBarrier exception  715  raised.  716 """  717 #将势垒重置为初始状态。当前等待的任何线程都将引起故障障碍异常。  718  with self._cond:  719 if self._count > 0:  720 if self._state == 0:  721 #reset the barrier, waking up threads 重置障碍,唤醒线程  722 self._state = -1  723 elif self._state == -2:  724 #was broken, set it to reset state 被破坏,设置为重置状态  725 #which clears when the last thread exits 最后一个线程退出时哪一个线程清除  726 self._state = -1  727 else:  728 self._state = 0  729  self._cond.notify_all()  730  731 def abort(self):  732 """Place the barrier into a 'broken' state.  733  Useful in case of error. Any currently waiting threads and threads  734  attempting to 'wait()' will have BrokenBarrierError raised.  735 """  736 #将障碍设置为“破碎”状态。在发生错误时颇有用。任何当前正在等待的线程和  737 # 试图“wait()”的线程都会出现故障障碍。  738  with self._cond:  739  self._break()  740  741 def _break(self):  742 # An internal error was detected. The barrier is set to  743 # a broken state all parties awakened.  744 #检测到内部错误。障碍被设置为一个破碎的国家,全部各方都觉醒了。  745 self._state = -2  746  self._cond.notify_all()  747  748  @property  749 def parties(self):  750 """Return the number of threads required to trip the barrier."""  751 #返回跳闸所需的线程数。  752 return self._parties  753  754  @property  755 def n_waiting(self):  756 """Return the number of threads currently waiting at the barrier."""  757 #返回阻塞处当前等待的线程数。  758 # We don't need synchronization here since this is an ephemeral result  759 # anyway. It returns the correct value in the steady state.  760 #咱们不须要同步,由于这是一个短暂的结果。它在稳定状态下返回正确的值。  761 if self._state == 0:  762 return self._count  763 return 0  764  765  @property  766 def broken(self):  767 """Return True if the barrier is in a broken state."""  768 #若是屏障处于破坏状态,返回True。  769 return self._state == -2  770  771 # exception raised by the Barrier class  772 #由Barrier类引起的异常  773 class BrokenBarrierError(RuntimeError):  774 pass  775  776  777 # Helper to generate new thread names  778 #帮助程序生成新的线程名称  779 _counter = _count().__next__  780 _counter() # Consume 0 so first non-main thread has id 1.  781 #消耗0,因此第一个非主线程id为1。  782 def _newname(template="Thread-%d"):  783 return template % _counter()  784  785 # Active thread administration #活动线程管理  786 _active_limbo_lock = _allocate_lock()  787 _active = {} # maps thread id to Thread object 将线程id映射到线程对象  788 _limbo = {}  789 _dangling = WeakSet()  790  791 # Main class for threads  792 '''线程的主类'''  793  794 class Thread:  795 """A class that represents a thread of control.  796  This class can be safely subclassed in a limited fashion. There are two ways  797  to specify the activity: by passing a callable object to the constructor, or  798  by overriding the run() method in a subclass.  799 """  800 #表示控制线程的类。这个类能够以有限的方式安全地子类化。有两种方法能够指定活动:  801 # 经过将可调用对象传递给构造函数,或者在子类中重写run()方法。  802  803 _initialized = False  804 # Need to store a reference to sys.exc_info for printing  805 # out exceptions when a thread tries to use a global var. during interp.  806 # shutdown and thus raises an exception about trying to perform some  807 # operation on/with a NoneType  808 #须要存储对sys的引用。exc_info用于在interp期间线程试图使用全局变量时打印异常。  809 # 关闭,所以引起了一个异常,即试图对/使用非etype执行某些操做  810 _exc_info = _sys.exc_info  811 # Keep sys.exc_clear too to clear the exception just before  812 # allowing .join() to return.  813 #Keep sys.ex_clear也能够在allowing.join()返回以前清除异常。  814 #XXX __exc_clear = _sys.exc_clear  815  816 def __init__(self, group=None, target=None, name=None,  817 args=(), kwargs=None, *, daemon=None):  818 """This constructor should always be called with keyword arguments. Arguments are:  819  *group* should be None; reserved for future extension when a ThreadGroup  820  class is implemented.  821  *target* is the callable object to be invoked by the run()  822  method. Defaults to None, meaning nothing is called.  823  *name* is the thread name. By default, a unique name is constructed of  824  the form "Thread-N" where N is a small decimal number.  825  *args* is the argument tuple for the target invocation. Defaults to ().  826  *kwargs* is a dictionary of keyword arguments for the target  827  invocation. Defaults to {}.  828  If a subclass overrides the constructor, it must make sure to invoke  829  the base class constructor (Thread.__init__()) before doing anything  830  else to the thread.  831 """  832 #这个构造函数应该老是使用关键字参数调用。论点是:*group*不该该是;在实现  833 # ThreadGroup类时为未来的扩展保留。*target*是run()方法调用的可调用对象。  834 # 默认为None,表示不调用任何东西。*name*是线程名。默认状况下,惟一的名称  835 # 是由“Thread-N”的形式构造的,其中N是一个小数。*args*是目标调用的参数元组。  836 # 默认为()。*kwargs*是目标调用的关键字参数字典。默认为{}。若是子类重写构造  837 # 函数,它必须确保在对线程执行其余操做以前调用基类构造函数(thread. __init__())。  838  839 assert group is None, "group argument must be None for now"  840 if kwargs is None:  841 kwargs = {}  842 self._target = target  843 self._name = str(name or _newname())  844 self._args = args  845 self._kwargs = kwargs  846 if daemon is not None:  847 self._daemonic = daemon  848 else:  849 self._daemonic = current_thread().daemon  850 self._ident = None  851 self._tstate_lock = None  852 self._started = Event()  853 self._is_stopped = False  854 self._initialized = True  855 # sys.stderr is not stored in the class like  856 # sys.exc_info since it can be changed between instances  857 self._stderr = _sys.stderr  858 # For debugging and _after_fork()  859  _dangling.add(self)  860  861 def _reset_internal_locks(self, is_alive):  862 # private! Called by _after_fork() to reset our internal locks as  863 # they may be in an invalid state leading to a deadlock or crash.  864 #私人!由_after_fork()调用,以重置内部锁,由于它们可能处于无效状态,致使死锁或崩溃。  865  self._started._reset_internal_locks()  866 if is_alive:  867  self._set_tstate_lock()  868 else:  869 # The thread isn't alive after fork: it doesn't have a tstate anymore.  870 #在fork以后,线程再也不是活的:它再也不有tstate。  871 self._is_stopped = True  872 self._tstate_lock = None  873  874 def __repr__(self):  875 assert self._initialized, "Thread.__init__() was not called"  876 status = "initial"  877 if self._started.is_set():  878 status = "started"  879 self.is_alive() # easy way to get ._is_stopped set when appropriate  880 #在适当的状况下,得到._is_stopped设置的简单方法  881 if self._is_stopped:  882 status = "stopped"  883 if self._daemonic:  884 status += " daemon"  885 if self._ident is not None:  886 status += " %s" % self._ident  887 return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)  888  889 def start(self):  890 """Start the thread's activity.  891  It must be called at most once per thread object. It arranges for the  892  object's run() method to be invoked in a separate thread of control.  893  This method will raise a RuntimeError if called more than once on the  894  same thread object.  895 """  896 #启动线程的活动。每一个线程对象最多只能调用一次。它安排在一个单独的控制线程中  897 # 调用对象的run()方法。若是在同一个线程对象上调用屡次,此方法将引起运行时错误。  898 if not self._initialized:  899 raise RuntimeError("thread.__init__() not called")  900  901 if self._started.is_set():  902 raise RuntimeError("threads can only be started once")  903  with _active_limbo_lock:  904 _limbo[self] = self  905 try:  906  _start_new_thread(self._bootstrap, ())  907 except Exception:  908  with _active_limbo_lock:  909 del _limbo[self]  910 raise  911  self._started.wait()  912  913 def run(self):  914 """Method representing the thread's activity.  915  You may override this method in a subclass. The standard run() method  916  invokes the callable object passed to the object's constructor as the  917  target argument, if any, with sequential and keyword arguments taken  918  from the args and kwargs arguments, respectively.  919 """  920 #表示线程活动的方法。您能够在子类中重写此方法。标准run()方法调用传递给对象  921 # 构造函数的可调用对象做为目标参数(若是有的话),分别使用args和kwargs参数  922 # 中的顺序参数和关键字参数。  923 try:  924 if self._target:  925 self._target(*self._args, **self._kwargs)  926 finally:  927 # Avoid a refcycle if the thread is running a function with  928 # an argument that has a member that points to the thread.  929 #若是线程正在运行一个具备指向线程的成员的参数的函数,请避免使用refcycle。  930 del self._target, self._args, self._kwargs  931  932 def _bootstrap(self):  933 # Wrapper around the real bootstrap code that ignores  934 # exceptions during interpreter cleanup. Those typically  935 # happen when a daemon thread wakes up at an unfortunate  936 # moment, finds the world around it destroyed, and raises some  937 # random exception *** while trying to report the exception in  938 # _bootstrap_inner() below ***. Those random exceptions  939 # don't help anybody, and they confuse users, so we suppress  940 # them. We suppress them only when it appears that the world  941 # indeed has already been destroyed, so that exceptions in  942 # _bootstrap_inner() during normal business hours are properly  943 # reported. Also, we only suppress them for daemonic threads;  944 # if a non-daemonic encounters this, something else is wrong.  945 '''包装真正的引导代码,在解释器清理期间忽略异常。这一般发生在守护进程线程  946  在一个不幸的时刻醒来,发现它周围的世界被破坏,并在试图报告***下面的异常  947  in_bootstrap_inner()时引起一些随机异常时。这些随机的异常对任何人都没有  948  帮助,并且它们混淆了用户,因此咱们抑制了它们。只有当世界彷佛确实已经被破坏  949  时,咱们才会抑制它们,以便在正常工做时间内正确报告_bootstrap_inner()中  950  的异常。并且,咱们只对daemonic线程禁止它们;若是一个非daemonic遇到了这个  951  问题,就会出现其余问题'''  952 try:  953  self._bootstrap_inner()  954 except:  955 if self._daemonic and _sys is None:  956 return  957 raise  958  959 def _set_ident(self):  960 self._ident = get_ident()  961  962 def _set_tstate_lock(self):  963 """  964  Set a lock object which will be released by the interpreter when  965  the underlying thread state (see pystate.h) gets deleted.  966 """  967 #设置一个锁对象,当底层线程状态(请参阅pystate.h)被删除时,解释器将释放这个锁对象。  968 self._tstate_lock = _set_sentinel()  969  self._tstate_lock.acquire()  970  971 def _bootstrap_inner(self):  972 try:  973  self._set_ident()  974  self._set_tstate_lock()  975  self._started.set()  976  with _active_limbo_lock:  977 _active[self._ident] = self  978 del _limbo[self]  979  980 if _trace_hook:  981  _sys.settrace(_trace_hook)  982 if _profile_hook:  983  _sys.setprofile(_profile_hook)  984  985 try:  986  self.run()  987 except SystemExit:  988 pass  989 except:  990 # If sys.stderr is no more (most likely from interpreter  991 # shutdown) use self._stderr. Otherwise still use sys (as in  992 # _sys) in case sys.stderr was redefined since the creation of  993 # self.  994 #若是系统。stderr再也不使用self._stderr(极可能是因为解释器关闭)。不然,  995 # 在case sys中仍然使用sys(如in_sys)。stderr自自我创造以来被从新定义。  996 if _sys and _sys.stderr is not None:  997 print("Exception in thread %s:\n%s" %  998 (self.name, _format_exc()), file=_sys.stderr)  999 elif self._stderr is not None: 1000 # Do the best job possible w/o a huge amt. of code to 1001 # approximate a traceback (code ideas from Lib/traceback.py) 1002 #尽最大的努力作最好的工做。近似回溯的代码(来自Lib/traceback.py的代码思想) 1003 exc_type, exc_value, exc_tb = self._exc_info() 1004 try: 1005 print(( 1006 "Exception in thread " + self.name + 1007 " (most likely raised during interpreter shutdown):"), file=self._stderr) 1008 print(( 1009 "Traceback (most recent call last):"), file=self._stderr) 1010 while exc_tb: 1011 print(( 1012 ' File "%s", line %s, in %s' % 1013  (exc_tb.tb_frame.f_code.co_filename, 1014  exc_tb.tb_lineno, 1015 exc_tb.tb_frame.f_code.co_name)), file=self._stderr) 1016 exc_tb = exc_tb.tb_next 1017 print(("%s: %s" % (exc_type, exc_value)), file=self._stderr) 1018  self._stderr.flush() 1019 # Make sure that exc_tb gets deleted since it is a memory 1020 # hog; deleting everything else is just for thoroughness 1021 #确保exc_tb被删除,由于它占用内存;删除全部其余内容只是为了完全 1022 finally: 1023 del exc_type, exc_value, exc_tb 1024 finally: 1025 # Prevent a race in 1026 # test_threading.test_no_refcycle_through_target when 1027 # the exception keeps the target alive past when we 1028 # assert that it's dead. 1029 #防止test_threading中的竞争。test_no_refcycle_through_target, 1030 # 当异常断言目标已死时,该异常将使目标保持存活。 1031 #XXX self._exc_clear() 1032 pass 1033 finally: 1034  with _active_limbo_lock: 1035 try: 1036 # We don't call self._delete() because it also 1037 # grabs _active_limbo_lock. 1038 #咱们不调用self._delete(),由于它也抓取_active_limbo_lock。 1039 del _active[get_ident()] 1040 except: 1041 pass 1042 1043 def _stop(self): 1044 # After calling ._stop(), .is_alive() returns False and .join() returns 1045 # immediately. ._tstate_lock must be released before calling ._stop(). 1046 #调用._stop()后,.is_alive()返回False, .join()当即返回。 1047 1048 # Normal case: C code at the end of the thread's life 1049 # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and 1050 # that's detected by our ._wait_for_tstate_lock(), called by .join() 1051 # and .is_alive(). Any number of threads _may_ call ._stop() 1052 # simultaneously (for example, if multiple threads are blocked in 1053 # .join() calls), and they're not serialized. That's harmless - 1054 # they'll just make redundant rebindings of ._is_stopped and 1055 # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the 1056 # "assert self._is_stopped" in ._wait_for_tstate_lock() always works 1057 # (the assert is executed only if ._tstate_lock is None). 1058 #正常状况:线程生命周期结束时的C代码(_threadmodule.c中的release_sentinel) 1059 # 释放了._tstate_lock,咱们的._wait_for_tstate_lock()检测到这一点, 1060 # 它被.join()和.is_alive()调用。同时调用任意数量的线程_may_ ._stop() 1061 # (例如,若是多个线程在.join()调用中被阻塞,而且它们没有被序列化)。这是无害的, 1062 # 他们只会对._is_stopped和._tstate_lock进行冗余的重绑定。晦涩的: 1063 # 咱们将._tstate_lock绑定到最后,以便“断言self”。_is_stopped()中 1064 # 的._wait_for_tstate_lock()老是有效的(只有当._tstate_lock为空时才执行断言)。 1065 1066 # Special case: _main_thread releases ._tstate_lock via this 1067 # module's _shutdown() function. 1068 #特殊状况:_main_thread经过这个模块的_shutdown()函数释放._tstate_lock。 1069 lock = self._tstate_lock 1070 if lock is not None: 1071 assert not lock.locked() 1072 self._is_stopped = True 1073 self._tstate_lock = None 1074 1075 def _delete(self): 1076 "Remove current thread from the dict of currently running threads." 1077  with _active_limbo_lock: 1078 del _active[get_ident()] 1079 # There must not be any python code between the previous line 1080 # and after the lock is released. Otherwise a tracing function 1081 # could try to acquire the lock again in the same thread, (in 1082 # current_thread()), and would block. 1083 #前一行和锁释放后之间不该该有任何python代码。不然,跟踪函数能够尝试在相 1084 # 同的线程(在current_thread()中)中再次获取锁,并将阻塞。 1085 1086 def join(self, timeout=None): 1087 """Wait until the thread terminates. 1088  This blocks the calling thread until the thread whose join() method is 1089  called terminates -- either normally or through an unhandled exception 1090  or until the optional timeout occurs. 1091  When the timeout argument is present and not None, it should be a 1092  floating point number specifying a timeout for the operation in seconds 1093  (or fractions thereof). As join() always returns None, you must call 1094  isAlive() after join() to decide whether a timeout happened -- if the 1095  thread is still alive, the join() call timed out. 1096  When the timeout argument is not present or None, the operation will 1097  block until the thread terminates. 1098  A thread can be join()ed many times. 1099  join() raises a RuntimeError if an attempt is made to join the current 1100  thread as that would cause a deadlock. It is also an error to join() a 1101  thread before it has been started and attempts to do so raises the same 1102  exception. 1103 """ 1104 #等待直到线程终止。这将阻塞调用线程,直到调用join()方法的线程终止——一般或经过 1105 # 未处理的异常终止,或直到出现可选超时为止。当出现timeout参数而不是None时, 1106 # 它应该是一个浮点数,以秒(或几分之一)为单位指定操做的超时。由于join()老是 1107 # 返回None,因此必须在join()以后调用isAlive(),以决定是否发生超时——若是线程 1108 # 仍然活着,则join()调用超时。当timeout参数不存在或不存在时,操做将阻塞, 1109 # 直到线程终止。一个线程能够屡次链接()ed。若是尝试链接当前线程,join()将引起 1110 # 一个运行时错误,由于这会致使死锁。在线程启动以前链接()线程也是一个错误, 1111 # 试图这样作会引起相同的异常。 1112 if not self._initialized: 1113 raise RuntimeError("Thread.__init__() not called") 1114 if not self._started.is_set(): 1115 raise RuntimeError("cannot join thread before it is started") 1116 if self is current_thread(): 1117 raise RuntimeError("cannot join current thread") 1118 1119 if timeout is None: 1120  self._wait_for_tstate_lock() 1121 else: 1122 # the behavior of a negative timeout isn't documented, but 1123 # historically .join(timeout=x) for x<0 has acted as if timeout=0 1124 #没有记录消极超时的行为,可是在历史上,x<0时的.join(timeout=x)就像timeout=0同样 1125 self._wait_for_tstate_lock(timeout=max(timeout, 0)) 1126 1127 def _wait_for_tstate_lock(self, block=True, timeout=-1): 1128 # Issue #18808: wait for the thread state to be gone. 1129 # At the end of the thread's life, after all knowledge of the thread 1130 # is removed from C data structures, C code releases our _tstate_lock. 1131 # This method passes its arguments to _tstate_lock.acquire(). 1132 # If the lock is acquired, the C code is done, and self._stop() is 1133 # called. That sets ._is_stopped to True, and ._tstate_lock to None. 1134 #问题#18808:等待线程状态消失。在线程生命周期结束时,在从C数据结构中删除全部 1135 # 线程知识以后,C代码释放咱们的_tstate_lock。该方法将其参数 1136 # 传递给_tstate_lock.acquire()。若是得到了锁,则完成C代码, 1137 # 并调用self._stop()。这将._is_stopped设置为True,._tstate_lock设置为None。 1138 lock = self._tstate_lock 1139 if lock is None: # already determined that the C code is done 已经肯定C代码已经完成 1140 assert self._is_stopped 1141 elif lock.acquire(block, timeout): 1142  lock.release() 1143  self._stop() 1144 1145  @property 1146 def name(self): 1147 """A string used for identification purposes only. 1148  It has no semantics. Multiple threads may be given the same name. The 1149  initial name is set by the constructor. 1150 """ 1151 #仅用于识别目的的字符串。它没有语义。多个线程可能被赋予相同的名称。初始名称由构造函数设置。 1152 assert self._initialized, "Thread.__init__() not called" 1153 return self._name 1154 1155  @name.setter 1156 def name(self, name): 1157 assert self._initialized, "Thread.__init__() not called" 1158 self._name = str(name) 1159 1160  @property 1161 def ident(self): 1162 """Thread identifier of this thread or None if it has not been started. 1163  This is a nonzero integer. See the get_ident() function. Thread 1164  identifiers may be recycled when a thread exits and another thread is 1165  created. The identifier is available even after the thread has exited. 1166 """ 1167 #此线程的线程标识符,若是没有启动,则为空。这是非零整数。请参阅get_ident()函数。 1168 # 当线程退出并建立另外一个线程时,能够回收线程标识符。即便线程已经退出,标识符也是可用的。 1169 assert self._initialized, "Thread.__init__() not called" 1170 return self._ident 1171 1172 def is_alive(self): 1173 """Return whether the thread is alive. 1174  This method returns True just before the run() method starts until just 1175  after the run() method terminates. The module function enumerate() 1176  returns a list of all alive threads. 1177 """ 1178 #返回线程是否存在。这个方法在run()方法开始以前返回True,直到run()方法终止以后。 1179 # 模块函数enumerate()返回一个包含全部活线程的列表。 1180 assert self._initialized, "Thread.__init__() not called" 1181 if self._is_stopped or not self._started.is_set(): 1182 return False 1183  self._wait_for_tstate_lock(False) 1184 return not self._is_stopped 1185 1186 isAlive = is_alive 1187 1188  @property 1189 def daemon(self): 1190 """A boolean value indicating whether this thread is a daemon thread. 1191  This must be set before start() is called, otherwise RuntimeError is 1192  raised. Its initial value is inherited from the creating thread; the 1193  main thread is not a daemon thread and therefore all threads created in 1194  the main thread default to daemon = False. 1195  The entire Python program exits when no alive non-daemon threads are 1196  left. 1197 """ 1198 #一个布尔值,指示此线程是否为守护线程。这必须在调用start()以前设置,不然会引起 1199 # 运行时错误。它的初始值继承自建立线程;主线程不是守护进程线程,所以在主线程中 1200 # 建立的全部线程默认为守护进程= False。当没有存活的非守护进程线程时, 1201 # 整个Python程序退出。 1202 assert self._initialized, "Thread.__init__() not called" 1203 return self._daemonic 1204 1205  @daemon.setter 1206 def daemon(self, daemonic): 1207 if not self._initialized: 1208 raise RuntimeError("Thread.__init__() not called") 1209 if self._started.is_set(): 1210 raise RuntimeError("cannot set daemon status of active thread") 1211 self._daemonic = daemonic 1212 1213 def isDaemon(self): #Daemon:守护进程 1214 return self.daemon 1215 1216 def setDaemon(self, daemonic): 1217 self.daemon = daemonic 1218 1219 def getName(self): 1220 return self.name 1221 1222 def setName(self, name): 1223 self.name = name 1224 1225 # The timer class was contributed by Itamar Shtull-Trauring 1226 #计时器类由Itamar Shtull-Trauring贡献 1227 1228 class Timer(Thread): 1229 """Call a function after a specified number of seconds: 1230  t = Timer(30.0, f, args=None, kwargs=None) 1231  t.start() 1232  t.cancel() # stop the timer's action if it's still waiting 1233 """ 1234 #在指定的秒数后调用一个函数:t = Timer(30.0, f, args=None, kwargs=None) 1235 #t.start() t.cancel()若是计时器仍在等待,则中止计时器的操做 1236 1237 def __init__(self, interval, function, args=None, kwargs=None): 1238 Thread.__init__(self) 1239 self.interval = interval 1240 self.function = function 1241 self.args = args if args is not None else [] 1242 self.kwargs = kwargs if kwargs is not None else {} 1243 self.finished = Event() 1244 1245 def cancel(self): 1246 """Stop the timer if it hasn't finished yet.""" 1247 #若是计时器尚未结束,请中止。 1248  self.finished.set() 1249 1250 def run(self): 1251  self.finished.wait(self.interval) 1252 if not self.finished.is_set(): 1253 self.function(*self.args, **self.kwargs) 1254  self.finished.set() 1255 1256 1257 # Special thread class to represent the main thread 1258 '''表示主线程的特殊线程类''' 1259 1260 class _MainThread(Thread): 1261 1262 def __init__(self): 1263 Thread.__init__(self, name="MainThread", daemon=False) 1264  self._set_tstate_lock() 1265  self._started.set() 1266  self._set_ident() 1267  with _active_limbo_lock: 1268 _active[self._ident] = self 1269 1270 1271 # Dummy thread class to represent threads not started here. 1272 # These aren't garbage collected when they die, nor can they be waited for. 1273 # If they invoke anything in threading.py that calls current_thread(), they 1274 # leave an entry in the _active dict forever after. 1275 # Their purpose is to return *something* from current_thread(). 1276 # They are marked as daemon threads so we won't wait for them 1277 # when we exit (conform previous semantics). 1278 #伪线程类来表示这里没有启动的线程。它们死后不会被垃圾收集,也不会被等待。若是它们在 1279 # 线程中调用任何东西。调用current_thread()的py在_active dict中永远留下一个条目。 1280 # 它们的目的是从current_thread()返回*something*。它们被标记为守护线程,所以在退出 1281 # 时咱们不会等待它们(符合前面的语义)。 1282 1283 class _DummyThread(Thread): 1284 1285 def __init__(self): 1286 Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) 1287 1288  self._started.set() 1289  self._set_ident() 1290  with _active_limbo_lock: 1291 _active[self._ident] = self 1292 1293 def _stop(self): 1294 pass 1295 1296 def is_alive(self): 1297 assert not self._is_stopped and self._started.is_set() 1298 return True 1299 1300 def join(self, timeout=None): 1301 assert False, "cannot join a dummy thread" 1302 1303 1304 # Global API functions 1305 #全球API函数 1306 1307 def current_thread(): 1308 """Return the current Thread object, corresponding to the caller's thread of control. 1309  If the caller's thread of control was not created through the threading 1310  module, a dummy thread object with limited functionality is returned. 1311 """ 1312 #返回当前线程对象,对应于调用方的控制线程。若是没有经过线程模块建立调用者的控制线 1313 # 程,则返回具备有限功能的虚拟线程对象。 1314 try: 1315 return _active[get_ident()] 1316 except KeyError: 1317 return _DummyThread() 1318 1319 currentThread = current_thread 1320 1321 def active_count(): 1322 """Return the number of Thread objects currently alive. 1323  The returned count is equal to the length of the list returned by 1324  enumerate(). 1325 """ 1326 #返回当前存活的线程对象的数量。返回的计数等于enumerate()返回的列表的长度。 1327  with _active_limbo_lock: 1328 return len(_active) + len(_limbo) 1329 1330 activeCount = active_count 1331 1332 def _enumerate(): 1333 # Same as enumerate(), but without the lock. Internal use only. 1334 #与enumerate()相同,只是没有锁。内部使用。 1335 return list(_active.values()) + list(_limbo.values()) 1336 1337 def enumerate(): 1338 """Return a list of all Thread objects currently alive. 1339  The list includes daemonic threads, dummy thread objects created by 1340  current_thread(), and the main thread. It excludes terminated threads and 1341  threads that have not yet been started. 1342 """ 1343 #返回当前全部线程对象的列表。该列表包括daemonic线程、current_thread()建立的虚拟 1344 # 线程对象和主线程。它排除终止的线程和还没有启动的线程。 1345  with _active_limbo_lock: 1346 return list(_active.values()) + list(_limbo.values()) 1347 1348 from _thread import stack_size 1349 1350 # Create the main thread object, 1351 # and make it available for the interpreter 1352 # (Py_Main) as threading._shutdown. 1353 #建立主线程对象,并将其做为thread ._shutdown提供给解释器(Py_Main)。 1354 1355 _main_thread = _MainThread() 1356 1357 def _shutdown(): 1358 # Obscure: other threads may be waiting to join _main_thread. That's 1359 # dubious, but some code does it. We can't wait for C code to release 1360 # the main thread's tstate_lock - that won't happen until the interpreter 1361 # is nearly dead. So we release it here. Note that just calling _stop() 1362 # isn't enough: other threads may already be waiting on _tstate_lock. 1363 #晦涩:其余线程可能正在等待加入_main_thread。这很可疑,但有些代码能够作到。 1364 # 咱们不能等待C代码释放主线程的tstate_lock——这要等到解释器快死的时候才会发生。 1365 # 咱们在这里释放它。注意,仅仅调用_stop()是不够的:其余线程可能已经在 1366 # 等待_tstate_lock了。 1367 if _main_thread._is_stopped: 1368 # _shutdown() was already called 1369 return 1370 tlock = _main_thread._tstate_lock 1371 # The main thread isn't finished yet, so its thread state lock can't have 1372 # been released. 1373 #主线程还没有完成,所以它的线程状态锁没法释放。 1374 assert tlock is not None 1375 assert tlock.locked() 1376  tlock.release() 1377  _main_thread._stop() 1378 t = _pickSomeNonDaemonThread() 1379 while t: 1380  t.join() 1381 t = _pickSomeNonDaemonThread() 1382 1383 def _pickSomeNonDaemonThread(): 1384 for t in enumerate(): 1385 if not t.daemon and t.is_alive(): 1386 return t 1387 return None 1388 1389 def main_thread(): 1390 """Return the main thread object. 1391  In normal conditions, the main thread is the thread from which the 1392  Python interpreter was started. 1393 """ 1394 #返回主线程对象。在正常状况下,主线程是Python解释器启动的线程。 1395 return _main_thread 1396 1397 # get thread-local implementation, either from the thread 1398 # module, or from the python fallback 1399 #从线程模块或python回退中获取线程本地实现 1400 1401 try: 1402 from _thread import _local as local 1403 except ImportError: 1404 from _threading_local import local 1405 1406 1407 def _after_fork(): 1408 """ 1409  Cleanup threading module state that should not exist after a fork. 1410 """ 1411 # Reset _active_limbo_lock, in case we forked while the lock was held 1412 # by another (non-forked) thread. http://bugs.python.org/issue874900 1413 #Reset _active_limbo_lock,以防咱们分叉而锁被另外一个(非分叉的)线程持有。 1414 global _active_limbo_lock, _main_thread 1415 _active_limbo_lock = _allocate_lock() 1416 1417 # fork() only copied the current thread; clear references to others. 1418 #fork()只复制当前线程;明确说起他人。 1419 new_active = {} 1420 current = current_thread() 1421 _main_thread = current 1422  with _active_limbo_lock: 1423 # Dangling thread instances must still have their locks reset, 1424 # because someone may join() them. 1425 #悬空线程实例必须从新设置它们的锁,由于有人可能会加入()它们。 1426 threads = set(_enumerate()) 1427  threads.update(_dangling) 1428 for thread in threads: 1429 # Any lock/condition variable may be currently locked or in an 1430 # invalid state, so we reinitialize them. 1431 #任何锁/条件变量可能当前被锁定或处于无效状态,所以咱们从新初始化它们。 1432 if thread is current: 1433 # There is only one active thread. We reset the ident to 1434 # its new value since it can have changed. 1435 #只有一个活动线程。咱们将ident重置为它的新值,由于它可能已经更改。 1436  thread._reset_internal_locks(True) 1437 ident = get_ident() 1438 thread._ident = ident 1439 new_active[ident] = thread 1440 else: 1441 # All the others are already stopped. 1442  thread._reset_internal_locks(False) 1443  thread._stop() 1444 1445  _limbo.clear() 1446  _active.clear() 1447  _active.update(new_active) 1448 assert len(_active) == 1 1449 1450 1451 if hasattr(_os, "register_at_fork"): 1452 _os.register_at_fork(after_in_child=_after_fork)
threading源代码

  队列:数据结构

  Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,可以在多线程中直接使用。可使用队列来实现线程间的同步。多线程

 1 '''队列'''  2 import queue  3 q =queue.Queue() #设置队列  4 q.put("q1") #队列中放入数据  5 q.put("q2")  6 q.put("q3")  7  8 # print(q.qsize()) #获取队列大小  9 10 '''队列中获取数据,取出的数据超出存入数据时会等待,不会报错''' 11 print(q.get()) 12 print(q.get()) 13 print(q.get()) 14 # print(q.get()) 15 16 '''获取队列,但不会等待,超出后直接报错''' 17 print(q.get_nowait()) 18 print(q.get_nowait()) 19 print(q.get_nowait()) 20 # print(q.get_nowait()) 21 22 '''设置优先级排序的依据''' 23 q = queue.PriorityQueue(maxsize=0) 24 q.put((3,"q1")) #当maxsizie<=0时,队列无限大,>0时,给定数据即为队列大小 25 q.put((1,"q2")) 26 q.put((-4,"q3")) 27 print(q.get()) #获取时会从小到大按顺序获取 28 print(q.get()) 29 print(q.get())

上述代码只是队列的应用,下面将队列应用与线程之中:

 1 import queue  2 import time  3 import threading  4  5 q = queue.Queue(maxsize=10)  6 def gave(name):  7 count = 1  8 while True:  9 q.put("--骨头--%s" % count) 10 print("%s 生产骨头 %s" % (name,count)) 11 time.sleep(1) 12 count+=1 13 14 def consumer(name): 15 while q.qsize()>0: 16 # while True: 17 print("%s 吃掉 %s" % (name,q.get())) 18 # time.sleep(10) 19 20 g = threading.Thread(target=gave,args=("王二小",)) 21 c = threading.Thread(target=consumer,args=("旺财",)) 22 g.start() 23 c.start()
对垒事件
 1 #print('\033[41;1m--red light on---\033[0m') #红灯  2 #print('\033[43;1m--yellow light on---\033[0m') #黄灯  3 #print('\033[42;1m--green light on---\033[0m') #绿灯  4 '''主要用在数据同步上'''  5 '''红绿灯事件'''  6  7 # import threading  8 # import time  9 # # import queue  10 # event = threading.Event()  11 # # q = queue.Queue()  12 #  13 # def light():  14 # count = 1  15 # while True:  16 # if count<=5:  17 # event.set()  18 # print('\033[42;1m--green light on---\033[0m')  19 # elif 5<count<=10:  20 # event.clear()  21 # print('\033[43;1m--yellow light on---\033[0m')  22 # else:  23 # print('\033[41;1m--red light on---\033[0m')  24 # if count>=15:  25 # count = 0  26 # time.sleep(1)  27 # count+=1  28 #  29 # def car(name):  30 # while True:  31 # if event.is_set():  32 # time.sleep(1)  33 # print("%s is running..." % name)  34 # else:  35 # print("car is waiting...")  36 # event.wait() #等待事件event对象发生变化  37 #  38 #  39 # Light = threading.Thread(target=light,)  40 # Light.start()  41 # Car = threading.Thread(target=car,args=("BENZ",))  42 # Car.start()  43  44  45 import threading  46 import time  47 import queue  48  49 event=threading.Event()  50 q=queue.PriorityQueue(maxsize=20)  51 #在循环以前先放入十辆车:  52 for i in range(10):  53 q.put("旧车辆,%s" % "QQ")  54  55 def light():  56 count=0  57 while True:  58 if count<10:  59  event.set()  60 print("\033[42;1m--green light on---\033[0m",10-count)  61 elif 10<=count<15:  62  event.clear()  63 print("\033[43;1m--yellow light on---\033[0m",15-count)  64 else:  65  event.clear()  66 if count>=25:  67 count=0  68 continue  69 print("\033[41;1m--red light on---\033[0m",25-count)  70 time.sleep(1)  71 count+=1  72  73 def car(name):  74 while True:  75 if event.is_set() and q.qsize()>=1:  76 print("%s is running..." % name)  77 time.sleep(1)  78 print("道路还有【%s】辆车" % q.qsize())  79 else:  80 print("car is waiting...")  81 print("如今道路中有车%s辆" % q.qsize())  82 event.wait() #等待事件event对象发生变化  83  84 #路口停车  85 def Put():  86 n=0  87 while q.qsize()<20:  88 time.sleep(2)  89 q.put("新车辆%s车辆" % n)  90 n+=1  91 print("车辆已驶入")  92 else:  93  event.wait()  94 print("中止驶入")  95 print("中止驶入后道路中有车%s" % q.qsize())  96  97 #车辆行驶  98 def Get():  99 while True: 100 if event.is_set(): 101 time.sleep(2) 102 print("%s车辆--------经过" % q.get()) 103 else: 104 print("禁止通行!!") 105  event.wait() 106 107 108 109 C=threading.Thread(target=car,args=("...T...",)) 110 L=threading.Thread(target=light) 111 P=threading.Thread(target=Put) 112 G=threading.Thread(target=Get) 113 L.start() 114 C.start() 115 P.start() 116 G.start()
红绿灯事件
 1 import threading  2  3 money = 0  4 lock = threading.Lock()  5  6 #存钱  7 def get_money(Sum):  8 global money  9 money+=Sum #x=money+sum;money=x 10 11 #取钱 12 def put_money(Sum): 13 global money 14 money-=Sum 15 16 def run(Sum): 17  lock.acquire() 18 for i in range(10000): 19  put_money(Sum) 20  get_money(Sum) 21  lock.release() 22 23 #单线程中不会存在问题 24 #然而在多线程中,操做系统交叉处理赋值语句,致使 25 # 全局变量被一个线程修改,而另外一个线程殊不知情。 26 m1 = threading.Thread(target=run,args=(100,)) 27 m2 = threading.Thread(target=run,args=(1000,)) 28 m1.start() 29 m2.start() 30 m1.join() 31 m2.join() 32 print(money)
银行事件
 1 '''A multi-producer, multi-consumer queue.'''  2 #多生产者、多消费者队列。  3 import threading  4 from collections import deque  5 from heapq import heappush, heappop  6 from time import monotonic as time  7 try:  8 from _queue import SimpleQueue  9 except ImportError:  10 SimpleQueue = None  11  12 __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']  13  14  15 try:  16 from _queue import Empty  17 except AttributeError:  18 class Empty(Exception):  19 'Exception raised by Queue.get(block=0)/get_nowait().'  20 pass  21  22 class Full(Exception):  23 'Exception raised by Queue.put(block=0)/put_nowait().'  24 pass  25  26  27 class Queue:  28 '''Create a queue object with a given maximum size.  29  If maxsize is <= 0, the queue size is infinite.  30 '''  31 #建立一个具备给定最大大小的队列对象。若是maxsize <= 0,则队列大小为无穷大。  32  33 def __init__(self, maxsize=0):  34 self.maxsize = maxsize  35  self._init(maxsize)  36  37 # mutex must be held whenever the queue is mutating. All methods  38 # that acquire mutex must release it before returning. mutex  39 # is shared between the three conditions, so acquiring and  40 # releasing the conditions also acquires and releases mutex.  41 #当队列发生变化时,必须持有互斥锁。全部得到互斥锁的方法都必须在返回以前释放它。  42 # 互斥锁在这三个条件之间是共享的,所以获取和释放条件也得到和释放互斥锁。  43 self.mutex = threading.Lock()  44  45 # Notify not_empty whenever an item is added to the queue; a  46 # thread waiting to get is notified then.  47 #当一个项目被添加到队列中时,通知not_empty;而后会通知等待获取的线程。  48 self.not_empty = threading.Condition(self.mutex)  49  50 # Notify not_full whenever an item is removed from the queue;  51 # a thread waiting to put is notified then.  52 #当一个项目从队列中删除时,通知not_full;而后会通知等待放置的线程。  53 self.not_full = threading.Condition(self.mutex)  54  55 # Notify all_tasks_done whenever the number of unfinished tasks  56 # drops to zero; thread waiting to join() is notified to resume  57 #当未完成任务的数量降为零时,通知all_tasks_done;等待加入()的线程被通知恢复  58 self.all_tasks_done = threading.Condition(self.mutex)  59 self.unfinished_tasks = 0  60  61 def task_done(self):  62 '''Indicate that a formerly enqueued task is complete.  63  Used by Queue consumer threads. For each get() used to fetch a task,  64  a subsequent call to task_done() tells the queue that the processing  65  on the task is complete.  66  If a join() is currently blocking, it will resume when all items  67  have been processed (meaning that a task_done() call was received  68  for every item that had been put() into the queue).  69  Raises a ValueError if called more times than there were items  70  placed in the queue.  71 '''  72 #指示之前加入队列的任务已经完成。由队列使用者线程使用。对于用于获取任务的每一个  73 # get(),对task_done()的后续调用将告诉队列任务的处理已经完成。若是一个join()  74 # 当前处于阻塞状态,那么当全部项都被处理完时(这意味着对于每一个已将()放入队列的  75 # 项都接收了task_done()调用),它将恢复。若是调用的次数超过了队列中放置的项的  76 # 次数,就会引起ValueError。  77  with self.all_tasks_done:  78 unfinished = self.unfinished_tasks - 1  79 if unfinished <= 0:  80 if unfinished < 0:  81 raise ValueError('task_done() called too many times')  82  self.all_tasks_done.notify_all()  83 self.unfinished_tasks = unfinished  84  85 def join(self):  86 '''Blocks until all items in the Queue have been gotten and processed.  87  The count of unfinished tasks goes up whenever an item is added to the  88  queue. The count goes down whenever a consumer thread calls task_done()  89  to indicate the item was retrieved and all work on it is complete.  90  When the count of unfinished tasks drops to zero, join() unblocks.  91 '''  92 #阻塞,直到获取和处理队列中的全部项。当一个项目被添加到队列中时,未完成任务的计数  93 # 就会上升。每当使用者线程调用task_done()时,计数就会降低,以指示检索了项目并完成  94 # 了对其的全部工做。当未完成任务的计数降为0时,join()将解块。  95  with self.all_tasks_done:  96 while self.unfinished_tasks:  97  self.all_tasks_done.wait()  98  99 def qsize(self): 100 '''Return the approximate size of the queue (not reliable!).''' 101 #返回队列的大体大小(不可靠!) 102  with self.mutex: 103 return self._qsize() 104 105 def empty(self): 106 '''Return True if the queue is empty, False otherwise (not reliable!). 107  This method is likely to be removed at some point. Use qsize() == 0 108  as a direct substitute, but be aware that either approach risks a race 109  condition where a queue can grow before the result of empty() or 110  qsize() can be used. 111  To create code that needs to wait for all queued tasks to be 112  completed, the preferred technique is to use the join() method. 113 ''' 114 #若是队列为空,返回True,不然返回False(不可靠!)这种方法可能会在某个时候被删除。 115 # 使用qsize() == 0做为直接的替代,可是要注意,在使用empty()或qsize()的结果以前, 116 # 队列可能会增加,这可能会带来竞争条件的风险。要建立须要等待全部排队任务完成的代码, 117 # 首选技术是使用join()方法。 118  with self.mutex: 119 return not self._qsize() 120 121 def full(self): 122 '''Return True if the queue is full, False otherwise (not reliable!). 123  This method is likely to be removed at some point. Use qsize() >= n 124  as a direct substitute, but be aware that either approach risks a race 125  condition where a queue can shrink before the result of full() or 126  qsize() can be used. 127 ''' 128 #若是队列满了,返回True,不然返回False(不可靠!)这种方法可能会在某个时候被删除。 129 # 使用qsize() >= n做为直接替代,可是要注意,在使用full()或qsize()的结果以前, 130 # 队列可能会收缩,这可能会致使竞争条件的风险。 131  with self.mutex: 132 return 0 < self.maxsize <= self._qsize() 133 134 def put(self, item, block=True, timeout=None): 135 '''Put an item into the queue. 136  If optional args 'block' is true and 'timeout' is None (the default), 137  block if necessary until a free slot is available. If 'timeout' is 138  a non-negative number, it blocks at most 'timeout' seconds and raises 139  the Full exception if no free slot was available within that time. 140  Otherwise ('block' is false), put an item on the queue if a free slot 141  is immediately available, else raise the Full exception ('timeout' 142  is ignored in that case). 143 ''' 144 #将项目放入队列中。若是可选的args 'block'为true,而'timeout'为None(默认值), 145 # 那么若是有必要,阻塞直到空闲的插槽可用为止。若是“timeout”是非负数,它最多会 146 # 阻塞“timeout”秒,若是在这段时间内没有可用的空闲时间,它就会引起彻底异常。 147 # 不然(‘block’为false),若是有空闲的插槽当即可用,就在队列中放置一个项目, 148 # 不然引起完整的异常(在这种状况下忽略‘timeout’)。 149  with self.not_full: 150 if self.maxsize > 0: 151 if not block: 152 if self._qsize() >= self.maxsize: 153 raise Full 154 elif timeout is None: 155 while self._qsize() >= self.maxsize: 156  self.not_full.wait() 157 elif timeout < 0: 158 raise ValueError("'timeout' must be a non-negative number") 159 else: 160 endtime = time() + timeout 161 while self._qsize() >= self.maxsize: 162 remaining = endtime - time() 163 if remaining <= 0.0: 164 raise Full 165  self.not_full.wait(remaining) 166  self._put(item) 167 self.unfinished_tasks += 1 168  self.not_empty.notify() 169 170 def get(self, block=True, timeout=None): 171 '''Remove and return an item from the queue. 172  If optional args 'block' is true and 'timeout' is None (the default), 173  block if necessary until an item is available. If 'timeout' is 174  a non-negative number, it blocks at most 'timeout' seconds and raises 175  the Empty exception if no item was available within that time. 176  Otherwise ('block' is false), return an item if one is immediately 177  available, else raise the Empty exception ('timeout' is ignored 178  in that case). 179 ''' 180 #从队列中删除并返回项。若是可选的args 'block'为true,而'timeout'为None 181 # (默认值),则在项可用以前,若是有必要,阻塞。若是“timeout”是非负数,它最多 182 # 会阻塞“timeout”秒,若是在这段时间内没有可用项,就会引起空异常。 183 # 不然(‘block’为false),若是一个项当即可用,返回一个项,不然引起空异常 184 # (在这种状况下忽略'timeout')。 185  with self.not_empty: 186 if not block: 187 if not self._qsize(): 188 raise Empty 189 elif timeout is None: 190 while not self._qsize(): 191  self.not_empty.wait() 192 elif timeout < 0: 193 raise ValueError("'timeout' must be a non-negative number") 194 else: 195 endtime = time() + timeout 196 while not self._qsize(): 197 remaining = endtime - time() 198 if remaining <= 0.0: 199 raise Empty 200  self.not_empty.wait(remaining) 201 item = self._get() 202  self.not_full.notify() 203 return item 204 205 def put_nowait(self, item): 206 '''Put an item into the queue without blocking. 207  Only enqueue the item if a free slot is immediately available. 208  Otherwise raise the Full exception. 209 ''' 210 #将项目放入队列中而不阻塞。只有当一个空闲的插槽当即可用时,才将项目加入队列。不然引起彻底异常。 211 return self.put(item, block=False) 212 213 def get_nowait(self): 214 '''Remove and return an item from the queue without blocking. 215  Only get an item if one is immediately available. Otherwise 216  raise the Empty exception. 217 ''' 218 #在不阻塞的状况下从队列中删除并返回项。只有当一个项目是当即可用的。不然引起空异常。 219 return self.get(block=False) 220 221 # Override these methods to implement other queue organizations 222 # (e.g. stack or priority queue). 223 # These will only be called with appropriate locks held 224 #重写这些方法以实现其余队列组织(例如堆栈或优先队列)。只有在持有适当的锁时才会调用这些函数 225 226 # Initialize the queue representation 227 '''初始化队列表示''' 228 def _init(self, maxsize): 229 self.queue = deque() 230 231 def _qsize(self): 232 return len(self.queue) 233 234 # Put a new item in the queue 235 def _put(self, item): 236  self.queue.append(item) 237 238 # Get an item from the queue 239 def _get(self): 240 return self.queue.popleft() 241 242 243 class PriorityQueue(Queue): 244 '''Variant of Queue that retrieves open entries in priority order (lowest first). 245  Entries are typically tuples of the form: (priority number, data). 246 ''' 247 #按优先级顺序(最低优先级)检索打开项的队列的变体。条目一般是表单的元组(优先级号、数据)。 248 249 def _init(self, maxsize): 250 self.queue = [] 251 252 def _qsize(self): 253 return len(self.queue) 254 255 def _put(self, item): 256  heappush(self.queue, item) 257 258 def _get(self): 259 return heappop(self.queue) 260 261 262 class LifoQueue(Queue): 263 '''Variant of Queue that retrieves most recently added entries first.''' 264 #队列的变体,它首先检索最近添加的条目。 265 266 def _init(self, maxsize): 267 self.queue = [] 268 269 def _qsize(self): 270 return len(self.queue) 271 272 def _put(self, item): 273  self.queue.append(item) 274 275 def _get(self): 276 return self.queue.pop() 277 278 279 class _PySimpleQueue: 280 '''Simple, unbounded FIFO queue. 281  This pure Python implementation is not reentrant. 282 ''' 283 #简单、无界的FIFO队列。这个纯Python实现是不可重入的。 284 285 # Note: while this pure Python version provides fairness 286 # (by using a threading.Semaphore which is itself fair, being based 287 # on threading.Condition), fairness is not part of the API contract. 288 # This allows the C version to use a different implementation. 289 #注意:虽然这个纯Python版本提供了公平性(经过使用线程)。信号量自己是公平的, 290 # 基于thread . condition),公平不是API契约的一部分。这容许C版本使用不一样的实现。 291 292 def __init__(self): 293 self._queue = deque() 294 self._count = threading.Semaphore(0) 295 296 def put(self, item, block=True, timeout=None): 297 '''Put the item on the queue. 298  The optional 'block' and 'timeout' arguments are ignored, as this method 299  never blocks. They are provided for compatibility with the Queue class. 300 ''' 301 #将项目放到队列中。可选的“block”和“timeout”参数被忽略,由于这个方法从不阻塞。 302 # 它们是为了与队列类兼容而提供的。 303  self._queue.append(item) 304  self._count.release() 305 306 def get(self, block=True, timeout=None): 307 '''Remove and return an item from the queue. 308  If optional args 'block' is true and 'timeout' is None (the default), 309  block if necessary until an item is available. If 'timeout' is 310  a non-negative number, it blocks at most 'timeout' seconds and raises 311  the Empty exception if no item was available within that time. 312  Otherwise ('block' is false), return an item if one is immediately 313  available, else raise the Empty exception ('timeout' is ignored 314  in that case). 315 ''' 316 #从队列中删除并返回项。若是可选的args 'block'为true,而'timeout'为None 317 # (默认值),则在项可用以前,若是有必要,阻塞。若是“timeout”是非负数,它最多 318 # 会阻塞“timeout”秒,若是在这段时间内没有可用项,就会引起空异常。不然 319 # (‘block’为false),若是一个项当即可用,返回一个项,不然引起空异常 320 # (在这种状况下忽略'timeout')。 321 if timeout is not None and timeout < 0: 322 raise ValueError("'timeout' must be a non-negative number") 323 if not self._count.acquire(block, timeout): 324 raise Empty 325 return self._queue.popleft() 326 327 def put_nowait(self, item): 328 '''Put an item into the queue without blocking. 329  This is exactly equivalent to `put(item)` and is only provided 330  for compatibility with the Queue class. 331 ''' 332 #将项目放入队列中而不阻塞。这彻底等同于‘put(item)’,而且只提供与队列类的兼容性。 333 return self.put(item, block=False) 334 335 def get_nowait(self): 336 '''Remove and return an item from the queue without blocking. 337  Only get an item if one is immediately available. Otherwise 338  raise the Empty exception. 339 ''' 340 #在不阻塞的状况下从队列中删除并返回项。只有当一个项目是当即可用的。不然引起空异常。 341 return self.get(block=False) 342 343 def empty(self): 344 '''Return True if the queue is empty, False otherwise (not reliable!).''' 345 #若是队列为空,返回True,不然返回False(不可靠!) 346 return len(self._queue) == 0 347 348 def qsize(self): 349 '''Return the approximate size of the queue (not reliable!).''' 350 #返回队列的大体大小(不可靠!) 351 return len(self._queue) 352 353 354 if SimpleQueue is None: 355 SimpleQueue = _PySimpleQueue
queue源代码

 

Python进程

 

进程(multiprocessing):

线程是进程最小的数据单元;每一个进程都是相互独立的,它们之间不能共享数据。

启动单个进程:

 1 '''启动一个进程'''  2 import multiprocessing  3 import time  4  5 def run(name):  6 time.sleep(2)  7 print("hello",name)  8  9 if __name__ == "__main__": 10 p = multiprocessing.Process(target=run,args=("pp",)) 11  p.start() 12  p.join() 13 14 15 '''运行结果''' 16 hello pp

启动多个进程:

 1 import multiprocessing  2 import time  3  4 def run(name):  5 time.sleep(2)  6 print("hello",name)  7  8 for i in range(10):  9 if __name__ == "__main__": 10 p = multiprocessing.Process(target=run, args=("pp",)) 11  p.start() 12 p.join()

在进程中建立进程:

 1 import multiprocessing  2 import threading  3 import time  4  5 def thread_run():  6 print(threading.get_ident()) #get_ident()得到线程地址  7  8 def run(name):  9 time.sleep(2) 10 print("hello",name) 11 t = threading.Thread(target=thread_run) 12  t.start() 13 14 if __name__ == "__main__": 15 p = multiprocessing.Process(target=run,args=("pp",)) 16  p.start() 17  p.join() 18 19 20 '''运行结果''' 21 hello pp 22 2404

数据共享:

 1 '''经过中间介质(pickle)使两个进程实现数据共享,实质上并非彻底的数据共享,  2 只是将子进程的对象(队列Queue)进行克隆'''  3 # import threading  4 # import queue  5 #  6 # def f():  7 # q.put("jfkdsljfkdls")  8 #  9 # if __name__=="__main__": 10 # q=queue.Queue() 11 # p=threading.Thread(target=f) 12 # p.start() 13 # print(q.get()) 14 # p.join() 15 16 '''进程''' 17 from multiprocessing import Process,Queue 18 import queue 19 20 def f(q2): 21 q2.put("hkshhdjskajdksa") 22 23 if __name__=="__main__": 24 q=Queue() 25 p=Process(target=f,args=(q,)) 26  p.start() 27 print(q.get()) 28 p.join()
经过中间介质(pickle)
 1 '''manager是用来传递对象'''  2 from multiprocessing import Process,Manager  3 import os  4  5 def f(d,l,l1):  6 d[os.getpid()] = os.getpid() #os.getpid():Return the current process id.  7  l.append(os.getpid())  8 l1.append(os.getpid()) #l1属于直接传递,不能回传  9 # print(l) 10 # print("l1:***",l1) #普通列表在子进程中每次会得到一个新值,但都会被下一个值覆盖 11 # print(d) 12 13 if __name__ == "__main__": 14  with Manager() as mager: 15 d = mager.dict() #由manager生成的字典 16 l = mager.list(range(5)) #由manager生成的列表 17 l1 = [] #普通列表没法共享数据,最后仍旧是空列表 18 p_list = [] 19 for i in range(10): 20 p = Process(target=f,args=(d,l,l1)) 21  p.start() 22  p.join() 23 print("l:",l) 24 print(l1) 25 print("d:",d)
数据共享(manager)

数据传递:

 1 '''主进程(父类进程) 子进程'''  2 '''管道通讯实现数据之间的传递'''  3 # from multiprocessing import Process,Pipe  4 #  5 # def f(con):  6 # con.send("hello from child1")  7 # con.send("hello from child2")  8 # print("parent news:",con.recv())  9 # con.close() 10 # 11 # if __name__ == "__main__": 12 # Parent_con,Child_con = Pipe() 13 # p = Process(target=f,args=(Child_con,)) 14 # p.start() 15 # print(Parent_con.recv()) 16 # print(Parent_con.recv()) 17 # Parent_con.send("from parent") 18 # p.join() 19 20 21 22 '''运行结果''' 23 hello from child1 24 hello from child2 25 parent news: from parent
管道通讯

 进程锁(Lock):

屏幕存在共享,多进程能够同时使用屏幕,进程加锁的目的在于,确保屏幕被独个进程使用。

from multiprocessing import Process,Lock def f(l,i): l.acquire() print("+++",i) l.release() if __name__ == "__main__": lock = Lock() #加锁的目的是为了确保屏幕被单独占用 for num in range(100): Process(target=f,args=(lock,num)).start() '''运行结果''' +++ 0 +++ 1 +++ 2 +++ 3 +++ 4 +++ 5 . . .(不在这里演示完全部内容)

进程池(pool):

python中,进程池内部会维护一个进程序列。当须要时,程序会去进程池中获取一个进程。

若是进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

from multiprocessing import Process,Pool import time import os def foo(i): time.sleep(2) print("in the process:",os.getpid()) return i+100 def bar(args): print("system done",args) if __name__ == "__main__": pool = Pool(5) for i in range(10): # pool.apply(func=foo,args=(i,)) #生成进程,把pool放入容器 #apply自己是一个串行方法,不受join影响  pool.apply_async(func=foo,args=(i,),callback=bar) #apply_async是一个并行方法,受join影响 #callback()回调函数为主进程操做,进程池一旦开始运行,回调函数会自动执行 print("end") pool.close() #pool关闭是须要时间的,因此在close以后再join pool.join()

pool的内置方法:

  • apply    串行方法。从进程池里取一个进程并同步执行,不受join影响
  • apply_async    并行方法。从进程池里取出一个进程并异步执行,受join影响
  • terminate    马上关闭进程池
  • join 主进程等待全部子进程执行完毕,必须在close或terminete以后(如上述代码)
  • close 等待全部进程结束才关闭线程池
 1 #  2 # Module providing the `Pool` class for managing a process pool  3 #模块提供用于管理进程池的“池”类  4 # multiprocessing/pool.py  5 #  6 # Copyright (c) 2006-2008, R Oudkerk  7 # Licensed to PSF under a Contributor Agreement.  8 #  9  10 __all__ = ['Pool', 'ThreadPool']  11  12 #  13 # Imports  14 #  15  16 import threading  17 import queue  18 import itertools  19 import collections  20 import os  21 import time  22 import traceback  23  24 # If threading is available then ThreadPool should be provided. Therefore  25 # we avoid top-level imports which are liable to fail on some systems.  26 from . import util  27 from . import get_context, TimeoutError  28  29 #  30 # Constants representing the state of a pool  31 #表示池状态的常数  32  33 RUN = 0  34 CLOSE = 1  35 TERMINATE = 2  36  37 #  38 # Miscellaneous  39 #  40  41 job_counter = itertools.count()  42  43 def mapstar(args):  44 return list(map(*args))  45  46 def starmapstar(args):  47 return list(itertools.starmap(args[0], args[1]))  48  49 #  50 # Hack to embed stringification of remote traceback in local traceback  51 #  52  53 class RemoteTraceback(Exception):  54 def __init__(self, tb):  55 self.tb = tb  56 def __str__(self):  57 return self.tb  58  59 class ExceptionWithTraceback:  60 def __init__(self, exc, tb):  61 tb = traceback.format_exception(type(exc), exc, tb)  62 tb = ''.join(tb)  63 self.exc = exc  64 self.tb = '\n"""\n%s"""' % tb  65 def __reduce__(self):  66 return rebuild_exc, (self.exc, self.tb)  67  68 def rebuild_exc(exc, tb):  69 exc.__cause__ = RemoteTraceback(tb)  70 return exc  71  72 #  73 # Code run by worker processes  74 #  75  76 class MaybeEncodingError(Exception):  77 """Wraps possible unpickleable errors, so they can be  78  safely sent through the socket."""  79 #包装可能出现的没法拾取的错误,以便经过套接字安全地发送这些错误。  80  81 def __init__(self, exc, value):  82 self.exc = repr(exc)  83 self.value = repr(value)  84 super(MaybeEncodingError, self).__init__(self.exc, self.value)  85  86 def __str__(self):  87 return "Error sending result: '%s'. Reason: '%s'" % (self.value,  88  self.exc)  89  90 def __repr__(self):  91 return "<%s: %s>" % (self.__class__.__name__, self)  92  93  94 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,  95 wrap_exception=False):  96 if (maxtasks is not None) and not (isinstance(maxtasks, int)  97 and maxtasks >= 1):  98 raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))  99 put = outqueue.put 100 get = inqueue.get 101 if hasattr(inqueue, '_writer'): 102  inqueue._writer.close() 103  outqueue._reader.close() 104 105 if initializer is not None: 106 initializer(*initargs) 107 108 completed = 0 109 while maxtasks is None or (maxtasks and completed < maxtasks): 110 try: 111 task = get() 112 except (EOFError, OSError): 113 util.debug('worker got EOFError or OSError -- exiting') 114 break 115 116 if task is None: 117 util.debug('worker got sentinel -- exiting') 118 break 119 120 job, i, func, args, kwds = task 121 try: 122 result = (True, func(*args, **kwds)) 123 except Exception as e: 124 if wrap_exception and func is not _helper_reraises_exception: 125 e = ExceptionWithTraceback(e, e.__traceback__) 126 result = (False, e) 127 try: 128  put((job, i, result)) 129 except Exception as e: 130 wrapped = MaybeEncodingError(e, result[1]) 131 util.debug("Possible encoding error while sending result: %s" % ( 132  wrapped)) 133  put((job, i, (False, wrapped))) 134 135 task = job = result = func = args = kwds = None 136 completed += 1 137 util.debug('worker exiting after %d tasks' % completed) 138 139 def _helper_reraises_exception(ex): 140 'Pickle-able helper function for use by _guarded_task_generation.' 141 #用于_guarded_task_generation的可选择助手函数。 142 raise ex 143 144 # 145 # Class representing a process pool 类表示进程池 146 # 147 148 class Pool(object): 149 ''' 150  Class which supports an async version of applying functions to arguments. 151 ''' 152 #类,该类支持将函数应用于参数的异步版本。 153 _wrap_exception = True 154 155 def Process(self, *args, **kwds): 156 return self._ctx.Process(*args, **kwds) 157 158 def __init__(self, processes=None, initializer=None, initargs=(), 159 maxtasksperchild=None, context=None): 160 self._ctx = context or get_context() 161  self._setup_queues() 162 self._taskqueue = queue.SimpleQueue() 163 self._cache = {} 164 self._state = RUN 165 self._maxtasksperchild = maxtasksperchild 166 self._initializer = initializer 167 self._initargs = initargs 168 169 if processes is None: 170 processes = os.cpu_count() or 1 171 if processes < 1: 172 raise ValueError("Number of processes must be at least 1") 173 174 if initializer is not None and not callable(initializer): 175 raise TypeError('initializer must be a callable') 176 177 self._processes = processes 178 self._pool = [] 179  self._repopulate_pool() 180 181 self._worker_handler = threading.Thread( 182 target=Pool._handle_workers, 183 args=(self, ) 184  ) 185 self._worker_handler.daemon = True 186 self._worker_handler._state = RUN 187  self._worker_handler.start() 188 189 190 self._task_handler = threading.Thread( 191 target=Pool._handle_tasks, 192 args=(self._taskqueue, self._quick_put, self._outqueue, 193  self._pool, self._cache) 194  ) 195 self._task_handler.daemon = True 196 self._task_handler._state = RUN 197  self._task_handler.start() 198 199 self._result_handler = threading.Thread( 200 target=Pool._handle_results, 201 args=(self._outqueue, self._quick_get, self._cache) 202  ) 203 self._result_handler.daemon = True 204 self._result_handler._state = RUN 205  self._result_handler.start() 206 207 self._terminate = util.Finalize( 208  self, self._terminate_pool, 209 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 210  self._worker_handler, self._task_handler, 211  self._result_handler, self._cache), 212 exitpriority=15 213  ) 214 215 def _join_exited_workers(self): 216 """Cleanup after any worker processes which have exited due to reaching 217  their specified lifetime. Returns True if any workers were cleaned up. 218 """ 219 #在因为达到指定的生存期而退出的任何工做进程以后进行清理。若是有工人被清理干净, 220 # 返回True。 221 cleaned = False 222 for i in reversed(range(len(self._pool))): 223 worker = self._pool[i] 224 if worker.exitcode is not None: 225 # worker exited 226 util.debug('cleaning up worker %d' % i) 227  worker.join() 228 cleaned = True 229 del self._pool[i] 230 return cleaned 231 232 def _repopulate_pool(self): 233 """Bring the number of pool processes up to the specified number, 234  for use after reaping workers which have exited. 235 """ 236 #将池进程的数量增长到指定的数量,以便在收割已退出的工人后使用。 237 for i in range(self._processes - len(self._pool)): 238 w = self.Process(target=worker, 239 args=(self._inqueue, self._outqueue, 240  self._initializer, 241  self._initargs, self._maxtasksperchild, 242  self._wrap_exception) 243  ) 244  self._pool.append(w) 245 w.name = w.name.replace('Process', 'PoolWorker') 246 w.daemon = True 247  w.start() 248 util.debug('added worker') 249 250 def _maintain_pool(self): 251 """Clean up any exited workers and start replacements for them. 252 """ 253 #清理全部离职的员工,并开始替换他们。 254 if self._join_exited_workers(): 255  self._repopulate_pool() 256 257 def _setup_queues(self): 258 self._inqueue = self._ctx.SimpleQueue() 259 self._outqueue = self._ctx.SimpleQueue() 260 self._quick_put = self._inqueue._writer.send 261 self._quick_get = self._outqueue._reader.recv 262 263 def apply(self, func, args=(), kwds={}): 264 ''' 265  Equivalent of `func(*args, **kwds)`. 266  Pool must be running. 267 ''' 268 #至关于“func(*args, ** kwds)”。池必须正在运行。 269 return self.apply_async(func, args, kwds).get() 270 271 def map(self, func, iterable, chunksize=None): 272 ''' 273  Apply `func` to each element in `iterable`, collecting the results 274  in a list that is returned. 275 ''' 276 #对“iterable”中的每一个元素应用“func”,在返回的列表中收集结果。 277 return self._map_async(func, iterable, mapstar, chunksize).get() 278 279 def starmap(self, func, iterable, chunksize=None): 280 ''' 281  Like `map()` method but the elements of the `iterable` are expected to 282  be iterables as well and will be unpacked as arguments. Hence 283  `func` and (a, b) becomes func(a, b). 284 ''' 285 #方法相似于“map()”,但“iterable”的元素也应该是可迭代的,并将做为参数解压缩。 286 # 所以“func”和(a, b)变成了func(a, b)。 287 return self._map_async(func, iterable, starmapstar, chunksize).get() 288 289 def starmap_async(self, func, iterable, chunksize=None, callback=None, 290 error_callback=None): 291 ''' 292  Asynchronous version of `starmap()` method. 293 ''' 294 #异步版本的“starmap()”方法。 295 return self._map_async(func, iterable, starmapstar, chunksize, 296  callback, error_callback) 297 298 def _guarded_task_generation(self, result_job, func, iterable): 299 '''Provides a generator of tasks for imap and imap_unordered with 300  appropriate handling for iterables which throw exceptions during 301  iteration.''' 302 #为imap和imap_unordered提供任务生成器,并为迭代期间抛出异常的迭代提供适当的处理。 303 try: 304 i = -1 305 for i, x in enumerate(iterable): 306 yield (result_job, i, func, (x,), {}) 307 except Exception as e: 308 yield (result_job, i+1, _helper_reraises_exception, (e,), {}) 309 310 def imap(self, func, iterable, chunksize=1): 311 ''' 312  Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. 313 ''' 314 #等价于“map()”——可能比“Pool.map()”慢得多。 315 if self._state != RUN: 316 raise ValueError("Pool not running") 317 if chunksize == 1: 318 result = IMapIterator(self._cache) 319  self._taskqueue.put( 320  ( 321  self._guarded_task_generation(result._job, func, iterable), 322  result._set_length 323  )) 324 return result 325 else: 326 if chunksize < 1: 327 raise ValueError( 328 "Chunksize must be 1+, not {0:n}".format( 329  chunksize)) 330 task_batches = Pool._get_tasks(func, iterable, chunksize) 331 result = IMapIterator(self._cache) 332  self._taskqueue.put( 333  ( 334  self._guarded_task_generation(result._job, 335  mapstar, 336  task_batches), 337  result._set_length 338  )) 339 return (item for chunk in result for item in chunk) 340 341 def imap_unordered(self, func, iterable, chunksize=1): 342 ''' 343  Like `imap()` method but ordering of results is arbitrary. 344 ''' 345 #Like `imap()`方法,但结果的顺序是任意的。 346 if self._state != RUN: 347 raise ValueError("Pool not running") 348 if chunksize == 1: 349 result = IMapUnorderedIterator(self._cache) 350  self._taskqueue.put( 351  ( 352  self._guarded_task_generation(result._job, func, iterable), 353  result._set_length 354  )) 355 return result 356 else: 357 if chunksize < 1: 358 raise ValueError( 359 "Chunksize must be 1+, not {0!r}".format(chunksize)) 360 task_batches = Pool._get_tasks(func, iterable, chunksize) 361 result = IMapUnorderedIterator(self._cache) 362  self._taskqueue.put( 363  ( 364  self._guarded_task_generation(result._job, 365  mapstar, 366  task_batches), 367  result._set_length 368  )) 369 return (item for chunk in result for item in chunk) 370 371 def apply_async(self, func, args=(), kwds={}, callback=None, 372 error_callback=None): 373 ''' 374  Asynchronous version of `apply()` method. “apply()”方法的异步版本。 375 ''' 376 if self._state != RUN: 377 raise ValueError("Pool not running") 378 result = ApplyResult(self._cache, callback, error_callback) 379  self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) 380 return result 381 382 def map_async(self, func, iterable, chunksize=None, callback=None, 383 error_callback=None): 384 ''' 385  Asynchronous version of `map()` method. 方法的异步版本 386 ''' 387 return self._map_async(func, iterable, mapstar, chunksize, callback, 388  error_callback) 389 390 def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, 391 error_callback=None): 392 ''' 393  Helper function to implement map, starmap and their async counterparts. 394 ''' 395 #帮助函数实现映射,星图和他们的异步对等。 396 if self._state != RUN: 397 raise ValueError("Pool not running") 398 if not hasattr(iterable, '__len__'): 399 iterable = list(iterable) 400 401 if chunksize is None: 402 chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 403 if extra: 404 chunksize += 1 405 if len(iterable) == 0: 406 chunksize = 0 407 408 task_batches = Pool._get_tasks(func, iterable, chunksize) 409 result = MapResult(self._cache, chunksize, len(iterable), callback, 410 error_callback=error_callback) 411  self._taskqueue.put( 412  ( 413  self._guarded_task_generation(result._job, 414  mapper, 415  task_batches), 416  None 417  ) 418  ) 419 return result 420 421  @staticmethod 422 def _handle_workers(pool): 423 thread = threading.current_thread() 424 425 # Keep maintaining workers until the cache gets drained, unless the pool 426 # is terminated. 427 #继续维护worker,直到缓存耗尽,除非池终止。 428 while thread._state == RUN or (pool._cache and thread._state != TERMINATE): 429  pool._maintain_pool() 430 time.sleep(0.1) 431 # send sentinel to stop workers 432  pool._taskqueue.put(None) 433 util.debug('worker handler exiting') 434 435  @staticmethod 436 def _handle_tasks(taskqueue, put, outqueue, pool, cache): 437 thread = threading.current_thread() 438 439 for taskseq, set_length in iter(taskqueue.get, None): 440 task = None 441 try: 442 # iterating taskseq cannot fail 443 #迭代taskseq不会失败 444 for task in taskseq: 445 if thread._state: 446 util.debug('task handler found thread._state != RUN') 447 break 448 try: 449  put(task) 450 except Exception as e: 451 job, idx = task[:2] 452 try: 453  cache[job]._set(idx, (False, e)) 454 except KeyError: 455 pass 456 else: 457 if set_length: 458 util.debug('doing set_length()') 459 idx = task[1] if task else -1 460 set_length(idx + 1) 461 continue 462 break 463 finally: 464 task = taskseq = job = None 465 else: 466 util.debug('task handler got sentinel') 467 468 try: 469 # tell result handler to finish when cache is empty 470 #告诉结果处理程序在缓存为空时结束 471 util.debug('task handler sending sentinel to result handler') 472  outqueue.put(None) 473 474 # tell workers there is no more work 475 util.debug('task handler sending sentinel to workers') 476 for p in pool: 477  put(None) 478 except OSError: 479 util.debug('task handler got OSError when sending sentinels') 480 481 util.debug('task handler exiting') 482 483  @staticmethod 484 def _handle_results(outqueue, get, cache): 485 thread = threading.current_thread() 486 487 while 1: 488 try: 489 task = get() 490 except (OSError, EOFError): 491 util.debug('result handler got EOFError/OSError -- exiting') 492 return 493 494 if thread._state: 495 assert thread._state == TERMINATE, "Thread not in TERMINATE" 496 util.debug('result handler found thread._state=TERMINATE') 497 break 498 499 if task is None: 500 util.debug('result handler got sentinel') 501 break 502 503 job, i, obj = task 504 try: 505  cache[job]._set(i, obj) 506 except KeyError: 507 pass 508 task = job = obj = None 509 510 while cache and thread._state != TERMINATE: 511 try: 512 task = get() 513 except (OSError, EOFError): 514 util.debug('result handler got EOFError/OSError -- exiting') 515 return 516 517 if task is None: 518 util.debug('result handler ignoring extra sentinel') 519 continue 520 job, i, obj = task 521 try: 522  cache[job]._set(i, obj) 523 except KeyError: 524 pass 525 task = job = obj = None 526 527 if hasattr(outqueue, '_reader'): 528 util.debug('ensuring that outqueue is not full') 529 # If we don't make room available in outqueue then 530 # attempts to add the sentinel (None) to outqueue may 531 # block. There is guaranteed to be no more than 2 sentinels. 532 #若是咱们不在outqueue中留出可用的空间,那么尝试将sentinel (None) 533 # 添加到outqueue可能会阻塞。保证不超过2个哨兵。 534 try: 535 for i in range(10): 536 if not outqueue._reader.poll(): 537 break 538  get() 539 except (OSError, EOFError): 540 pass 541 542 util.debug('result handler exiting: len(cache)=%s, thread._state=%s', 543  len(cache), thread._state) 544 545  @staticmethod 546 def _get_tasks(func, it, size): 547 it = iter(it) 548 while 1: 549 x = tuple(itertools.islice(it, size)) 550 if not x: 551 return 552 yield (func, x) 553 554 def __reduce__(self): 555 raise NotImplementedError( 556 'pool objects cannot be passed between processes or pickled' 557 #不能在进程之间传递池对象或pickle池对象 558  ) 559 560 def close(self): 561 util.debug('closing pool') 562 if self._state == RUN: 563 self._state = CLOSE 564 self._worker_handler._state = CLOSE 565 566 def terminate(self): 567 util.debug('terminating pool') 568 self._state = TERMINATE 569 self._worker_handler._state = TERMINATE 570  self._terminate() 571 572 def join(self): 573 util.debug('joining pool') 574 if self._state == RUN: 575 raise ValueError("Pool is still running") 576 elif self._state not in (CLOSE, TERMINATE): 577 raise ValueError("In unknown state") 578  self._worker_handler.join() 579  self._task_handler.join() 580  self._result_handler.join() 581 for p in self._pool: 582  p.join() 583 584  @staticmethod 585 def _help_stuff_finish(inqueue, task_handler, size): 586 # task_handler may be blocked trying to put items on inqueue 587 #试图将项放入inqueue时可能阻塞task_handler 588 util.debug('removing tasks from inqueue until task handler finished') 589  inqueue._rlock.acquire() 590 while task_handler.is_alive() and inqueue._reader.poll(): 591  inqueue._reader.recv() 592  time.sleep(0) 593 594  @classmethod 595 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, 596  worker_handler, task_handler, result_handler, cache): 597 # this is guaranteed to only be called once 这保证只调用一次 598 util.debug('finalizing pool') 599 600 worker_handler._state = TERMINATE 601 task_handler._state = TERMINATE 602 603 util.debug('helping task handler/workers to finish') 604  cls._help_stuff_finish(inqueue, task_handler, len(pool)) 605 606 if (not result_handler.is_alive()) and (len(cache) != 0): 607 raise AssertionError( 608 "Cannot have cache with result_hander not alive") 609 610 result_handler._state = TERMINATE 611 outqueue.put(None) # sentinel 612 613 # We must wait for the worker handler to exit before terminating 614 # workers because we don't want workers to be restarted behind our back. 615 #咱们必须在终止工人以前等待工人处理程序退出,由于咱们不但愿工人在咱们背后从新启动。 616 util.debug('joining worker handler') 617 if threading.current_thread() is not worker_handler: 618  worker_handler.join() 619 620 # Terminate workers which haven't already finished. 621 if pool and hasattr(pool[0], 'terminate'): 622 util.debug('terminating workers') 623 for p in pool: 624 if p.exitcode is None: 625  p.terminate() 626 627 util.debug('joining task handler') 628 if threading.current_thread() is not task_handler: 629  task_handler.join() 630 631 util.debug('joining result handler') 632 if threading.current_thread() is not result_handler: 633  result_handler.join() 634 635 if pool and hasattr(pool[0], 'terminate'): 636 util.debug('joining pool workers') 637 for p in pool: 638 if p.is_alive(): 639 # worker has not yet exited 640 util.debug('cleaning up worker %d' % p.pid) 641  p.join() 642 643 def __enter__(self): 644 return self 645 646 def __exit__(self, exc_type, exc_val, exc_tb): 647  self.terminate() 648 649 # 650 # Class whose instances are returned by `Pool.apply_async()` 651 # 652 653 class ApplyResult(object): 654 655 def __init__(self, cache, callback, error_callback): 656 self._event = threading.Event() 657 self._job = next(job_counter) 658 self._cache = cache 659 self._callback = callback 660 self._error_callback = error_callback 661 cache[self._job] = self 662 663 def ready(self): 664 return self._event.is_set() 665 666 def successful(self): 667 if not self.ready(): 668 raise ValueError("{0!r} not ready".format(self)) 669 return self._success 670 671 def wait(self, timeout=None): 672  self._event.wait(timeout) 673 674 def get(self, timeout=None): 675  self.wait(timeout) 676 if not self.ready(): 677 raise TimeoutError 678 if self._success: 679 return self._value 680 else: 681 raise self._value 682 683 def _set(self, i, obj): 684 self._success, self._value = obj 685 if self._callback and self._success: 686  self._callback(self._value) 687 if self._error_callback and not self._success: 688  self._error_callback(self._value) 689  self._event.set() 690 del self._cache[self._job] 691 692 AsyncResult = ApplyResult # create alias -- see #17805 693 694 # 695 # Class whose instances are returned by `Pool.map_async()` 696 # 697 698 class MapResult(ApplyResult): 699 700 def __init__(self, cache, chunksize, length, callback, error_callback): 701 ApplyResult.__init__(self, cache, callback, 702 error_callback=error_callback) 703 self._success = True 704 self._value = [None] * length 705 self._chunksize = chunksize 706 if chunksize <= 0: 707 self._number_left = 0 708  self._event.set() 709 del cache[self._job] 710 else: 711 self._number_left = length//chunksize + bool(length % chunksize) 712 713 def _set(self, i, success_result): 714 self._number_left -= 1 715 success, result = success_result 716 if success and self._success: 717 self._value[i*self._chunksize:(i+1)*self._chunksize] = result 718 if self._number_left == 0: 719 if self._callback: 720  self._callback(self._value) 721 del self._cache[self._job] 722  self._event.set() 723 else: 724 if not success and self._success: 725 # only store first exception 726 self._success = False 727 self._value = result 728 if self._number_left == 0: 729 # only consider the result ready once all jobs are done 730 if self._error_callback: 731  self._error_callback(self._value) 732 del self._cache[self._job] 733  self._event.set() 734 735 # 736 # Class whose instances are returned by `Pool.imap()` 737 # 738 739 class IMapIterator(object): 740 741 def __init__(self, cache): 742 self._cond = threading.Condition(threading.Lock()) 743 self._job = next(job_counter) 744 self._cache = cache 745 self._items = collections.deque() 746 self._index = 0 747 self._length = None 748 self._unsorted = {} 749 cache[self._job] = self 750 751 def __iter__(self): 752 return self 753 754 def next(self, timeout=None): 755  with self._cond: 756 try: 757 item = self._items.popleft() 758 except IndexError: 759 if self._index == self._length: 760 raise StopIteration from None 761  self._cond.wait(timeout) 762 try: 763 item = self._items.popleft() 764 except IndexError: 765 if self._index == self._length: 766 raise StopIteration from None 767 raise TimeoutError from None 768 769 success, value = item 770 if success: 771 return value 772 raise value 773 774 __next__ = next # XXX 775 776 def _set(self, i, obj): 777  with self._cond: 778 if self._index == i: 779  self._items.append(obj) 780 self._index += 1 781 while self._index in self._unsorted: 782 obj = self._unsorted.pop(self._index) 783  self._items.append(obj) 784 self._index += 1 785  self._cond.notify() 786 else: 787 self._unsorted[i] = obj 788 789 if self._index == self._length: 790 del self._cache[self._job] 791 792 def _set_length(self, length): 793  with self._cond: 794 self._length = length 795 if self._index == self._length: 796  self._cond.notify() 797 del self._cache[self._job] 798 799 # 800 # Class whose instances are returned by `Pool.imap_unordered()` 801 #类,其实例由' Pool.imap_unordered() '返回 802 803 class IMapUnorderedIterator(IMapIterator): 804 805 def _set(self, i, obj): 806  with self._cond: 807  self._items.append(obj) 808 self._index += 1 809  self._cond.notify() 810 if self._index == self._length: 811 del self._cache[self._job] 812 813 # 814 # 815 # 816 817 class ThreadPool(Pool): 818 _wrap_exception = False 819 820  @staticmethod 821 def Process(*args, **kwds): 822 from .dummy import Process 823 return Process(*args, **kwds) 824 825 def __init__(self, processes=None, initializer=None, initargs=()): 826 Pool.__init__(self, processes, initializer, initargs) 827 828 def _setup_queues(self): 829 self._inqueue = queue.SimpleQueue() 830 self._outqueue = queue.SimpleQueue() 831 self._quick_put = self._inqueue.put 832 self._quick_get = self._outqueue.get 833 834  @staticmethod 835 def _help_stuff_finish(inqueue, task_handler, size): 836 # drain inqueue, and put sentinels at its head to make workers finish 837 #排干队伍内的水,并在其头部放置哨兵,使工人完成工做 838 try: 839 while True: 840 inqueue.get(block=False) 841 except queue.Empty: 842 pass 843 for i in range(size): 844 inqueue.put(None)
pool.py
相关文章
相关标签/搜索