源码之Queue

看源码能够把python看得更透,更懂,想必也是开发人员的必经之路。python

如今有个任务,写个线程池。使用Queue就能写一个最简单的,下面就来学学Queue源码。app

 

源码之Queueide

class Queue:
    """Create a queue object with a given maximum size.  
    If maxsize is <= 0, the queue size is infinite.   

告诉你建立一个给出长度的队列,若是长度不大于0那么队列长度将变成无限。函数

Queue构造方法fetch

 1 def __init__(self, maxsize=0):
 2     self.maxsize = maxsize        #定义maxsize字段为最大值字段
 3     self._init(maxsize)         #调用_init方法
 4         
 5     # mutex must be held whenever the queue is mutating.  All methods  
 6     # that acquire mutex must release it before returning.  
 7     # is shared between the three conditions, so acquiring and
 8     # releasing the conditions also acquires and releases mutex.
 9     '''
10     当队列正在改变时,锁必须被持有,全部得到锁的方法必须在返回以前释放它。
11     锁在三种条件下被共享,因此获取和释放条件也就获取和释放锁。
12     ''' 
13   
14     self.mutex = _threading.Lock()      #定义mutex字段为一个锁对象
15         
16     # Notify not_empty whenever an item is added to the queue; a
17     # thread waiting to get is notified then.
18     '''
19     当有一项被加入进队列时通知非空,而后通知一个线程将被等待获得
20     '''
21   
22     self.not_empty = _threading.Condition(self.mutex) #返回一个Condition对象
23         
24     # Notify not_full whenever an item is removed from the queue;
25         # a thread waiting to put is notified then.
26     '''
27     当一项被移除出队列时通知未满,而后通知一个线程等待被放进队列
28     '''
29         
30     self.not_full = _threading.Condition(self.mutex) #返回一个Condition对象
31         
32     # Notify all_tasks_done whenever the number of unfinished tasks
33         # drops to zero; thread waiting to join() is notified to resume
34     '''
35     在未完成任务的数量被删除至0时,通知全部任务完成
36     '''
37         
38     self.all_tasks_done = _threading.Condition(self.mutex) #返回一个Condition对象
39     self.unfinished_tasks = 0        #定义未完成任务数量

解析:优化

  将maxsize参数传递给了_init()方法,后面再看这个方法,它实际上是建立了一个deque对象(双管道)。ui

以后建立了一个锁对象,又经过锁对象建立了3个Condition对象。关于Condition对象,它属于线程的领域,后面介绍。spa

 

类Queue中的方法:线程

1.task_donedebug

 1 def task_done(self):
 2         """Indicate that a formerly enqueued task is complete.
 3 
 4         Used by Queue consumer threads.  For each get() used to fetch a task,
 5         a subsequent call to task_done() tells the queue that the processing
 6         on the task is complete.
 7 
 8         If a join() is currently blocking, it will resume when all items
 9         have been processed (meaning that a task_done() call was received
10         for every item that had been put() into the queue).
11 
12         Raises a ValueError if called more times than there were items
13         placed in the queue.
14         代表一个之前的队列任务完成了
15 
16         使用队列消费者进程。对于每个get()用来获得任务,随后调用task_done 方法告诉队列任务的处理已经完成了
17         若是一个join正在阻塞,当全部项都已经被加工了他将从新占用。
18         若是调用次数超过队列中放置的项目,则会抛ValueError异常
19         """
20         self.all_tasks_done.acquire()       #得到锁
21         try:
22             unfinished = self.unfinished_tasks - 1  #判断队列中一个线程的任务是否所有完成
23             if unfinished <= 0:                     #是则进行通知,或在过量调用时报异常
24                 if unfinished < 0:
25                     raise ValueError('task_done() called too many times')
26                 self.all_tasks_done.notify_all()
27             self.unfinished_tasks = unfinished      #不然未完成任务数量-1
28         finally:
29             self.all_tasks_done.release()           #最后释放锁

解析:

  这个方法判断队列中一个线程的任务是否所有完成,首先会经过all_tasks_done对象得到锁,若是是则进行通知,最后释放锁。nodify这个方法在Condition对象中研究。

2.join

 1 def join(self):
 2     """Blocks until all items in the Queue have been gotten and processed.
 3 
 4     The count of unfinished tasks goes up whenever an item is added to the
 5     queue. The count goes down whenever a consumer thread calls task_done()
 6     to indicate the item was retrieved and all work on it is complete.
 7 
 8     When the count of unfinished tasks drops to zero, join() unblocks.
 9 
10      阻塞,直到队列中全部项都被获得和处理
11 
12      当一个项目被添加到队列中时,未完成任务上升。当一个消费线程调用task_done方法代表这项被恢复且全部工做都完成时数量降低。
13      当未完成为0时,join解除阻塞
14     """
15 
16     self.all_tasks_done.acquire()
17     try:
18         while self.unfinished_tasks:            #若是有未完成的任务,将调用wait()方法等待
19             self.all_tasks_done.wait()
20     finally:
21         self.all_tasks_done.release()

解析:

  阻塞方法,当队列中有未完成进程时,调用join方法来阻塞,直到他们都完成。wait方法后面看。

3.qsize

1 def qsize(self):
2         """Return the approximate size of the queue (not reliable!).
3             返回一个估计的队列大小,不可靠
4         """
5         self.mutex.acquire()
6         n = self._qsize()           #这个方法返回了deque对象的长度
7         self.mutex.release()
8         return n

解析:

  后面会提_qsize这个方法,它返回了deque对象的长度。这个一个估计值,并不可靠。

4.empty

1 def empty(self):
2         """Return True if the queue is empty, False otherwise (not reliable!).
3             当队列为空时返回True,不然False
4         """
5         self.mutex.acquire()
6         n = not self._qsize()       #若是长度为0返回True,不然False
7         self.mutex.release()
8         return n

解析:

  判断队列长度是否为空,也是基于qsize的方法,因此仍然不可靠。

5.full

1 def full(self):
2     """Return True if the queue is full, False otherwise (not reliable!)."""
3     self.mutex.acquire()
4     n = 0 < self.maxsize == self._qsize()
5     self.mutex.release()
6     return n

没啥说的了,判断队列是否满了

6.put

 1 def put(self, item, block=True, timeout=None):
 2         """Put an item into the queue.
 3 
 4         If optional args 'block' is true and 'timeout' is None (the default),
 5         block if necessary until a free slot is available. If 'timeout' is
 6         a non-negative number, it blocks at most 'timeout' seconds and raises
 7         the Full exception if no free slot was available within that time.
 8         Otherwise ('block' is false), put an item on the queue if a free slot
 9         is immediately available, else raise the Full exception ('timeout'
10         is ignored in that case).
11 
12         若是可选的参数block和timeout为空(默认),若是必要的话阻塞直到有一个空闲位置可用。
13         若是timeout是一个非负的数字,它将阻塞至多这个数字的秒数而且若是没有可用位置时报Full异常。
14         另外,block 为false时,若是有可用的位置将会放一项进去,不然报Full异常
15 
16         """
17         self.not_full.acquire()                  #not_full得到锁
18         try:
19             if self.maxsize > 0:                 #若是队列长度有限制
20                 if not block:                    #若是没阻塞
21                     if self._qsize() == self.maxsize:   #若是队列满了抛异常
22                         raise Full
23                 elif timeout is None:           #有阻塞且超时为空,等待
24                     while self._qsize() == self.maxsize:
25                         self.not_full.wait()
26                 elif timeout < 0:
27                     raise ValueError("'timeout' must be a non-negative number")
28                 else:                           #若是有阻塞,且超时非负时,结束时间=当前时间+超时时间
29                     endtime = _time() + timeout
30                     while self._qsize() == self.maxsize:
31                         remaining = endtime - _time()
32                         if remaining <= 0.0:       #到时后,抛异常
33                             raise Full
34                         self.not_full.wait(remaining)
35             self._put(item)                         #调用_put方法
36             self.unfinished_tasks += 1              #未完成任务+1
37             self.not_empty.notify()                 #通知非空
38         finally:
39             self.not_full.release()                 #not_full释放锁

解析:

  默认状况下block为True,timeout为None。若是队列满则会等待,未满则会调用_put方法将进程加入deque中(后面介绍),而且未完成任务加1还会通知队列非空。

若是设置block参数为Flase,队列满时则会抛异常。若是设置了超时那么在时间到以前进行阻塞,时间一到抛异常。这个方法使用not_full对象进行操做。

7.put_nowait

1 def put_nowait(self, item):
2         """Put an item into the queue without blocking.
3 
4         Only enqueue the item if a free slot is immediately available.
5         Otherwise raise the Full exception.
6         """
7         return self.put(item, False)

就是put方法的block设置成Fasle的效果,没啥说的。

8.get

 1 def get(self, block=True, timeout=None):
 2         """Remove and return an item from the queue.
 3 
 4         If optional args 'block' is true and 'timeout' is None (the default),
 5         block if necessary until an item is available. If 'timeout' is
 6         a non-negative number, it blocks at most 'timeout' seconds and raises
 7         the Empty exception if no item was available within that time.
 8         Otherwise ('block' is false), return an item if one is immediately
 9         available, else raise the Empty exception ('timeout' is ignored
10         in that case).
11         """
12         self.not_empty.acquire()                #not_empty得到锁
13         try:
14             if not block:                       #不阻塞时
15                 if not self._qsize():           #队列为空时抛异常
16                     raise Empty
17             elif timeout is None:               #不限时时,队列为空则会等待
18                 while not self._qsize():
19                     self.not_empty.wait()
20             elif timeout < 0:
21                 raise ValueError("'timeout' must be a non-negative number")
22             else:
23                 endtime = _time() + timeout
24                 while not self._qsize():
25                     remaining = endtime - _time()
26                     if remaining <= 0.0:
27                         raise Empty
28                     self.not_empty.wait(remaining)
29             item = self._get()                  #调用_get方法,移除并得到项目
30             self.not_full.notify()              #通知非满
31             return item                        #返回项目
32         finally:
33             self.not_empty.release()            #释放锁  

解析:

  能够看出逻辑同put相似,参数默认状况下队列空了则会等待,不然将会调用_get方法(往下看)移除并得到一个项,最后返回这个项。这个方法使用not_empty对象进行操做。

9.get_nowait

block=False版的get方法

10._init()

def _init(self, maxsize):
        self.queue = deque()

生成了一个deque对象,这个deque对象才是真正操做队列添加或者删除一项的源头,因此有必要读一下这个类。

 

源码之deque

class deque(object):
    """
    deque([iterable[, maxlen]]) --> deque object
    
    Build an ordered collection with optimized access from its endpoints.
    """

告诉你这个类是建立一个能优化访问它的端点的有序的集合

deque构造方法:

1 def __init__(self, iterable=(), maxlen=None): # known case of _collections.deque.__init__
2         """
3         deque([iterable[, maxlen]]) --> deque object
4         
5         Build an ordered collection with optimized access from its endpoints.
6         # (copied from class doc)
7         """
8         pass

构造函数中有两个参数,迭代器和一个最大长度,默认都为空。

类Queue/deque中的方法:

类中方法有不少,先挑Queue中遇到的看。

11._qsize

def _qsize(self, len=len):
        return len(self.queue)

这个方法直接获得deque对象的长度并返回

12._put

def _put(self, item):
    self.queue.append(item)

这个方法调用了deque的append方法:

def append(self, *args, **kwargs): # real signature unknown
    """ Add an element to the right side of the deque. """
    pass

在deque队列右边添加一个元素。注意,这个方法只有一句pass,并无实现其功能的代码。可是看到 real signature unknown(真正未知的签名)这句话,我以为就是把代码放在我看不见的地方了。

13._get

def _get(self):
    return self.queue.popleft()

这个方法调用了deque的popleft方法:

def popleft(self, *args, **kwargs): # real signature unknown
        """ Remove and return the leftmost element. """
        pass

从最左端删除并返回一个元素,这有点像序列中的pop方法。一样的,也是一句 real signayure unknown

 

到此Queue类的源码已经看完了,不过如今对Queue的原理只了解了一部分,接下来看看锁和Condition对象在队列中是如何工做的。

 

源码之Condition

class _Condition(_Verbose):
    """Condition variables allow one or more threads to wait until they are
       notified by another thread.
    """

这个类继承了Vserbose类,还告诉你条件变量容许一个或多个线程等待直到他们被其余线程通知

Condition构造方法:

 1 def __init__(self, lock=None, verbose=None):
 2         _Verbose.__init__(self, verbose)    #调用了父类的构造方法
 3         if lock is None:
 4             lock = RLock()                  #得到一个RLock对象
 5         self.__lock = lock
 6         # Export the lock's acquire() and release() methods
 7         self.acquire = lock.acquire         #两个字段分别引用锁的得到和释放
 8         self.release = lock.release
 9         # If the lock defines _release_save() and/or _acquire_restore(),
10         # these override the default implementations (which just call
11         # release() and acquire() on the lock).  Ditto for _is_owned().
12         try:
13             self._release_save = lock._release_save
14         except AttributeError:
15             pass
16         try:
17             self._acquire_restore = lock._acquire_restore
18         except AttributeError:
19             pass
20         try:
21             self._is_owned = lock._is_owned
22         except AttributeError:
23             pass
24         self.__waiters = []

这个方法中执行了父类的构造方法,而后得到了RLock对象,又将其方法的引用赋给了多个字段。来看Verbose的构造方法:

class _Verbose(object):
        def __init__(self, verbose=None):
            pass

什么也没作,下面来看RLock。

源码之RLock

class _RLock(_Verbose):
    """A reentrant lock must be released by the thread that acquired it. Once a
       thread has acquired a reentrant lock, the same thread may acquire it
       again without blocking; the thread must release it once for each time it
       has acquired it.
    """

说什么一个进程获得一个锁后必须释放它

类RLock构造方法

def __init__(self, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__block = _allocate_lock()
        self.__owner = None
        self.__count = 0

又调用了Vserbose的__init__(),一样是pass。还调用了_allocate_lock()方法。

def allocate_lock(): # real signature unknown; restored from __doc__
    """
    allocate_lock() -> lock object
    (allocate() is an obsolete synonym)
    
    Create a new lock object.  See help(LockType) for information about locks.
    """
    pass

又是real signature unknown,看注释:建立一个新的锁对象。它就干了这个。

还有两个字段,全部者和数量。接下来就看Condition中用到的RlLock的3个的方法。

 

1._release_save

def _release_save(self):
        if __debug__:
            self._note("%s._release_save()", self)
        count = self.__count
        self.__count = 0
        owner = self.__owner
        self.__owner = None
        self.__block.release()
        return (count, owner)

字面意思为保存,释放。若是debug为真,将本方法的返回值传递给了note方法。而后将owner和count字段保存,返回,重置。那么debug为什么,note方法又作了什么?

__debug__在__builtin__.py中,默认为True。来看note方法

 1 _VERBOSE = False
 2 
 3 if __debug__:
 4 
 5     class _Verbose(object):
 6 
 7         def __init__(self, verbose=None):
 8             if verbose is None:
 9                 verbose = _VERBOSE
10             self.__verbose = verbose
11 
12         def _note(self, format, *args):
13             if self.__verbose:
14                 format = format % args
15                 # Issue #4188: calling current_thread() can incur an infinite
16                 # recursion if it has to create a DummyThread on the fly.
17                 ident = _get_ident()
18                 try:
19                     name = _active[ident].name
20                 except KeyError:
21                     name = "<OS thread %d>" % ident
22                 format = "%s: %s\n" % (name, format)
23                 _sys.stderr.write(format)

__init__中将self._verbose赋值成了False,而在_note中若是self._verbose为假那么这个方法就啥也没执行。

若是self._verbose为真,就会调用了_get_ident()方法,简单看了下这个方法,解释为:返回一个非0整数,在其余同时存在的线程中惟一标识 一个进程。

因此这个方法干的活就是返回锁的拥有者,数量,而后释放锁。

 

2._acquire_restore

1 def _acquire_restore(self, count_owner):
2         count, owner = count_owner
3         self.__block.acquire()
4         self.__count = count
5         self.__owner = owner
6         if __debug__:
7             self._note("%s._acquire_restore()", self)

顾名思义,得到锁,而后将owner和count做为参数赋值给对象,恢复字段的值。

 

3._is_owned

def _is_owned(self):
        return self.__owner == _get_ident()

判断owner的值与get_ident的返回值是否相等。其返回值就是惟一标识线程的一个整数,那么这个比较的意义就是肯定这个锁对象的持有者是否为这个进程。

类Condition的方法

condition中主要用到了的wait,nodify方法,让其余线程等待或唤醒它们。因为时间有限,这部份内容待续。

 

 

至此,Queue源码已经看的差很少了,总结一下

Queue调用RLock对象使线程得到和释放锁,并记录线程的信息。调用Condition对象控制线程等待或是唤醒它们。而后deque对象负责操做管道中的元素。

心得

刚开始读源码时方法和类间调来调去有点难如下手,不过多读几遍就差很少搞明白了。个人思路是先攻主体,好比就先看Queue里的代码,遇到了什么deque,Condition的能够先跳过,以后再逐个击破。最后回过头来把各个部分组合整理一下,再画个类图什么的就更容易明白了。

最后附上一张类图。

 

相关文章
相关标签/搜索