【Python】 多线程并发threading & 任务队列Queue

threadinghtml

  【这篇文章的阅读量愈来愈多了… 所以我以为有必要声明下,文章的性质是我我的的学习记录和总结,并不是教程,文中不免有表达不严谨,甚至错误的地方。若是您只是相对threading相关内容作个大概的了解,但愿能对您有所参考。若是想要精密地学习,请移步正规教材、文档以及大牛的博客】python

  python程序默认是单线程的,也就是说在前一句语句执行完以前后面的语句不能继续执行(不知道我理解得对不对)多线程

  先感觉一下线程,通常状况下:并发

def testa():
    sleep(1)
    print "a"

def testb():
    sleep(1)
    print "b"

testa()
testb()
#先隔出一秒打印出a,再过一秒打出b

  可是若是用了threading的话:app

ta = threading.Thread(target=testa)
tb = threading.Thread(target=testb)
for t in [ta,tb]:
    t.start()
for t in [ta,tb]:
    t.join()
print "DONE"

#输出是ab或者ba(紧贴着的)而后空一行再来DONE的结果。

   获得这样的结果是由于这样的,在start以后,ta首先开始跑,可是主线程(脚本自己)没有等其完成就继续开始下一轮循环,而后tb也开始了,在以后的一段时间里,ta和tb两条线程(分别表明了testa和testb这两个过程)共同执行。相对于一个个迭代而言,这样作无疑是大大提升了运行的速度。dom

  Thread类为线程的抽象类,其构造方法的参数target指向一个函数对象,即该线程的具体操做。此外还能够有args=<tuple>来给target函数传参数。须要注意的是当传任何一个序列进去的话Thread会自动把它分解成单个单个的元素而后分解传给target函数。我估计在定义的时候确定是*args了。ide

  join方法是个很tricky的东西,至今还不是很清楚地懂这是个什么玩意儿。join([timeout])方法阻塞了主线程,直到调用此方法的子线程完成以后主线程才继续往下运行。(以前我糊里糊涂地把join就牢牢接在start后面写了,若是这么写了的话那么多线程在速度上就毫无优点,和单线程同样了= =)。而像上面这个示例同样,先一个遍历把全部线程 都启动起来,再用一个遍历把全部线程都join一遍彷佛是比较通行的作法。函数

 

  ●  关于线程锁工具

  多线程程序涉及到一个问题,那就是当不一样线程要对同一个资源进行修改或利用时会出现混乱,因此有必要引入线程锁。性能

  (经网友提醒,补充一下相关例子)好比下面这一段程序:

import threading

class MyThread(threading.Thread):
    def __init__(self,counter,name):
        self.counter = counter
     self.name = name

    def run(self):
        self.counter[0] += 1
        print self.counter[0]

if __name__ == '__main__':
    counter = [0]
    for i in range(1,11):
        t = MyThread(counter,i)
        t.start()

 

 

  这里并发了10个线程,在没有混乱的状况下,很明显一个线程的name和通过它处理事后的counter中的数字应该相同。由于没有锁可能引起混乱,想象中,咱们可能认为,当某个线程要打印counter中的数字时,别的线程对其做出了改变,从而致使打印出的counter中的数字不符合预期。实际上,这段代码的运行结果很大几率是很整齐的1\n2\n3....10。若是要解释一下,1. 虽然称并发10个线程。可是实际上线程是不可能真的在同一个时间点开始,好比在这个例子中t1启动后,要将循环进入下一轮,建立新的线程对象t2,而后再让t2启动。这段时间虽然很短很短,可是确实是存在的。而这段时间的长度,足够让t1的run中,进行自增而且打印的操做。最终,整个结果看上去彷佛没什么毛病。

  若是咱们想要看到“混乱”的状况,显然两个方法。要么缩短for i in range以及建立线程对象的时间,使得线程在自增以后来不及打印时counter被第二个线程自增,这个比较困难;另外一个方法就是延长自增后到打印前的这段时间。天然想到,最简单的,用time.sleep(1)睡一秒便可。此时结果多是10\n10\n...。主要看第一行的结果。再也不是1而是10了。说明在自增操做结束,打印数字以前睡的这一秒里,到第10个线程都成功自增了counter,所以即便是第一个线程,打印到的也是通过第10个线程修改的counter了。

  上述结果虽然数的值上改变了,可是极可能输出仍然是整齐的一行行的。有时候好几个数字会挤在一块儿输出,尤为是把并发量调大,好比调到100或1000,尤其明显。挤在一块儿主要是由于,time.sleep(1)并非精精确确地睡1秒。有多是0.999或者1.001(具体差别可能更小,打个比方)。此时可能tk线程睡了1.001秒而tk+1线程睡了0.999秒,致使二者打印时内容被杂乱地一块儿写入缓冲区,因此打印出来的就凌乱了。根据时间偏差的不一样,甚至有可能出现大数字先打印出来的状况。

 

  能够经过Thread.Lock类来建立简单的线程锁。lock = threading.Lock()便可。在某线程start中,处理会被争抢的资源以前,让lock.acquire(),且lock在acquire()以后不能再acquire,不然会报错。当线程结束处理后调用lock.release()来释放锁就行了。通常而言,有锁的多线程场景能够提高一部分效率,但在写文件等时机下会有阻塞等待的状况。

  为了说明简单的lock,咱们改一下上面那段程序:

import threading
import time

class MyThread(threading.Thread):
    def __init__(self,lock,name):
        threading.Thread.__init__(self)
        self.lock = lock
        self.name = name

    def run(self):
        time.sleep(1)
        # self.lock.acquire()
        print self.name
        # self.lock.release()

if __name__ == '__main__':
    lock = threading.Lock()
    for i in range(1,10):
        t = MyThread(lock,i)
        t.start()

  根据启动的顺序,每一个线程有了name属性。而后启动,在没有锁的状况下可能会出现挤在一块儿,而且数字乱序输出的状况。把两句注释去掉,加上锁以后,获得的输出确定是一行一行的,可是数字仍然有多是乱序的。分析一下,加上锁以后,每次进行print,实际上是线程对于sys.stdout写入内容,有多个线程都要print就造成了竞争,所以就会致使挤在一块儿。加上锁,acquire以后,本线程拥有了对sys.stdout的独享,所以能够正确输出内容+换行,再解开锁供下一个须要打印的线程使用。那为何乱序问题仍是没有解决呢?这个就是(推测)由于前面提到的time.sleep的不精确性。有可能6号线程sleep了稍微久而7号稍微短了些,致使7号先于6号得到锁。天然7就比6先打印出来了。若是稍微有意思地改动一下,好比sleep的秒数时间错开来,1号线程睡1秒,2号线程睡2秒这样子的话,时间上的错开使得没有了对资源的竞争的状况,所以即便没有锁也不会乱。

  总结一下,1. 对于run过程当中对于可能有竞争的资源以前所作的操做,花费时间越是接近,越有可能发生资源竞争从而致使混乱。(废话…)2. 当run中有print或者相似操做时须要注意,其实隐含着要对stdout作出竞争的意义

  

  相比之下,无所多线程场景能够进一步提高效率,可是可能会引发读写冲突等问题,因此要慎用。必定要确认各个线程间没有共同的资源之类的问题后再实行无锁多线程。

  和Lock类相似的还有一个RLock类,与Lock类的区别在于RLock类锁能够嵌套地acquire和release。也就是说在同一个线程中acquire以后再acquire也不会报错,而是将锁的层级加深一层。只有当每一层锁从下到上依次都release开这个锁才算是被解开。

 

  ●  更增强大的锁——Condition

  上面提到的threading.Lock类提供了最为简单的线程锁的功能。除了Lock和RLock之外,其实threading还补充了其余一些不少的带有锁功能的类。Condition就是其中最为强大的类之一。

  在说Condition以前还须要明确一下线程的几个概念。线程的阻塞和挂起,线程的这两个状态乍一看都是线程暂停再也不继续往前运行,可是引发的缘由不太同样。阻塞是指线程间互相的制约,当一个线程得到了锁,其余的线程就被阻塞了,而挂起是出于统一调度的考虑。换句话说,挂起是一种主动的行为,在程序中咱们主动挂起某个线程而后能够主动放下让线程继续运行;而阻塞更多时候是被动发生的,当有线程操做冲突了那么必然是有一方要被阻塞的。从层级上看,挂起操做是高于阻塞的,也就说一个线程能够在阻塞的时候被挂起,而后被唤醒后依然是阻塞状态。若是在挂起过程当中具有了运行条件(即再也不阻塞),线程也不会往前运行。

  再来看看Condition类的一些方法。首先是acquire和release,Condition内部也维护了一把锁,默认是RLock类,全部关联了同一个Condition对象的线程也都会遵照这把锁规定的来进行运行。

  Condition.wait([timeout])  这个方法必定要在获取锁定以后调用,调用这个方法的Condition对象所在的线程会被挂起而且释放这个线程得到着的全部锁,直到接到通知被唤醒或者超时(若是设置了Timeout的话),当被唤醒以后线程将从新获取锁定。

  Condition.notify()  notify就是上面所说的通知,调用这个方法以后会唤醒一个被挂起的线程。线程的选择尚不明确,彷佛是随机的。须要注意的是notify方法只进行挂起的唤醒而不涉及锁的释放

  Condition.notify_all()  唤醒全部挂起的线程

  基于上面这几个方法,就能够作出比较好的线程管理的demo了,好比下面这段网上常见的一个捉迷藏的模拟程序:

import threading,time

class Seeker(threading.Thread):
    def __init__(self,cond,name):
        Thread.__init__(self)
        self.cond = cond
        self.name = name

    def run(self):
        time.sleep(1)    #1.确保seeker晚于hider开始执行

        self.cond.acquire()    #4. hider的锁释放了因此这里得到了锁
        print '我把眼睛蒙上了'
        self.cond.notify()    #5.蒙上眼后通知hider,hider线程此时被唤醒并试图获取锁,可是锁还在seeker身上,因此hider被阻塞,seeker继续往下
        self.cond.wait()  #6. seeker锁被释放而且挂起,hider就获取锁开始继续往下运行了
        
        print '我找到你了'
        self.cond.notify()  #9.找到了以后通知hider,hider意图获取锁但不行因此被阻塞,seeker往下
        self.cond.release()  #10.释放锁 print '我赢了'

class Hider(threading.Thread):
        def __init__(self,cond,name):
            Thread.__init__(self)
            self.cond = cond
            self.name = name
        
        def run(self):
            self.cond.acquire()    #2.hider获取锁
            self.cond.wait()    #3.hider被挂起而后释放锁
            
            print '我已经藏好了'
            self.cond.notify()  #7.藏好后通知seeker,seeker意图获取锁,可是锁在hider身上因此seeker被阻塞
            self.cond.wait()    #8.hider被挂起,释放锁,seeker获取锁,seeker继续往下运行

            self.cond.release()  #11. 在此句以前一点,seeker释放了锁(#10),hider获得锁,随即这句hider释放锁 print '被你找到了'

cond = threading.Condition()
seeker = Seeker(cond,'seeker')
hider = Hider(cond,'hider')
seeker.start()
hider.start()

'''
结果:
我把眼睛蒙上了
我已经藏好了
我找到你了
我赢了
被你找到了
'''

  这里须要注意的是self.cond.release方法不能省,不然会引发死锁。

  

 

  ●  以上的包装线程的方式是一种面向过程的方法,下面介绍一下如何面向对象地来抽象线程

  面向对象地抽象线程须要自定义一个类继承Thread类。好比自定义class MyThread(Thread)。这个类的一个实例就是表明了一个线程,而后经过重载这个类中的run方法(是run,不是start!!但start的动做确实就是调用run)来执行具体的操做。此时锁能够做为一个构造方法的参数,将一个锁传进不一样的实例中以实现线程锁控制。好比:

  引用自http://www.cnblogs.com/tkqasn/p/5700281.html

#方法二:从Thread继承,并重写run()
class MyThread(threading.Thread):
    def __init__(self,arg):
        super(MyThread, self).__init__()#注意:必定要显式的调用父类的初始化函数。
        self.arg=arg
    def run(self):#定义每一个线程要运行的函数
        time.sleep(1)
        print 'the arg is:%s\r' % self.arg

for i in xrange(4):
    t =MyThread(i)
    t.start()

print 'main thread end!'

  

  Thread类还有如下的一些方法,自定义的类也能够调用

    getName()

    setName(...)  //其实Thread类在构造方法中有一个name参数,能够为相应的线程取一个名字。这两个方法就是相关这个名字属性的

    isAlive()  一个线程从start()开始到run()结束的过程当中没有异常,则其实alive的。

    setDaemon(True/False)  是否设置一个线程为守护线程。当你设置一个线程为守护线程以后,程序不会等待这个线程结束再退出程序,可参考http://blog.csdn.net/u012063703/article/details/51601579

 

  ●  除了Thread类,threading中还有如下一些属性,简单介绍一下:

    Timer类,Timer(int,target=func)  和Thread类相似,只不过它在int秒事后才以target指定的函数开始线程运行

    currentThread()  得到当前线程对象

    activeCount()  得到当前活动的线程总个数

    enumerate()  得到全部活动线程的列表

    settrace(func)  设置一跟踪函数,在run执行前执行

    setprofile(func)  设置一跟踪函数,在run执行完毕以后执行

 

  以上内容是目前我所能驾驭的,而threading类还有不少很NB的东西好比RLock类,Condition类,Event类等等。没什么时间再仔细研究它们,先写到这里为止。

 

Queue

  Queue用于创建和操做队列,常和threading类一块儿用来创建一个简单的线程队列。

  首先,队列有不少种,根据进出顺序来分类,能够分红

    Queue.Queue(maxsize)  FIFO(先进先出队列)

    Queue.LifoQueue(maxsize)  LIFO(先进后出队列)

    Queue.PriorityQueue(maxsize)  为优先级越高的越先出来,对于一个队列中的全部元素组成的entries,优先队列优先返回的一个元素是sorted(list(entries))[0]。至于对于通常的数据,优先队列取什么东西做为优先度要素进行判断,官方文档给出的建议是一个tuple如(priority, data),取priority做为优先度。

    若是设置的maxsize小于1,则表示队列的长度无限长

  FIFO是经常使用的队列,其一些经常使用的方法有:

    Queue.qsize()  返回队列大小

    Queue.empty()  判断队列是否为空

    Queue.full()  判断队列是否满了

    Queue.get([block[,timeout]])  从队列头删除并返回一个item,block默认为True,表示当队列为空却去get的时候会阻塞线程,等待直到有有item出现为止来get出这个item。若是是False的话代表当队列为空你却去get的时候,会引起异常。在block为True的状况下能够再设置timeout参数。表示当队列为空,get阻塞timeout指定的秒数以后尚未get到的话就引起Full异常。

    Queue.put(...[,block[,timeout]])  向队尾插入一个item,一样若block=True的话队列满时就阻塞等待有空位出来再put,block=False时引起异常。同get的timeout,put的timeout是在block为True的时候进行超时设置的参数。

    Queue.task_done()  从场景上来讲,处理完一个get出来的item以后,调用task_done将向队列发出一个信号,表示本任务已经完成

    Queue.join()  监视全部item并阻塞主线程,直到全部item都调用了task_done以后主线程才继续向下执行。这么作的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join以后,主线程就不会由于队列空了而擅自结束,而是等待最后那个线程处理完成了。

 

  结合threading和Queue能够构建出一个简单的生产者-消费者模型,好比:

  下面的代码引用自http://blog.csdn.net/l1902090/article/details/24804085

    import threading  
    import Queue  
    import time  
    class worker(threading.Thread):  
     def __init__(self,queue):  
      threading.Thread.__init__(self)  
      self.queue=queue  
      self.thread_stop=False  
       
     def run(self):  
      while not self.thread_stop:  
       print("thread%d %s: waiting for tast" %(self.ident,self.name))  
       try:  
        task=q.get(block=True, timeout=20)#接收消息  
       except Queue.Empty:  
        print("Nothing to do!i will go home!")  
        self.thread_stop=True  
        break  
       print("task recv:%s ,task No:%d" % (task[0],task[1]))  
       print("i am working")  
       time.sleep(3)  
       print("work finished!")  
       q.task_done()#完成一个任务  
       res=q.qsize()#判断消息队列大小  
       if res>0:  
        print("fuck!There are still %d tasks to do" % (res))  
       
     def stop(self):  
      self.thread_stop = True  
       
    if __name__ == "__main__":  
     q=Queue.Queue(3)  
     worker=worker(q)  
     worker.start()  
     q.put(["produce one cup!",1], block=True, timeout=None)#产生任务消息  
     q.put(["produce one desk!",2], block=True, timeout=None)  
     q.put(["produce one apple!",3], block=True, timeout=None)  
     q.put(["produce one banana!",4], block=True, timeout=None)  
     q.put(["produce one bag!",5], block=True, timeout=None)  
     print("***************leader:wait for finish!")  
     q.join()#等待全部任务完成  
     print("***************leader:all task finished!")  

 

  (嗯。。姑且不论他的F-word哈哈哈,开玩笑的,这例子还能够,至少很清晰地说明了如何把这两个模块结合起来用)

  输出是这样的:

    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one cup! ,task No:1
    i am working
    work finished!
    fuck!There are still 3 tasks to do
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one desk! ,task No:2
    i am workingleader:wait for finish!
    work finished!
    fuck!There are still 3 tasks to do
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one apple! ,task No:3
    i am working
    work finished!
    fuck!There are still 2 tasks to do
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one banana! ,task No:4
    i am working
    work finished!
    fuck!There are still 1 tasks to do
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one bag! ,task No:5
    i am working
    work finished!
    thread139958685849344 Thread-1: waiting for tast 1
     ***************leader:all task finished!
    Nothing to do!i will go home!

   运行一下就知道,上例中并无性能的提高(毕竟仍是只有一个线程在跑)。线程队列的意义并非进一步提升运行效率,而是使线程的并发更加有组织。能够看到,在增长了线程队列以后,程序对于线程的并发数量就有了控制。新线程想要加入队列开始执行,必须等一个既存的线程完成以后才能够。举个例子,好比

for i in range(x):
  t = MyThread(queue)
  t.start()

  x在这里是个变量,咱们不知道这个循环会触发多少线程并发,若是多的话就会很冒险。可是有了队列以后,把一个队列做为全部线程构建线程对象时的一个参数,让线程必须按照这个队列规定的大小来执行的话,就不担忧过多线程带来的危险了。

 

■  线程池实现

  不得不说一年前仍是太simple。。 一年后再来补充点内容吧

  首先咱们要明确,线程池,线程,队列这几个概念之间的区别和联系。

  举一个不太恰当的例子。好比有五个很饿的人去吃旋转寿司。旋转寿司店里有一个传送带,将寿司运送到他们面前。他们一字排开坐好准备好吃,当寿司过来,食客可能会选择一个喜欢的口味开吃。在吃的过程当中,他一般就不会再去“吃着碗里看着传送带上的”了。之因此是很饿的人,由于咱们假定他们一旦吃完一盘就会马上着手下一盘,绝不停歇。

  在这个场景中,五我的组成的集体是线程池,每一个人就是一个线程,而旋转寿司的传送带是队列,每盘寿司就是一个队列中的任务。之因此说这个例子不太恰当,是由于场景中食客能够本身选择想吃的寿司而线程池-队列中,队列才是任务分配的主导。就比如是传送带发现某个食客说他已经吃完一盘寿司,还想再来一盘的时候,会不顾食客的喜爱,强行将一盘寿司推到一个空闲的食客面前让他吃。

  更加抽象点来讲,线程在这个语境中其实就像是一个工具,而线程池就是一个工具的集合。因为一般一个线程池面向的是一类任务,因此线程池中的线程基本上也是同质的。即上述的五个食客是五胞胎(误hh)。另外一方面,之因此说面向的是一类任务,是由于队列中的任务一般是具备某些共性的。共性程度高低取决于队列以及线程池的具体实现,可是确定是有的。这就比如寿司能够有握り,巻き而上面的具能够有いくら、マグロ、ウニ可是归根结底确定仍是要有米饭的。

  在正式的开发中,队列一般是由第三方服务提供好比RabbitMQ,Redis等。而线程池一般由程序本身实现。下面这段代码则是在一个python程序中,基于Queue加上自制的建议线程池创建起来的模型。

# -*- coding:utf-8 -*-

import threading
import Queue
import time
import random

from faker import Faker

class MyThread(threading.Thread):
  '''
  线程模型
  '''
  def __init__(self,queue):
    threading.Thread.__init__(self)
    self.queue = queue
    self.start()  # 由于做为一个工具,线程必须永远“在线”,因此不如让它在建立完成后直接运行,免得咱们手动再去start它

    def run(self):
        while True:  # 除非确认队列中已经无任务,不然时刻保持线程在运行
            try:
                task = self.queue.get(block=False)    # 若是队列空了,直接结束线程。根据具体场景不一样可能不合理,能够修改
                time.sleep(random.random())  # 假设处理了一段时间
                print 'Task %s Done' % task  # 提示信息而已
                self.queue.task_done()
            except Exception,e:
                break

class MyThreadPool():
    def __init__(self,queue,size):
        self.queue = queue
        self.pool = []
        for i in range(size):
            self.pool.append(MyThread(queue))

    def joinAll(self):
        for thd in self.pool:
            if thd.isAlive():  thd.join()

if __name__ == '__main__':
    q = Queue.Queue(10)
    fake = Faker()
    for i in range(5):
        q.put(fake.word())
    pool = MyThreadPool(queue=q,size=2)
    pool.joinAll()

  网上有一部分示例,将队列做为一个属性维护在了线程池类中,也不失为一种办法,我这里为了可以条理清晰,没有放在类里面。这段程序首先生成了一个maxsize是10的队列。fake.word()能够随机生成一个单词,这里仅做测试用。因此向队列中添加了5个task。

  这里有个坑: 若是put的数量大于队列最大长度,并且put没有设置block=False的话,那么显然程序会阻塞在put这边。此时ThreadPool未被创建,也就是说工做线程都尚未启动,所以会引发这样一个死锁。若是把线程池的创建放到put以前也不行,此时线程发现队列为空,因此全部线程都会直接结束(固然这是线程中get的block是False的时候,若是为True那么也是死锁),最终队列中的task没人处理,程序输出为空。解决这个坑的办法,一个是像上面同样保持最开始put的量小于队列长度;第二个就是干脆不要限制队列长度,用q = Queue.Queue()生产队列便可。

  好的,继续往下,进入了线程池的生成。线程池内部的列表才是真·线程池,另外其关联了queue对象,因此在建立的时候能够将队列对象传递给线程对象。线程对象在建立时就启动了,而且被添加到线程池的那个列表中。线程池的大小由参数给出,线程启动后会去队列里面get任务,而且进行处理。处理完成后进行task_done声明而且再次去尝试get。若是队列为空那么就直接抛出异常,也就是跳出循环,线程结束。

  经过这样一个模型,根据线程池的大小,这才真正地给线程并发作了一个限制,可促进较大程度的资源利用。

 

  ●  进一步地…

  在上面这个示例中,实际上处理任务的实际逻辑是被写在了MyThread类里面。若是咱们想要一个通用性更加高的工具类,那么势必要想一想如何将这个线程类解耦具体逻辑。另外一方面,队列中的任务的内容,不只仅能够是字符串,也能够是任何python对象。这就使得灵活性大大提升。

  好比咱们能够在队列中put内容是(func, args, kwargs)这样一个元组。其中func是一个函数对象,描述了任务的处理逻辑过程,args是一个元组,表明全部func函数的匿名参数,kwargs则是func函数的全部具名参数。如此,能够将线程类的run方法改写成这样:

def run(self):
    while True:
        try:
            func,args,kwargs = self.queue.get()
            try:
                func(*args,**kwargs)
            except Exception,e:
                raise ('bad execution: %s' % str(e))
            self.queue.task_done()
        except Exception,e:
            break

  这样一个run就能够作到很大程度的解耦了。

  相似的思想,线程池类和线程类也没必要是一一对应的。能够将线程类做为一个参数传递给线程池类。这样一个线程池类就能够做为容器容纳各类各样的线程了。具体实例就不写了。

相关文章
相关标签/搜索