【译】线程:概念和实现(2)

翻译:老齐python

译者注:与本文相关图书推荐:《Python大学实用教程》《跟老齐学Python:轻松入门》数据库


第二部分

竞态条件

在讨论Python线程的其余特性以前,让咱们先讨论一下编写线程程序时遇到的一个更困难的问题:竞态条件。安全

一旦你了解了什么是竞态条件,并看到了正在发生的状况,而后就使用标准库提供的模块,以防止这些竞态条件的出现。bash

当两个或多个线程访问共享数据或资源时,可能会出现竞态状况。在本例中,你将建立一个每次都发生的大型竞态条件,但请注意,大多数它并非很明显。示例中的状况一般不多发生,并且会产生使人困惑的结果。能够想象,由于竞态条件而引发的bug很难被发现。微信

幸运的是,在下述示例中竞态问题每次都会发生,你将详细地了解它以便解释发生了什么。函数

对于本例,将编写一个更新数据库的类。你不会真的有一个数据库:你只是要伪造它,由于这不是本文的重点。工具

FakeDatabase类中有.__init__().update()方法:post

class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)
复制代码

FakeDatabase中的属性.value,用于做为竞态条件中共享的数据。ui

.__init__()中将.value值初始化为0.,到目前为止,一切正常。spa

.update() 看起来有点奇怪,它模拟从数据库中读取一个值,对其进行一些计算,而后将一个新值写回数据库。

所谓从数据库中读取,即将.value的值复制到本地变量。计算就是在原值上加1,而后.sleep() 一小会儿。最后,它经过将本地值复制回.value,将值写回去。

下面是FakeDatabase的使用方法:

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)
复制代码

程序中建立了两个ThreadPoolExecutor,而后对每一个线程调用.submit(),告诉它们运行database.update()

.submit()有一个明显特征,它容许将位置参数和命名参数传给线程中运行的函数:

.submit(function, *args, **kwargs)
复制代码

在上面的用法中,index做为第一个也是惟一一个位置参数传给database.update()。你将在本文后面看到,能够用相似的方式传多个参数。

因为每一个线程都运行.update(),而.update()会让.value的值加1,所以在最后打印时,你可能会但愿database.value为2。但若是是这样的话,你就不会看这个例子了。若是运行上述代码,则输出以下:

$ ./racecond.py
Testing unlocked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing unlocked update. Ending value is 1.
复制代码

你可能已经预料到这种状况会发生,可是让咱们来看看实际状况的细节,由于这将使这个问题的解决方案更容易理解。

单线程

在用两个线程深刻讨论这个问题以前,让咱们先退一步,谈谈线程工做流程的一些细节。

咱们不会在这里深刻讨论全部的细节,由于这种全面深刻的讨论如今并不重要。咱们还将简化一些事情,这种作法虽然在技术上并不许确,但会让你对正在发生的事情有正确的认识。

当你告诉ThreadPoolExecutor运行每一个线程时,也就是告诉它要运行哪一个函数以及要传给它的参数:executor.submit(database.update, index)

其结果是线程池中的每一个线程都将调用database.update(index)。注意,database__main__中建立的FakeDatabase实例对象,调用它的方法.update()

每一个线程都将引用同一个FakeDatabase的实例database,每一个线程还将有一个惟一的值index。为了让上述过程更容易理解,请看下图:

当某线程开始运行.update()时,它有此方法的本地的数据,即.update()中的local_copy。这绝对是件好事,不然,在两个线程中运行同一个函数就会互相干扰了。这意味着该函数的全部做用域(或本地)变量对于线程来讲都是安全的。

如今,你已经理解,若是使用单个线程和对.update()的单个调用来运行上面的程序会发生什么状况。

若是只运行一个线程,以下图所示,会一步一步地执行.update()。下图中,语句显示在上面,下面用图示方式演示了线程中的local_value和共享的database.value 中的值的变化:

按照时间顺序,从上到下观察上面的示意图,从建立线程Thread 1开始,到Thread 1结束终止。

Thread 1启动时,FakeDatabase.value为零。方法中的第一行代码local_copy=self.value将0复制到局部变量。接下来,使用local_copy+=1语句增长local_copy的值。你能够看到Thread 1中的.value值为1。

而后,调用下一个time.sleep(),这将使当前线程暂停并容许其余线程运行。由于在这个例子中只有一个线程,因此这没有影响。

Thread 1唤醒并继续时,它将新值从local_copy复制到FakeDatabase.value,而后线程完成。你能够看到database.value为1。

到目前为止,一切正常。你只运行了一次.update()而且将FakeDatabase.value递增为1。

两个线程

回到竞态条件,两个线程并行,但不是同时运行。每一个线程都有本身的local_copy,并指向相同的database,正是这个共享数据库对象致使了这些问题。

程序仍是从Thread 1执行.update()开始:

Thread 1调用time.sleep()时,它容许另外一个线程开始运行。这就是事情变得有趣的地方。

Thread 2启动并执行相同的操做。它也将database.value复制到其私有的local_copy,而此时共享的database.value还没有更新:

Thread 1进入睡眠状态时,共享的database.value仍然未被修改,仍是0,而此时的local_copy的两个私有版本的值都为1。

Thread 1如今醒来并保存其local_copy的值,而后线程终止,给Thread 2机会。Thread 2不知道在它睡眠时Thread 1运行并更新了database.value的值。Thread 2也将它的local_copy值存储到database.value中,并将其设置为1:

这两个线程交替访问一个共享对象,覆盖彼此的结果。当一个线程释放内存或在另外一个线程完成访问以前关闭文件句柄时,可能会出现相似的竞态。

为何这不是一个愚蠢的示例

上面的例子是刻意而为,目的是确保每次运行程序时都会发生竞态。由于操做系统能够在任什么时候候交换线程,因此在读取x的值以后,而且在写回递增的值以前,能够中断相似x=x+1的语句。

发生这种状况的缘由细节很是有趣,但这篇文章的其他部分并不须要这些细节,因此能够跳过这个隐藏的部分。

既然你已经看到了运行过程当中的竞态条件,让咱们找出解决问题的方法!

使用锁实现同步

有不少方法能够避免或解决竞态。你不会在这里看到全部这些方法,可是有一些方法是常用的。让咱们从Lock开始。

要解决上述竞态条件,须要找到一种方法,使得在代码的“读-修改-写”操做中一次只容许一个线程。最多见的方法是使用Python中名为Lock的方法。在其余的一些语言中,相似的被称为MutexMutex源于MUTual EXclusion,这正是Lock的做用。

Lock像是通行证,一次只能有一个线程拥有Lock,任何其余想要Lock的线程都必须等到Lock的全部者放弃它。

执行此操做的基本函数是.acquire().release()。线程将调用my_lock.acquire()来获取本身的锁。若是锁已经被其余线程全部,则将等待它被释放。这里有一点很重要,若是一个线程获得了锁,但还没有返回,你的程序将被卡住。你稍后会读到更多关于这方面的内容。

幸运的是,Python的Lock也将做为上下文管理器运行,所以你能够在一个带有with的语句中使用它,而且当with代码块因为任何缘由退出时,锁也会自动释放。

让咱们看看添加了锁的FakeDatabase,其所调用函数保持不变:

class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def locked_update(self, name):
        logging.info("Thread %s: starting update", name)
        logging.debug("Thread %s about to lock", name)
        with self._lock:
            logging.debug("Thread %s has lock", name)
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
            logging.debug("Thread %s about to release lock", name)
        logging.debug("Thread %s after release", name)
        logging.info("Thread %s: finishing update", name)
复制代码

除了添加一堆调试日志以便更清楚地看到锁操做以外,这里的大变化是添加一个名为._lock的属性,它是一个threading.Lock()实例对象。这个._lock在未锁定状态下初始化,并由with语句锁定和释放。

这里值得注意的是,运行此方法的线程将一直保持Lock,直到彻底完成对数据库的更新。在这种状况下,这意味着函数将在复制、更新、休眠时保持锁定,而后将值写回数据库。

若是在日志记录设置为警告级别的状况下运行此版本,你将看到如下内容:

$ ./fixrace.py
Testing locked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing locked update. Ending value is 2.
复制代码

看看这个。你的程序终于成功了!

__main__中配置日志输出后,能够经过添加如下语句将级别设置为DEBUG来打开完整日志记录:

logging.getLogger().setLevel(logging.DEBUG)
复制代码

在启用DEBUG后,运行此程序,以下所示:

$ ./fixrace.py
Testing locked update. Starting value is 0.
Thread 0: starting update
Thread 0 about to lock
Thread 0 has lock
Thread 1: starting update
Thread 1 about to lock
Thread 0 about to release lock
Thread 0 after release
Thread 0: finishing update
Thread 1 has lock
Thread 1 about to release lock
Thread 1 after release
Thread 1: finishing update
Testing locked update. Ending value is 2.
复制代码

在输出中,你能够看到Thread 0获得了锁,并在进入睡眠状态时仍保持锁定。而后Thread 1启动并尝试获取相同的锁。由于Thread 0仍在持有锁,Thread 1必须等待。这就是Lock的互斥性。

本文其他部分中的许多示例将日志设置为WARNINGDEBUG级别。咱们一般只是DEBUG级别的输出,由于DEBUG日志可能很是长。在日志记录打开的状况下尝试这些程序,看看它们能作什么。

死锁

在继续探索以前,应该先看看使用锁时的一个常见问题。如你所见,若是已经获取了Lock,则对.acquire()的二次调用将等到持有Lock的线程调用.release()。运行此代码时,你认为会发生什么状况?

import threading

l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")
复制代码

当程序第二次调用l.acquire()时,该函数将挂起,等待Lock的释放。在本例中,能够经过删除第二次调用来修复死锁,但死锁一般发生在如下两个微妙的事情之一:

  1. 未正确释放Lock的错误。
  2. 设计问题,其中一个函数须要由某些函数调用,这些函数可能具备或可能不具备Lock

第一种状况有时会发生,但使用Lock做为上下文管理器会大大减小错误出现的频率。建议尽量使用上下文管理器编写代码,由于它们有助于避免异常跳过.release()调用的状况。

在某些语言中,设计问题可能要复杂一些。值得庆幸的是,Python线程的又一个对象RLock就是为这种状况而设计的。它容许线程在调用.release()以前屡次经过.acquire()实现RLock。该线程中调用.release()的次数与调用.acquire()的次数相同。

LockRLock是线程中用来防止竞态条件的两个基本工具,还有一些其余工具以不一样的方式发挥做用。在你查看它们以前,让咱们转到一个稍微不一样的问题上。

未完待续

原文连接:realpython.com/intro-to-py…

关注微信公众号:老齐教室。读深度文章,得精湛技艺,享绚丽人生。

相关文章
相关标签/搜索