Python 模块源码分析:queue 队列

起步

queue 模块提供适用于多线程编程的先进先出(FIFO)数据结构。由于它是线程安全的,因此多个线程很轻松地使用同一个实例。html

源码分析

先从初始化的函数来看:编程

class Queue:
    def __init__(self, maxsize=0):
        # 设置队列的最大容量
        self.maxsize = maxsize
        self._init(maxsize)

        # 线程锁,互斥变量
        self.mutex = threading.Lock()
        # 由锁衍生出三个条件变量
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)

        self.unfinished_tasks = 0

    def _init(self, maxsize):
        # 初始化底层数据结构
        self.queue = deque()

从这初始化函数能获得哪些信息呢?首先,队列是能够设置其容量大小的,而且具体的底层存放元素的它使用了 collections.deque() 双端列表的数据结构,这使得能很方便的作先进先出操做。这里还特意抽象为 _init 函数是为了方便其子类进行覆盖,容许子类使用其余结构来存放元素(好比优先队列使用了 list)。安全

而后就是线程锁 self.mutex ,对于底层数据结构 self.queue 的操做都要先得到这把锁;再往下是三个条件变量,这三个 Condition 都以 self.mutex 做为参数,也就是说它们共用一把锁;从这能够知道诸如 with self.mutexwith self.not_empty 等都是互斥的。数据结构

基于这些锁而作的一些简单的操做:多线程

class Queue:
    ...
    def qsize(self):
        # 返回队列中的元素数
        with self.mutex:
            return self._qsize()

    def empty(self):
        # 队列是否为空
        with self.mutex:
            return not self._qsize()

    def full(self):
        # 队列是否已满
        with self.mutex:
            return 0 < self.maxsize <= self._qsize()

    def _qsize(self):
        return len(self.queue)

这个代码片断挺好理解的,无需分析。app

做为队列,主要得完成入队与出队的操做,首先是入队:函数

class Queue:
    ...
    def put(self, item, block=True, timeout=None):
        with self.not_full: # 获取条件变量not_full
            if self.maxsize > 0:
                if not block:
                    if self._qsize() >= self.maxsize:
                        raise Full # 若是 block 是 False,而且队列已满,那么抛出 Full 异常
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        self.not_full.wait() # 阻塞直到由剩余空间
                elif timeout < 0: # 不合格的参数值,抛出ValueError
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    endtime = time() + timeout  # 计算等待的结束时间
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full # 等待期间一直没空间,抛出 Full 异常
                        self.not_full.wait(remaining)
            self._put(item) # 往底层数据结构中加入一个元素
            self.unfinished_tasks += 1
            self.not_empty.notify()

    def _put(self, item):
        self.queue.append(item)

尽管只有二十几行的代码,但这里的逻辑仍是比较复杂的。它要处理超时与队列剩余空间不足的状况,具体几种状况以下:源码分析

  1. 若是 block 是 False,忽略timeout参数线程

    • 若此时队列已满,则抛出 Full 异常;
    • 若此时队列未满,则当即把元素保存到底层数据结构中;
  2. 若是 block 是 True设计

    • timeoutNone 时,那么put操做可能会阻塞,直到队列中有空闲的空间(默认);
    • timeout 是非负数,则会阻塞相应时间直到队列中有剩余空间,在这个期间,若是队列中一直没有空间,抛出 Full 异常;

处理好参数逻辑后,,将元素保存到底层数据结构中,并递增unfinished_tasks,同时通知 not_empty ,唤醒在其中等待数据的线程。

出队操做:

class Queue:
    ...
    def get(self, block=True, timeout=None):
        with self.not_empty:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = time() + timeout
                while not self._qsize():
                    remaining = endtime - time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()
            self.not_full.notify()
            return item

    def _get(self):     
        return self.queue.popleft()

get() 操做是 put() 相反的操做,代码块也及其类似,get() 是从队列中移除最早插入的元素并将其返回。

  1. 若是 block 是 False,忽略timeout参数

    • 若此时队列没有元素,则抛出 Empty 异常;
    • 若此时队列由元素,则当即把元素保存到底层数据结构中;
  2. 若是 block 是 True

    • timeoutNone 时,那么get操做可能会阻塞,直到队列中有元素(默认);
    • timeout 是非负数,则会阻塞相应时间直到队列中有元素,在这个期间,若是队列中一直没有元素,则抛出 Empty 异常;

最后,经过 self.queue.popleft() 将最先放入队列的元素移除,并通知 not_full ,唤醒在其中等待数据的线程。

这里有个值得注意的地方,在 put() 操做中递增了 self.unfinished_tasks ,而 get() 中却没有递减,这是为何?

这实际上是为了留给用户一个消费元素的时间,get() 仅仅是获取元素,并不表明消费者线程处理的该元素,用户须要调用 task_done() 来通知队列该任务处理完成了:

class Queue:
    ...
    def task_done(self):
        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0: # 也就是成功调用put()的次数小于调用task_done()的次数时,会抛出异常
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notify_all() # 当unfinished为0时,会通知all_tasks_done
            self.unfinished_tasks = unfinished

    def join(self):
        with self.all_tasks_done:
            while self.unfinished_tasks: # 若是有未完成的任务,将调用wait()方法等待
                self.all_tasks_done.wait()

因为 task_done() 使用方调用的,当 task_done() 次数大于 put() 次数时会抛出异常。

task_done() 操做的做用是唤醒正在阻塞的 join() 操做。join() 方法会一直阻塞,直到队列中全部的元素都被取出,并被处理了(和线程的join方法相似)。也就是说 join() 方法必须配合 task_done() 来使用才行。

LIFO 后进先出队列

LifoQueue使用后进先出顺序,与栈结构类似:

class LifoQueue(Queue):
    '''Variant of Queue that retrieves most recently added entries first.'''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        self.queue.append(item)

    def _get(self):
        return self.queue.pop()

这就是 LifoQueue 所有代码了,这正是 Queue 设计很棒的一个缘由,它将底层的数据操做抽象成四个操做函数,自己来处理线程安全的问题,使得其子类只需关注底层的操做。

LifoQueue 底层数据结构改用 list 来存放,经过 self.queue.pop() 就能将 list 中最后一个元素移除,无需重置索引。

PriorityQueue 优先队列

from heapq import heappush, heappop

class PriorityQueue(Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).

    Entries are typically tuples of the form:  (priority number, data).
    '''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        heappush(self.queue, item)

    def _get(self):
        return heappop(self.queue)

优先队列使用了 heapq 模块的结构,也就是最小堆的结构。优先队列更为经常使用,队列中项目的处理顺序须要基于这些项目的特征,一个简单的例子:

import queue

class A:
    def __init__(self, priority, value):
        self.priority = priority
        self.value = value

    def __lt__(self, other):
        return self.priority < other.priority


q = queue.PriorityQueue()

q.put(A(1, 'a'))
q.put(A(0, 'b'))
q.put(A(1, 'c'))

print(q.get().value)  # 'b'

使用优先队列的时候,须要定义 __lt__ 魔术方法,来定义它们之间如何比较大小。若元素的 priority 相同,依然使用先进先出的顺序。

参考

相关文章
相关标签/搜索