Python并发编程之多进程

Python并发编程之多进程

1、什么是进程

进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。html

进程是资源分配的基本单位python

进程有:代码段,数据段,进程控制块(PCB)组成git

2、进程与程序的区别

程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。github

举例:算法

想象一位有一手好厨艺的计算机科学家正在为他的女儿烘制生日蛋糕。编程

他有作生日蛋糕的食谱,json

厨房里有所需的原料:面粉、鸡蛋、韭菜,蒜泥等。windows

在这个比喻中:数组

作蛋糕的食谱就是程序(即用适当形式描述的算法)安全

计算机科学家就是处理器(cpu)

而作蛋糕的各类原料就是输入数据

进程就是厨师阅读食谱、取来各类原料以及烘制蛋糕等一系列动做的总和

须要强调的是:同一个程序执行两次,那也是两个进程,好比打开暴风影音,虽然都是同一个软件,可是一个能够播放苍井空,一个能够播放饭岛爱。

3、并发与并行

不管是并行仍是并发,在用户看来都是'同时'运行的,无论是进程仍是线程,都只是一个任务而已,真是干活的是cpu,cpu来作这些任务,而一个cpu同一时刻只能执行一个任务

  1. 并发:在同一个时间段内多个任务同时进行,伪并行,即看起来是同时运行。单个cpu+多道技术就能够实现并发(并行也属于并发)

    举例:

    你是一个cpu,你同时谈了三个女友,每个均可以是一个恋爱任务,你被这三个任务共享,要玩出并发恋爱的效果,应该是你先跟女朋友1去看电影,看了一会说:很差,我要拉肚子,而后跑去跟第二个女朋友吃饭,吃了一会说:那啥,我去趟洗手间,而后跑去跟女朋友3开了个房

  2. 并行:在同一个时间点上多个任务同时进行,同时运行,只有具有多个cpu才能实现并行

    单核下,能够利用多道技术,多个核,每一个核也均可以利用多道技术(多道技术是针对单核而言的

    举例:

    有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4, 一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术,而一旦任务1的I/O结束了,操做系统会从新调用它(需知进程的调度、分配给哪一个cpu运行,由操做系统说了算),可能被分配给四个cpu中的任意一个去执行

4、同步、异步、阻塞、非阻塞

  1. 同步

    同步:某一个任务的执行必须依赖于另外一个任务的返回结果

    所谓同步,就是在发出一个功能调用时,在没有获得结果以前,该调用就不会返回。按照这个定义,其实绝大多数函数都是同步调用。可是通常而言,咱们在说同步、异步的时候,特指那些须要其余部件协做或者须要必定时间完成的任务

  2. 异步

    异步:某一个任务的执行,不须要依赖于另外一个任务的返回,只须要告诉另外一个任务一声

    异步的概念和同步相对。当一个异步功能调用发出后,调用者不能马上获得结果。当该异步功能完成后,经过状态、通知或回调来通知调用者。

  3. 阻塞

    阻塞:程序由于相似于IO等待、等待事件等致使没法继续执行。

    阻塞调用是指调用结果返回以前,当前线程会被挂起(如遇到io操做)。函数只有在获得结果以后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不一样的。对于同步调用来讲,不少时候当前线程仍是激活的,只是从逻辑上当前函数没有返回而已。

  4. 非阻塞

    程序遇到相似于IO操做时,再也不阻塞等待,若是没有及时的处理IO,就报错或者跳过等其余操做

    非阻塞和阻塞的概念相对应,指在不能马上获得结果以前也会马上返回,同时该函数不会阻塞当前线程。

5、进程的基本状态

进程的三大基本状态:

  1. 就绪状态:全部进程须要的资源都获取到了,等待着CPU的调用
  2. 执行状态:获取到了全部资源包括CPU,进程处于运行状态
  3. 阻塞状态:程停滞再也不运行,放弃了CPU,进程此时处于内存里

6、multiprocessing模块介绍

Python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu_count()查看),在Python中大部分状况须要使用多进程。Python提供了multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。

multiprocessing模块的功能众多:支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。

须要再次强调的一点是:与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

7、Process类的介绍

  1. 建立进程的类

    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动)
    
    强调:
    1. 须要使用关键字的方式来指定参数
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
  2. 参数介绍

    group参数未使用,值始终为None
    
    target表示调用对象,即子进程要执行的任务
    
    args表示调用对象的位置参数元组,args=(1,2,'egon',)
    
    kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
    
    name为子进程的名称
  3. 方法介绍

    p.start():启动进程,并调用该子进程中的p.run() 
     p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法  
    
     p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁
     p.is_alive():若是p仍然运行,返回True
    
     p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
  4. 属性介绍

    p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置
    
    p.name:进程的名称
    
    p.pid:进程的pid
    
    p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束
    
    p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功

8、Process类的使用

注意:在windows中Process()必须放到# if name == 'main':下

  1. 建立并开启子进程的两种方式

    from multiprocessing import Process
    import os
    
    
    def child_process():
        print("这是子进程{0},父进程是{1}".format(os.getpid(), os.getppid()))
    
    if __name__ == '__main__':
        child_p = Process(target=child_process)
        child_p.start()
    
        # child_p.join()
        print("这是父进程{0}".format(os.getpid()))
    from multiprocessing import Process
    import os
    
    
    class ChildProcess(Process):
        def __init__(self):
            super(ChildProcess, self).__init__()
    
        def run(self):
            print("这是子进程{0},父进程是{1}".format(os.getpid(), os.getppid()))
    
    
    if __name__ == '__main__':
        child_p = ChildProcess()
        child_p.start()
    
        # child_p.join()
        print("这是父进程{0}".format(os.getpid()))
  2. 进程之间的内存空间是隔离的

    from multiprocessing import Process
    import os
    
    num = 100
    
    def chile_process():
        global num
        num = 0
        print("子进程中:{0}".format(num))
    
    if __name__ == '__main__':
        p = Process(target=chile_process)
        p.start()
    
        print("父进程中:{0}".format(num))
    
    # 父进程中:100
    # 子进程中:0
  3. Process中的join()方法

    join():主进程等待,等待子进程结束

    from multiprocessing import Process
    import os
    import time
    
    def child_process():
        time.sleep(3)
        print("这是子进程")
    
    
    if __name__ == '__main__':
        p = Process(target=child_process)
        p.start()
        # p.join()
    
        print("这是主进程")
    
    # 这是主进程
    # 这是子进程
    # 分析:若是不加join那么则是先打印主进程中的“这是主进程”,而后等待三秒在打印“这是子进程”
    
    
    from multiprocessing import Process
    import os
    import time
    
    def child_process():
        time.sleep(3)
        print("这是子进程")
    
    
    if __name__ == '__main__':
        p = Process(target=child_process)
        p.start()
        p.join()
    
        print("这是主进程")
    # 这是子进程
    # 这是主进程
    # 分析:若是加了join那么主进程会等待子进程执行完以后再执行主进程,也就是说会先等待三秒而后同时打印出“这是子进程”和“这是主进程”

9、守护进程

守护进程的特色:

  1. 守护进程会在主进程执行完成后终止

  2. 设置了守护进程后,守护进程不能再开启子进程,不然会报异常

    from multiprocessing import Process
    import time
    
    def func(name):
        time.sleep(1)
        print("我是{0}".format(name))
    
    
    def foo(name):
        time.sleep(3)
        print("{0}是谁".format(name))
    
    
    if __name__ == '__main__':
        p1 = Process(target=func, args=("oldwang",))
        p2 = Process(target=foo, args=("oldwang",))
    
        p1.daemon = True
        # 必定要在p.start()前设置,设置p为守护进程,禁止p建立子进程,而且父进程代码执行结束,p即终止运行
        p1.start()
        p2.start()
    
        print("这是主进程...")
    
    # 执行结果:
    # 这是主进程...
    # oldwang是谁

10、进程同步

进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的,

而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理

# 模拟抢票,购票行为由并行变成了串行,牺牲了效率,提升了数据安全性
from multiprocessing import Process, Lock
import json
import time
import os

def search_ticket():
    with open("file/ticket", mode="r", encoding="utf-8") as f:
        ticket_num = int(f.read())
        print("剩余票数:{0}".format(ticket_num))

def get_ticket():
    with open("file/ticket", mode="r", encoding="utf-8") as f:
        ticket = int(f.read())
        time.sleep(0.1)  # 模拟抢票延时
        if ticket:
            ticket -= 1
            print("{0}抢到了一张票,还剩{1}张票".format(os.getpid(), ticket))

        else:
            print("{0}没有抢到票".format(os.getpid()))

    f = open("file/ticket", mode="w", encoding="utf-8")
    f.write(str(ticket))

def task(lock):
    search_ticket()
    lock.acquire()
    get_ticket()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(100):
        p = Process(target=task, args=(lock,))
        p.start()

总结:加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然能够用文件共享数据实现进程间通讯,但问题是:

  1. 效率低(共享数据基于文件,而文件是硬盘上的数据)
  2. 须要本身加锁处理

11、队列(推荐使用)

进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

  1. Queue(底层就是以管道和锁的方式实现

    方法介绍

    maxsize是队列中容许最大项数,省略则无大小限制。    
    
    q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常。
    
    q.get方法能够从队列读取而且删除一个元素。一样,get方法有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常.
    
    q.get_nowait():同q.get(False)
    q.put_nowait():同q.put(False)
    
    q.empty():调用此方法时q为空则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中又加入了项目。
    q.full():调用此方法时q已满则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中的项目被取走。
    q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()同样
    
    q.cancel_join_thread():不会在进程退出时自动链接后台线程。能够防止join_thread()方法阻塞
    
    q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,若是某个使用者正在被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误。
    
    q.join_thread():链接队列的后台线程。此方法用于在调用q.close()方法以后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread方法能够禁止这种行为
    from multiprocessing import Queue
    
    q = Queue(maxsize=3)
    
    q.put(1)
    q.put({"name":"dogfa"})
    q.put([1,2,3])
    
    print(q.full())      # True
    
    print(q.get())       # 1
    print(q.get())       # {'name': 'dogfa'}
    print(q.get())       # [1, 2, 3]
    
    print(q.empty()) # True
  2. 生产者消费者模型

    在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。

  3. 为何使用生产者消费者模式

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

  4. 什么是生产者消费者模式

    生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

  5. JoinableQueue()

    #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
    
       #参数介绍:
        maxsize是队列中容许最大项数,省略则无大小限制。    
      #方法介绍:
        JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:
        q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常
        q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止
  6. 生产者消费者模型的实现

    from multiprocessing import JoinableQueue, Process
    import os
    import random
    import time
    
    def customer(q):
        while 1:
            time.sleep(0.5)
            print("{0}号顾客吃了{1}".format(os.getpid(), q.get()))
            q.task_done()
    
    
    def producter(food, q):
        for i in range(10):
            time.sleep(random.randint(1, 2))
            q.put(food)
            print("{0}号厨师完成了{1}的制做".format(os.getpid(), food))
    
        q.join()
    
    
    if __name__ == '__main__':
        q = JoinableQueue()
    
        pro1 = Process(target=producter, args=("包子", q))
        pro2 = Process(target=producter, args=("油条", q))
        pro3 = Process(target=producter, args=("花卷", q))
    
        cus1 = Process(target=customer, args=(q,))
        cus2 = Process(target=customer, args=(q,))
    
        cus1.daemon = True
        cus2.daemon = True
        lst = [pro1, pro2, pro3, cus1, cus2]
        [i.start() for i in lst]
    
        pro1.join()
        pro2.join()
        pro3.join()
    
        print("ending...")
        # 主进程等待pro1,Pro2,pro3执行完成,当pro执行完成意味着cus一定执行完成,因此能够将cus设置成守护进程
  7. 生产者消费者模式总结

    #程序中有两类角色
            一类负责生产数据(生产者)
            一类负责处理数据(消费者)
    
        #引入生产者消费者模型为了解决的问题是:
            平衡生产者与消费者之间的工做能力,从而提升程序总体处理数据的速度
    
        #如何实现:
            生产者<-->队列<——>消费者
        #生产者消费者模型实现类程序的解耦和

12、管道(不推荐使用)

  1. 管道

    #建立管道的类:
    Pipe([duplex]):在进程之间建立一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的链接对象,强调一点:必须在产生Process对象以前产生管道
    #参数介绍:
    dumplex:默认管道是全双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送。
    #主要方法:
        conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另一端已经关闭,那么recv方法会抛出EOFError。
        conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象
     #其余方法:
    conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法
    conn1.fileno():返回链接使用的整数文件描述符
    conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。
    
    conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。
    conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收    
    
    conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。
  2. 利用管道实现进程间的通讯

    from multiprocessing import Process,Pipe
    
    import time,os
    def consumer(p,name):
        left,right=p
        left.close()
        while True:
            try:
                baozi=right.recv()
                print('%s 收到包子:%s' %(name,baozi))
            except EOFError:
                right.close()
                break
    def producer(seq,p):
        left,right=p
        right.close()
        for i in seq:
            left.send(i)
            # time.sleep(1)
        else:
            left.close()
    if __name__ == '__main__':
        left,right=Pipe()
    
        c1=Process(target=consumer,args=((left,right),'c1'))
        c1.start()
    
    
        seq=(i for i in range(10))
        producer(seq,(left,right))
    
        right.close()
        left.close()
    
        c1.join()
        print('主进程')

十3、数据共享

进程间数据是独立的,能够借助于队列或管道实现通讯,两者都是基于消息传递的,虽然进程间数据独立,但能够经过Manager实现数据共享

from multiprocessing import Process, Manager, Lock
import os
import random
import time


def func(dic, lock):
    lock.acquire()  # 不加锁确定会形成数据混乱
    time.sleep(random.randrange(2))
    dic["count"] -= 1
    lock.release()



if __name__ == '__main__':
    lock = Lock()
    with Manager() as m:
        dic = m.dict({"count": 100})
        lst = []
        for i in range(100):
            p = Process(target=func, args=(dic, lock))
            lst.append(p)
            p.start()

        [i.join() for i in lst]
        print(dic["count"])

进程间通讯应该尽可能避免使用上述共享数据的方式

十3、进程池

在利用Python进行系统管理的时候,特别是同时操做多个文件目录,或者远程控制多台主机,并行操做能够节约大量的时间。多进程是实现并发的手段之一,须要注意的问题是:

  1. 很明显须要并发执行的任务一般要远大于核数
  2. 一个操做系统不可能无限开启进程,一般有几个核就开几个进程
  3. 进程开启过多,效率反而会降低(开启进程是须要占用系统资源的,并且开启多余核数目的进程也没法作到并行)

若是当被操做对象数目不大时,能够直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但若是是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时能够发挥进程池的功效。咱们就能够经过维护一个进程池来控制进程数目。

ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool能够提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,若是池尚未满,那么就会建立一个新的进程用来执行该请求;但若是池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个进程去执行全部任务,不会开启其余进程

  1. 建立进程池

    Pool([numprocess  [,initializer [, initargs]]]):建立进程池
  2. 参数介绍

    numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值
    initializer:是每一个工做进程启动时要执行的可调用对象,默认为None
    initargs:是要传给initializer的参数组
  3. 方法介绍

    p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()
    
    p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。
    
    p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
    P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法:
    
    obj.get():返回结果,若是有必要则等待结果到达。
    obj.ready():若是调用完成,返回True
    obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常
    obj.wait([timeout]):等待结果变为可用。
    obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数
  4. 进程池的使用

    1. 使用进程池(异步调用,apply_async)

      from multiprocessing import Process, Pool
      import os
      import time
      import socket
      
      def func(i):
          print(i)
          time.sleep(1)
          return i ** 2
      
      if __name__ == '__main__':
          pool = Pool(os.cpu_count() + 1)
          ret_lst = []
      
          for i in range(100):
              # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
              ret = pool.apply_async(func, args=(i,))
              ret_lst.append(ret)
      
          # 没有后面的join,或get,则程序总体结束,进程池中的任务还没来得及所有执行完也都跟着主进程一块儿结束了
          print("=======================")
           # 关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
          pool.close()
          # 调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束
          pool.join()
          # 看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证实结果已经计算完毕,剩下的事情就是调用每一个对象下的get方法去获取结果
          print(ret_lst)
        # 使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get
          [print(i.get()) for i in ret_lst]
    2. 使用进程池(同步调用,apply)

      from multiprocessing import Process, Pool
      import os
      import time
      import socket
      
      def func(i):
          print(i)
          time.sleep(1)
          return i ** 2
      
      if __name__ == '__main__':
        # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
          pool = Pool(os.cpu_count() + 1)
          # 同步执行,即执行完一个拿到结果,再去执行另一个
          ret_lst = []
      
          for i in range(100):
              ret = pool.apply(func, args=(i,))
              ret_lst.append(ret)
        # 看到的就是最终的结果组成的列表,apply是同步的,因此直接获得结果,没有get()方法
          print(ret_lst)
    3. 进程池实现基于TCP协议的socket并发效果

      # 服务端
      from multiprocessing import Process, Pool
      import os
      import time
      import socket
      
      def func(conn, client_addr):
          print("进程:{0}".format(os.getpid()))
          while 1:
              try:
                  c_msg = conn.recv(1024).decode("utf-8")
                  if not c_msg: break
                  print(c_msg)
                  conn.send(c_msg.upper().encode("utf-8"))
              except Exception:
                  break
      
      if __name__ == '__main__':
          sk = socket.socket()
          sk.bind(("127.0.0.1", 8080))
          sk.listen(5)
          pool = Pool(os.cpu_count() + 1)
          while 1:
              conn, addr = sk.accept()
              pool.apply_async(func, args=(conn, addr))
      
      
      
      # 服务端
      import socket
      
      sk = socket.socket()
      
      sk.connect(("127.0.0.1", 8080))
      
      while 1:
          c_msg = input(">>")
          if not c_msg: continue
          sk.send(c_msg.encode("utf-8"))
          s_msg = sk.recv(1024).decode("utf-8")
          print(s_msg)

      当链接数达到开启的进程池中的最大进程数量时,再有其它客户端进行链接,将会阻塞等待,当另外的客户端结束链接时才会创建起会话链接。

  5. 回调函数(callback())

    须要回调函数的场景:进程池中任何一个任务一旦处理完了,就当即告知主进程:我好了额,你能够处理个人结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

    咱们能够把耗时间(阻塞)的任务放到进程池中,而后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

    from multiprocessing import Pool
    import requests
    import json
    import os
    
    def get_page(url):
        print('<进程%s> get %s' %(os.getpid(),url))
        respone=requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    
    def pasrse_page(res):
        print('<进程%s> parse %s' %(os.getpid(),res['url']))
        parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
        with open('db.txt','a') as f:
            f.write(parse_res)
    
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
    
        p=Pool(3)
        res_l=[]
        for url in urls:
            res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
            res_l.append(res)
    
        p.close()
        p.join()
        print([res.get() for res in res_l]) # 拿到的是get_page的结果,其实彻底不必拿该结果,该结果已经传给回调函数处理了
    
    '''
    打印结果:
    <进程3388> get https://www.baidu.com
    <进程3389> get https://www.python.org
    <进程3390> get https://www.openstack.org
    <进程3388> get https://help.github.com/
    <进程3387> parse https://www.baidu.com
    <进程3389> get http://www.sina.com.cn/
    <进程3387> parse https://www.python.org
    <进程3387> parse https://help.github.com/
    <进程3387> parse http://www.sina.com.cn/
    <进程3387> parse https://www.openstack.org
    [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
    '''

    若是在主进程中等待进程池中全部任务都执行完毕后,再统一处理结果,则无需回调函数

十4、信号量

互斥锁 同时只容许一个线程更改数据,而Semaphore是同时容许必定数量的线程更改数据 ,好比厕全部3个坑,那最多只容许3我的上厕所,后面的人只能等里面有人出来了才能再进去,若是指定信号量为3,那么来一我的得到一把锁,计数加1,当计数等于3时,后面的人均须要等待。一旦释放,就有人能够得到一把锁

信号量与进程池的概念很像,可是要区分开,信号量涉及到加锁的概念

from multiprocessing import Process,Semaphore
import time,random

def go_wc(sem,user):
    sem.acquire()
    print('%s 占到一个茅坑' %user)
    time.sleep(random.randint(0,3)) #模拟每一个人拉屎速度不同,0表明有的人蹲下就起来了
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(5)
    p_l=[]
    for i in range(13):
        p=Process(target=go_wc,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

十5、事件

Python线程的事件用于主线程控制其余线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。

clear:将“Flag”设置为False
set:将“Flag”设置为True

from multiprocessing import Process,Event
import time,random

def car(e,n):
    while True:
        if not e.is_set(): #Flase
            print('\033[31m红灯亮\033[0m,car%s等着' %n)
            e.wait()
            print('\033[32m车%s 看见绿灯亮了\033[0m' %n)
            time.sleep(random.randint(3,6))
            if not e.is_set():
                continue
            print('走你,car', n)
            break

def police_car(e,n):
    while True:
        if not e.is_set():
            print('\033[31m红灯亮\033[0m,car%s等着' % n)
            e.wait(1)
            print('灯的是%s,警车走了,car %s' %(e.is_set(),n))
            break

def traffic_lights(e,inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            e.clear() #e.is_set() ---->False
        else:
            e.set()

if __name__ == '__main__':
    e=Event()
    # for i in range(10):
    #     p=Process(target=car,args=(e,i,))
    #     p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))
        p.start()
    t=Process(target=traffic_lights,args=(e,10))
    t.start()

    print('============》')

..............

相关文章
相关标签/搜索