python之多进程

1、multiprocessing模块

python中的多线程没法利用多核优点,若是想要充分地使用多核cpu的资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。python提供了multiprocessingcss

multiprocessing 模块用来开启子进程。并在子进程中执行咱们定制的任务(例如函数)。与多线程threading相似html

 

multiprocessing 模块支持不少功能 :子进程、通讯、共享数据,执行不一样的同步,提供Process、Queue、Pipe、Lock 等组件python

ps:与线程不一样,进程没有任何共享状态,修改的数据(改动等等)仅限于该进程内。编程

 2、Process类介绍

 建立进程的类

Procss([group [, target [,name [, args [, kwargs ] ] ] ] ] ) ,由该类实例化获得对象,表示一个子进程中的人物(还没有启动)json

ps:1. 须要指定关键字的方式来制定参数windows

  2. args 指定为传给 target 函数位置,是一个元组形式,必须有逗号安全

参数介绍

1. group 参数 未使用 ,值始终为 None

2. target 表示 调用对象 , 即子进程要执行的任务(不加括号)

3. args 表示调用对象的位置参数元组,args = (x , y ,)

4. kwargs 表示调用对象的字典,kwargs={'x':1 , 'y':99}

5. name 为子进程的名称

方法介绍

1. p.start() :启动进程,并调用该子进程的 p.run()

2. p.run() :进程启动时运行的方法 ,它去调用target指定的函数,自定义的类必定要实现该方法。

3. p.terminate() :强制终止进程p ,不会进行任何清理操做,若是p建立了子进程,该子进程就变成僵尸进程了,使用该方法须要当心这个状况,若是p还保存了一个锁 那么也将不会被释放,进而致使死锁

4. p.is_alive() : 若是p仍然运行,返回True 

5. p.join([timeout]) :主进程等待p终止 (ps:是主线程处于等 的状态,而p是处于运行的状态),timeout是可选的超时时间,须要强调的是,p.join 只能join 住 start 开启的进程,而不能join住 run开启的进程

属性介绍

1. p.daemon : 默认为False , 若是设为True ,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设置完后,p不能建立本身的新进程,必须在p.start() 以前设置。

2. p.name :进程的名称

3. p.pid :进程的pid

4. p.exitcode :进程运行时为None、若是为-N ,表示被信号N结束(了解)

5. p.authkey :进程的身份验证键,默认是由 os.urandom()随机生成的32位字符串,这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同身份验证键时才算成功(了解)

 

3、Process类的使用

在windows中Process()必须放到# if __name__ == '__main__':下网络

建立并开启子进程的两种方式多线程

# 开启子进程的方式1:

from multiprocessing import Process
import time

def task(name):
    print('%s is running' %name)
    time.sleep(3)
    print('%s is done' %name)

# 在windows系统上开启子进程的操做必须放到该行代码下
if __name__ == '__main__':
    p=Process(target=task,args=('子进程',)) # Process(target=task,kwargs={'name':'子进程'}) #
    p.start() # 仅仅只是向操做系统发送一个创造子进程的信号
    print('')
方式一
# 开启子进程的方式2:
from multiprocessing import Process
import time

class Myprocess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name

    def run(self):
        print('%s is running' %self.name)
        time.sleep(3)
        print('%s is done' %self.name)

# 在windows系统上开启子进程的操做必须放到该行代码下
if __name__ == '__main__':
    p=Myprocess('子进程')
    p.start() # 仅仅只是向操做系统发送一个创造子进程的信号,start会调用run方法
    p.join()
    print('')
方式二

进程之间的内存空间是隔离的

from multiprocessing import Process
n=100 #在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就能够了
def work():
    global n
    n=0
    print('子进程内: ',n)


if __name__ == '__main__':
    p=Process(target=work)
    p.start()
    print('主进程内: ',n)
View Code

将以前的基于TCP的套接字通讯改成支持并发

from socket import *
from multiprocessing import Process

FTPS=socket(AF_INET,SOCK_STREAM)
FTPS.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
FTPS.bind(('127.0.0.1',9999))
FTPS.listen(5)

def talk(conn):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg :break
            conn.sendall(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    while True:
        conn,client_addr=FTPS.accept()
        p=Process(target=talk,args=(conn,))
        p.start()
服务端
from socket import *

FTPC=socket(AF_INET,SOCK_STREAM)
FTPC.connect(('127.0.0.1',9999))

while True:
    cmd=input('>>:').strip()
    if not cmd:continue

    FTPC.sendall(cmd.encode('utf-8'))
    msg=FTPC.recv(1024)
    print(msg.decode('utf-8'))
客户端

这样作虽然能够实现并发的效果  可是有一个问题并发

每来一个客户端,都在服务端开启一个进程,若是并发来十万个客户端,要开启十万个进程吗
解决方法:进程池

 Process对象的join方法

from multiprocessing import Process
import time
import random

class Task(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name


    def run(self):
        print('%s is running' %self.name)
        time.sleep(random.randint(1,3))
        print('%s is ending' %self.name)

if __name__ == '__main__':
    p=Task('子进程')
    p.start()
    p.join(0.5) #等待p中止,等0.5秒就再也不等了
    print('开始')
View Code
from multiprocessing import Process
import time
import random
class Task(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name


    def run(self):
        print('%s is running' %self.name)
        time.sleep(random.randint(1,3))
        print('%s is ending' %self.name)
if __name__ == '__main__':
    p_msg = []
    for p in range(4):
        p=Task('子进程%s'%p)
        p_msg.append(p)
        p.start()
    for p in p_msg:
        p.join()
#固然不是了,必须明确:p.join()是让谁等?
#很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,

#详细解析以下:
#进程只要start就会在开始运行了,因此p1-p4.start()时,系统中已经有四个并发的进程了
#而咱们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
#join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其他p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接经过检测,无需等待
# 因此4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间


    print('主线程')


#上述启动进程与join进程能够简写为
# p_l=[p1,p2,p3,p4]
#
# for p in p_l:
#     p.start()
#
# for p in p_l:
#     p.join()
p.join详细说明

process对象的其余方法或属性

from multiprocessing import Process
import time
import random
class Myprocess(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Myprocess-1,
        #                    #因此加到这里,会覆盖咱们的self.name=name

        #为咱们开启的进程设置名字的作法
        super().__init__()
        self.name=name

    def run(self):
        print('%s is running' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is  ending' %self.name)
if __name__ == '__main__':

    p=Myprocess('子进程')
    p.start()
    print('开始')
    print(p.pid) #查看pid
name和pid
#进程对象的其余方法一:terminate,is_alive
from multiprocessing import Process
import time
import random
class Myprocess(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Myprocess-1,
        #                    #因此加到这里,会覆盖咱们的self.name=name

        #为咱们开启的进程设置名字的作法
        super().__init__()
        self.name=name

    def run(self):
        print('%s is running' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is  ending' %self.name)
if __name__ == '__main__':

    p=Myprocess('子进程')
    p.start()

    p.terminate()#关闭进程,不会当即关闭,因此is_alive马上查看的结果可能仍是存活
    print(p.is_alive()) #结果为True

    print('开始')
    print(p.is_alive()) #结果为False
terminate和is_alive

僵尸进程与孤儿进程

 参考博客:http://www.cnblogs.com/Anker/p/3271773.html

#总结僵尸进程和孤儿进程

#僵尸进程:有害
例若有个进程,它按期的产 生一个子进程,这个子进程须要作的事情不多,作完它该作的事情以后就退出了,所以这个子进程的生命周期很短,可是,父进程只管生成新的子进程,至于子进程 退出以后的事情,则一律漠不关心,这样,系统运行上一段时间以后,系统中就会存在不少的僵死进程,假若用ps命令查看的话,就会看到不少状态为Z的进程。 严格地来讲,僵死进程并非问题的根源,罪魁祸首是产生出大量僵死进程的那个父进程。所以,当咱们寻求如何消灭系统中大量的僵死进程时,答案就是把产生大 量僵死进程的那个元凶枪毙掉(也就是经过kill发送SIGTERM或者SIGKILL信号啦)。枪毙了元凶进程以后,它产生的僵死进程就变成了孤儿进 程,这些孤儿进程会被init进程接管,init进程会wait()这些孤儿进程,释放它们占用的系统进程表中的资源,这样,这些已经僵死的孤儿进程 就能瞑目而去了。

#孤儿进程无害:会被系统接管
总结孤儿和僵尸进程

4、守护进程

主进程建立守护进程

  其一:守护进程会在主进程代码执行结束后就终止

  其二:守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

from multiprocessing import Process
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")
if __name__ == '__main__':
    p1=Process(target=foo)
    p1.daemon = True
    p1.start()
    print('主进程')
View Code
#主进程代码运行完毕,守护进程就会结束
from multiprocessing import Process
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':

    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True
    p1.start()
    p2.start()
    print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,由于主进程打印main----时,p1也执行了,可是随即被终止
特殊的例子

5、互斥锁

进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的,

而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理

from multiprocessing import Process,Lock
import os,time

#互斥锁 可理解为生活中卫生间的锁

def work(mutex):
    # mutex.acquire() 抢锁操做
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())
    # mutex.release()  #释放锁操做

if __name__ == '__main__':
    # mutex=Lock()    #多进程需加在main后
    for i in range(3):
        p=Process(target=work)#args=(mutex,)#传mutex
        p.start()
互斥锁mutex

加锁:由并发变成‘串行’,牺牲了运行效率,但保障了数据安全

例子:模拟12306抢票

from multiprocessing import Process,Lock
import json
import time,random

def search(i):
    with open('db.json','rt',encoding='utf-8') as f: #db.json 文件 {"count": 1},表示余一张票
        dic=json.load(f)
        time.sleep(1)
        print('路人%s查看到剩余票数:%s' %(i,dic['count']))

def get(i):
    with open('db.json','rt',encoding='utf-8') as f:
        dic=json.load(f)
    if dic['count'] > 0:
        # 有票
        dic['count']-=1
        time.sleep(random.randint(1,3)) #关键在于此处 睡的同时全部子进程都读取到1张票
        with open('db.json','wt',encoding='utf-8') as f:
            json.dump(dic,f)
            print('路人%s抢票成功' % i)

    else:
        print('路人%s抢票失败' %i)


def task(i,mutex):
    search(i)
    # mutex.acquire()
    get(i)
    # mutex.release()

if __name__ == '__main__':
    mutex = Lock()

    for i in range(1,11):
        p=Process(target=task,args=(i,mutex))
        p.start()
        # p.join()

    print('主进程')
模拟12306抢票:互斥锁的应用

总结

#加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然能够用文件共享数据实现进程间通讯,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.须要本身加锁处理



#所以咱们最好找寻一种解决方案可以兼顾:一、效率高(多个进程共享一块内存的数据)二、帮咱们处理好锁问题。这就是mutiprocessing模块为咱们提供的基于消息的IPC通讯机制:队列和管道。
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来,
咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。

6、IPC机制

进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

一、队列

建立队列的类(底层就是以管道和锁定的方式实现的)

  Queue(maxsize):建立共享的进程队列,Queue是多进程安全的队列

可使用Queue实现多进程之间的数据传递。参数:maxsize 是队列中容许最大数,不写则无限制
from multiprocessing import Queue

主要方法:

'''
multiprocessing模块支持进程间通讯的两种主要形式:管道和队列
都是基于消息传递实现的,可是队列接口
'''

from multiprocessing import Process,Queue
import time
q=Queue(3)


#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
print(q.full()) #满了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
Queue 队列
1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常。
2 q.get方法能够从队列读取而且删除一个元素。一样,get方法有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常.
3  
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6 
7 q.empty():调用此方法时q为空则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中又加入了项目。
8 q.full():调用此方法时q已满则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中的项目被取走。
9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()同样
详细

二、管道(了解)

建立管道的类

Pipe(duplex)在进程之间建立一条管道,并返回元组(coon1,coon2)表示管道的两端

  ps:必须在产生process对象以前产生管道

参数:dumplex:默认管道是全双工的,若是将duplex设成False,conn1只能用于接收,conn2只能用于发送。

主要方法:

    conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另一端已经关闭,那么recv方法会抛出EOFError。
    conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象
 #其余方法:
conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回链接使用的整数文件描述符
conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。
Pipe 管道
from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
        # time.sleep(1)
    else:
        left.close()
if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主进程')
Pipe 示例

7、生产者消费者模型

在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。

 特色:

  • 实现了 生产者 与 消费者 解耦合
  • 平衡了生产者的生产能力和消费者的消费能力

 为何使用此模式:

当程序中存在明显的两类任务,一类负责造数据,另一类负责处理数据,就能够用生产者消费者模型来实现
解耦和从而提高效率

如何实现

生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型
实现方法
生产者进程-------->queue<----------消费者进程
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('')
示例

上述示例存在一个问题:主进程永远不会结束,由于子进程的生产者在生产完后就结束了,可是消费者在取空q以后,则以后一直处于死循环且阻塞在q.get()这步。

  解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,此时便可在收到结束信号时break出死循环

 

另一种队列提供了这种机制

   #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

   #参数介绍:
    maxsize是队列中容许最大项数,省略则无大小限制。    
  #方法介绍:
    JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止
from multiprocessing import Process,JoinableQueue
import time,random

def producer(food,name,q):
    for i in range(3):
        time.sleep(random.randint(1,3))
        res='%s%s' %(food,i)
        q.put(res)
        print('厨师%s生产了%s' %(name,res))


def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print('吃货%s吃了%s' %(name,res))
        q.task_done() #向q.join()发送一次信号,证实一个数据已经被取走了

if __name__ == '__main__':
    q=JoinableQueue()
    #生产者们
    p1=Process(target=producer,args=('包子','tom',q))
    p2=Process(target=producer,args=('蛋汤','jerry',q))
    p3=Process(target=producer,args=('红烧肉','bob',q))
    #消费者们
    c1 = Process(target=consumer,args=('吃货1',q))
    c2 = Process(target=consumer,args=('吃货2',q))
    c1.daemon=True
    c2.daemon=True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()


    p1.join()
    p2.join()
    p3.join()
    q.join() #主进程最后一行代码运行完毕,生产者所有正常死亡,消费者也没有存在的意义了

    print('')
另外一种Queue
#主进程等--->p1,p2,p3等---->c1,c2
#p1,p2,p3结束了,证实c1,c2确定全都收完了p1,p2,p3发到队列的数据
#于是c1,c2也没有存在的价值了,应该随着主进程的结束而结束,因此设置成守护进程

8、信号量(了解)

互斥锁 同时只容许一个线程更改数据,而Semaphore是同时容许必定数量的线程更改数据 ,好比厕全部3个坑,那最多只容许3我的上厕所,后面的人只能等里面有人出来了才能再进去,若是指定信号量为3,那么来一我的得到一把锁,计数加1,当计数等于3时,后面的人均须要等待。一旦释放,就有人能够得到一把锁
    信号量与进程池的概念很像,可是要区分开,信号量涉及到加锁的概念
from multiprocessing import Process,Semaphore
import time

def test(sem,user):
    sem.acquire()
    print('%s 占一个位置' %user)
    time.sleep(2)
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(3)
    p_l=[]
    for i in range(12):
        p=Process(target=test,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)


    for i in p_l:
        i.join()
    print('')
信号量

9、事件(了解)

python线程的事件用于主线程控制其余线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。

clear:将“Flag”设置为False
set:将“Flag”设置为True

from multiprocessing import Process,Event
import time,random

def car(e,n):
    while True:
        if not e.is_set(): #Flase
            print('\033[31m红灯亮\033[0m,car%s等着' %n)
            e.wait()
            print('\033[32m车%s 看见绿灯亮了\033[0m' %n)
            time.sleep(random.randint(3,6))
            if not e.is_set():
                continue
            print('走你,car', n)
            break

def police_car(e,n):
    while True:
        if not e.is_set():
            print('\033[31m红灯亮\033[0m,car%s等着' % n)
            e.wait(1)
            print('灯的是%s,警车走了,car %s' %(e.is_set(),n))
            break

def traffic_lights(e,inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            e.clear() #e.is_set() ---->False
        else:
            e.set()

if __name__ == '__main__':
    e=Event()
    # for i in range(10):
    #     p=Process(target=car,args=(e,i,))
    #     p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))
        p.start()
    t=Process(target=traffic_lights,args=(e,10))
    t.start()

    print('============》')
View Code

10、进程池(重要)

在利用Python进行系统管理的时候,特别是同时操做多个文件目录,或者远程控制多台主机,并行操做能够节约大量的时间。多进程是实现并发的手段之一,须要注意的问题是:

  1. 很明显须要并发执行的任务一般要远大于核数
  2. 一个操做系统不可能无限开启进程,一般有几个核就开几个进程
  3. 进程开启过多,效率反而会降低(开启进程是须要占用系统资源的,并且开启多余核数目的进程也没法作到并行)

 

例如当被操做对象数目不大时,能够直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但若是是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时能够发挥进程池的功效。

用池是为了将并发的进程或线程数目控制在计算机可承受的范围内

咱们就能够经过维护一个进程池来控制进程数目,好比httpd的进程模式,规定最小进程数和最大进程数... 

ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool能够提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,若是池尚未满,那么就会建立一个新的进程用来执行该请求;但若是池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

    建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个进程去执行全部任务,不会开启其余进程

from concurrent.futures import ProcessPoolExecutor 
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os
import time
import random

def task(x):
    print('%s is running' %os.getpid())
    time.sleep(random.randint(1,3))
    return x**2

if __name__ == '__main__':
    p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数
    for i in range(10):
        p.submit(task,i)  #提交任务
    print('')
进程池

重点:

submit  提交任务

同步调用 vs 异步调用
提交任务的两种方式

同步调用:提交完任务后,就在原地等待任务执行完毕,拿到运行结果/返回值后再执行下一行代码
同步调用下任务的执行是串行执行

异步调用:提交完任务后,不会原地等待任务执行完毕,不等结果 直接执行下一行代码
同步调用下任务的执行是并发执行
if __name__ == '__main__':
    # 异步调用
    p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数
    futures=[]
    for i in range(10):
        future=p.submit(task,i)  #提交任务
        futures.append(future)
    p.shutdown(wait=True) #关闭了进程池的入口,而后执行join操做
    for future in futures:
        print(future.result()) #拿到结果
    print('')

#main上方代码     与上面示例一致
异步调用
if __name__ == '__main__':
    p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数
    for i in range(10):
        res=p.submit(task,i).result()
        print(res)

    print('')
同步调用
相关文章
相关标签/搜索