线程,程序执行的最小单元,单线程处理多个任务只能一个处理完后继续处理下一个直到所有处理完,多线程处理任务会比单线程处理起来快吗?在python程序里得看状况,首先有GIL锁的存在致使同一时刻只能有一个线程执行(执行遇到中断释放GIL锁),这乍一看和单线程处理多任务没有区别,可是若是执行的任务是I/O密集型任务就可以提升任务执行效率,但若是任务是CPU密集型任务显然得不到任何效率提高,反而还会由于上下文切换等致使执行不如单线程执行。html
Python中实现多线程模块推荐使用threading,threading获得了java线程的启示。Queue模块对于线程同步是十分有用的,该模块提供了一个同步FIFO队列类型,这个类型很是便于处理线程之间的通讯和协调。java
__all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 'enumerate', 'main_thread', 'TIMEOUT_MAX', 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size']
threading模块提供对象 | 做用 |
---|---|
current_thread/currentThread | 为调用线程返回一个Thread对象。若是调用线程不是由threading模块建立的将建立并返回一个具备有限功能的Thread对象 |
Thread | 表示控制线程的类,建立一个线程能够直接实例化一个Thread对象,直接实例化时须要指定参数target,t对象调用run方法即执行target(*args, **kwargs); 还能够同个继承Thread类建立一个线程,这样建立须要覆盖run方法而不须要传参target。 |
getName,setName | t.getName(), t.setName(name) getName返回t的名称,setName设置t的名称。线程名能够是任意字符串而且不要求惟一。 |
isAlive | t.isAlive() 若是t是活动的,将返回True(t.start()已经执行,t.run()尚未结束)。不然返False |
isDaemon, setDaemon | t.isDaemon() t.setDaemon(daemonic) 若是t是一个驻留程序(即便t仍然是活动的,Python能够终止整个处理过程,这样能够结束线程t),isDaemon将返回True,不然返回False。最开始,当且仅当建立t的线程是一个驻留程序时,t才是一个驻留程序。只能在t.start()以前调用t.setDaemon(True)把t设置为一个驻留程序。通俗地讲正常状况下若是主线程执行内容中有两个子线程任务要执行,若是给子线程设置setDaemon(True),那么当主线程任务结束时两个子线程仍是活动的时候主线程不会等待子线程执行完成。默认状况下都是False |
join | t.join(timeout=None) 建立这个线程的线程将会挂起直到t结束。只能在t.start()以后调用t.join() |
run | t.run() run是执行代码(任务)的方法,不过统一使用t.start(), 让t..start()去调用执行run |
start | t.start() start可让t活动,并让run在一个单独的线程中执行,一个线程对象只能调用一次run方法 |
import threading import time def cal(n): print(">>>>>>>>%s" % n) time.sleep(n) temp = n * n * n print("result: {}".format(temp)) with open('{}.txt'.format(n), 'w') as f: f.write('{}abcde'.format(n)) print("total_thread_count is: {}".format(threading.active_count())) if __name__ == '__main__': t1 = threading.Thread(target=cal, args=(2,)) t2 = threading.Thread(target=cal, args=(3,)) t = threading.current_thread() print(t.isDaemon()) # False t2.start() t1.start() print("main thread ending") # 执行结果 """ False >>>>>>>>3 >>>>>>>>2 main thread ending result: 8 total_thread_count is: 3 result: 27 total_thread_count is: 2 """ # 同目录下生成2.txt和3.txt文件
import threading import time def cal(n): print(">>>>>>>>%s" % n) time.sleep(n) temp = n * n * n print("result: {}".format(temp)) with open('{}.txt'.format(n), 'w') as f: f.write('{}abcde'.format(n)) print("total_thread_count is: {}".format(threading.active_count())) if __name__ == '__main__': t1 = threading.Thread(target=cal, args=(2,)) t2 = threading.Thread(target=cal, args=(3,)) t = threading.current_thread() t1.setDaemon(True) t2.setDaemon(True) print(t.isDaemon()) # False t2.start() t1.start() print("main thread ending") # 执行结果 ''' False >>>>>>>>3 >>>>>>>>2 main thread ending ''' # 并不会生成2.txt和3.txt文件
import threading import time class CalTask(threading.Thread): def __init__(self, num): super(CalTask, self).__init__() self.num = num def run(self): n = self.num print(">>>>>>>>%s" % n) time.sleep(n) temp = n * n * n print("result: {}".format(temp)) with open('{}.txt'.format(n), 'w') as f: f.write('{}abcde'.format(n)) print("total_thread_count is: {}".format(threading.active_count())) if __name__ == '__main__': t1 = CalTask(num=2) t2 = CalTask(num=3) # t = threading.current_thread() t = threading.currentThread() # t1.setDaemon(True) t2.setDaemon(True) print(t.isDaemon()) # False t2.start() t1.start() print("main thread ending")
Lock(互斥锁)是在多线程编程中线程同步控制的一种方法。在编程中遇到多个线程都修改同一个共享数据的时候就须要考虑线程同步控制。python
线程同步可以保证多个线程安全访问竞争资源。互斥锁为资源引入两个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其余线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其余的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操做,从而保证了多线程状况下数据的正确性。redis
import threading import time total = 100 class CalTask(threading.Thread): def __init__(self): super(CalTask, self).__init__() def run(self): global total print(">>>>>>: ", total) time.sleep(0.0001) total -= 1 if __name__ == '__main__': for i in range(100): t = CalTask() t.start() print(threading.active_count()) while True: if threading.active_count() == 1: break print(total) ''' 执行结果: >>>>>>: 100 >>>>>>: 100 >>>>>>: 100 >>>>>>: 100 >>>>>>: 100 >>>>>>: 100 >>>>>>: 99 >>>>>>: 99 >>>>>>: 99 >>>>>>: 99 >>>>>>: 95 >>>>>>: 95 ... 9 0 ''' # 最终结果是total为0,每次直接修改全局变量total自己和Python存在的GIL锁的缘由致使结果最终为0
import threading import time total = 100 class CalTask(threading.Thread): def __init__(self): super(CalTask, self).__init__() def run(self): global total print(">>>>>>: ", total) temp = total time.sleep(0.0001) total = temp - 1 if __name__ == '__main__': for i in range(100): t = CalTask() t.start() print(threading.active_count()) while True: if threading.active_count() == 1: break print(total) ''' 执行结果: >>>>>>: 100 >>>>>>: 100 >>>>>>: 100 >>>>>>: 100 >>>>>>: 100 >>>>>>: 100 >>>>>>: 99 >>>>>>: 99 >>>>>>: 99 >>>>>>: 99 >>>>>>: 98 >>>>>>: 98 ... 7 89 ''' # 最终结果是total为89,这和代码实例一对比结果不一样的缘由是由于虽然有GIL锁的存在,但如今是经过一个中间变量来操做而后从新赋值给total,这样会形成数据不安全
import threading import time total = 100 lock = threading.Lock() class CalTask(threading.Thread): def __init__(self): super(CalTask, self).__init__() def run(self): global total lock.acquire() print(">>>>>>: ", total) temp = total time.sleep(0.0001) total = temp - 1 lock.release() if __name__ == '__main__': for i in range(100): t = CalTask() t.start() print(threading.active_count()) while True: if threading.active_count() == 1: break print(total) ''' 执行结果: >>>>>>: 100 >>>>>>: 99 >>>>>>: 98 >>>>>>: 97 >>>>>>: 96 >>>>>>: 95 >>>>>>: 94 >>>>>>: 93 >>>>>>: 92 >>>>>>: 91 >>>>>>: 90 >>>>>>: 89 90 >>>>>>: 88 >>>>>>: 87 ... 0 '''
import threading import time total = 100 lock = threading.Lock() class CalTask(threading.Thread): def __init__(self): super(CalTask, self).__init__() def run(self): global total print(">>>>>>: ", total) temp = total time.sleep(0.0001) total = temp - 1 if __name__ == '__main__': for i in range(100): t = CalTask() t.start() t.join() print(threading.active_count()) while True: if threading.active_count() == 1: break print(total) ''' 执行结果: >>>>>>: 100 >>>>>>: 99 >>>>>>: 98 >>>>>>: 97 >>>>>>: 96 >>>>>>: 95 >>>>>>: 94 >>>>>>: 93 >>>>>>: 92 >>>>>>: 91 >>>>>>: 90 >>>>>>: 89 >>>>>>: 88 >>>>>>: 87 ... 1 0 ''' # 串行执行,执行效率否则代码3(加锁)
join是等待全部,即总体串行,而锁只是锁住修改共享数据的部分,即部分‘串行’,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁均可以实现,毫无疑问,互斥锁的部分串行效率要更高编程
死锁: 是指两个或两个以上的进程或线程在执行过程当中,因争夺资源而形成的一种互相等待的现象,若无外力做用,它们都将没法推动下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程安全
from threading import Thread, Lock import time mutexA = Lock() mutexB = Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('33[41m%s 拿到A锁33[0m' % self.name) mutexB.acquire() print('33[42m%s 拿到B锁33[0m' % self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('33[43m%s 拿到B锁33[0m' % self.name) time.sleep(2) mutexA.acquire() print('33[44m%s 拿到A锁33[0m' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t = MyThread() t.start() ''' 33[41mThread-1 拿到A锁33[0m 33[42mThread-1 拿到B锁33[0m 33[43mThread-1 拿到B锁33[0m 33[41mThread-2 拿到A锁33[0m 死锁 '''
from threading import Thread, RLock import time mutexA = RLock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('33[41m%s 拿到A锁33[0m' % self.name) mutexA.acquire() print('33[42m%s 拿到B锁33[0m' % self.name) mutexA.release() mutexA.release() def func2(self): mutexA.acquire() print('33[43m%s 拿到B锁33[0m' % self.name) time.sleep(2) mutexA.acquire() print('33[44m%s 拿到A锁33[0m' % self.name) mutexA.release() mutexA.release() if __name__ == '__main__': for i in range(10): t = MyThread() t.start()
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源能够被屡次require。直到一个线程全部的acquire都被release,其余的线程才能得到资源。多线程
Condition对象提供了对复杂线程同步问题的支持。架构
Condition | class Condition(lock=None) Condition能够建立并返回一个新Condition对象,并将对象L设置为lock,若是lock=None,L将被设置为一个新建立的RLock对象 |
---|---|
acquire,release | c.acquire(wait=1) c.release() 这些方法将调用L的对应方法,除非一个线程拥有锁L,不然该线程不能对c调用任何其余方法。 |
notify,notifyAll | c.notify() c.notifyAll() notify能够唤醒正在等待c的线程中的某一个线程。调用线程在调用c.notify()以前必须拥有L,而且notify不会释放L。除非被唤醒的线程能够再次获得L,不然该线程不会变为就绪状态。所以,调用线程一般会在调用notify以后调用release。notifyAll相似于notify,区别在于notifyAll将唤醒全部正在等待的线程而不仅是其中的一个 |
wait | c.wait(timeout=None) wait将释放L, 而后挂起调用线程,直到其它线程对c调用notify或notifyAll。调用线程在调用c.wait()以前必须拥有L。 |
import threading import time class Seeker(threading.Thread): def __init__(self, cond, name): super(Seeker, self).__init__() self.cond = cond self.name = name def run(self): time.sleep(1) # 确保先运行Hider中的方法 self.cond.acquire() # 2 print(self.name + ': 我已经把眼睛蒙上了') self.cond.notify() print(self.name + ": 速度啊大兄弟") self.cond.wait() # 3 # 6 print(self.name + ': 我找到你了 大兄弟') self.cond.notify() self.cond.release() # 7 print(self.name + ': 我赢了') # 8 class Hider(threading.Thread): def __init__(self, cond, name): super(Hider, self).__init__() self.cond = cond self.name = name def run(self): self.cond.acquire() self.cond.wait() # 1 #释放对琐的占用,同时线程挂起在这里,直到被notify并从新占有琐。 # 4 print(self.name + ': 我已经藏好了,你快来找我吧') self.cond.notify() self.cond.wait() # 5 # 8 self.cond.release() print(self.name + ': 被你找到了,大爷的') cond = threading.Condition() seeker = Seeker(cond, 'seeker') hider = Hider(cond, 'hider') seeker.start() hider.start() ''' 执行结果: seeker: 我已经把眼睛蒙上了 seeker: 速度啊大兄弟 hider: 我已经藏好了,你快来找我吧 seeker: 我找到你了 大兄弟 seeker: 我赢了 hider: 被你找到了,大爷的 '''
Event对象可让任意数量的线程挂起并等待。等待事件对象e的全部线程在任何其余线程调用e.set()时将变为就绪状态。事件对象e有一个标记,能够记录该事件是否已经发送;在e被建立时,这个标记的初始值为False。Event就是这样一个相似于简化的Condition。并发
方法 | 做用 |
---|---|
Event | Event能够建立并返回一个新事件对象e,而且e的标记被设置为False |
clear | e.clear() 将e的标记设置为False |
isSet | e.isSet() 返回e的标记的值,True或False |
set | e.set() 将e的标记设置为True。全部等待e的线程将变为就绪 |
wait | e.wait(timeout=None) 若是e的标记为True,wait将当即返回。不然,wait将挂起调用线程,直到其余一些线程调用 |
import threading import time import logging logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) def worker(event): logging.debug('Waiting for redis ready...') event.wait() logging.debug( 'redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1) def main(): readis_ready = threading.Event() t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1') t1.start() t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2') t2.start() logging.debug( 'first of all, check redis server, make sure it is OK, and then trigger the redis ready event') time.sleep(3) # simulate the check progress readis_ready.set() if __name__ == "__main__": main() ''' 执行结果: (t1 ) Waiting for redis ready... (t2 ) Waiting for redis ready... (MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event (t1 ) redis ready, and connect to redis server and do some work [Thu Feb 14 14:58:53 2019] (t2 ) redis ready, and connect to redis server and do some work [Thu Feb 14 14:58:53 2019] '''
信号量(也被称为计数信号量,counting semaphore)是广义上的锁。Lock的状态能够被看做是True或False,信号量对象s的状态是一个0~n的数字,n是在s被建立设置的。信号量能够用于管理固定的资源池,相比信号量使用队列来实现这个功能更健壮一些。app
方法 | 做用 |
---|---|
Semaphore | class Semaphore(n=1) Semaphore能够建立并返回一个状态被设置为n的信号量对象s |
acquire | s.acquire(wait=True) 在s的状态大于0时,acquire将把状态值减1并返回True,在s的状态为0而且wait为True时,acquire将挂起调用线程并等待,直到其余一些线程调用了s.release。在s的状态为0,而且wait为False时,acquire将当即返回False |
release | s.release() 在s的状态大于0,或者在状态为0可是没有线程正在等待s时,release将把状态值增长1,在s的状态为0而且有线程正在等待s时,release将把s的状态设为0,并唤醒任意一个等待线程。调用release的线程将再也不被挂起,该线程将保持就绪,并继续正常执行 |
import threading import time semaphore = threading.Semaphore(5) # 设置同时能够有5个线程能够得到信号量锁 def func(): if semaphore.acquire(): print(threading.currentThread().getName() + ' get semaphore') time.sleep(2) semaphore.release() for i in range(20): t1 = threading.Thread(target=func) t1.start() # 执行结果:每两秒并发执行5个线程任务,即每2秒打印5次
threading模块提供了一个local类,线程可使用这个类得到线程本地存储,也被称为每线程数据。线程本地存储对象能够set设置和get获取属性,能够在__dict__中获取到,这个本地存储对象是线程安全的,多个线程同时设置和得到对象的属性是不会有问题的。
示例代码:
import threading L = threading.local() print("in main thread, setting zop to 42") L.zop = 42 def targ(): print("in subthread, setting zop to 23") L.zop = 23 print("in subthread, setting zop is now ", L.zop) if __name__ == '__main__': t = threading.Thread(target=targ) t.start() t.join() print("in main thread, setting zop is now ", L.zop) # 执行结果 """ in main thread, setting zop to 42 in subthread, setting zop to 23 in subthread, setting zop is now 23 in main thread, setting zop is now 42 """
只要线程程序必须处理某些外部对象,能够专门指定一个使用Queue对象的线程来实现这样的处理。经过这个Queue对象,外部接口线程能够经过这个Queue对象得到其余线程放入的工做请求。外部接口线程能够将结果放入到一个或多个其余Queue对象来返回这些结果。下面示例展现了若是将这种架构包装到一个通用的可重用类中:
import threading try: import Queue # Python 2 except ImportError: import queue as Queue # Python 3 class ExternalInterfacing(threading.Thread): def __init__(self, **kwargs): super(ExternalInterfacing, self).__init__() self.setDaemon(True) self.workRequestQueue = Queue.Queue() self.resultQueue = Queue.Queue() self.start() def apply(self, externalCallable, *args, **kwargs): "called by other threads as externalCallable would be" self.workRequestQueue.put((externalCallable, args, kwargs)) return self.resultQueue.get() def run(self): while 1: externalCallable, args, kwargs = self.workRequestQueue.get() self.resultQueue.put(externalCallable(args, kwargs))
搜了下资料发现threadpool的源码实现的基础架构就是这样的,threadpool源码有四百多行就不在这里分析了,写到这里了threadpool源码分析。