Python:使用threading模块实现多线程(转)python
分类: python 标签: thread 评论: 暂无评论 阅读:5,420 views算法
综述编程
Python这门解释性语言也有专门的线程模型,Python虚拟机使用GIL(Global Interpreter Lock,全局解释器锁)来互斥线程对共享资源的访问,但暂时没法利用多处理器的优点。安全
在Python中咱们主要是经过thread和 threading这两个模块来实现的,其中Python的threading模块是对thread作了一些包装的,能够更加方便的被使用,因此咱们使用 threading模块实现多线程编程。这篇文章咱们主要来看看Python对多线程编程的支持。多线程
在语言层面,Python对多线程提供了很好的支持,能够方便地支持建立线程、互斥锁、信号量、同步等特性。下面就是官网上介绍threading模块的基本资料及功能:app
实现模块函数
thread:多线程的底层支持模块,通常不建议使用;ui
threading:对thread进行了封装,将一些线程的操做对象化线程
threading模块unix
Thread 线程类,这是咱们用的最多的一个类,你能够指定线程函数执行或者继承自它均可以实现子线程功能;
Timer与Thread相似,但要等待一段时间后才开始运行;
Lock 锁原语,这个咱们能够对全局变量互斥时使用;
RLock 可重入锁,使单线程能够再次得到已经得到的锁;
Condition 条件变量,能让一个线程停下来,等待其余线程知足某个“条件”;
Event 通用的条件变量。多个线程能够等待某个事件发生,在事件发生后,全部的线程都被激活;
Semaphore为等待锁的线程提供一个相似“等候室”的结构;
BoundedSemaphore 与semaphore相似,但不容许超过初始值;
Queue:实现了多生产者(Producer)、多消费者(Consumer)的队列,支持锁原语,可以在多个线程之间提供很好的同步支持。
其中Thread类
是你主要的线程类,能够建立进程实例。该类提供的函数包括:
getName(self) 返回线程的名字
isAlive(self) 布尔标志,表示这个线程是否还在运行中
isDaemon(self) 返回线程的daemon标志
join(self, timeout=None) 程序挂起,直到线程结束,若是给出timeout,则最多阻塞timeout秒
run(self) 定义线程的功能函数
setDaemon(self, daemonic) 把线程的daemon标志设为daemonic
setName(self, name) 设置线程的名字
start(self) 开始线程执行
其中Queue提供的类
Queue队列
LifoQueue后入先出(LIFO)队列
PriorityQueue 优先队列
接下来,咱们将会用一个一个示例来展现threading的各个功能,包括但不限于:两种方式起线程、threading.Thread类的重要函数、使用Lock互斥及RLock实现重入锁、使用Condition实现生产者和消费者模型、使用Event和Semaphore多线程通讯
两种方式起线程
在Python中咱们主要是经过thread和threading这两个模块来实现的,其中Python的threading模块是对thread作了一些包装的,能够更加方便的被使用,因此咱们使用threading模块实现多线程编程。通常来讲,使用线程有两种模式,一种是建立线程要执行的函数,把这个函数传递进Thread对象里,让它来执行;另外一种是直接从Thread继承,建立一个新的class,把线程执行的代码放到这个新的 class里。
将函数传递进Thread对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading
def thread_fun(num):
for n in range(0, int(num)):
print " I come from %s, num: %s" %( threading.currentThread().getName(), n)
def main(thread_num):
thread_list = list();
# 先建立线程对象
for i in range(0, thread_num):
thread_name = "thread_%s" %i
thread_list.append(threading.Thread(target = thread_fun, name = thread_name, args = (20,)))
# 启动全部线程
for thread in thread_list:
thread.start()
# 主线程中等待全部子线程退出
for thread in thread_list:
thread.join()
if __name__ == "__main__":
main(3)
程序启动了3个线程,而且打印了每个线程的线程名字,这个比较简单吧,处理重复任务就派出用场了,下面介绍使用继承threading的方式;
继承自threading.Thread类:
1
2
3
4
5
6
7
8
9
10
11
12
13
import threading
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self);
def run(self):
print "I am %s" %self.name
if __name__ == "__main__":
for thread in range(0, 5):
t = MyThread()
t.start()
接下来,将会介绍如何控制这些线程,包括子线程的退出,子线程是否存活及将子线程设置为守护线程(Daemon)。
threading.Thread类的重要函数
介绍threading模块中的主类Thread的一些主要方法,实例代码以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
import threading
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
print "I am %s" % (self.name)
if __name__ == "__main__":
for i in range(0, 5):
my_thread = MyThread()
my_thread.start()
一、name相关
你能够为每个thread指定name,默认的是Thread-No形式的,如上述实例代码打印出的同样:
I am Thread-1
I am Thread-2
I am Thread-3
I am Thread-4
I am Thread-5
固然你能够指定每个thread的name,这个经过setName方法,代码:
1
2
3
def __init__(self):
threading.Thread.__init__(self)
self.setName("new" + self.name)
二、join方法
join方法原型以下,这个方法是用来阻塞当前上下文,直至该线程运行结束:
1
def join(self, timeout=None):
timeout能够设置超时时间
三、setDaemon方法
当咱们在程序运行中,执行一个主线程,若是主线程又建立一个子线程,主线程和子线程就分兵两路,当主线程完成想退出时,会检验子线程是否完成。若是子线程未完成,则主线程会等待子线程完成后再退出。可是有时候咱们须要的是,只要主线程完成了,无论子线程是否完成,都要和主线程一块儿退出,这时就能够用setDaemon方法,并设置其参数为True。
使用Lock互斥锁
如今咱们考虑这样一个问题:假设各个线程须要访问同一公共资源,咱们的代码该怎么写?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import time
counter = 0
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global counter
time.sleep(1);
counter += 1
print "I am %s, set counter:%s" % (self.name, counter)
if __name__ == "__main__":
for i in range(0, 200):
my_thread = MyThread()
my_thread.start()
解决上面的问题,咱们兴许会写出这样的代码,咱们假设跑200个线程,可是这200个线程都会去访问counter这个公共资源,并对该资源进行处理(counter += 1),代码看起来就是这个样了,可是咱们看下运行结果:
I am Thread-69, set counter:64
I am Thread-73, set counter:66I am Thread-74, set counter:67I am Thread-75, set counter:68I am Thread-76, set counter:69I am Thread-78, set counter:70I am Thread-77, set counter:71I am Thread-58, set counter:72I am Thread-60, set counter:73I am Thread-62, set counter:74I am Thread-66, set counter:75I am Thread-70, set counter:76I am Thread-72, set counter:77I am Thread-79, set counter:78I am Thread-71, set counter:78
打印结果我只贴了一部分,从中咱们已经看出了这个全局资源(counter)被抢占的状况,问题产生的缘由就是没有控制多个线程对同一资源的访问,对数据形成破坏,使得线程运行的结果不可预期。这种现象称为“线程不安全”。在开发过程当中咱们必需要避免这种状况,那怎么避免?这就用到了咱们在综述中提到的互斥锁了。
互斥锁概念
Python编程中,引入了对象互斥锁的概念,来保证共享数据操做的完整性。每一个对象都对应于一个可称为” 互斥锁” 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。在Python中咱们使用threading模块提供的Lock类。
咱们对上面的程序进行整改,为此咱们须要添加一个互斥锁变量mutex = threading.Lock(),而后在争夺资源的时候以前咱们会先抢占这把锁mutex.acquire(),对资源使用完成以后咱们在释放这把锁mutex.release()。代码以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time
counter = 0
mutex = threading.Lock()
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global counter, mutex
time.sleep(1);
if mutex.acquire():
counter += 1
print "I am %s, set counter:%s" % (self.name, counter)
mutex.release()
if __name__ == "__main__":
for i in range(0, 100):
my_thread = MyThread()
my_thread.start()
同步阻塞
当一个线程调用Lock对象的acquire()方法得到锁时,这把锁就进入“locked”状态。由于每次只有一个线程1能够得到锁,因此若是此时另外一个线程2试图得到这个锁,该线程2就会变为“block“同步阻塞状态。直到拥有锁的线程1调用锁的release()方法释放锁以后,该锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来得到锁,并使得该线程进入运行(running)状态。
进一步考虑
经过对公共资源使用互斥锁,这样就简单的到达了咱们的目的,可是若是咱们又遇到下面的状况:
一、遇到锁嵌套的状况该怎么办,这个嵌套是指当我一个线程在获取临界资源时,又须要再次获取;
二、若是有多个公共资源,在线程间共享多个资源的时候,若是两个线程分别占有一部分资源而且同时等待对方的资源;
上述这两种状况会直接形成程序挂起,即死锁,下面咱们会谈死锁及可重入锁RLock。
死锁的造成
前一篇文章Python:使用threading模块实现多线程编程四[使用Lock互斥锁]咱们已经开始涉及到如何使用互斥锁来保护咱们的公共资源了,如今考虑下面的状况–
若是有多个公共资源,在线程间共享多个资源的时候,若是两个线程分别占有一部分资源而且同时等待对方的资源,这会引发什么问题?
死锁概念
所谓死锁: 是指两个或两个以上的进程在执行过程当中,因争夺资源而形成的一种互相等待的现象,若无外力做用,它们都将没法推动下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。 因为资源占用是互斥的,当某个进程提出申请资源后,使得有关进程在无外力协助下,永远分配不到必需的资源而没法继续运行,这就产生了一种特殊现象死锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import threading
counterA = 0
counterB = 0
mutexA = threading.Lock()
mutexB = threading.Lock()
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
self.fun1()
self.fun2()
def fun1(self):
global mutexA, mutexB
if mutexA.acquire():
print "I am %s , get res: %s" %(self.name, "ResA")
if mutexB.acquire():
print "I am %s , get res: %s" %(self.name, "ResB")
mutexB.release()
mutexA.release()
def fun2(self):
global mutexA, mutexB
if mutexB.acquire():
print "I am %s , get res: %s" %(self.name, "ResB")
if mutexA.acquire():
print "I am %s , get res: %s" %(self.name, "ResA")
mutexA.release()
mutexB.release()
if __name__ == "__main__":
for i in range(0, 100):
my_thread = MyThread()
my_thread.start()
代码中展现了一个线程的两个功能函数分别在获取了一个竞争资源以后再次获取另外的竞争资源,咱们看运行结果:
I am Thread-1 , get res: ResA
I am Thread-1 , get res: ResB
I am Thread-2 , get res: ResAI am Thread-1 , get res: ResB
能够看到,程序已经挂起在那儿了,这种现象咱们就称之为”死锁“。
避免死锁
避免死锁主要方法就是:正确有序的分配资源,避免死锁算法中最有表明性的算法是Dijkstra E.W 于1968年提出的银行家算法。
可重入锁RLock
考虑这种状况:若是一个线程遇到锁嵌套的状况该怎么办,这个嵌套是指当我一个线程在获取临界资源时,又须要再次获取。
根据这种状况,代码以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading
import time
counter = 0
mutex = threading.Lock()
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global counter, mutex
time.sleep(1);
if mutex.acquire():
counter += 1
print "I am %s, set counter:%s" % (self.name, counter)
if mutex.acquire():
counter += 1
print "I am %s, set counter:%s" % (self.name, counter)
mutex.release()
mutex.release()
if __name__ == "__main__":
for i in range(0, 200):
my_thread = MyThread()
my_thread.start()
这种状况的代码运行状况以下:
I am Thread-1, set counter:1
以后就直接挂起了,这种状况造成了最简单的死锁。
那有没有一种状况能够在某一个线程使用互斥锁访问某一个竞争资源时,能够再次获取呢?在Python中为了支持在同一线程中屡次请求同一资源,python提供了“可重入锁”:threading.RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源能够被屡次require。直到一个线程全部的acquire都被release,其余的线程才能得到资源。上面的例子若是使用RLock代替Lock,则不会发生死锁:
代码只需将上述的:
1
mutex = threading.Lock()
替换成:
1
mutex = threading.RLock()
使用Condition实现复杂同步
目前咱们已经会使用Lock去对公共资源进行互斥访问了,也探讨了同一线程可使用RLock去重入锁,可是尽管如此咱们只不过才处理了一些程序中简单的同步现象,咱们甚至还不能很合理的去解决使用Lock锁带来的死锁问题。因此咱们得学会使用更深层的解决同步问题。
Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock相似的acquire和release方法外,还提供了wait和notify方法。
使用Condition的主要方式为:线程首先acquire一个条件变量,而后判断一些条件。若是条件不知足则wait;若是条件知足,进行一些处理改变条件后,经过notify方法通知其余线程,其余处于wait状态的线程接到通知后会从新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
下面咱们经过很著名的“生产者-消费者”模型来来演示下,在Python中使用Condition实现复杂同步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import threading
import time
condition = threading.Condition()
products = 0
class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global condition, products
while True:
if condition.acquire():
if products < 10:
products += 1;
print "Producer(%s):deliver one, now products:%s" %(self.name, products)
condition.notify()
else:
print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)
condition.wait();
condition.release()
time.sleep(2)
class Consumer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global condition, products
while True:
if condition.acquire():
if products > 1:
products -= 1
print "Consumer(%s):consume one, now products:%s" %(self.name, products)
condition.notify()
else:
print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)
condition.wait();
condition.release()
time.sleep(2)
if __name__ == "__main__":
for p in range(0, 2):
p = Producer()
p.start()
for c in range(0, 10):
c = Consumer()
c.start()
代码中主要实现了生产者和消费者线程,双方将会围绕products来产生同步问题,首先是2个生成者生产products ,而接下来的10个消费者将会消耗products,代码运行以下:
Producer(Thread-1):deliver one, now products:1
Producer(Thread-2):deliver one, now products:2
Consumer(Thread-3):consume one, now products:1
Consumer(Thread-4):only 1, stop consume, products:1
Consumer(Thread-5):only 1, stop consume, products:1
Consumer(Thread-6):only 1, stop consume, products:1
Consumer(Thread-7):only 1, stop consume, products:1
Consumer(Thread-8):only 1, stop consume, products:1
Consumer(Thread-10):only 1, stop consume, products:1
Consumer(Thread-9):only 1, stop consume, products:1
Consumer(Thread-12):only 1, stop consume, products:1
Consumer(Thread-11):only 1, stop consume, products:1
另外:Condition对象的构造函数能够接受一个Lock/RLock对象做为参数,若是没有指定,则Condition对象会在内部自行建立一个RLock;除了notify方法外,Condition对象还提供了notifyAll方法,能够通知waiting池中的全部线程尝试acquire内部锁。因为上述机制,处于waiting状态的线程只能经过notify方法唤醒,因此notifyAll的做用在于防止有线程永远处于沉默状态。
使用Event实现线程间通讯
使用threading.Event能够实现线程间相互通讯,以前的Python:使用threading模块实现多线程编程七[使用Condition实现复杂同步]咱们已经初步实现了线程间通讯的基本功能,可是更为通用的一种作法是使用threading.Event对象。
使用threading.Event可使一个线程等待其余线程的通知,咱们把这个Event传递到线程对象中,Event默认内置了一个标志,初始值为False。一旦该线程经过wait()方法进入等待状态,直到另外一个线程调用该Event的set()方法将内置标志设置为True时,该Event会通知全部等待状态的线程恢复运行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading
import time
class MyThread(threading.Thread):
def __init__(self, signal):
threading.Thread.__init__(self)
self.singal = signal
def run(self):
print "I am %s,I will sleep ..."%self.name
self.singal.wait()
print "I am %s, I awake..." %self.name
if __name__ == "__main__":
singal = threading.Event()
for t in range(0, 3):
thread = MyThread(singal)
thread.start()
print "main thread sleep 3 seconds... "
time.sleep(3)
singal.set()
运行效果以下:
I am Thread-1,I will sleep ...
I am Thread-2,I will sleep ...
I am Thread-3,I will sleep ...
main thread sleep 3 seconds...
I am Thread-1, I awake...I am Thread-2, I awake...
I am Thread-3, I awake...
来自:http://www.ourunix.org/
http://xlambda.com/gevent-tutorial/