1.1 线程状态html
线程有5种状态,状态转换的过程以下图所示:python
1.2 线程同步——锁算法
多线程的优点在于能够同时运行多个任务(至少感受起来是这样,其实Python中是伪多线程)。可是当线程须要共享数据时,可能存在数据不一样步的问题。考虑这样一种状况:一个列表里全部元素都是0,线程"set"从后向前把全部元素改为1,而线程"print"负责从前日后读取列表并打印。那么,可能线程"set"开始改的时候,线程"print"便来打印列表了,输出就成了一半0一半1,这就是数据的不一样步。为了不这种状况,引入了锁的概念。数组
锁有两种状态—锁定和未锁定。每当一个线程好比"set"要访问共享数据时,必须先得到锁定;若是已经有别的线程好比"print"得到锁定了,那么就让线程"set"暂停,也就是同步阻塞;等到线程"print"访问完毕,释放锁之后,再让线程"set"继续。通过这样的处理,打印列表时要么所有输出0, 要么所有输出1,不会再出现一半0一半1的尴尬场面。服务器
线程与锁的交互以下图所示:多线程
1.3 线程通讯(条件变量)并发
然而还有另一种尴尬的状况:列表并非一开始就有的;而是经过线程"create"建立 的。若是"set"或者"print" 在"create"尚未运行的时候就访问列表,将会出现一个异常。使用锁能够解决这个问题,可是"set"和"print"将须要一个无限循环——他们不知道"create"何时会运行,让"create"在运行后通知"set"和"print"显然是一个更好的解决方案。因而,引入了条件变量。app
条件变量容许线程好比"set"和"print"在条件不知足的时候(列表为None时)等待,等到条件知足的时候(列表已经建立)发出一个通知,告诉"set" 和"print"条件已经有了,大家该起床干活了;而后"set"和"print"才继续运行。dom
线程与条件变量的交互以下图所示:ide
1.4 线程运行和阻塞的状态转换
最后看看线程运行和阻塞状态的转换。
阻塞有三种状况:
同步阻塞是指处于竞争锁定的状态,线程请求锁定时将进入这个状态,一旦成功得到锁定又恢复到运行状态;
等待阻塞是指等待其余线程通知的状态,线程得到条件锁定后,调用“等待”将进入这个状态,一旦其余线程发出通知,线程将进入同步阻塞状态,再次竞争条件锁定;
而其余阻塞是指调用time.sleep()、anotherthread.join()或等待IO时的阻塞,这个状态下线程不会释放已得到的锁定。
为充分利用cpu资源,减小资源浪费,Python中也提供了多线程技术,但因为存在GIL的关系,实际上同一时刻只存在一个线程在执行,即Python中不存在真正意义上的多线程,可是多线程技术仍是提升了CPU资源的利用率,因此仍是颇有价值的。
咱们知道系统处理任务时,须要为每一个请求建立和销毁对象。当有大量并发任务须要处理时,再使用传统的多线程就会形成大量的资源建立销毁致使服务器效率的降低。这时候,线程池就派上用场了。线程池技术为线程建立、销毁的开销问题和系统资源不足问题提供了很好的解决方案。
3.1 优势:
(1)能够控制产生线程的数量。经过预先建立必定数量的工做线程并限制其数量,控制线程对象的内存消耗。
(2)下降系统开销和资源消耗。经过对多个请求重用线程,线程建立、销毁的开销被分摊到了多个请求上。另外经过限制线程数量,下降虚拟机在垃圾回收方面的开销。
(3)提升系统响应速度。线程事先已被建立,请求到达时可直接进行处理,消除了因线程建立所带来的延迟,另外多个线程可并发处理。
“池”的概念使得人们能够定制必定量的资源,而后对这些资源进行复用,而不是频繁的建立和销毁。
3.2 注意事项
虽然线程池是构建多线程应用程序的强大机制,但使用它并非没有风险的。在使用线程池时需注意线程池大小与性能的关系,注意并发风险、死锁、资源不足和线程泄漏等问题。
一、线程池大小。多线程应用并不是线程越多越好,须要根据系统运行的软硬件环境以及应用自己的特色决定线程池的大小。通常来讲,若是代码结构合理的话,线程数目与CPU 数量相适合便可。若是线程运行时可能出现阻塞现象,可相应增长池的大小;若有必要可采用自适应算法来动态调整线程池的大小,以提升CPU 的有效利用率和系统的总体性能。
二、并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。
三、线程泄漏。这是线程池应用中一个严重的问题,当任务执行完毕而线程没能返回池中就会发生线程泄漏现象。
3.3 线程池设计的关键点
一、取任务数与线程数的最小值来决定开启线程的数量,即min(任务数,线程数);
二、当前线程池中线程的状态,即正在运行的线程数量和正在等待的线程数量;
三、关闭线程。
1 #!/usr/bin/env python 2 #-*- coding:utf-8 -*- 3 4 import Queue 5 import threading 6 import time 7 8 ''' 9 要点: 10 一、利用Queue队列特性,将建立的线程对象放入队列中; 11 二、当执行任务时,从queue队列中取线程执行任务; 12 三、当任务执行完毕后,线程放回线程池 13 ''' 14 15 #线程池类 16 class ThreadPool(object): 17 def __init__(self,max_thread=20): #默认最大线程数为20 18 self.queue = Queue.Queue(max_thread) #建立队列,大小为max_thread 19 #把线程对象(线程类名)加入到队列中,此时并无建立线程对象,只占用很小内存 20 for i in xrange(max_thread): 21 self.queue.put(threading.Thread) #注意此处是类名,若是这里self.queue.put(threading.Thread())(即把建立的对象放入队列中),那么每次都要在内存中开辟空间,这样内存浪费很大 22 23 24 def get_thread(self):#从队列里获取线程 25 return self.queue.get() 26 27 def add_thread(self):#在队列里添加线程 28 self.queue.put(threading.Thread) #注意此处是类名 29 30 pool = ThreadPool(10) #建立大小为10的线程池 31 32 #任务函数 33 def func(arg,p): 34 print arg 35 time.sleep(2) 36 p.add_thread() #当前线程执行完,在队列里增长一个线程 37 38 for i in xrange(300): 39 #获取队列中的线程对象(此时线程对象还没建立),默认queue.get(),若是队列里没有线程就会阻塞等待。 40 thread = pool.get_thread() 41 t = thread(target=func,args=(i,pool)) #此时才真正建立线程对象,并传递参数 42 t.start() #开启线程 43 44 ''' 45 for i in xrange(300): 46 thread = pool.get_thread() 47 t = thread(target=func,args=(i,pool)) 48 t.start() 49 在threading.Thread的构造函数中:self.__args = args self.__target = target 50 51 #固然也能够经过下面方法赋值,可是不推荐(对于私有字段,通常建议在构造函数中传参赋值) 52 for i in xrange(300): 53 ret = pool.get_thread() 54 ret._Thread__target = func 55 ret._Thread__args = (i,pool) 56 ret.start() 57 '''
3.4 复杂线程池预备知识
1 #!/usr/bin/env python 2 #-*- coding:utf-8 -*- 3 4 import Queue 5 6 obj = object() #先建立一个object对象obj 7 q = Queue.Queue() #建立队列 8 9 #把obj对象放入队列,注意此处与上述例子中的区别,上例是每次都建立对象,那么每一个对象都会占用内存; 10 #而此处是先建立对象,再把此对象加入队列,即对象只建立了1次,而队列中加入了10次,那么对象只会占用一分内存 11 for i in range(10): 12 q.put(obj) 13 14 for i in range(10): 15 print id(q.get()) #实验结果是此处id彻底相同
1 #!/usr/bin/env python 2 #-*- coding:utf-8 -*- 3 4 import contextlib 5 import threading 6 import time 7 import random 8 9 doing = [] #线程列表 10 11 #打印正在运行的线程个数函数 12 def num(l2): 13 while True: 14 print len(l2) 15 time.sleep(1) 16 17 #单首创建一个线程,用于打印当前正在运行的线程数量 18 t = threading.Thread(target=num,args=(doing,)) 19 t.start() 20 21 #管理上下文的装饰器 22 @contextlib.contextmanager 23 def show(l1,item): 24 l1.append(item) #把正在执行的当前线程加入列表 25 yield #运行到此处,程序就会跳出当前函数,去执行with关键字中的内容,执行完后在进入当前函数执行 26 l1.remove(item) #当前线程执行完成后,则从列表中移除 27 28 #任务函数 29 def task(i): 30 flag = threading.current_thread() 31 with show(doing,flag): #with管理上下文,进行切换 32 print len(doing) 33 time.sleep(random.randint(1,4)) #等待 34 35 for i in range(20): #建立20个线程 36 temp = threading.Thread(target=task,args=(i,)) 37 temp.start()
更多with上下文管理知识请参考:
http://www.cnblogs.com/alan-babyblog/p/5153343.html
http://www.cnblogs.com/alan-babyblog/p/5153386.html
3.5 twisted源码中经典线程池的实现
1 #!/usr/bin/env python 2 #-*- coding:utf-8 -*- 3 4 from Queue import Queue 5 import contextlib 6 import threading 7 8 WorkerStop = object() 9 10 class ThreadPool: 11 workers = 0 12 threadFactory = threading.Thread #类名 13 currentThread = staticmethod(threading.currentThread) #静态方法 14 15 def __init__(self, maxthreads=20, name=None): 16 self.q = Queue(0) #建立队列,参数0表示队列大小不限,队列用于存听任务 17 self.max = maxthreads #定义最大线程数 18 self.name = name 19 self.waiters = [] #存放等待线程的列表 20 self.working = [] #存放工做线程的列表 21 22 def start(self): 23 needSize = self.q.qsize() #获取任务所需的线程数 24 while self.workers < min(self.max, needSize): #wokers初始值为0 25 self.startAWorker() #调用开启线程的方法 26 27 def startAWorker(self): 28 self.workers += 1 #workers自增1 29 newThread = self.threadFactory(target=self._worker, name='test') #建立1个线程并去执行_worker方法 30 newThread.start() 31 32 def callInThread(self, func, *args, **kw): 33 self.callInThreadWithCallback(None, func, *args, **kw) 34 35 def callInThreadWithCallback(self, onResult, func, *args, **kw): 36 o = (func, args, kw, onResult) #把参数组合成元组 37 self.q.put(o) #把任务加入队列 38 39 #上下文管理装饰器 40 @contextlib.contextmanager 41 def _workerState(self, stateList, workerThread): 42 stateList.append(workerThread) #把当前执行线程加入线程状态类表stateList 43 try: 44 yield 45 finally: 46 stateList.remove(workerThread) #执行完后移除 47 48 def _worker(self): 49 ct = self.currentThread() #获取当前线程id 50 o = self.q.get() #队列中获取执行任务 51 52 while o is not WorkerStop: #当获取的o不是workstop信号时,执行while循环 53 with self._workerState(self.working, ct): #上下文切换 54 function, args, kwargs, onResult = o #元组分别赋值给变量 55 del o #删除元组o 56 try: 57 result = function(*args, **kwargs) 58 success = True 59 except: 60 success = False 61 if onResult is None: 62 pass 63 else: 64 pass 65 66 del function, args, kwargs 67 68 if onResult is not None: 69 try: 70 onResult(success, result) 71 except: 72 #context.call(ctx, log.err) 73 pass 74 75 del onResult, result 76 77 with self._workerState(self.waiters, ct): 78 o = self.q.get() #获取任务 79 80 def stop(self): #关闭线程 81 while self.workers: #循环workers 82 self.q.put(WorkerStop) #在队列中增长一个信号~ 83 self.workers -= 1 #workers值-1,直到全部线程关闭 84 85 def show(arg): 86 import time 87 time.sleep(1) 88 print arg 89 90 pool = ThreadPool(10) 91 92 #建立500个任务,队列里添加500个任务 93 #每一个任务都是一个元组(方法名,动态参数,动态参数,默认为NoNe) 94 for i in range(100): 95 pool.callInThread(show, i) 96 97 pool.start() #开启执行任务 98 99 pool.stop() #执行关闭线程方法
参考资料:
http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html
http://www.cnblogs.com/wupeiqi/articles/4839959.html