【译】线程:概念和实现(4)

翻译:老齐python

译者注:与本文相关图书推荐:《Python大学实用教程》《跟老齐学Python:轻松入门》安全


第四部分

将队列应用于PCP

若是你但愿一次可以处理管道中的多个值,就须要一种针对管道的数据结构,它至关于producer的备份,能实现数量增长和减小。bash

Python标准库有一个queue模块,该模块有一个Queue 类,下面将Pipeline改成Queue,就能够再也不使用Lock锁定某些变量,此外,还将使用Python的threading模块中的Event来中止工做线程,这是一种与以往不一样的方法。微信

Event开始。当有不少线程等待threading.Event实例的时候,它可以将一个线程标记为一个事件。这段代码的关键是,等待事件的线程不必定须要中止它们正在作的事情,它们能够每隔一段时间检查一次Event的状态。网络

不少事情均可以触发event。在本例中,主线程将简单地休眠一段时间,而后运行.set()数据结构

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)
    pipeline = Pipeline()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)
        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()
复制代码

这里惟一的变化是建立了event对象,而后将event做为参数传给后面的.submit方法,在with语句中,有一句要sleep一秒钟,再记录日志信息,最后调用event.set()多线程

producer也不须要改变太多:并发

def producer(pipeline, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")
    logging.info("Producer received EXIT event. Exiting")
复制代码

while循环中再也不为pipeline设置SENTINEL值。consumer须要相应作较大改动:dom

def consumer(pipeline, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get_message("Consumer")
        logging.info(
            "Consumer storing message: %s (queue size=%s)",
            message,
            pipeline.qsize(),
        )

    logging.info("Consumer received EXIT event. Exiting")
复制代码

必须删除SENTINEL值相关的代码,while循环的条件也所以更复杂了一些,如今须要考虑not event.is_set()not pipeline.empty()两个条件,也就是未设置event,或者pipeline未清空时。函数

要确保在consumer进程结束是队列中已是空的了,不然就会出现如下两种糟糕的状况。一是丢失了这些最终消息,但更严重的状况是第二种,producer若是视图将信息添加到完整队列中,会被锁住,从而不能返回。这种事件会发生在producer验证.is_set()条件以后,调用pipeline.set_message()以前。

这种事件会发生在producer验证.is_set()条件以后,调用pipeline.set_message()以前。

若是发生这种状况,producer可能会在队列仍然全满的状况下唤醒并退出。而后,调用.set_message().set_message()将一直等到队列中有新信息的空间。若consumer已经退出,这种状况就不会发生,并且producer不会退出。

consumer中的其余部分看起来应该很熟悉。

然而,Pipeline还须要重写:

class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)
复制代码

上面的Pipelinequeue.Queue的子类。Queue 在初始化时指定一个可选参数,以指定队列的最大长度。

若是为maxsize指定一个正数,则该数字为队列元素个数的极限,若是达到该值,.put()方法被锁定,直到元素的数量少于maxsize才解锁。若是不指定maxsize,则队列将增加到计算机内存的所许可的最值。

.get_message().set_message()两个方法代码更少了,它们基本上把.get().put()封装在Queue中。你可能想知道防止线程发生竞态条件的锁都去了哪里。

编写标准库的核心开发人员知道,Queue常常在多线程环境中使用,因而将锁合并到Queue自己中。Queue对于线程来讲是安全的。

此程序的运行以下所示:

$ ./prodcom_queue.py
Producer got message: 32
Producer got message: 51
Producer got message: 25
Producer got message: 94
Producer got message: 29
Consumer storing message: 32 (queue size=3)
Producer got message: 96
Consumer storing message: 51 (queue size=3)
Producer got message: 6
Consumer storing message: 25 (queue size=3)
Producer got message: 31

[many lines deleted]

Producer got message: 80
Consumer storing message: 94 (queue size=6)
Producer got message: 33
Consumer storing message: 20 (queue size=6)
Producer got message: 48
Consumer storing message: 31 (queue size=6)
Producer got message: 52
Consumer storing message: 98 (queue size=6)
Main: about to set event
Producer got message: 13
Consumer storing message: 59 (queue size=6)
Producer received EXIT event. Exiting
Consumer storing message: 75 (queue size=6)
Consumer storing message: 97 (queue size=5)
Consumer storing message: 80 (queue size=4)
Consumer storing message: 33 (queue size=3)
Consumer storing message: 48 (queue size=2)
Consumer storing message: 52 (queue size=1)
Consumer storing message: 13 (queue size=0)
Consumer received EXIT event. Exiting
复制代码

通读上述示例的输出,会发现,有的地方颇有意思。在顶部,你能够看到producer必须建立5条信息并将其中4条放在队列中,队列中最前面的一条被操做系统换掉以后,第5条条信息才能加入队列。

而后consumer运行,把第1条信息拉了出来,它打印出了该信息以及队列在此时的长度:

Consumer storing message: 32 (queue size=3)
复制代码

此时,标明第5条信息尚未进入pipeline ,删除单个信息后queue的减少到3。你也知道queue能够保存10条消息,所以queue线程不会被queue阻塞,它被操做系统置换了。

注意:你调试的输出结果会有所不一样。你的输出将随着运行次数的不一样而改变。这就是用线程工做的乐趣所在!

执行代码,你能看到主线程生成event事件,这会致使producer当即退出,consumer还有不少工做要作,因此它会一直运行,直到清理完pipeline

尝试操做大小不一样的队列,并调用producerconsumer中的time.sleep(),以分别模拟更长的网络或磁盘访问时间。即便对程序的这些内容稍加更改,也会使结果产生很大差别。

这是解决发PCP的一个好方法,可是你能够进一步简化它,不须要使用Pipeline,一旦去掉日志记录,它就会变成一个queue.Queue

下面是直接使用queue.Queue的最终代码:

import concurrent.futures
import logging
import queue
import random
import threading
import time

def producer(queue, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        queue.put(message)

    logging.info("Producer received event. Exiting")

def consumer(queue, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info(
            "Consumer storing message: %s (size=%d)", message, queue.qsize()
        )

    logging.info("Consumer received event. Exiting")

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()
复制代码

这更易于阅读,并展现了如何使用Python的内置模块来简化复杂的问题。

LockQueue是便于解决并发问题的类,但标准库还提供了其余类。在结束本文以前,让咱们浏览其中一些类。

Threading

Python的threading模块还提供了一些类,虽然上面的示例不须要这些,可是它们在不一样的用例中能够派上用场,因此熟悉它们是有好处的。

Semaphore

threading.Semaphore有一些特殊属性的计数器对象,这里实现的计数具备原子性,意味着能够保证操做系统不会在递增或递减计数器的过程当中交换线程。

内部计数器在调用.release()时递增,在调用.acquire()时递减。

另一个特殊属性,若是一个线程在计数器为零时调用.acquire(),则该线程将被锁定,直到另外一个线程调用.release(),并将计数器增长到1。

Semaphores一般用于保护容量有限的资源。例如,若是你有一个链接池,而且但愿将该池的大小限制为特定的数目。

Timer

threading.Timer用于在通过必定时间后调度要调用的函数,你能够经过传入等待的秒数和调用的函数来建立Timer实例:

t = threading.Timer(30.0, my_function)
复制代码

经过调用.start()启动Timer。在指定时间以后的某个时间点,将在新线程上调用该函数。但请注意,没法保证会在你但愿的时间准确调用该函数。

若是要中止已经启动的Timer,能够调用.cancel()。若是在Timer触发后调用.cancel(),不会执行任何操做,也不会产生异常。

Timer可用于在特定时间后提示用户执行操做。若是用户在Timer过时以前执行操做,则能够调用.cancel()

Barrier

threading.Barrier可用于保持固定数量的线程同步。建立Barrier时,调用方必须指定将要同步的线程数。每一个线程都调用Barrier.wait()方法,它们都将保持封锁状态,直到指定数量的线程在等待,而后所有同时释放。

请记住:线程是由操做系统调度的,所以,即便全部线程都是同时释放的,它们也将被调度为一次运行一个线程。

Barrier的一个用途是容许线程池对自身进行初始化。让这些线程初始化后在Barrier上等待,将确保在全部线程完成初始化以前,没有一个线程开始运行。

结论:Python中的线程

如今你已经了解了Python的threading提供的许多功能,以及一些如何写线程程序和用线程程序解决问题的示例。你还看到了在编写和调试线程程序时出现的一些问题。

原文连接:realpython.com/intro-to-py…

关注微信公众号:老齐教室。读深度文章,得精湛技艺,享绚丽人生。

相关文章
相关标签/搜索