py基础---多线程、多进程、协程

Python基础__线程、进程、协程

一、什么是线程(thread)?

线程是操做系统可以进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运做单位,一条线程指的是进程中一个单一顺序的控制流,一个进程中能够并发多个线程,每条线程中并发执行不一样的任务。git

官方解释:github

A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.编程

Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.windows

If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.api

Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.安全

On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.服务器

Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.多线程

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).并发

二、什么是进程(process)?

一个进程就是一个应用程序,是系统进行资源分配和调度的基本单位,是操做系统结构的基础,在早期面向进程设计的计算机结构中, 进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器,进程是线程的容器,程序是指令、数据以及其组织形式的描述,一个进程包含多个线程。

官方解释:

An executing instance of a program is called a process.

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

三、进程和线程的区别

  1. 线程共享建立它的进程的地址空间(每一个进程有本身的地址空间)。
  2. 线程能够直接访问其进程的数据(每一个进程拥有本身父进程的数据副本。)
  3. 线程能够直接与其进程的其余线程通讯(每一个进程必须使用第三方手段来与兄弟进程进行通讯,好比消息队列)
  4. 新线程很容易建立(建立新的进程须要父进程来建立,子进程是父进程的拷贝)
  5. 线程能够对同一进程的线程进行至关大的控制(进程只能控制子进程)
  6. 对主线程的更改(取消、优先级更改等),可能会影响进程的其余线程的行为(对父进程的更改不会影响子进程)

四、GIL全局解释器锁

在python中,一次只能有一个线程在执行,若是想要利用多核多处理器资源,尽可能使用多进程去处理。

官方解释:

Cpython实现细节:在Cpython中,因为Global Interpreter Lock,只有一个线程能够同时执行python代码(某些面向性能的库可能会避免这个限制)若是你但愿应用程序更好地利用多核机器的计算资源,建议你使用多处理,可是,若是要同时运行多个I/O绑定任务,则线程仍然是一个合适的模型。

总结:因此若是底层python解释器的话,咱们python的多线程只适合IO密集型任务,不适合CPU计算密集型任务。

五、多线程(threading模块)

  1. 模块以及相关方法

    threading模块:

    threading.Thread(target=?,args=(arg1,arg2...)):

    建立threading线程对象,target参数将要运行的函数的变量名填写到这里,args参数是要运行函数须要的参数。

    start():让线程开始执行任务

    join():默认线程开起来以后就和主线程没有关系了,主线程结束和join不要紧,调用join方法,会让主线程再调用join方法的地方等待,知道子线程完成操做,主线程才继续往下执行代码。
    setDaemon(True):默认为False,默认主线程与其余线程没有关系,可是有时候,咱们想要主线程关闭的时候,把其余线程也关闭了(不关心其余线程是否是执行完任务了),咱们将其余线程设置为主线程的守护进程。注意:该方法须要在线程开始以前执行

    threading.currentThread(): 返回当前的线程变量。
    threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    除了使用方法外,线程模块一样提供了Thread类来处理线程,Thread类提供了如下方法:
    run(): 用以表示线程活动的方法。
    start():启动线程活动。
    join([time]): 等待至线程停止。这阻塞调用线程直至线程的join() 方法被调用停止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
    isAlive(): 返回线程是否活动的。
    getName(): 返回线程名。
    setName(): 设置线程名。

  2. 如何使用threading模块建立多线程

    • 直接调用

      # author:Dman
      # date:2019/3/26
      import threading
      import time
      
      def foo(n):
          print('foo___start___%s' % n)
          time.sleep(1)
          print('end foo____')
      
      
      def bar():
          print('bar___start')
          time.sleep(2)     #time.sleep的时候并不会占用cpu
          print('end bar____')
      
      
      
      t1 = threading.Thread(target=foo,args=(2,))
      t2 = threading.Thread(target=bar)
      t1.start()
      t2.start()
      print('_________main______')
    • 使用继承的方式调用

      # author:Dman
      # date:2019/3/27
      
      """
      使用自定义类的方式去建立thread
      步骤:
      一、继承threading.Thread类
      二、必须重写父方法,run()方法。
      三、能够重写父类方法
      
      """
      import threading
      import  time
      class Mythred(threading.Thread):
      
          def __init__(self,num):
              # super(Mythred,self).__init__()
              threading.Thread.__init__(self)
              self.num = num
      
          def run(self):
              print('threading %s is running at %s ' % (threading.current_thread(), time.ctime()))
              time.sleep(3)
      
      if __name__ == '__main__':
          t1 = Mythred(1)
          t2 = Mythred(2)
          t1.start()
          t2.start()
  3. join方法的疑问

    # author:Dman
    # date:2019/3/27
    
    """
    threading 中方法,join方法的理解与使用。
    join方法
    """
    
    import threading
    from time import ctime,sleep
    import time
    
    def music(func):
        for i in range(2):
            print ("Begin listening to %s. %s" %(func,ctime()))
            sleep(4)
            print("end listening %s"%ctime())
    
    def move(func):
        for i in range(2):
            print ("Begin watching at the %s! %s" %(func,ctime()))
            sleep(5)
            print('end watching %s'%ctime())
    
    
    
    if __name__ == '__main__':
    
        threads = []
        t1 = threading.Thread(target=music, args=('七里香',))
        threads.append(t1)
        t2 = threading.Thread(target=move, args=('阿甘正传',))
        threads.append(t2)
    
        for t in threads:
            # t.setDaemon(True)
            t.start()
            # t.join()   #位置1
        # t1.join()      #位置2
        # t2.join()########  #位置3
        print ("all over %s" %ctime())

    总结:

    一、位置1:会阻塞全部的子线程,父进程会在全部程序执行完成以后就执行

    二、位置2:只会阻塞线程 t1,在 t1子线程执行完毕以后,主线程就会继续执行print函数。

    三、位置3:只会阻塞线程 t2,在 t2子线程执行完毕以后,主线程就会继续执行print函数。

  4. 多线程数据共享和同步

    若是多个线程共同对某个数据进行修改,则可能出现不可预料的结果,为了保证数据的正确性,须要对多个线程进行同步,使用thread类的Lock和RLock能够是实现简单的线程同步。

    • 同步锁:又称互斥锁 ------threading.Lock

      做用:解决多个线程访问共享变量的时候出现的数据的问题。

      # author:Dman
      # date:2019/3/27
      
      """
      没有同步锁案例
      """
      """
      下面程序执行顺序:
      一、咱们打开了100个线程去执行addnum函数,其中addnum是对一个全局变量num进行-1的操做,咱们的理想的状态时num左后等于0
      
      实际运行结果是:
      100个线程开始抢GIL,抢到的将被CPU执行
          一、执行global num 
          二、temp = num 赋值操做
          三、执行time.sleep(0.1) ---(解释:这个语句至关于发生IO阻塞,挂起,GIL释放,下一步num=temp-1还未被执行,所以全局变量num的值仍为
          100)
          四、其余99个线程开始抢GIL锁,重复上面的步骤
          五、其余98个线程开始抢GIL锁,重复上面的步骤
          ...
          (备注:若是阻塞的事件够长,因为cpu的执行速度很快,也就是切换的快,在发生阻塞的0.1秒钟,若是100个
          线程都切换一遍那么,每一个线程就都拿到num=100这个变量,后面再执行-1操做,那么当全部线程结束,获得的结果都是99.)
      """
      
      import time
      import threading
      
      def addNum():
          global num #在每一个线程中都获取这个全局变量
          # num-=1  #这个操做速度很快
      
          temp=num
          # print('--get num:',num)    # 每一个线程执行到这一步就只能拿到变量num = 100,
          time.sleep(0.1)
          num =temp-1 #对此公共变量进行-1操做
      
      
      if __name__ == '__main__':
          num = 100  #设定一个共享变量
          thread_list = []
          for i in range(5):
              t = threading.Thread(target=addNum)
              t.start()
              thread_list.append(t)
      
          for t in thread_list: #等待全部线程执行完毕
              t.join()
      
          print('final num:', num )
      #------运行结果-------
      final num: 99

      # author:Dman
      # date:2019/3/28
      """
      线程锁使用实例
      一、使用threading.Lock()方法获得锁对象
      二、在方法中调用acquire()和release()方法去包围咱们的代码,
      那么同一时刻就只有一个线程能够访问acquire和release包围的代码块。
      """
      
      import time
      import threading
      
      
      def addNum():
          global num  # 在每一个线程中都获取这个全局变量
          # num-=1  #这个操做速度很快
          lock.acquire()                  # 获取锁
          temp = num
          # print('--get num:',num)    # 每一个线程执行到这一步就只能拿到变量num = 100,
          time.sleep(0.1)
          num = temp - 1  # 对此公共变量进行-1操做
          lock.release()               # 释放锁
      
      if __name__ == '__main__':
          lock = threading.Lock()
          num = 100  # 设定一个共享变量
          thread_list = []
          for i in range(100):
              t = threading.Thread(target=addNum)
              t.start()
              thread_list.append(t)
      
          for t in thread_list:  # 等待全部线程执行完毕
              t.join()
      
          print('final num:', num)

      思考:同步锁和GIL的区别

      总结:

      一、线程锁又称互斥锁、同步锁,为了防止多个代码同时访问共享变量的时候,出现问题。

      二、在threading模块中,经过Lock类来建立锁对象,经过aquire方法和release方法去包围须要保护的代码

    • 线程死锁和递归锁

      • 死锁

        死锁现象,见代码以下:

        # author:Dman
        # date:2019/3/30
        
        """
        线程死锁:
        在线程间共享多个资源的时候,若是两个线程分别占有一部分资源而且同时等待对方的资源,就会形成死锁,由于系统判断这部分资源都正在使用,全部这两个线程在无外力做用下将一直等待下去。
        """
        import threading
        import threading,time
        
        class myThread(threading.Thread):
            def doA(self):
                lockA.acquire()
                print(self.name,"gotlockA",time.ctime())
                time.sleep(3)
                lockB.acquire()                     # 要求获取LockB
                print(self.name,"gotlockB",time.ctime())
                lockB.release()
                lockA.release()
        
            def doB(self):
                lockB.acquire()
                print(self.name,"gotlockB",time.ctime())
                time.sleep(2)
                lockA.acquire()                     # 要求获取LockA
                print(self.name,"gotlockA",time.ctime())
                lockA.release()
                lockB.release()
            def run(self):
                self.doA()
                self.doB()
        if __name__=="__main__":
        
            lockA=threading.Lock()
            lockB=threading.Lock()
            threads=[]
            for i in range(5):
                threads.append(myThread())
            for t in threads:
                t.start()
            for t in threads:
                t.join()#等待线程结束,后面再讲。
      • 递归锁-----threading.RLock

        做用:为了解决锁嵌套的问题,解决死锁问题。

        # author:Dman
        # date:2019/3/30
        
        """
        递归锁(RLock)也叫可重入锁:解决死锁问题,看    线程锁_test3.py
        特色:能够屡次acquire。
        内部使用计数器来维护。acquire的时候计数器加1,release的时候计数器减1
        
        结果:锁的是内部代码块,同一时刻保证只有一个线程执行该代码块。
        使用场景:当咱们修改多个变量是有关联的,咱们只能对本身的方法去锁定,可是不能保证别人的方法是锁定的,因此当咱们内部锁定了以后,其余函数也可能锁定,这样就出现了多把锁的状况。
        
        """
        
        import threading
        import threading,time
        
        class myThread(threading.Thread):
            def doA(self):
                # lockA.acquire()
                lock.acquire()
                print(self.name,"gotlockA",time.ctime())
                time.sleep(3)
                # lockB.acquire()                     # 要求获取LockB
                lock.acquire()
                print(self.name,"gotlockB",time.ctime())
                # lockB.release()
                # lockA.release()
                lock.release()
                lock.release()
        
            def doB(self):
                lock.acquire()
                print(self.name,"gotlockB",time.ctime())
                time.sleep(2)
                lock.acquire()                     # 要求获取LockA
                print(self.name,"gotlockA",time.ctime())
                lock.release()
                lock.release()
            def run(self):
                self.doA()
                self.doB()
        if __name__=="__main__":
        
            # lockA=threading.Lock()
            # lockB=threading.Lock()
            lock = threading.RLock()
            threads=[]
            for i in range(5):
                threads.append(myThread())
            for t in threads:
                t.start()
            for t in threads:
                t.join()#等待线程结束,后面再讲。
      • 案例使用案例----银行取钱:

        # author:Dman
        # date:2019/3/30
        """
        递归锁场景---案例
        """
        import threading
        
        class Account:
        
            def __init__(self,name,money):
                self.name = name
                self.balance = money
                self.lock = threading.RLock()
        
            def withdraw(self,amount):
                with self.lock:
                    self.balance -= amount
        
            def deposit(self,amount):
                with self.lock:           # with上下文管理,帮咱们acquire 和release
                    self.balance += amount
        
        def transfer(from_user, to_user,amount):
            # 锁不能够加在这里,由于其余的线程执行其余方法在不加锁的状况下数据一样是不安全的
            from_user.withdraw(amount)
            to_user.deposit(amount)
        
        if __name__ == '__main__':
            alex = Account('alex',100)
            dman = Account('xiaohu',20000)
            t1 = threading.Thread(target=transfer, args=(alex, dman, 100))
            t1.start()
        
            t2 = threading.Thread(target=transfer, args=(dman, dman, 200))
            t2.start()
        
            t1.join()
            t2.join()
        
            print('>>>', alex.balance)
            print('>>>', dman.balance)

        总结:

        一、建立递归锁的方法:使用threading.RLock类去建立递归锁对象。同互斥锁同样,使用aquire和release方法去包围代码块

        二、递归锁是为了解决锁嵌套的时候的问题。

    • 条件变量同步---threading.Condition

      • 做用:为了实现多个线程之间的交互,它自己也提供了RLock或Lock的方法,还提供了wait()、notify()、notifyAll()方法

        wait():条件不知足时调用,线程会释放锁并进入等待阻塞;
        notify():条件创造后调用,通知等待池激活一个线程;
        notifyAll():条件创造后调用,通知等待池激活全部线程。

        # author:Dman
        # date:2019/3/30
        """
        条件变量------实现线程的限制
        应用场景:有一类线程须要知足条件以后才能继续执行。,为了在知足必定条件后,唤醒某个线程,防止该线程一直不被执行
        """
        
        import  threading,time
        from random import randint
        class Producer(threading.Thread):
            def run(self):
                global L
                while True:
                    val=randint(0,100)
                    print('生产者',self.name,":Append"+str(val),L)
                    if lock_con.acquire():
                        L.append(val)
                        lock_con.notify()     #
                        lock_con.release()
                    time.sleep(3)
        class Consumer(threading.Thread):
            def run(self):
                global L
                while True:
                        lock_con.acquire()
                        # print('ok1')
                        if len(L)==0:
                            lock_con.wait()
                        print('消费者',self.name,":Delete"+str(L[0]),L)
                        del L[0]
                        lock_con.release()
                        time.sleep(0.25)
        
        if __name__=="__main__":
        
            L=[]
            lock_con=threading.Condition()#获取一个Condition对象
            threads=[]
            for i in range(5):
                threads.append(Producer())
            threads.append(Consumer())
            for t in threads:
                t.start()
            for t in threads:
                t.join()

        总结:

        一、使用threading.Condition()获取一个Condition对象,里面默认使用RLock,也能够本身手动传参数。

    • 同步条件---threading.Event

      • 做用:Event和Condition差很少,只是少了锁的功能,所以Event用于不访问共享变量的条件环境

        event.isSet():返回event的状态值;

        event.wait():若是 event.isSet()==False将阻塞线程;

        event.set(): 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度;

        event.clear():恢复event的状态值为False。

        # author:Dman
        # date:2019/3/30
        """
        event没有锁功能,可是实现了线程之间的交互。内部有标志位
        实现了函数:
        isSet():返回event 的状态值
        wait():若是event的状态值位False将阻塞线程
        set(): 设置event的状态值位True
        clear():设置event的状态值为False
        
        交叉执行。
        """
        import threading,time
        
        class Boss(threading.Thread):
        
            def run(self):
                print("BOSS:今晚你们都要加班到22:00。")
                event.isSet() or event.set()
                time.sleep(5)
                print("BOSS:<22:00>能够下班了。")
                event.isSet() or event.set()
        
        class Worker(threading.Thread):
        
            def run(self):
                event.wait()
                print("Worker:哎……命苦啊!")
                time.sleep(0.25)
                event.clear()
                event.wait()
                print("Worker:OhYeah!")
        
        if __name__=="__main__":
            event=threading.Event()  #获取event对象
            threads=[]
            for i in range(5):
                threads.append(Worker())
            threads.append(Boss())
            for t in threads:
                t.start()
            for t in threads:
                t.join()
        #---------运行结果---------------
        BOSS:今晚你们都要加班到22:00。
        Worker:哎……命苦啊!
        Worker:哎……命苦啊!Worker:哎……命苦啊!
        Worker:哎……命苦啊!
        
        Worker:哎……命苦啊!
        BOSS:<22:00>能够下班了。
        Worker:OhYeah!
        Worker:OhYeah!
        Worker:OhYeah!Worker:OhYeah!
        
        Worker:OhYeah!
    • 信号量

      • 做用:用来控制线程并发数的,使用BoundedSemaphore或Semaphore类来管理一个内置的计数器,每当调用acquire方法时-1,调用release方法时+1.

        计数器不能小于0,当计数器为0时,acquire方法将阻塞线程至同步锁定状态,知道其余线程调用release方法。(相似停车场的概念)

        BoundedSemaphore与Semaphore的惟一区别在于前者将调用release时检查计数器是否超过了计数器的初始值,若是超过了将抛出一个异常。

        # author:Dman
        # date:2019/3/30
        """
        一、信号量
        二、信号量和递归锁的区别:
        三、应用场景:
        四、信号量的建立:
        
        """
        import  threading,time
        
        class MyThread(threading.Thread):
            def run(self):
                if semaphore.acquire():
                    print(self.name)
                    time.sleep(5)
                    semaphore.release()
        
        
        
        
        if __name__ =='__main__':
            semaphore = threading.BoundedSemaphore(5)
            thrs = []
            for i in range(13):
                thrs.append(MyThread())
        
        
            for i in thrs:
                i.start()
        
            # print('___main function close _____')
    • 多线程数据共享利器--queue队列模块

      • 做用:多个线程间进行安全的信息交互的时候

        queue队列类的方法

        建立一个“队列”对象
        import Queue
        q = Queue.Queue(maxsize = 10)
        Queue.Queue类便是一个队列的同步实现。队列长度可为无限或者有限。可经过Queue的构造函数的可选参数maxsize来设定队列长度。若是maxsize小于1就表示队列长度无限。

        将一个值放入队列中
        q.put(10)
        调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
        1。若是队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。若是block为0,put方法将引起Full异常。

        将一个值从队列中取出
        q.get()
        调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。若是队列为空且block为True,get()就使调用线程暂停,直至有项目可用。若是队列为空且block为False,队列将引起Empty异常。

        Python Queue模块有三种队列及构造函数:
        一、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
        二、LIFO相似于堆,即先进后出。 class queue.LifoQueue(maxsize)
        三、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)

        此包中的经常使用方法(q = Queue.Queue()):
        q.qsize() 返回队列的大小
        q.empty() 若是队列为空,返回True,反之False
        q.full() 若是队列满了,返回True,反之False
        q.full 与 maxsize 大小对应
        q.get([block[, timeout]]) 获取队列,timeout等待时间
        q.get_nowait() 至关q.get(False)
        非阻塞 q.put(item) 写入队列,timeout等待时间
        q.put_nowait(item) 至关q.put(item, False)
        q.task_done() 在完成一项工做以后,q.task_done() 函数向任务已经完成的队列发送一个信号
        q.join() 实际上意味着等到队列为空,再执行别的操做

        案例一

        # author:Dman
        # date:2019/3/30
        import queue
        
        """
        队列 queue:是线程安全的
        相比较列表:为何队列是线程安全的
        """
        import threading,queue,time,random
        
        class Production(threading.Thread):
            def run(self):
                while True:
                    r = random.randint(0,100)
                    q.put(r)
                    print('生产出来%s号包子' % r)
                    time.sleep(1)
        
        class Proces(threading.Thread):
            def run(self):
                while True:
                    re = q.get()
                    print('吃掉%s号包子'% re)
        
        
        
        if __name__ == '__main__':
            q = queue.Queue(10)
            threads = [Production(),Production(),Proces()]
            for t in threads:
                t.start()

        案例二:

        # author:Dman
        # date:2019/4/3
        #实现一个线程不断生成一个随机数到一个队列中(考虑使用Queue这个模块)
        # 实现一个线程从上面的队列里面不断的取出奇数
        # 实现另一个线程从上面的队列里面不断取出偶数
        
        import random,threading,time
        from queue import Queue
        #Producer thread
        class Producer(threading.Thread):
          def __init__(self, t_name, queue):
            threading.Thread.__init__(self,name=t_name)
            self.data=queue
          def run(self):
            for i in range(10):  #随机产生10个数字 ,能够修改成任意大小
              randomnum=random.randint(1,99)
              print ("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum))
              self.data.put(randomnum) #将数据依次存入队列
              time.sleep(1)
            print ("%s: %s finished!" %(time.ctime(), self.getName()))
        
        #Consumer thread
        class Consumer_even(threading.Thread):
          def __init__(self,t_name,queue):
            threading.Thread.__init__(self,name=t_name)
            self.data=queue
          def run(self):
            while 1:
              try:
                val_even = self.data.get(1,5) #get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒
                if val_even%2==0:
                  print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even))
                  time.sleep(2)
                else:
                  self.data.put(val_even)
                  time.sleep(2)
              except:   #等待输入,超过5秒 就报异常
                print ("%s: %s finished!" %(time.ctime(),self.getName()))
                break
        class Consumer_odd(threading.Thread):
          def __init__(self,t_name,queue):
            threading.Thread.__init__(self, name=t_name)
            self.data=queue
          def run(self):
            while 1:
              try:
                val_odd = self.data.get(1,5)
                if val_odd%2!=0:
                  print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd))
                  time.sleep(2)
                else:
                  self.data.put(val_odd)
                  time.sleep(2)
              except:
                print ("%s: %s finished!" % (time.ctime(), self.getName()))
                break
        #Main thread
        def main():
          queue = Queue()
          producer = Producer('Pro.', queue)
          consumer_even = Consumer_even('Con_even.', queue)
          consumer_odd = Consumer_odd('Con_odd.',queue)
          producer.start()
          consumer_even.start()
          consumer_odd.start()
          producer.join()
          consumer_even.join()
          consumer_odd.join()
          print ('All threads terminate!')
        
        if __name__ == '__main__':
          main()

        案例3:相比较,list不是线程安全的

        import threading,time
        
        li=[1,2,3,4,5]
        
        def pri():
            while li:
                a=li[-1]
                print(a)
                time.sleep(1)
                try:
                    li.remove(a)
                except:
                    print('----',a)
        
        t1=threading.Thread(target=pri,args=())
        t1.start()
        t2=threading.Thread(target=pri,args=())
        t2.start()

六、多进程

  1. 多进程概念

    因为GIL的存在,默认的Cpython解释器中的多线程其实并非真正的多线程,若是想要充分地使用多核CPU资源,在python中大部分须要使用多进程,Python提供了multiprocessing模块,这个模块支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。

    该模块的使用和threading模块相似,api大体相同,可是须要注意几点:

    一、在unix平台上,某个进程终结以后,该进程须要被父进程调用wait,不然进程成为僵尸进程,因此有必要对每一个process对象调用join方法(实际上等于wait),对于多线程来讲 ,因为只有一个进程,因此不存在此必要性。

    二、multiprocessing模块提供了Pipe和Queue,效率上更高,赢优先考虑Pipe和Queue,避免使用Lock、Event等同步方式(由于他们占据的不是进程的资源。)

    三、多进程应该避免共享资源,在多线程中,咱们能够比较容易的共享资源,好比使用全局变量或者传递参数,在多进程的状况下,因为每一个进程有本身独立的内存空间,以上方法不合适。此时咱们能够经过共享内存和Manager的方法来共享资源,但这样作提升了程序的复杂度。

    四、另外、在windows系统下,须要注意的是想要启动一个子进程,必须加上 if __name__ == '__main__':

  2. 建立多进程------multiprocessing.Process

    • 直接调用

      from multiprocessing import Process
      import time
      def f(name):
          time.sleep(1)
          print('hello', name,time.ctime())
      
      if __name__ == '__main__':
          p_list=[]
          for i in range(3):
              p = Process(target=f, args=('alvin',))
              p_list.append(p)
              p.start()
          for i in p_list:
              p.join()
          print('end')
    • 类的方式调用

      from multiprocessing import Process
      import time
      
      class MyProcess(Process):
          def __init__(self):
              super(MyProcess, self).__init__()
              #self.name = name
      
          def run(self):
              time.sleep(1)
              print ('hello', self.name,time.ctime())
      
      
      if __name__ == '__main__':
          p_list=[]
          for i in range(3):
              p = MyProcess()
              p.start()
              p_list.append(p)
      
          for p in p_list:
              p.join()
      
          print('end')
  3. 多进程之间的通讯,有三种方式

    • multiprocessing.Queue

      from multiprocessing import Process, Queue
      
      def f(q,n):
          q.put([42, n, 'hello'])
      
      if __name__ == '__main__':
          q = Queue()
          p_list=[]
          for i in range(3):
              p = Process(target=f, args=(q,i))
              p_list.append(p)
              p.start()
          print(q.get())
          print(q.get())
          print(q.get())
          for i in p_list:
                  i.join()
    • multiprocessing.Pipe

      from multiprocessing import Process, Pipe
      
      def f(conn):
          conn.send([42, None, 'hello'])
          conn.close()
      
      if __name__ == '__main__':
          parent_conn, child_conn = Pipe()
          p = Process(target=f, args=(child_conn,))
          p.start()
          print(parent_conn.recv())   # prints "[42, None, 'hello']"
          p.join()

      一、Pipe()函数返回一个由管道链接的链接对象,默认状况下是双工(双向)。

      二、Pipe()返回的两个链接对象表明管道的两端。 每一个链接对象都有send() 和recv()方法(以及其余方法)。 请注意,若是两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏。 固然,同时使用管道的不一样端的进程不存在损坏的风险。

    • multiprocessing.Manager

      from multiprocessing import Process, Manager
      
      def f(d, l,n):
          d[n] = '1'
          d['2'] = 2
          d[0.25] = None
          l.append(n)
          print(l)
      
      if __name__ == '__main__':
          with Manager() as manager:
              d = manager.dict()
      
              l = manager.list(range(5))
              p_list = []
              for i in range(10):
                  p = Process(target=f, args=(d, l,i))
                  p.start()
                  p_list.append(p)
              for res in p_list:
                  res.join()
      
              print(d)
              print(l)

      一、Manager()返回的管理器对象控制一个服务器进程,该进程保存Python对象并容许其余进程使用代理操做它们。

      二、Manager()返回的管理器将支持类型列表,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value和Array。

  4. 进程间同步----multiprocessing.Lock

    from multiprocessing import Process, Lock
    
    def f(l, i):
        l.acquire()
        try:
            print('hello world', i)
        finally:
            l.release()
    
    if __name__ == '__main__':
        lock = Lock()
    
        for num in range(10):
            Process(target=f, args=(lock, num)).start()

    进程间同步,只使用父进程的锁,(另外尽可能避免这种状况)

  5. 进程池----multiprocessing.Pool

    from  multiprocessing import Process,Pool
    import time
    
    def Foo(i):
        time.sleep(2)
        return i+100
    
    def Bar(arg):
        print('-->exec done:',arg)
    
    pool = Pool(5)
    
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,),callback=Bar)
        #pool.apply(func=Foo, args=(i,))
    
    print('end')
    pool.close()
    pool.join()

    进程池内部维护一个进程序列,当使用时,就去进程池中获取一个进程,若是进程池中没有可供使用的进程,那么程序就会等待,直到进程池中有可用的进程为止。

    进程池中的两个方法:

    一、apply

    二、map

    三、apply_async 是异步的,也就是在启动进程以后会继续后续的代码,不用等待进程函数返回

    四、map_async 是异步的,

    五、join语句要放在close语句后面

七、协程

  • 协程是什么?

    • 协程,又称微线程,英文名为Coroutine,协程是用户态的轻量级的线程。协程拥有本身的寄存器上下文和栈。协程调用切换时,将寄存器上下文和栈保存到其余地方,在切换回来的时候,能够恢复先前保存的寄存器上下文和栈,所以:

      协程能保留上一次调用的状态,每次过程重入的时候,就至关于进入上一次调用的状态,换种说话,进入上一次离开时所处的逻辑流的位置

      总结:

      一、协程必须在只有一个单线程里实现并发

      二、修改共享数据不须要加锁

      三、用户程序里本身保存多个控制流的上下文和栈

      四、一个协程遇到IO操做自动切换到其余线程

  • 协程的好处?

    一、无需线程上下文切换的开销

    二、无需院子操做锁定以及同步的开销(原子操做是不须要同步,所谓原子操做是指不会被线程调度机制打断的操做,也就是说该操做必须执行完毕,才能进行线程切换;原子操做能够是一个步骤,也能够是多个操做步骤)

    三、方便切换控制流,简化编程模型

    四、高并发+高扩展+低成本:一个CPU支持上万的协程都不是问题,因此很适合高并发的问题。

  • 协程的缺点

    一、没法利用多核资源:协程的本质是一个单线程,它不能同时将单个CPU的多个核用上,协程须要和进程配合才能利用多核CPU;咱们平常所编写的大部分应用没有这个必要,除非是CPU密集性应用

    二、进行阻塞操做入IO会阻塞掉整个程序

  • yield实现协程案例

    # author:Dman
    # date:2019/4/1
    import  time
    import  queue
    
    def consumer(name):
        print('---开始生产包子')
        while True:
            new_baozi = yield
            print("[%s] is eating baozi %s" % (name,new_baozi))
    
    def producer():
        next(con1)
        next(con2)
        n = 0
        while n<5:
            n += 1
            con1.send(n)
            con2.send(n)
            print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
    
    if __name__ == '__main__':
        con1 = consumer('c1')
        con2 = consumer('c2')
        p = producer()
    #------------------运行结果---------------
    ---开始生产包子
    ---开始生产包子
    [c1] is eating baozi 1
    [c2] is eating baozi 1
    [producer] is making baozi 1
    [c1] is eating baozi 2
    [c2] is eating baozi 2
    [producer] is making baozi 2
    [c1] is eating baozi 3
    [c2] is eating baozi 3
    [producer] is making baozi 3
    [c1] is eating baozi 4
    [c2] is eating baozi 4
    [producer] is making baozi 4
    [c1] is eating baozi 5
    [c2] is eating baozi 5
    [producer] is making baozi 5
  • greenlet模块支持的协程

    相比较yield,能够在任意函数之间随意切换,而不须要把这个函数先声明成为generaor。(可是它没法自动遇到IO阻塞去切换,必须手动去切换)

    from greenlet import greenlet
    
    def test1():
        print(12)
        gr2.switch()
        print(34)
        gr2.switch()
    
    
    def test2():
        print(56)
        gr1.switch()
        print(78)
    
    
    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    gr1.switch()        #调用switch去切换执行函数
    #----------执行结果----------------
    12
    56
    34
    78
  • gevent模块支持的协程

    • 理解

      使用gevent,能够得到极高的并发性能,但gevent只能在Unix/Linux下运行,在Windows下不保证正常安装和运行。(它能够在遇到IO阻塞的时候自动切换)

      因为gevent是基于IO切换的协程,因此最神奇的是,咱们编写的Web App代码,不须要引入gevent的包,也不须要改任何代码,仅仅在部署的时候,用一个支持gevent的WSGI服务器,马上就得到了数倍的性能提高。具体部署方式能够参考后续“实战”-“部署Web App”一节。

    • 简单案例

      # author:Dman
      # date:2019/4/1
      
      """
      gevent 封装了greenlet,这个不须要本身去切换,遇到io阻塞,模块会本身去切换任务。
      咱们只须要把gevent对象加到里面
      """
      import gevent
      
      
      def func1():
          print('\033[31;1m李闯在跟海涛搞...\033[0m')
          gevent.sleep(2)                     #模拟IO阻塞,自动开始切换
          print('\033[31;1m李闯又回去跟继续跟海涛搞...\033[0m')
      
      
      def func2():
          print('\033[32;1m李闯切换到了跟海龙搞...\033[0m')
          gevent.sleep(1)
          print('\033[32;1m李闯搞完了海涛,回来继续跟海龙搞...\033[0m')
      
      
      gevent.joinall([
          gevent.spawn(func1),      #将函数加到里面。
          gevent.spawn(func2),
          # gevent.spawn(func3),
      ])
      #-----------执行结果-------------
      李闯在跟海涛搞...
      李闯切换到了跟海龙搞...
      李闯搞完了海涛,回来继续跟海龙搞...
      李闯又回去跟继续跟海涛搞...
    • 同步IO和异步IO的区别

      # author:Dman
      # date:2019/4/1
      
      import gevent
      
      def task(pid):
          """
          Some non-deterministic task
          """
          gevent.sleep(0.5)
          print('Task %s done' % pid)
      
      
      def synchronous():
          for i in range(1, 10):
              task(i)
      
      
      def asynchronous():     #异步io函数
          threads = [gevent.spawn(task, i) for i in range(10)]
          gevent.joinall(threads)
      
      
      print('Synchronous:'.center(20,'-'))
      synchronous()
      
      print('Asynchronous:'.center(20,'-'))
      asynchronous()
    • 简单的异步爬虫,遇到IO阻塞会自动切换任务

      from gevent import monkey
      import time
      
      monkey.patch_all()
      # 在最开头的地方gevent.monkey.patch_all();把标准库中的thread/socket等给替换掉,
      # 这样咱们在后面使用socket的时候能够跟日常同样使用,无需修改任何代码,可是它变成非阻塞的了.
      #
      
      
      import gevent
      from urllib.request import urlopen
      
      
      def f(url):
          print('GET: %s' % url)
          resp = urlopen(url)
          data = resp.read()
          print('%d bytes received from %s.' % (len(data), url))
      
      list = ['https://www.python.org/','https://www.yahoo.com/','https://github.com/']
      start = time.time()
      # for url in l:
      #     f(url)
      
      gevent.joinall([
          gevent.spawn(f, list[0]),
          gevent.spawn(f, list[1]),
          gevent.spawn(f, list[2]),
      ])
      print(time.time()-start)
      
      #-----------输出结果---------------
      GET: https://www.python.org/
      GET: https://www.yahoo.com/
      GET: https://github.com/
      48560 bytes received from https://www.python.org/.
      82655 bytes received from https://github.com/.
      536556 bytes received from https://www.yahoo.com/.
      3.361192226409912

八、事件驱动和异步IO

相关文章
相关标签/搜索