并发程序含有多个逻辑上的独立执行块,他们能够独立的并行执行,也能够串行执行。 并行程序解决问题的速度比串行程序快的多,由于其能够同时执行整个任务的多个部分。并行程序可能有多个独立执行块,也可能只有一个。node
引用Rob Pike的经典描述就是: 并发是同一时间应对多件事情的能力; 并行是同一时间动手作多件事情的能力。python
常见的并发模型有:git
这篇主要介绍线程与锁模型github
线程与锁模型是对底层硬件运行过程的形式化,很是简单直接,几乎全部的编程语言都对其提供了支持,且不对其使用方法加以限制(易出错)。算法
这篇文章主要使用python语言来演示线程与锁模型。文章结构来自《七周七并发模型》编程
from threading import Thread
def hello_world():
print("Hello from new thread")
def main():
my_thread = Thread(target=hello_world)
my_thread.start()
print("Hello from main thread")
my_thread.join()
main()
复制代码
这段代码建立并启动了一个Thread
实例,首先从start()
开始,my_thread.start() main()
函数的余下部分一块儿并发执行。最后调用join()
来等待my_thread
线程结束。缓存
运行这段代码输出结果有几种:安全
Hello from new thread
Hello from main thread
复制代码
或者bash
Hello from main thread
Hello from new thread
复制代码
或者多线程
Hello from new threadHello from main thread
复制代码
究竟哪一个结果取决于哪一个线程先执行print()
。多线程编程很难的缘由之一就是运行结果可能依赖于时序,屡次运行结果并不稳定。
from threading import Thread, Lock
class Counter(object):
def __init__(self, count=0):
self.count = count
def increment(self):
self.count += 1
def get_count(self):
print("Count: %s" % self.count)
return self.count
def test_count():
counter = Counter()
class CounterThread(Thread):
def run(self):
for i in range(10000):
counter.increment()
t1 = CounterThread()
t2 = CounterThread()
t1.start()
t2.start()
t1.join()
t2.join()
counter.get_count()
test_count()
复制代码
这段代码建立一个简单的类Counter 和两个线程,每一个线程都调用counter.increment() 10000次。
屡次运行这段代码会获得不一样的值,缘由是两个线程在使用 counter.count 时发生了竞态条件
(代码行为取决于各操做的时序)。
一个可能的操做是:
竞态条件的解决方法是对 count 进行同步(synchronize)访问。一种操做是使用 内置锁
(也称互斥锁(mutex)、管程(monitor)或临界区(critical section))来同步对increment() 的调用。
线程同步可以保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其余线程不能更改; 直到该线程释放资源,将资源的状态变成“非锁定”,其余的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操做,从而保证了多线程状况下数据的正确性。
当一个线程调用锁的acquire()方法得到锁时,锁就进入“locked”状态。每次只有一个线程能够得到锁。若是此时另外一个线程试图得到这个锁,该线程就会变为“blocked”状态,称为“同步阻塞”。 直到拥有锁的线程调用锁的release()方法释放锁以后,锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来得到锁,并使得该线程进入运行(running)状态。
python 锁的使用流程以下:
#建立锁
mutex = threading.Lock()
#锁定
mutex.acquire([timeout])
#释放
mutex.release()
复制代码
推荐使用上下文管理器来操做锁,
with lock:
do someting
# 至关于
lock.acquire()
try:
# do something...
finally:
lock.release()
复制代码
acquire(blocking=True, timeout=-1) 能够阻塞或非阻塞地得到锁。 当调用时参数 blocking 设置为 True (缺省值),阻塞直到锁被释放,而后将锁锁定并返回 True 。 在参数 blocking 被设置为 False 的状况下调用,将不会发生阻塞。若是调用时 blocking 设为 True 会阻塞,并当即返回 False ;不然,将锁锁定并返回 True。 当浮点型 timeout 参数被设置为正值调用时,只要没法得到锁,将最多阻塞 timeout 设定的秒数。timeout 参数被设置为 -1 时将无限等待。当 blocking 为 false 时,timeout 指定的值将被忽略。 若是成功得到锁,则返回 True,不然返回 False (例如发生 超时 的时候)。 timeout 参数须要
python3.2+
from threading import Thread, Lock
mutex = Lock()
class SynchronizeCounter(object):
def __init__(self, count=0):
self.count = count
def increment(self):
# if mutex.acquire(1): # 获取锁
# self.count += 1
# mutex.release() # 释放锁
# 等同于上述代码
with mutex:
self.count += 1
def get_count(self):
print("Count: %s" % self.count)
return self.count
def test_synchronize_count():
counter = SynchronizeCounter()
class CounterThread(Thread):
def run(self):
for i in range(100000):
counter.increment()
t1 = CounterThread()
t2 = CounterThread()
t1.start()
t2.start()
t1.join()
t2.join()
counter.get_count()
if __name__ == "__main__":
for i in range(100):
test_synchronize_count()
复制代码
这段代码还有一个隐藏的bug,那就是 get_count(),这里get_count() 是在join()以后调用的,所以是线程安全的,可是若是在其它地方调用了 get_count() 函数。 因为在 get_count() 中没有进行线程同步,调用时可能会获取到一个失效的值。
对于JAVA等竞态编译语言,
更糟糕的是,有时一个线程产生的修改可能会对另外一个线程不可见。
从直觉上来讲,编译器、JVM、硬件都不该插手修改本来的代码逻辑。可是近几年的运行效率提高,尤为是共享内存交媾的运行效率提高,都仰仗于此类代码优化。 具体的反作用,Java 内存模型有明确说明。 Java 内存模型定义了什么时候一个线程对内存的修改对另外一个线程可见。
基本原则是:若是读线程和写线程不进行同步,就不能保证可见性。
一个重点: 两个线程都须要进行同步。只在其中一个线程进行同步是不够的。
可若是全部的方法都同步,大多数线程可能都会被阻塞,失去了并发的意义,而且可能会出现死锁。
哲学家就餐问题
:假设有五位哲学家围坐在一张圆形餐桌旁,作如下两件事情之一:吃饭,或者思考。吃东西的时候,他们就中止思考,思考的时候也中止吃东西。餐桌中间有一大碗意大利面,每两个哲学家之间有一只餐叉。由于用一只餐叉很难吃到意大利面,因此假设哲学家必须用两只餐叉吃东西。他们只能使用本身左右手边的那两只餐叉。
哲学家历来不交谈,这就很危险,可能产生死锁,每一个哲学家都拿着左手的餐叉,永远都在等右边的餐叉(或者相反)。 即便没有死锁,也有可能发生资源耗尽。例如,假设规定当哲学家等待另外一只餐叉超过五分钟后就放下本身手里的那一只餐叉,而且再等五分钟后进行下一次尝试。这个策略消除了死锁(系统总会进入到下一个状态),但仍然有可能发生“活锁”。若是五位哲学家在彻底相同的时刻进入餐厅,并同时拿起左边的餐叉,那么这些哲学家就会等待五分钟,同时放下手中的餐叉,再等五分钟,又同时拿起这些餐叉。
下面是哲学家进餐问题的一个实现:
import threading
import random
import time
class Philosopher(threading.Thread):
running = True
def __init__(self, xname, forkOnLeft, forkOnRight):
threading.Thread.__init__(self)
self.name = xname
self.forkOnLeft = forkOnLeft
self.forkOnRight = forkOnRight
def run(self):
while self.running:
# Philosopher is thinking (but really is sleeping).
time.sleep(random.uniform(1, 3))
print("%s is hungry." % self.name)
self.dine()
def dine(self):
fork1, fork2 = self.forkOnLeft, self.forkOnRight
while self.running:
fork1.acquire(True) # 阻塞式获取left 锁
# locked = fork2.acquire(True) # 阻塞式 获取right 锁 容易产生死锁
locked = fork2.acquire(False) # 非阻塞式 获取right 锁
if locked:
break # 若是被锁定,释放 left 退出等待
fork1.release()
print("%s swaps forks" % self.name)
fork1, fork2 = fork2, fork1
else:
return
self.dining()
fork2.release()
fork1.release()
def dining(self):
print("%s starts eating " % self.name)
time.sleep(random.uniform(1, 5))
print("%s finishes eating and leaves to think." % self.name)
def DiningPhilosophers():
forks = [threading.Lock() for n in range(5)]
philosopherNames = ("Aristotle", "Kant", "Buddha", "Marx", "Russel")
philosophers = [
Philosopher(philosopherNames[i], forks[i % 5], forks[(i + 1) % 5])
for i in range(5)
]
Philosopher.running = True
for p in philosophers:
p.start()
for p in philosophers:
p.join()
time.sleep(100)
Philosopher.running = False
print("Now we're finishing.")
DiningPhilosophers()
复制代码
规模较大的程序经常使用监听器模式来解耦模块,这里咱们构造一个类从一个URL进行下载,Listeners 监听下载进度。
import requests
import threading
class Listeners(object):
def __init__(self, count=0):
self.count = count
self.done_count = 0.0
self.listeners = []
def append(self, listener):
self.listeners.append(listener)
def remove(self, listener):
self.listeners.remove(listener)
def on_progress(self, n):
# 一些咱们不知道的实现
# do someting
# self.done_count += 1
# print("Process: %f" % (self.done_count / self.count))
pass
listeners = Listeners(5)
class Downloader(threading.Thread):
def __init__( self, group=None, target=None, name=None, args=(), kwargs=None, daemon=None ):
threading.Thread.__init__(
self, group=group, target=target, name=name, daemon=daemon
)
self.url = kwargs.get("url")
def download(self):
resp = requests.get(self.url)
def add_listener(self, listener):
listeners.append(listener)
def remove_listener(self, listener):
listeners.delete(listener)
def update_progress(self, n):
for listener in listeners:
listner.on_progress(n)
def run(self):
self.download()
print(self.url)
listeners.on_progress(1)
def test():
urls = [
"https://www.baidu.com",
"https://www.google.com",
"https://www.bing.com",
"https://www.zaih.com",
"https://www.github.com",
]
ts = [Downloader(kwargs=dict(url=url)) for url in urls]
print(ts)
[t.start() for t in ts]
[t.join() for t in ts]
if __name__ == "__main__":
test()
复制代码
这段代码中,add_listener, remove_listener 和 update_progress 都是同步方法,但 update_progress 调用了一个咱们不知道如何实现的方法。若是这个方法中,获取了一把锁,程序在执行的过程当中就可能发生死锁。因此,咱们要尽可能避免使用这种方法。还有一种方法是在遍历以前对 listeners 进行保护性复制,再针对这份副本进行遍历。(如今调用外星方法再也不须要加锁)
Lock() 虽然方便,但限制不少:
重入锁是(threading.RLock)一个能够被同一个线程屡次获取的同步基元组件。在内部,它在基元锁的锁定/非锁定状态上附加了 "所属线程" 和 "递归等级" 的概念。在锁定状态下,某些线程拥有锁 ; 在非锁定状态下, 没有线程拥有它。
若要锁定锁,线程调用其 acquire() 方法;一旦线程拥有了锁,方法将返回。若要解锁,线程调用 release() 方法。 acquire()/release() 对能够嵌套;只有最终 release() (最外面一对的 release() ) 将锁解开,才能让其余线程继续处理 acquire() 阻塞。
threading.RLock 提供了显式的 acquire() 和 release() 方法 一个好的实践是:
lock = threading.RLock()
复制代码
Lock 和 RLock 的使用区别以下:
#rlock_tut.py
import threading
num = 0
lock = Threading.Lock()
lock.acquire()
num += 1
lock.acquire() # 这里会被阻塞
num += 2
lock.release()
# With RLock, that problem doesn’t happen.
lock = Threading.RLock()
lock.acquire()
num += 3
lock.acquire() # 不会被阻塞.
num += 4
lock.release()
lock.release() # 两个锁都须要调用 release() 来释放.
复制代码
使用内置锁时,阻塞的线程没法被中断,程序不能从死锁恢复,能够给锁设置超时时间来解决这个问题。
timeout 参数须要 python3.2+
import time
from threading import Thread, Lock
lock1 = RLock()
lock2 = RLock()
# 这个程序会一直死锁下去,若是想突破这个限制,能够在获取锁的时候加上超时时间
# > python threading 没有实现 销毁(destroy),中止(stop),暂停(suspend),继续(resume),中断(interrupt)等
class T1(Thread):
def run(self):
print("start run T1")
lock1.acquire()
# lock1.acquire(timeout=2) # 设置超时时间可避免死锁
time.sleep(1)
lock2.acquire()
# lock2.acquire(timeout=2) # 设置超时时间可避免死锁
lock1.release()
lock2.release()
class T2(Thread):
def run(self):
print("start run T2")
lock2.acquire()
# lock2.acquire(timeout=2) # 设置超时时间可避免死锁
time.sleep(1)
lock1.acquire()
# lock1.acquire(timeout=2) # 设置超时时间可避免死锁
lock2.release()
lock1.release()
def test():
t1, t2 = T1(), T2()
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == "__main__":
test()
复制代码
若是咱们要在链表中插入一个节点。一种作法是用锁保护整个链表,但链表加锁时其它使用者没法访问。交替锁能够只所追杀链表的一部分,容许不涉及被锁部分的其它线程自由访问。
from random import randint
from threading import Thread, Lock
class Node(object):
def __init__(self, value, prev=None, next=None):
self.value = value
self.prev = prev
self.next = next
self.lock = Lock()
class SortedList(Thread):
def __init__(self, head):
Thread.__init__(self)
self.head = head
def insert(self, value):
head = self.head
node = Node(value)
print("insert: %d" % value)
while True:
if head.value <= value:
if head.next != None:
head = head.next
else:
head.lock.acquire()
head.next = node
node.prev = head
head.lock.release()
break
else:
prev = head.prev
prev.lock.acquire()
head.lock.acquire()
if prev != None:
prev.next = node
else:
self.head = node
node.prev = prev
prev.lock.release()
node.next = head
head.prev = node
head.lock.release()
break
def run(self):
for i in range(5):
self.insert(randint(10, 20))
def test():
head = Node(10)
t1 = SortedList(head)
t2 = SortedList(head)
t1.start()
t2.start()
t1.join()
t2.join()
while head:
print(head.value)
head = head.next
if __name__ == "__main__":
test()
复制代码
这种方案不只可让多个线程并发的进行链表插入操做,还能让其余的链表操做安全的并发。
并发编程常常须要等待某个事件发生。好比从队列删除元素前须要等待队列非空、向缓存添加数据前须要等待缓存有足够的空间。条件变量就是为这种状况设计的。
条件变量老是与某种类型的锁对象相关联,锁对象能够经过传入得到,或者在缺省的状况下自动建立。当多个条件变量须要共享同一个锁时,传入一个锁颇有用。锁是条件对象的一部分,没必要单独地跟踪它。
条件变量服从上下文管理协议:使用 with 语句会在它包围的代码块内获取关联的锁。 acquire() 和 release() 方法也能调用关联锁的相关方法。
其它方法必须在持有关联的锁的状况下调用。 wait() 方法释放锁,而后阻塞直到其它线程调用 notify() 方法或 notify_all() 方法唤醒它。一旦被唤醒, wait() 方法从新获取锁并返回。它也能够指定超时时间。
#condition_tut.py
import random, time
from threading import Condition, Thread
""" 'condition' variable will be used to represent the availability of a produced item. """
condition = Condition()
box = []
def producer(box, nitems):
for i in range(nitems):
time.sleep(random.randrange(2, 5)) # Sleeps for some time.
condition.acquire()
num = random.randint(1, 10)
box.append(num) # Puts an item into box for consumption.
condition.notify() # Notifies the consumer about the availability.
print("Produced:", num)
condition.release()
def consumer(box, nitems):
for i in range(nitems):
condition.acquire()
condition.wait() # Blocks until an item is available for consumption.
print("%s: Acquired: %s" % (time.ctime(), box.pop()))
condition.release()
threads = []
""" 'nloops' is the number of times an item will be produced and consumed. """
nloops = random.randrange(3, 6)
for func in [producer, consumer]:
threads.append(Thread(target=func, args=(box, nloops)))
threads[-1].start() # Starts the thread.
for thread in threads:
"""Waits for the threads to complete before moving on with the main script. """
thread.join()
print("All done.")
复制代码
与锁相比使用原子变量的优势:
- 不会忘记在正确的时候获取锁
- 因为没有锁的参与,对原子变量的操做不会引起死锁。
- 原子变量时无锁(lock-free)非阻塞(non-blocking)算法的基础,这种算法能够不用锁和阻塞来达到同步的目的。
python 不支持原子变量
线程与锁模型最大的优势是适用面广,更接近于“本质”--近似于对硬件工做方式的形式化--正确使用时效率高。 此外,线程与锁模型也可轻松的集成到大多数编程语言。
[1] 哲学家进餐问题: zh.wikipedia.org/wiki/哲学家就餐问… [2] Let’s Synchronize Threads in Python: hackernoon.com/synchroniza…