一文了解Python的线程

问题python

  • 什么是线程?算法

  • 如何建立、执行线程?数据库

  • 如何使用线程池ThreadPoolExecutor?编程

  • 如何避免资源竞争问题?安全

  • 如何使用Python中线程模块threading提供的经常使用工具?网络

 

目录数据结构

1. 什么是线程多线程

2. 建立线程并发

    2.1. 守护线程app

    2.2. 加入线程

3. 多线程

4. 线程池

5. 竞态条件

    5.1. 单线程

    5.2. 两个线程

    5.3. 示例的意义

6. 同步锁

7. 死锁

8. 生产者-消费者模型中的线程

    8.1 在生产者-消费者模型中使用锁

    8.2 在生产者-消费者模型中使用队列

9. 线程对象

    9.1 信号量

    9.2 定时器

    9.3 栅栏

1. 什么是线程

  线程是操做系统可以进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运做单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中能够并发多个线程,每条线程并行执行不一样的任务。

  在Python3中实现的大部分运行任务里,不一样的线程实际上并无同时运行:它们只是看起来像是同时运行的。

  你们很容易认为线程化是在程序上运行两个(或多个)不一样的处理器,每一个处理器同时执行一个独立的任务。这种理解并不彻底正确,线程可能会在不一样的处理器上运行,但一次只能运行一个线程。

  同时执行多个任务须要使用非标准的Python运行方式:用不一样的语言编写一部分代码,或者使用多进程模块multiprocessing,但这么作会带来一些额外的开销。

  因为Python默认的运行环境是CPython(C语言开发的Python),因此线程化可能不会提高全部任务的运行速度。这是由于和GIL(Global Interpreter Lock)的交互造成了限制:一次只能运行一个Python线程。

  线程化的通常替代方法是:让各项任务花费大量时间等待外部事件。但问题是,若是想缩短等待时间,会须要大量的CPU计算,结果是程序的运行速度可能并不会提高。

  当代码是用Python语言编写并在默认执行环境CPython上运行时,会出现这种状况。若是线程代码是用C语言写的,那它们就可以释放GIL并同时运行。若是是在别的Python执行环境(如IPython, PyPy,Jython, IronPython)上运行,请参考相关文档了解它们是如何处理线程的。

  若是只用Python语言在默认的Python执行环境下运行,而且遇到CPU受限的问题,那就应该用多进程模块multiprocessing来解决。

  在程序中使用线程也能够简化设计。本文中的大部分示例并不保证能够提高程序运行速度,其目的是使设计结构更加清晰、便于逻辑推理。

2. 建立线程

既然已经对什么是线程有了初步了解,下面让咱们来学习如何建立一个线程。

Python标准库提供了threading模块,里面包含将在本文中介绍的大部分基本模块。在这个模块中,Thread类很好地封装了有关线程的子类,为咱们提供了干净的接口来使用它们。

要启动一个线程,须要建立一个Thread实例,而后调用.start()方法:

import logging
import threading
import time


def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)


if __name__ == "__main__":
    log_format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=log_format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,))
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    # x.join()
    logging.info("Main    : all done")

查看日志语句,能够看到__main__部分正在建立并启动线程:

x = threading.Thread(target=thread_function, args=(1,))
x.start()

建立线程时,咱们须要传递两个参数,第一个参数target是函数名,指定这个线程去哪一个函数里面去执行代码,第二个参数args是一个元组类型,指定为这个函数传递的参数。在本例中,Thread运行函数thread_function(),并将1做为参数传递给该函数。

在本文中,咱们用连续整数为线程命名。虽然threading.get_ident()方法可以为每个线程生成惟一的名称,但这些名称一般会比较长,并且可读性差。

这里的thread_function()函数自己没作什么,它只是简单地记录了一些信息,并用time.sleep()隔开。

运行程序(注释掉倒数第二行代码),结果以下:

15:42:26: Main    : before creating thread
15:42:26: Main    : before running thread
15:42:26: Thread 1: starting
15:42:26: Main    : wait for the thread to finish
15:42:26: Main    : all done
15:42:28: Thread 1: finishing

能够看到,线程Thread__main__部分代码运行完后才结束。下一节会对这一现象作出解释,并讨论被注释掉那行代码。

2.1. 守护线程

在计算机科学中,守护进程daemon是一类在后台运行的特殊进程,用于执行特定的系统任务。

守护进程daemon在Python线程模块threading中有着特殊的含义。当程序退出时,守护线程将当即关闭。能够这么理解,守护线程是一个在后台运行,且不用费心去关闭它的线程,由于它会随程序自动关闭。

若是程序运行的线程是非守护线程,那么程序将等待全部线程结束后再终止。但若是运行的是守护线程,当程序退出时,守护线程会被自动杀死。

咱们仔细研究一下上面程序运行的结果,注意看最后两行。当运行程序时,在__main__部分打印完all done信息后、线程结束前,有一个大约2秒的停顿。

这时,Python在等待非守护线程完成运行。当Python程序结束时,关闭过程的一部分是清理线程。

查看Python线程模块的源代码,能够看到thread ._shutdown()方法遍历全部正在运行的线程,并在每一个非守护线程上调用.join()函数,检查它们是否已经结束运行。

所以,程序退出时须要等待,由于守护线程自己会在休眠中等待其余非守护线程运行结束。一旦thread ._shutdown()运行完毕并打印出信息,程序就能够退出。

守护线程这种自动退出的特性很实用,但其实还有其余的方法能实现相同的功能。咱们先用守护线程重复运行一下上面的程序,看看结果。只需在建立线程时,添加参数daemon=True

x = threading.Thread(target=thread_function, args=(1,), daemon=True)

如今运行程序,结果以下:

15:46:50: Main    : before creating thread
15:46:50: Main    : before running thread
15:46:50: Thread 1: starting
15:46:50: Main    : wait for the thread to finish
15:46:50: Main    : all done
15:46:52: Thread 1: finishing

添加参数daemon=True前

15:46:13: Main    : before creating thread
15:46:13: Main    : before running thread
15:46:13: Thread 1: starting
15:46:13: Main    : wait for the thread to finish
15:46:13: Main    : all done

添加参数daemon=True后

不一样的地方是,以前输出的最后一行不见了,说明thread_function()函数没有机会完成运行。这是一个守护线程,因此当__main__部分运行完最后一行代码,程序终止,守护线程被杀死。

2.2. 加入一个线程

守护线程用起来很方便,但若是想让守护线程运行完毕后再结束程序该怎么办?或者想让守护线程运行完后不退出程序呢?

让咱们来看一下刚刚注释掉的那行代码:

# x.join()

要让一个线程等待另外一个线程完成,能够调用.join()函数。若是取消对这行代码的注释,主线程将会暂停,等待线程x完成运行。

这个功能在守护线程和非守护线程上一样适用。若是用.join()函数加入了一个线程,则主线程将一直等待,直到被加入的线程运行完成。

15:48:06: Main    : before creating thread
15:48:06: Main    : before running thread
15:48:06: Thread 1: starting
15:48:06: Main    : wait for the thread to finish
15:48:08: Thread 1: finishing
15:48:08: Main    : all done

3. 多线程

到目前为止,示例代码中只用到了两个线程:主线程和一个threading.Thread线程对象。

一般,咱们但愿同时启动多个线程,让它们执行不一样的任务。先来看看比较复杂的建立多线程的方法,而后再看简单的。

这个复杂的建立方法其实前面已经展现过了:

import logging
import threading
import time


def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)


if __name__ == "__main__":
    log_format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=log_format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    threads = list()
    for index in range(3):
        logging.info("Main    : create and start thread %d.", index)
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()

    for index, thread in enumerate(threads):
        logging.info("Main    : before joining thread %d.", index)
        thread.join()
        logging.info("Main    : thread %d done", index)

这段代码和前面提到的建立单线程时的结构是同样的,建立线程对象,而后调用.start()方法。程序中会保存一个包含多个线程对象的列表,为稍后使用.join()函数作准备。

屡次运行这段代码可能会产生一些有趣的结果:

15:51:43: Main    : create and start thread 0.
15:51:43: Thread 0: starting
15:51:43: Main    : create and start thread 1.
15:51:43: Thread 1: starting
15:51:43: Main    : create and start thread 2.
15:51:43: Thread 2: starting
15:51:43: Main    : before joining thread 0.
15:51:45: Thread 0: finishing
15:51:45: Main    : thread 0 done
15:51:45: Main    : before joining thread 1.
15:51:45: Thread 2: finishing
15:51:45: Thread 1: finishing
15:51:45: Main    : thread 1 done
15:51:45: Main    : before joining thread 2.
15:51:45: Main    : thread 2 done

仔细看一下输出结果,三个线程都按照预想的顺序建立0,1,2,但它们的结束顺序倒是相反的!屡次运行将会生成不一样的顺序。查看线程Thread x: finish中的信息,能够知道每一个线程都在什么时候完成。

线程的运行顺序是由操做系统决定的,而且很难预测。颇有可能每次运行所获得的顺序都不同,因此在用线程设计算法时须要注意这一点。

幸运的是,Python中提供了几个基础模块,能够用来协调线程并让它们一块儿运行。在介绍这部份内容以前,让咱们先看看如何更简单地建立一组线程。

4. 线程池

咱们能够用一种更简单的方法来建立一组线程:线程池ThreadPoolExecutor,它是Python中concurrent.futures标准库的一部分。(Python 3.2 以上版本适用)。

最简单的方式是把它建立成上下文管理器,并使用with语句管理线程池的建立和销毁。

ThreadPoolExecutor重写上例中的__main__部分,代码以下:

import concurrent.futures
import logging
import time


def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)


if __name__ == "__main__":
    log_format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=log_format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))

这段代码建立一个线程池ThreadPoolExecutor做为上下文管理器,并传入须要的工做线程数量。而后使用.map()遍历可迭代对象,本例中是range(3),每一个对象生成池中的一个线程。

with模块的结尾,会让线程池ThreadPoolExecutor对池中的每一个线程调用.join()。强烈建议使用线程池ThreadPoolExecutor做为上下文管理器,由于这样就不会忘记写.join()

注:

使用线程池ThreadPoolExecutor可能会报一些奇怪的错误。例如,调用一个没有参数的函数,但将参数传入.map()时,线程将抛出异常。

不幸的是,线程池ThreadPoolExecutor会隐藏该异常,程序会在没有任何输出的状况下终止。刚开始调试时,这会让人很头疼。

运行修改后的示例代码,结果以下:

15:54:29: Thread 0: starting
15:54:29: Thread 1: starting
15:54:29: Thread 2: starting
15:54:31: Thread 0: finishing
15:54:31: Thread 2: finishing
15:54:31: Thread 1: finishing

再提醒一下,这里的线程1在线程0以前完成,这是由于线程的调度是由操做系统决定的,并不遵循一个特定的顺序。

5. 竞态条件

在继续介绍Python线程模块的一些其余特性以前,让咱们先讨论一下在编写线程化程序时会遇到的一个更头疼的问题: 竞态条件。

咱们先了解一下竞态条件的含义,而后看一个实例,再继续学习标准库提供的其余模块,来防止竞态条件的发生。

当两个或多个线程访问共享的数据或资源时,可能会出现竞态条件。在本例中,咱们建立了一个每次都会发生的大型竞态条件,但请注意,大多数竞态条件不会如此频繁发生。一般状况下,它们不多发生,但一旦发生,会很难进行调试。

在本例中,咱们会写一个更新数据库的类,但这里并不须要一个真正的数据库,只是一个虚拟的,由于这不是本文讨论的重点。

这个FakeDatabase类包括.__init__().update()方法。

class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)

FakeDatabase类会一直跟踪一个值: .value,它是共享数据,这里会出现竞态条件。

.__init__()方法将.value的值初始化为0。.update()方法从数据库中读取一个值,对其进行一些计算,而后将新值写回数据库。

FakeDatabase类的使用实例以下:

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)

该程序建立一个线程池ThreadPoolExecutor,里面包含两个线程,而后在每一个线程上调用.submit()方法,告诉它们运行database.update()函数。

.submit()容许将位置参数和关键字参数传递给正在线程中运行的函数:

.submit(function, *args, **kwargs)

示例代码中,index做为惟一一个位置参数传递给database.update()函数,后面会介绍,也能够用相似的方式传递多个参数。

因为每一个线程都会运行.update(), 让.value的变量值加上1,因此最后打印出的database.value值应该是2。但若是是这样的话,举这个例子就没有什么意义了。

实际上,运行上面这段代码的输出以下:

16:03:32: Testing update. Starting value is 0.
16:03:32: Thread 0: starting update
16:03:32: Thread 1: starting update
16:03:32: Thread 1: finishing update
16:03:32: Thread 0: finishing update
16:03:32: Testing update. Ending value is 1.

咱们来仔细研究一下这里究竟发生了什么,有助于更好地理解有关这个问题的解决方案。 

5.1. 单线程

在深刻研究上面有关两个线程的问题以前,咱们先回过头看一下线程究竟是如何工做的。

这里不会讨论全部的细节,由于在目前这个学习阶段还不必掌握这么多内容。咱们还将简化一些东西,虽然可能在技术上不够精确,但能够方便你们理解其中的原理。

当线程池ThreadPoolExecutor运行每一个线程时,咱们会指定运行哪一个函数,以及传递给该函数的参数:executor.submit(database.update, index),这里是指运行database.update函数,并传入index参数。

这么作的结果是,线程池中的每一个线程都将调用database.update(index)。注意,主线程__main__中建立的database是对FakeDatabase对象的引用。在这个对象上调用.update(),会调用该对象的实例方法。

每一个线程都将引用同一个FakeDatabase对象:database。每一个线程还有一个独特的index值,使得日志语句更易阅读:

当线程开始运行.update()函数时,它拥有局部变量local_copy。这绝对是一件好事,不然,运行相同函数的两个线程老是会相互混淆。也就是说,函数内定义的局部变量是线程安全的。

如今咱们能够看一下,若是使用单线程、调用一次.update()函数运行上面的程序会发生什么。

下图展现了在只运行一个线程的状况下,.update()函数是如何逐步执行的。代码显示在左上角,后面跟着一张图,显示线程中局部变量local_value和共享数据database.value的值:

 

这张图是这样布局的,从上至下时间增长,它以建立线程1开始,并在线程1终止时结束。

线程1启动时,FakeDatabase.value的值为0。第一行代码将值0复制给局部变量local_copy。接下来,local_copy += 1语句让local_copy的值增长1,能够看到线程1中的.value值变成了1。

而后调用time.sleep()方法,暂停当前线程,并容许其余线程运行。由于本例中只有一个线程,这里没什么影响。

当线程1被唤醒继续运行时,它将新值从局部变量local_copy复制到FakeDatabase.value,线程完成运行。能够看到database.value的值被设为1。

到目前为止,一切顺利。咱们运行了一次.update()函数,FakeDatabase.value值增长到1。

5.2. 两个线程

回到竞态条件,这两个线程会并发运行,但不会同时运行。它们都有各自的局部变量local_copy,并指向相同的database对象。正是database这个共享数据致使了这些问题。

程序建立线程1,运行update()函数:

当线程1调用time.sleep()方法时,它容许另外一个线程开始运行。这时,线程2启动并执行相同的操做。它也将database.value的值复制给私有变量local_copy,但共享数据database.value的值还未更新,仍为0:

当线程2进入休眠状态时,共享数据database.value的值仍是未被修改的0,并且两个线程中的私有变量local_copy的值都是1。

如今线程1被唤醒并保存其私有变量local_copy的值,而后终止,线程2继续运行。线程2在休眠的时候并不知道线程1已经运行完毕并更新了database.value中的值,当继续运行时, 它将本身私有变量local_copy的值存储到database.value中,也是1。

这两个线程交错访问同一个共享对象,覆盖了彼此的结果。当一个线程释放内存或在另外一个线程完成访问以前关闭文件句柄时,可能会出现相似的竞争条件。

5.3. 示例的意义

上面的例子是为了确保每次运行程序时都发生竞态条件。由于操做系统能够在任什么时候候交换出一个线程,因此有可能在读取了x的值以后,像x = x + 1这样的语句会中断,致使写回数据库的值不是咱们想要的。

这一过程当中的细节很是有趣,但本文剩下部分的学习不须要了解具体细节,因此能够先跳过。

看完有关竞态条件的实例,让咱们接下来看看如何解决它们!

6. 同步锁

有不少方法能够避免或解决竞态条件,这里不会介绍全部的解决方法,但会提到一些会常常用到的。让咱们先从锁Lock开始学习。

要解决上述竞态条件问题,须要找到一种方法,每次只容许一个线程进入代码的read-modify-write部分。最经常使用就是Python中的锁。在一些其余语言中,一样的思想被称为互斥锁mutex。互斥锁mutex属于进程互斥MUTual EXclusion的一部分,它和锁所作的工做是同样的。

锁是一种相似于通行证的东西,每次只有一个线程能够拥有锁,任何其余想要得到锁的线程必须等待,直到该锁的全部者将它释放出来。

完成此任务的基本函数是.acquire().release()。线程将调用my_lock.acquire()来获取锁。若是锁已经存在,则调用线程将会等待,直到锁被释放。这里有一点很重要,若是一个线程得到了锁,但从未释放,程序会被卡住。稍后会介绍更多关于这方面的内容。

幸运的是,Python的锁也将做为上下文管理器运行,因此能够在with语句中使用它,而且当with模块出于任何缘由退出时,锁会自动释放。

让咱们看看添加了锁的FakeDatabase,调用函数保持不变:

import logging
import time
import threading


class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def locked_update(self, name):
        logging.info("Thread %s: starting update", name)
        logging.debug("Thread %s about to lock", name)
        with self._lock:
            logging.debug("Thread %s has lock", name)
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
            logging.debug("Thread %s about to release lock", name)
        logging.debug("Thread %s after release", name)
        logging.info("Thread %s: finishing update", name)

除了添加一些调试日志以便更清楚地查看锁的运行以外,这里最大的变化是添加了一个叫._lock的成员,它是一个thread . lock()对象。这个._lock在未锁定状态下被初始化,并由with语句锁定和释放。

值得注意的是,运行该函数的线程将一直持有这个锁,直到它彻底更新完数据库。在本例中,这意味着它将在复制、更新、休眠并将值写回数据库的整个过程当中持有锁。

日志设置为警告级别,运行程序,结果以下:

16:08:44: Testing update. Starting value is 0.
16:08:44: Thread 0: starting update
16:08:44: Thread 1: starting update
16:08:44: Thread 0: finishing update
16:08:44: Thread 1: finishing update
16:08:44: Testing update. Ending value is 2.

在主线程__main__中配置完日志输出后,将日志级别设置为DEBUG能够打开完整的日志:

logging.getLogger().setLevel(logging.DEBUG) 

用调试日志运行程序的结果以下:

16:09:59: Testing update. Starting value is 0.
16:09:59: Thread 0: starting update
16:09:59: Thread 0 about to lock
16:09:59: Thread 1: starting update
16:09:59: Thread 0 has lock
16:09:59: Thread 1 about to lock
16:09:59: Thread 0 about to release lock
16:09:59: Thread 0 after release
16:09:59: Thread 1 has lock
16:09:59: Thread 0: finishing update
16:10:00: Thread 1 about to release lock
16:10:00: Thread 1 after release
16:10:00: Thread 1: finishing update
16:10:00: Testing update. Ending value is 2.

线程0得到锁,而且在它进入睡眠状态时仍然持有锁。而后线程1启动并尝试获取同一个锁,由于线程0仍然持有它,线程1就必须等待。这就是互斥锁。

本文其他部分的许多示例都有警告和调试级别的日志记录。咱们一般只显示警告级别的输出,由于调试日志可能很是长。

7. 死锁

在继续学习以前,咱们先看一下使用锁时会出现的常见问题。在上例中,若是锁已经被某个线程获取,那么第二次调用.acquire()时将一直等待,直到持有锁的线程调用.release()将锁释放。

思考一下,运行下面这段代码会获得什么结果:

import threading

l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")

当程序第二次调用l.acquire()时,它须要等待锁被释放。在本例中,能够删除第二次调用修复死锁,可是死锁一般在如下两种状况下会发生:

① 锁没有被正确释放时会产生运行错误;

② 在一个实用程序函数须要被其余函数调用的地方会出现设计问题,这些函数可能已经拥有或者没有锁。

第一种状况有时会发生,可是使用锁做为上下文管理器能够大大减小这种状况发生的频率。建议充分利用上下文管理器来编写代码,由于它们有助于避免出现异常跳过.release()调用的状况。

在某些语言中,设计问题可能有点棘手。庆幸的是,Python的线程模块还提供了另外一个锁对象RLock。它容许线程在调用.release()以前屡次获取.acquire()锁,且程序不会阻塞。该线程仍须要保证.release().acquire()的调用次数相同,但它是用了另外一种方式而已。

LockRLock是线程化编程中用来防止竞争条件的两个基本工具,还有一些其余的工具。在研究它们以前,咱们先转移到一个稍微不一样的领域。

8. 生产者-消费者模型中的线程

生产者-消费者模型是一个标准的计算机科学领域的问题,用于解决线程同步或进程同步。咱们先介绍一个它的变形,大体了解一下Python中的线程模块提供了哪些基础模块。 

本例中,假设须要写一个从网络读取消息并将其写入磁盘的程序。该程序不会主动请求消息,它必须在消息传入时侦听并接受它们。并且这些消息不会以固定的速度传入,而是以突发的方式传入。这一部分程序叫作生产者。

另外一方面,一旦传入了消息,就须要将其写入数据库。数据库访问很慢,但访问速度足以跟上消息传入的平均速度。但当大量消息同时传入时,速度会跟不上。这部分程序叫消费者。

在生产者和消费者之间,须要建立一个管道Pipeline,随着对不一样同步对象的深刻了解,咱们须要对管道里面的内容进行修改。

这就是基本的框架。让咱们看看使用Lock的解决方案。虽然它并非最佳的解决方法,但它运用的是前面已经介绍过的工具,因此比较容易理解。

8.1. 在生产者-消费者模型中使用锁

既然这是一篇关于Python线程的文章,并且刚刚已经阅读了有关锁的内容,因此让咱们尝试用锁解决竞态条件问题。

先写一个生产者线程,从虚拟网络中读取消息并放入管道中:

SENTINEL = object()

def producer(pipeline):
    """Pretend we're getting a message from the network."""
    for index in range(10):
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, "Producer")

生产者得到一个介于1到100之间的随机数,做为生成的虚拟消息。它调用管道上的.set_message()方法将其发送给消费者。

生产者还用一个SENTINEL值来警告消费者,在它发送10个值以后中止。这有点奇怪,但没必要担忧,在完成本示例后,会介绍如何去掉这个SENTINEL值。

管道pipeline的另外一端是消费者:

def consumer(pipeline):
    """Pretend we're saving a number in the database."""
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)

消费者从管道中读取一条消息并将其写入虚拟数据库,在本例中,只是将其储存到磁盘中。若是消费者获取了SENTINEL值,线程会终止。

在研究管道Pipeline以前,先看一下生成这些线程的主线程__main__部分:

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)

看起来应该很熟悉,由于它和前面示例中介绍过的__main__部分相似。

注意,打开调试日志能够查看全部的日志消息,方法是取消对这一行的注释:

# logging.getLogger().setLevel(logging.DEBUG)

咱们有必要遍历调试日志消息,来查看每一个线程是在何处得到和释放锁的。

如今让咱们看一下将消息从生产者传递给消费者的管道Pipeline:

class Pipeline(object):
    """
    Class to allow a single element pipeline between producer and consumer.
    """
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        logging.debug("%s:about to acquire getlock", name)
        self.consumer_lock.acquire()
        logging.debug("%s:have getlock", name)
        message = self.message
        logging.debug("%s:about to release setlock", name)
        self.producer_lock.release()
        logging.debug("%s:setlock released", name)
        return message

    def set_message(self, message, name):
        logging.debug("%s:about to acquire setlock", name)
        self.producer_lock.acquire()
        logging.debug("%s:have setlock", name)
        self.message = message
        logging.debug("%s:about to release getlock", name)
        self.consumer_lock.release()
        logging.debug("%s:getlock released", name)

好长一段代码!别惧怕,大部分是日志语句,删除全部日志语句后的代码以下:

class Pipeline:
    """
    Class to allow a single element pipeline between producer and consumer.
    """
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        self.consumer_lock.acquire()
        message = self.message
        self.producer_lock.release()
        return message

    def set_message(self, message, name):
        self.producer_lock.acquire()
        self.message = message
        self.consumer_lock.release()

这样看起来更清晰,管道类中有三个成员:

 .message存储要传递的消息;

 .producer_lock是一个线程锁对象,限制生产者线程对消息的访问;

.consumer_lock也是一个线程锁,限制消费者线程对消息的访问。

__init__() 初始化这三个成员,而后在.consumer_lock上调用.acquire(),消费者得到锁。生产者能够添加新消息,但消费者须要等待消息出现。

get_message().set_messages()几乎是相反的操做。.get_message()consumer_lock上调用.acquire(),这么作的目的是让消费者等待,直到有消息传入。

一旦消费者得到了锁.consumer_lock,它会将self.message的值复制给.message,而后在.producer_lock上调用.release()。释放此锁容许生产者在管道中插入下一条消息。

.get_message()函数中有一些细节很容易被忽略。你们思考一下,为何不把message变量删掉,直接返回self.message的值呢?

答案以下。

只要消费者调用.producer_lock.release(),它就被交换出去,生产者开始运行,这可能发生在锁被彻底释放以前!也就是说,存在一种微小的可能性,当函数返回self.message时,这个值是生产者生成的下一条消息,致使第一条消息丢失。这是竞态条件的另外一个例子。

咱们继续看事务的另外一端:.set_message()。生产者经过传入一条消息来调用该函数,得到锁.producer_lock,传入.message值,而后调用consumer_lock.release()释放锁,这将容许消费者读取该值。

运行代码,日志设置为警告级别,结果以下:

16:17:31: Producer got message: 3
16:17:31: Producer got message: 98
16:17:31: Consumer storing message: 3
16:17:31: Producer got message: 83
16:17:31: Consumer storing message: 98
16:17:31: Producer got message: 96
16:17:31: Consumer storing message: 83
16:17:31: Producer got message: 34
16:17:31: Consumer storing message: 96
16:17:31: Producer got message: 71
16:17:31: Consumer storing message: 34
16:17:31: Producer got message: 79
16:17:31: Consumer storing message: 71
16:17:31: Producer got message: 90
16:17:31: Consumer storing message: 79
16:17:31: Producer got message: 3
16:17:31: Consumer storing message: 90
16:17:31: Producer got message: 58
16:17:31: Consumer storing message: 3
16:17:31: Consumer storing message: 58

你们可能会以为奇怪,生产者在消费者还没运行以前就得到了两条消息。回过头仔细看一下生产者和.set_message()函数,生产者先获取消息,打印出日志语句,而后试图将消息放入管道中,这时才须要等待锁。

当生产者试图传入第二条消息时,它会第二次调用.set_message(),发生阻塞。

操做系统能够在任什么时候候交换线程,但它一般会容许每一个线程在交换以前有一段合理的运行时间。这就是为何生产者会一直运行,直到第二次调用.set_message()时被阻塞。

一旦线程被阻塞,操做系统老是会把它交换出去,并找到另外一个线程去运行。在本例中,就是消费者线程。

消费者调用.get_message()函数,它读取消息并在.producer_lock上调用.release()方法,释放锁,容许生产者再次运行。

注意,第一个值是43,正是消费者所读取的值,虽然生产者已经生成了新值45。

尽管使用锁的这种方法适用于本例,但对于常见的生产者-消费者模式问题,这不是一个很好的解决方法,由于它一次只容许管道中有一个值。当生产者收到大量值时,将无处安放。

让咱们继续看一个更好的解决方法:使用队列Queue.

8.2. 在生产者-消费者模型中使用队列

若是想在管道中一次处理多个值,咱们须要为管道提供一个数据结构,当从生产者线程备份数据时,该结构容许管道中的数据量灵活变更,再也不是单一值。

Python标准库中有一个模块叫队列queue,里面有一个类叫Queue。让咱们用队列Queue改写一下上面受锁保护的管道。

此外,咱们还会介绍另外一种中止工做线程的方法,使用Python线程模块中的事件Event对象。

事件的触发机制能够是多种多样的。在本例中,主线程只是休眠一段时间,而后调用event.set()方法,通知全部处于等待阻塞状态的线程恢复运行状态:

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

这里唯一的变化是在第8行建立了事件对象event,在第10行和第11行传递了event参数,代码的最后一个部分13-15行,先休眠0.1秒,记录一条消息,而后在事件上调用.set()方法。

生产者也不用变太多:

def producer(pipeline, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
    message = random.randint(1, 101)
    logging.info("Producer got message: %s", message)
    pipeline.set_message(message, "Producer")

    logging.info("Producer received EXIT event. Exiting")

在第3行循环部分设置了事件,并且也不用再把SENTINEL值放入管道中。

消费者的变化稍多:

def consumer(pipeline, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get_message("Consumer")
        logging.info(
            "Consumer storing message: %s  (queue size=%s)",
            message,
            pipeline.qsize(),
        )

    logging.info("Consumer received EXIT event. Exiting") 

除了须要删掉和SENTINEL值相关的代码,还要执行稍微复杂一点的循环条件。它会一直循环,直到事件结束,管道中的数据被清空。

必定要确保当消费者退出时,队列是空的。若是消费者在管道包含消息时退出,可能会出现两个问题。一是会丢失那部分数据,但更严重的是生产者会被锁住。

在生产者检查.is_set()条件后、但在调用pipeline.set_message()前触发事件,则会发生这种状况。

一旦发生这种状况,生产者可能被唤醒并退出,但此时锁仍被消费者持有。而后,生产者将尝试用.acquire()方法获取锁,可是消费者已经退出,并且永远不会释放锁,因此生产者就会一直等下去。

消费者的其他部分看起来应该很熟悉。

管道类的写法变化最大:

Pipelinequeue.Queue的一个子类。Queue队列里面有一个可选参数,在初始化时指定队列所能容纳的最大数据量。

.get_message().set_message()变得更简短,被队列中的.get().put()方法替代。

你们可能想知道,防止竞争条件的代码都跑哪里去了?

class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)

编写标准库的核心开发人员知道,在多线程环境中常用队列Queue,所以将全部锁定代码合并到了队列Queue模块内部。队列Queue自己就是线程安全的。

程序运行结果以下:

16:24:24: Producer got message: 96
16:24:24: Producer got message: 90
16:24:24: Consumer storing message: 96  (queue size=0)
16:24:24: Producer got message: 92
16:24:24: Consumer storing message: 90  (queue size=0)
16:24:24: Producer got message: 40
16:24:24: Consumer storing message: 92  (queue size=0)
16:24:24: Producer got message: 21
16:24:24: Consumer storing message: 40  (queue size=0)
16:24:24: Producer got message: 19
16:24:24: Consumer storing message: 21  (queue size=0)
16:24:24: Producer got message: 29
16:24:24: Consumer storing message: 19  (queue size=0)
16:24:24: Producer got message: 43
16:24:24: Consumer storing message: 29  (queue size=0)
16:24:24: Producer got message: 76
16:24:24: Consumer storing message: 43  (queue size=0)
16:24:24: Producer got message: 4
16:24:24: Consumer storing message: 76  (queue size=0)
16:24:24: Producer got message: 38
16:24:24: Consumer storing message: 4  (queue size=0)
16:24:24: Producer got message: 83
16:24:24: Consumer storing message: 38  (queue size=0)
16:24:24: Producer got message: 38
16:24:24: Consumer storing message: 83  (queue size=0)
16:24:24: Producer got message: 54
16:24:24: Consumer storing message: 38  (queue size=0)
16:24:24: Producer got message: 80
16:24:24: Consumer storing message: 54  (queue size=0)
16:24:24: Producer got message: 94
16:24:24: Consumer storing message: 80  (queue size=0)
16:24:24: Producer got message: 11
16:24:24: Consumer storing message: 94  (queue size=0)
16:24:24: Producer got message: 98
16:24:24: Consumer storing message: 11  (queue size=0)
16:24:24: Producer got message: 88
16:24:24: Consumer storing message: 98  (queue size=0)
16:24:24: Producer got message: 31
16:24:24: Consumer storing message: 88  (queue size=0)
16:24:24: Producer got message: 78
16:24:24: Consumer storing message: 31  (queue size=0)
16:24:24: Producer got message: 84
16:24:24: Consumer storing message: 78  (queue size=0)
16:24:24: Producer got message: 47
16:24:24: Consumer storing message: 84  (queue size=0)
16:24:24: Producer got message: 60
16:24:24: Consumer storing message: 47  (queue size=0)
16:24:24: Producer got message: 29
16:24:24: Consumer storing message: 60  (queue size=0)
16:24:24: Producer got message: 59
16:24:24: Consumer storing message: 29  (queue size=0)
16:24:24: Producer got message: 19
16:24:24: Consumer storing message: 59  (queue size=0)
16:24:24: Producer got message: 97
16:24:24: Consumer storing message: 19  (queue size=0)
16:24:24: Producer got message: 37
16:24:24: Consumer storing message: 97  (queue size=0)
16:24:24: Producer got message: 39
16:24:24: Consumer storing message: 37  (queue size=0)
16:24:24: Producer got message: 78
16:24:24: Consumer storing message: 39  (queue size=0)
16:24:24: Producer got message: 63
16:24:24: Consumer storing message: 78  (queue size=0)
16:24:24: Producer got message: 51
16:24:24: Consumer storing message: 63  (queue size=0)
16:24:24: Producer got message: 37
16:24:24: Consumer storing message: 51  (queue size=0)
16:24:24: Producer got message: 34
16:24:24: Consumer storing message: 37  (queue size=0)
16:24:24: Producer got message: 46
16:24:24: Consumer storing message: 34  (queue size=0)
16:24:24: Producer got message: 33
16:24:24: Consumer storing message: 46  (queue size=0)
16:24:24: Producer got message: 32
16:24:24: Consumer storing message: 33  (queue size=0)
16:24:24: Producer got message: 39
16:24:24: Consumer storing message: 32  (queue size=0)
16:24:24: Producer got message: 18
16:24:24: Consumer storing message: 39  (queue size=0)
16:24:24: Producer got message: 68
16:24:24: Consumer storing message: 18  (queue size=0)
16:24:24: Producer got message: 28
16:24:24: Consumer storing message: 68  (queue size=0)
16:24:24: Producer got message: 32
16:24:24: Consumer storing message: 28  (queue size=0)
16:24:24: Producer got message: 35
16:24:24: Consumer storing message: 32  (queue size=0)
16:24:24: Producer got message: 20
16:24:24: Consumer storing message: 35  (queue size=0)
16:24:24: Producer got message: 100
16:24:24: Consumer storing message: 20  (queue size=0)
16:24:24: Producer got message: 88
16:24:24: Consumer storing message: 100  (queue size=0)
16:24:24: Producer got message: 84
16:24:24: Consumer storing message: 88  (queue size=0)
16:24:24: Producer got message: 87
16:24:24: Consumer storing message: 84  (queue size=0)
16:24:24: Producer got message: 90
16:24:24: Consumer storing message: 87  (queue size=0)
16:24:24: Producer got message: 65
16:24:24: Consumer storing message: 90  (queue size=0)
16:24:24: Producer got message: 29
16:24:24: Consumer storing message: 65  (queue size=0)
16:24:24: Producer got message: 91
16:24:24: Consumer storing message: 29  (queue size=0)
16:24:24: Producer got message: 71
16:24:24: Consumer storing message: 91  (queue size=0)
16:24:24: Producer got message: 10
16:24:24: Consumer storing message: 71  (queue size=0)
16:24:24: Producer got message: 9
16:24:24: Consumer storing message: 10  (queue size=0)
16:24:24: Producer got message: 44
16:24:24: Consumer storing message: 9  (queue size=0)
16:24:24: Producer got message: 21
16:24:24: Consumer storing message: 44  (queue size=0)
16:24:24: Producer got message: 28
16:24:24: Consumer storing message: 21  (queue size=0)
16:24:24: Producer got message: 69
16:24:24: Consumer storing message: 28  (queue size=0)
16:24:24: Producer got message: 83
16:24:24: Consumer storing message: 69  (queue size=0)
16:24:24: Producer got message: 81
16:24:24: Consumer storing message: 83  (queue size=0)
16:24:24: Producer got message: 65
16:24:24: Consumer storing message: 81  (queue size=0)
16:24:24: Producer got message: 26
16:24:24: Consumer storing message: 65  (queue size=0)
16:24:24: Producer got message: 74
16:24:24: Consumer storing message: 26  (queue size=0)
16:24:24: Producer got message: 33
16:24:24: Consumer storing message: 74  (queue size=0)
16:24:24: Producer got message: 89
16:24:24: Consumer storing message: 33  (queue size=0)
16:24:24: Producer got message: 27
16:24:24: Consumer storing message: 89  (queue size=0)
16:24:24: Producer got message: 21
16:24:24: Consumer storing message: 27  (queue size=0)
16:24:24: Producer got message: 75
16:24:24: Consumer storing message: 21  (queue size=0)
16:24:24: Producer got message: 74
16:24:24: Consumer storing message: 75  (queue size=0)
16:24:24: Producer got message: 79
16:24:24: Consumer storing message: 74  (queue size=0)
16:24:24: Producer got message: 66
16:24:24: Consumer storing message: 79  (queue size=0)
16:24:24: Producer got message: 87
16:24:24: Consumer storing message: 66  (queue size=0)
16:24:24: Producer got message: 47
16:24:24: Consumer storing message: 87  (queue size=0)
16:24:24: Producer got message: 13
16:24:24: Consumer storing message: 47  (queue size=0)
16:24:24: Producer got message: 9
16:24:24: Consumer storing message: 13  (queue size=0)
16:24:24: Producer got message: 62
16:24:24: Consumer storing message: 9  (queue size=0)
16:24:24: Producer got message: 6
16:24:24: Consumer storing message: 62  (queue size=0)
16:24:24: Producer got message: 70
16:24:24: Consumer storing message: 6  (queue size=0)
16:24:24: Producer got message: 18
16:24:24: Consumer storing message: 70  (queue size=0)
16:24:24: Producer got message: 44
16:24:24: Consumer storing message: 18  (queue size=0)
16:24:24: Producer got message: 14
16:24:24: Consumer storing message: 44  (queue size=0)
16:24:24: Producer got message: 88
16:24:24: Consumer storing message: 14  (queue size=0)
16:24:24: Producer got message: 21
16:24:24: Consumer storing message: 88  (queue size=0)
16:24:24: Producer got message: 28
16:24:24: Consumer storing message: 21  (queue size=0)
16:24:24: Producer got message: 86
16:24:24: Consumer storing message: 28  (queue size=0)
16:24:24: Producer got message: 55
16:24:24: Consumer storing message: 86  (queue size=0)
16:24:24: Producer got message: 75
16:24:24: Consumer storing message: 55  (queue size=0)
16:24:24: Producer got message: 78
16:24:24: Consumer storing message: 75  (queue size=0)
16:24:24: Producer got message: 72
16:24:24: Consumer storing message: 78  (queue size=0)
16:24:24: Producer got message: 36
16:24:24: Consumer storing message: 72  (queue size=0)
16:24:24: Producer got message: 45
16:24:24: Consumer storing message: 36  (queue size=0)
16:24:24: Producer got message: 59
16:24:24: Consumer storing message: 45  (queue size=0)
16:24:24: Producer got message: 66
16:24:24: Consumer storing message: 59  (queue size=0)
16:24:24: Producer got message: 67
16:24:24: Consumer storing message: 66  (queue size=0)
16:24:24: Producer got message: 70
16:24:24: Consumer storing message: 67  (queue size=0)
16:24:24: Producer got message: 41
16:24:24: Consumer storing message: 70  (queue size=0)
16:24:24: Producer got message: 91
16:24:24: Consumer storing message: 41  (queue size=0)
16:24:24: Producer got message: 85
16:24:24: Consumer storing message: 91  (queue size=0)
16:24:24: Producer got message: 59
16:24:24: Consumer storing message: 85  (queue size=0)
16:24:24: Producer got message: 46
16:24:24: Consumer storing message: 59  (queue size=0)
16:24:24: Producer got message: 14
16:24:24: Consumer storing message: 46  (queue size=0)
16:24:24: Producer got message: 9
16:24:24: Consumer storing message: 14  (queue size=0)
16:24:24: Producer got message: 88
16:24:24: Consumer storing message: 9  (queue size=0)
16:24:24: Producer got message: 16
16:24:24: Consumer storing message: 88  (queue size=0)
16:24:24: Producer got message: 82
16:24:24: Consumer storing message: 16  (queue size=0)
16:24:24: Producer got message: 42
16:24:24: Consumer storing message: 82  (queue size=0)
16:24:24: Producer got message: 7
16:24:24: Consumer storing message: 42  (queue size=0)
16:24:24: Producer got message: 21
16:24:24: Consumer storing message: 7  (queue size=0)
16:24:24: Producer got message: 84
16:24:24: Consumer storing message: 21  (queue size=0)
16:24:24: Producer got message: 2
16:24:24: Consumer storing message: 84  (queue size=0)

16:24:24: Producer got message: 84
16:24:24: Consumer storing message: 48  (queue size=0)
16:24:24: Producer got message: 88
16:24:24: Consumer storing message: 84  (queue size=0)
16:24:24: Producer got message: 22
16:24:24: Consumer storing message: 88  (queue size=0)
16:24:24: Producer got message: 39
16:24:24: Consumer storing message: 22  (queue size=0)
16:24:24: Producer got message: 52
16:24:24: Consumer storing message: 39  (queue size=0)
16:24:24: Main: about to set event
16:24:24: Producer got message: 98
16:24:24: Consumer storing message: 52  (queue size=0)
16:24:24: Producer received EXIT event. Exiting
16:24:24: Consumer storing message: 98  (queue size=0)
16:24:24: Consumer received EXIT event. Exiting

生产者建立了5条消息,并将其中4条放到队列中。但在放置第5条消息以前,它被操做系统交换出去了。

而后消费者开始运行并储存第1条消息,打印出该消息和队列大小:

Consumer storing message: 32 (queue size=3)

这就是为何第5条消息没有成功进入管道。删除一条消息后,队列的大小缩减到3个。由于队列最多能够容纳10条消息,因此生产者线程没有被队列阻塞,而是被操做系统交换出去了。

注意:每次运行所获得的结果会不一样。这就是使用线程的乐趣所在!

当程序开始结束时,主线程触发事件,生产者当即退出。但消费者仍有不少工做要作,因此它会继续运行,直到清理完管道中的数据为止。

尝试修改生产者或消费者中的队列大小和time.sleep()中的休眠时间,来分别模拟更长的网络或磁盘访问时间。即便是轻微的更改,也会对结果产生很大的影响。

对于生产者-消费者模型,这是一个更好的解决方法,但其实能够进一步简化。去掉管道Pipeline和日志语句,就只剩下和queue.Queue相关的语句了。

直接使用queue.Queue的最终代码以下:

import concurrent.futures
import logging
import random
import threading
import time
import queue


class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)


def consumer(pipeline, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get_message("Consumer")
        logging.info(
            "Consumer storing message: %s  (queue size=%s)",
            message,
            pipeline.qsize(),
        )

    logging.info("Consumer received EXIT event. Exiting")


def producer(pipeline, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    logging.info("Producer received EXIT event. Exiting")


if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

能够看到,使用Python的内置基础模块可以简化复杂的问题,让代码阅读起来更清晰。

Lock和队列Queue是解决并发问题很是方便的两个类,但其实标准库还提供了其余类。在结束本教程以前,让咱们快速浏览一下还有哪些类。

9. 线程对象

Python的线程threading模块还有其余一些基本类型。虽然在上面的例子中没有用到,但它们会在不一样的状况下派上用场,因此熟悉一下仍是很好处的。

9.1 信号量

首先要介绍的是信号量thread.semaphore,信号量是具备一些特殊属性的计数器。

第一个属性是计数的原子性,能够保证操做系统不会在计数器递增或递减的过程当中交换线程。

内部计数器在调用.release()时递增,在调用.acquire()时递减。

另外一个特殊属性是,若是线程在计数器为0时调用.acquire(),那么该线程将阻塞,直到另外一个线程调用.release()并将计数器的值增长到1。

信号量一般用于保护容量有限的资源。例如,咱们有一个链接池,而且但愿限制该链接池中的元素数量,就能够用信号量来进行管理。

9.2 定时器

threading.Timer是一个定时器功能的类,指定函数在间隔特定时间后执行任务。咱们能够经过传入须要等待的时间和函数来建立一个定时器:

t = threading.Timer(30.0, my_function)

调用.start()启动定时器,函数将在指定时间事后的某个时间点上被新线程调用。但请注意,这里并不能保证函数会在咱们所指望的确切时间被调用,可能会存在偏差。  

若是想要中止已经启动的定时器,能够调用.cancel()。在定时器触发后调用.cancel()不会执行任何操做,也不会产生异常。

定时器可用于在特定时间以后提示用户执行操做。若是用户在定时器过期以前执行了操做,能够调用.cancel()取消定时。

9.3 栅栏

threading模块中的栅栏Barrier能够用来指定须要同步运行的线程数量。建立栅栏Barrier时,咱们必须指定所需同步的线程数。每一个线程都会在Barrier上调用.wait()方法,它们会先保持阻塞状态,直到等待的线程数量达到指定值时,会被同时释放。

注意,线程是由操做系统调度的,所以,即便全部线程同时被释放,一次也只能运行一个线程。

栅栏能够用来初始化一个线程池。让线程初始化后在栅栏里等待,能够确保程序在全部线程都完成初始化后再开始运行。

相关文章
相关标签/搜索