multiprocessor(中)

 

1、进程同步(锁)

  • 经过以前的学习,咱们想方设法实现了程序的异步,让多个任务能够同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受咱们控制。尽管并发编程让咱们能更加充分的利用IO资源,可是也给咱们带来了新的问题:进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。html

  • 案例一、多个进程抢占输出资源,致使打印混乱实例node

     
     
     
    x
     
     
     
     
    import os
    import time
    import random
    from multiprocessing import Process
    def work(n):
        print('%s: %s is running' %(n,os.getpid()))
        time.sleep(random.random())
        print('%s:%s is done' %(n,os.getpid()))
    if __name__ == '__main__':
        for i in range(5):
            p=Process(target=work,args=(i,))
            p.start()
    # 看结果:经过结果能够看出两个问题:】
    #问题一:每一个进程中work函数的第一个打印就不是按照咱们for循环的0-4的顺序来打印的
    #问题二:咱们发现,每一个work进程中有两个打印,可是咱们看到全部进程中第一个打印的顺序为0-2-1-4-3,可是第二个打印没有按照这个顺序,变成了2-1-0-3-4,说明咱们一个进程中的程序的执行顺序都混乱了。
    #问题的解决方法,第二个问题加锁来解决,第一个问题是没有办法解决的,由于进程开到了内核,有操做系统来决定进程的调度,咱们本身控制不了
    # 0: 9560 is running
    # 2: 13824 is running
    # 1: 7476 is running
    # 4: 11296 is running
    # 3: 14364 is running
    # 2:13824 is done
    # 1:7476 is done
    # 0:9560 is done
    # 3:14364 is done
    # 4:11296 is done
    #经过加锁解决第二个问题
    #由并发变成了串行,牺牲了运行效率,但避免了竞争
    def work(n,lock):
        #加锁,保证每次只有一个进程在执行锁里面的程序,这一段程序对于全部写上这个锁的进程,你们都变成了串行
        lock.acquire()
        print('%s: %s is running' %(n,os.getpid()))
        time.sleep(1)
        print('%s:%s is done' %(n,os.getpid()))
        #解锁,解锁以后其余进程才能去执行本身的程序
        lock.release()
        
        #注意这里可使用with上下位处理的形式,而且这里的with还为咱们实现了报错处理机制,若是使用上面的方式,一旦报错,子进程将称为孤儿进程
       # with lock:
            #print('%s: %s is running' %(n,os.getpid()))
       #time.sleep(1)
        #print('%s:%s is done' %(n,os.getpid()))
            
    if __name__ == '__main__':
        lock=Lock()
        for i in range(5):
            p=Process(target=work,args=(i,lock))
            p.start()
    #打印结果:
    # 2: 10968 is running
    # 2:10968 is done
    # 0: 7932 is running
    # 0:7932 is done
    # 4: 4404 is running
    # 4:4404 is done
    # 1: 12852 is running
    # 1:12852 is done
    # 3: 980 is running
    # 3:980 is done
    #结果分析:(本身去屡次运行一下,看看结果,我拿出其中一个结果来看)经过结果咱们能够看出,多进程刚开始去执行的时候,每次运行,首先打印出来哪一个进程的程序是不固定的,可是咱们解决了上面打印混乱示例代码的第二个问题,那就是同一个进程中的两次打印都是先完成的,而后才切换到下一个进程去,打印下一个进程中的两个打印结果,说明咱们控制住了同一进程中的代码执行顺序,若是涉及到多个进程去操做同一个数据或者文件的时候,就不担忧数据算错或者文件中的内容写入混乱了。
     
  • 锁的应用场景:当多个进程须要操做同一个文件/数据库的时候 ,会产生数据不安全,咱们应该使用锁来避免多个进程同时修改一个文件python

  • 特色:web

    • 1.牺牲了效率 保证了数据的安全 2.用户就会以为很慢 体验不好
  • 案例二、并发运行,效率高,可是竞争同一个文件,致使数据混乱数据库

     
     
     
    xxxxxxxxxx
     
     
     
     
    #注意:首先在当前文件目录下建立一个名为db的文件
    #文件db的内容为:{"count":1},只有这一行数据,而且注意,每次运行完了以后,文件中的1变成了0,你须要手动将0改成1,而后在去运行代码。注意必定要用双引号,否则json没法识别
    from multiprocessing import Process,Lock
    import time,json,random
    #查看剩余票数
    def search():
        dic=json.load(open('db')) #打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典
        print('\033[43m剩余票数%s\033[0m' %dic['count'])
    def get():  #抢票
        dic=json.load(open('db'))
        time.sleep(0.1)       #模拟读数据的网络延迟,那么进程之间的切换,致使全部人拿到的字典都是{"count": 1},也就是每一个人都拿到了这一票。
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(0.2)   #模拟写数据的网络延迟
            json.dump(dic,open('db','w'))
            #最终结果致使,每一个人显示都抢到了票,这就出现了问题~
            print('\033[43m购票成功\033[0m')
    def task():
        search()
        get()
    if __name__ == '__main__':
        for i in range(3): #模拟并发100个客户端抢票
            p=Process(target=task)
            p.start()
    #看结果分析:因为网络延迟等缘由使得进程切换,致使每一个人都抢到了这最后一张票
    # 剩余票数1
    # 剩余票数1
    # 剩余票数1
    # 购票成功
    # 购票成功
    # 购票成功
    #加锁版本
    def search():
        dic=json.load(open('db')) #打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典
        print('\033[43m剩余票数%s\033[0m' %dic['count'])
    def get():  #抢票
        dic=json.load(open('db'))
        time.sleep(0.1)       #模拟读数据的网络延迟,那么进程之间的切换,致使全部人拿到的字典都是{"count": 1},也就是每一个人都拿到了这一票。
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(0.2)   #模拟写数据的网络延迟
            json.dump(dic,open('db','w'))
            #最终结果致使,每一个人显示都抢到了票,这就出现了问题~
            print('\033[43m购票成功\033[0m')
        else:
            print('sorry,没票了亲!')
    def task(lock):
        search()
        #由于抢票的时候是发生数据变化的时候,全部咱们将锁加加到这里
        lock.acquire()
        get()
        lock.release()
    if __name__ == '__main__':
        lock = Lock() #建立一个锁
        for i in range(3): #模拟并发100个客户端抢票
            p=Process(target=task,args=(lock,)) #将锁做为参数传给task函数
            p.start()
    #看结果分析:只有一我的抢到了票
    # 剩余票数1
    # 剩余票数1
    # 剩余票数1
    # 购票成功   #幸运的人儿
    # sorry,没票了亲!
    # sorry,没票了亲!
     

    1555407596609

  • 进程锁的总结:编程

 
 
 
xxxxxxxxxx
 
 
 
 
加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然能够用文件共享数据实现进程间通讯,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.须要本身加锁处理
所以咱们最好找寻一种解决方案可以兼顾:
一、效率高(多个进程共享一块内存的数据)
二、帮咱们处理好锁问题。
这就是mutiprocessing模块为咱们提供的基于消息的IPC通讯机制:队列和管道
 

2、队列和管道

  • 咱们经过上面的学习知道锁实际上是基于文件的基础上实现的,也就是当咱们要读写一个数据时,须要涉及到使用锁,来约束多个进程之间的秩序 。可是效率底,因此咱们想改变这种文件类型的操做,那么咱们将学习使用基于socket和管道实现的高效方式:队列json

  • 队列和管道都是将数据存放于内存中。windows

    • 队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来, 咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。安全

      1555408563614

    • IPC通讯机制:Inter Process Communication,进程间通讯或者跨进程通讯,是指两个进程之间进行数据交换的过程。IPC不是某个系统所独有的,任何一个操做系统都须要有相应的IPC机制, 好比Windows上能够经过剪贴板、管道和邮槽等来进行进程间通讯,而Linux上能够经过命名共享内容、信号量等来进行进程间通讯。Android它也有本身的进程间通讯方式,Android建构在Linux基础上,继承了一部分Linux的通讯方式网络

  • 进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。队列就像一个特殊的列表,可是能够设置固定长度,而且从前面插入数据,从后面取出数据,先进先出

  • 队列的建立方式:

    •  
       
       
      xxxxxxxxxx
       
       
       
       
      Queue([maxsize]) 建立共享的进程队列。
      参数 :maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。底层队列使用管道和锁实现。
       
  • 队列须要记住的方法是:

     
     
     
    xxxxxxxxxx
     
     
     
     
    q = Queue([maxsize]) 
    建立共享的进程队列。maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还须要运行支持线程以便队列中的数据传输到底层管道中。 
    Queue的实例q具备如下方法:
    q.get( [ block [ ,timeout ] ] ) 
    返回q中的一个项目。若是q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 若是设置为False,将引起Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。若是在制定的时间间隔内没有项目变为可用,将引起Queue.Empty异常。若是没有值就会等到天荒地老
    q.get_nowait( ) 
    同q.get(False)方法。
    q.put(item [, block [,timeout ] ] ) 
    将item放入队列。若是队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。若是设置为False,将引起Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引起Queue.Full异常。若是设置了最大的容量,put只能一直等
    q.qsize() 
    返回队列中目前项目的正确数量。此函数的结果并不可靠,由于在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引起NotImplementedError异常。
    q.empty() 
    若是调用此方法时 q为空,返回True。若是其余进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
    q.full() 
    若是q已满,返回为True. 因为线程的存在,结果也多是不可靠的(参考q.empty()方法)。。
    q.close() 
    关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,若是某个使用者正被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误。
    q.cancel_join_thread() 
    不会再进程退出时自动链接后台线程。这能够防止join_thread()方法阻塞。
    q.join_thread() 
    链接队列的后台线程。此方法用于在调用q.close()方法后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread()方法能够禁止这种行为
     
  • 队列代码实例

     
     
     
    xxxxxxxxxx
     
     
     
     
    from multiprocessing import Queue # 此Queue 是进程队列,区别于from queue import Queue
    q=Queue(3) #建立一个队列对象,队列长度为3
    #put ,get ,put_nowait,get_nowait,full,empty
    q.put(3)   #往队列中添加数据
    q.put(2)
    q.put(1)
    # q.put(4)   # 若是队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
                # 若是队列中的数据一直不被取走,程序就会永远停在这里。天荒地老
    try:
        q.put_nowait(4) # 可使用put_nowait,若是队列满了不会阻塞,可是会由于队列满了而报错。
    except: # 所以咱们能够用一个try语句来处理这个错误。这样程序不会一直阻塞下去,可是会丢掉这个消息。
        print('队列已经满了')
    # 所以,咱们再放入数据以前,能够先看一下队列的状态,若是已经满了,就不继续put了。
    print(q.full()) #查看是否满了,满了返回True,不满返回False
    print(q.get())  #取出数据
    print(q.get())
    print(q.get())
    # print(q.get()) # 同put方法同样,若是队列已经空了,那么继续取就会出现阻塞。天荒地老
    try:
        q.get_nowait(3) # 可使用get_nowait,若是队列满了不会阻塞,可是会由于没取到值而报错。
    except: # 所以咱们能够用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
        print('队列已经空了')
    print(q.empty()) #空了
    #看下面的队列的时候 按照编号看
    import time
    from multiprocessing import Process, Queue
    def f(q):
        # q = Queue() #9. 咱们在主进程中开启了一个q,若是咱们在子进程中的函数里面再开一个q,那么你下面q.put('姑娘,多少钱~')添加到了新建立的这q里里面了
        q.put('姑娘,多少钱~')  #4.调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。
        print(q.qsize()) #6.查看队列中有多少条数据了 # 2 这里有不肯定性,有时候这里为1
    def f2(q):
        print('》》》》》》》》')
        print(q.get())  #5.取数据
    if __name__ == '__main__':
        q = Queue() #1.建立一个Queue对象
        q.put('小鬼')
        p = Process(target=f, args=(q,)) #2.建立一个进程
        p2 = Process(target=f2, args=(q,)) #3.建立一个进程
        p.start()
        p2.start()
        print(q.qsize())
        time.sleep(1) #7.若是阻塞一点时间,就会出现主进程运行太快,致使咱们在子进程中查看qsize为1个。
        print(q.get()) #结果:姑娘,多少钱~
        p.join() 
     # 以上代码证实这个队列是能够实现进程之间的数据共享的
    #一个复杂一点的例子
    import os
    import time
    import multiprocessing
    # 向queue中输入数据的函数
    def inputQ(queue):
        info = str(os.getpid()) + '(put):' + str(time.asctime())
        queue.put(info)
    # 向queue中输出数据的函数
    def outputQ(queue):
        info = queue.get()
        print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))
    # Main
    if __name__ == '__main__':
        #windows下,若是开启的进程比较多的话,程序会崩溃,为了防止这个问题,使用freeze_support()方法来解决。知道就行啦
        multiprocessing.freeze_support()
        record1 = []   # store input processes
        record2 = []   # store output processes
        queue = multiprocessing.Queue(3)
        # 输入进程
        for i in range(1,10):  
            process = multiprocessing.Process(target=inputQ,args=(queue,))
            process.start()
            record1.append(process)
        # 输出进程
        for i in range(10):
            process = multiprocessing.Process(target=outputQ,args=(queue,))
            process.start()
            record2.append(process)
        for p in record1:
            p.join()
        for p in record2:
            p.join()
            
      #队列是进程安全的:同一时间只能一个进程拿到队列中的一个数据,你拿到了一个数据,这个数据别人就拿不到了。
     

3、生产者消费者模型

  • 在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度,来是整个生产和消费达到最大化和平衡。

  • 为何要使用生产者和消费者模型:

    • 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。
  • 什么是生产者消费者模型:

    • 生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力,而且我能够根据生产速度和消费速度来均衡一下多少个生产者能够为多少个消费者提供足够的服务,就能够开多进程等等,而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据。

1555412939184

img

  • 基于队列来实现一个生产者消费者模型:

     
     
     
    xxxxxxxxxx
     
     
     
     
    #生产者消费者模型总结
        #程序中有两类角色
            一类负责生产数据(生产者)
            一类负责处理数据(消费者)  
        #引入生产者消费者模型为了解决的问题是:
            平衡生产者与消费者之间的工做能力,从而提升程序总体处理数据的速度   
        #如何实现:
            生产者<-->队列<——>消费者
        #生产者消费者模型实现类程序的解耦和
    from multiprocessing import Process,Queue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
    def producer(q):
        for i in range(1,10):
            time.sleep(random.randint(1,3))
            res='包子%s' %i
            q.put(res)
            print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
    if __name__ == '__main__':
        q=Queue()
        #生产者们:即厨师们
        p1=Process(target=producer,args=(q,))
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,))
        p1.start() #开始
        c1.start()
        print('主')  #可是这里有个问题,while True的子进程并未结束,经过上面基于队列的生产者消费者代码示例,咱们发现一个问题:主进程永远不会结束,缘由是:生产者p在生产完后就结束了,可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。
        
     #改进版本,解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环:子进程生产者在生产完毕后发送结束信号None。
    from multiprocessing import Process,Queue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            if res is None:break #收到结束信号则结束
            time.sleep(random.randint(1,3))
            print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
    def producer(q):
        for i in range(1,5):
            time.sleep(random.randint(1,3))
            res='包子%s' %i
            q.put(res)
            print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
        q.put(None) #在本身的子进程的最后加入一个结束信号
    if __name__ == '__main__':
        q=Queue()
        #生产者们:即厨师们
        p1=Process(target=producer,args=(q,))
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,))
        #开始
        p1.start()
        c1.start()
        print('主')
    #注意:结束信号None,不必定要由生产者发,主进程里一样能够发,但主进程须要等生产者结束后才应该发送该信号
    import time
    import random
    from multiprocessing import Process,Queue
    def producer(q):
        for i in range(1,10): #不能是 put(0) 就会直接结束一个进程由于if not food
            time.sleep(random.random())
            food = '泔水%s'%i
            print('%s生产了%s'%('taibai',food))
            q.put(food)
    def consumer(q,name):
        while True:
            food = q.get()   # food = 食物/None
            if not food : break
            time.sleep(random.uniform(1,2))
            print('%s 吃了 %s'%(name,food))
    if __name__ == '__main__':
        q = Queue()
        p1 = Process(target=producer,args=(q,))
        p1.start()
        c1 = Process(target=consumer,args=(q,'alex'))
        c1.start()
        c2 = Process(target=consumer,args=(q,'wusir'))
        c2.start()
        p1.join()  #必须等待p1执行完
        q.put(None) # 这里须要注意的是,又有几个消费者(while True)就要发几回None,不然会出现孤儿进程
        q.put(None)
    #但上述解决方式,在有多个生产者和多个消费者时,因为队列咱们说了是进程安全的,我一个进程拿走告终束信号,另一个进程就拿不到了,还须要多发送一个结束信号,有几个取数据的进程就要发送几个结束信号,咱们则须要用一个很low的方式去解决。
    from multiprocessing import Process,Queue
    import random
    import time
    def producer(name,q):
        for i in range(1,7):
            # 处理数据延迟
         &nbnbsp;  time.sleep(random.random())
            data= 'data %s'%i
            q.put(data)
            print('生产者 %s 生产了数据: %s'%(name,data))
    def consumer(name,q):
        while 1:
            data= q.get()
            if data is None:  #当接受到None是跳出循环
                print('\033[31;1m %s 消费完了 \033[0m'%name)
                break
            print('消费者 %s 收到数据: %s'%(name,data))
    if __name__ =='__main__':
        q=Queue()
        pro_name=['alex','wusir','wang']
        con_name=['aaa','bb','c','ddd']
        pro_lis=[]
        for i in pro_name:  # 将全部的生产者写入列表,而且启动
            p=Process(target=producer,args=(i,q))
            p.start()
            pro_lis.append(p)
        for i in con_name:  # 启动消费者的进程
            p=Process(target=consumer,args=(i,q))
            p.start()
        for i in pro_lis:  # 等待每一个进程阻塞,知道结束完
            i.join()
        for i in range(len(con_name)):  #发送四个None结束相应的消费者
            q.put(None)
     

     

4、JoinableQueue 队列

  •  
     
     
    xxxxxxxxxx
     
     
     
     
    JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
    #参数介绍:
        maxsize是队列中容许最大项数,省略则无大小限制。    
      #方法介绍:
        JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:
        q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常
        q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止,也就是队列中的数据所有被get拿走了。
     

1555420266275

  • 使用JoinableQueue实现生产消费者模型

    from multiprocessing import Process,JoinableQueue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            # time.sleep(random.randint(1,3))
            time.sleep(random.random())
            print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
            q.task_done() #向q.join()发送一次信号,证实一个数据已经被取走并执行完了,减一
    
    def producer(name,q):
        for i in range(10):
            # time.sleep(random.randint(1,3))
            time.sleep(random.random())
            res='%s%s' %(name,i)
            q.put(res)
            print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
        print('%s生产结束'%name)
        q.join() #生产完毕,使用此方法进行阻塞,直到队列中全部项目均被处理。
        print('%s生产结束~~~~~~'%name)
    
    if __name__ == '__main__':
        q=JoinableQueue()
        #生产者们:即厨师们
        p1=Process(target=producer,args=('包子',q))
        p2=Process(target=producer,args=('骨头',q))
        p3=Process(target=producer,args=('泔水',q))
    
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,))
        c2=Process(target=consumer,args=(q,))
        c1.daemon=True #若是不加守护,那么主进程结束不了,可是加了守护以后,必须确保生产者的内容生产完而且被处理完了,全部必须还要在主进程给生产者设置join,才能确保生产者生产的任务被执行完了,而且可以确保守护进程在全部任务执行完成以后才随着主进程的结束而结束。
        c2.daemon=True
    
        #开始
        p_l=[p1,p2,p3,c1,c2]
        for p in p_l:
            p.start()
    
        p1.join() #我要确保你的生产者进程结束了,生产者进程的结束标志着你生产的全部的人任务都已经被处理完了
        p2.join()
        p3.join()
        print('主')
        
        # 主进程等--->p1,p2,p3等---->c1,c2
        # p1,p2,p3结束了,证实c1,c2确定全都收完了p1,p2,p3发到队列的数据
        # 于是c1,c2也没有存在的价值了,不须要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,因此设置成守护进程就能够了。
    

    1555421072044

5、管道

  • 进程间通讯(IPC)方式二:管道(不推荐使用,了解便可),会致使数据不安全的状况出现,后面咱们会说到为何会带来数据 不安全的问题。

    • 管道介绍:

      #建立管道的类:
      Pipe([duplex]):在进程之间建立一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的链接对象,强调一点:必须在产生Process对象以前产生管道
      #参数介绍:
      dumplex:默认管道是全双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送。
      #主要方法:
          conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另一端已经关闭,那么recv方法会抛出EOFError。
          conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象
       #其余方法:
      conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法
      conn1.fileno():返回链接使用的整数文件描述符
      conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。
       
      conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。
      conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收    
       
      conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。
      
    • 管道的使用

      from multiprocessing import Process, Pipe
      def f(conn):
          conn.send("Hello 妹妹") #子进程发送了消息
          conn.close()
      
      if __name__ == '__main__':
          parent_conn, child_conn = Pipe() #创建管道,拿到管道的两端,双工通讯方式,两端均可以收发消息
          p = Process(target=f, args=(child_conn,)) #将管道的一段给子进程
          p.start() #开启子进程
          print(parent_conn.recv()) #主进程接受了消息
          p.join()
      

      img

    • 应该特别注意管道端点的正确管理问题。若是是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为什么在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。若是忘记执行这些步骤,程序可能在消费者中的recv()操做上挂起(就是阻塞)。管道是由操做系统进行引用计数的,必须在全部进程中关闭管道的相同一端就会能生成EOFError异常。所以,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

      • 如下会触发EOFError报错

        from multiprocessing import Process, Pipe
        
        def f(parent_conn,child_conn):
            #parent_conn.close() #不写close将不会引起EOFError
            while True:
                try:
                    print(child_conn.recv())
                except EOFError:
                    child_conn.close()
                    break
        
        if __name__ == '__main__':
            parent_conn, child_conn = Pipe()
            p = Process(target=f, args=(parent_conn,child_conn,))
            p.start()
            child_conn.close()
            parent_conn.send('hello')
            parent_conn.close()
            p.join()    
            
         #主进程将管道的两端都传送给子进程,子进程和主进程共用管道的两种报错状况,都是在recv接收的时候报错的:
            1.主进程和子进程中的管道的相同一端都关闭了,出现EOFError;
            2.若是你管道的一端在主进程和子进程中都关闭了,可是你还用这个关闭的一端去接收消息,那么就会出现OSError;
            因此你关闭管道的时候,就容易出现问题,须要将全部只用这个管道的进程中的两端所有关闭才行。固然也能够经过异常捕获(try:except EOFerror)来处理。
            虽然咱们在主进程和子进程中都打印了一下conn1一端的对象,发现两个再也不同一个地址,可是子进程中的管道和主进程中的管道仍是能够通讯的,由于管道是同一套,系统可以记录
        
相关文章
相关标签/搜索