一 multiprocessing模块介绍python
python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。Python提供了multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。linux
multiprocessing模块的功能众多:支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。编程
须要再次强调的一点是:与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。json
二 Process类的介绍windows
建立进程的类安全
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动) #强调: 1. 须要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍网络
group参数未使用,值始终为None target表示调用对象,即子进程要执行的任务 args表示调用对象的位置参数元组,args=(1,2,'egon',) kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} name为子进程的名称
1 p.start():启动进程,并调用该子进程中的p.run() 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法 3 4 p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁 5 p.is_alive():若是p仍然运行,返回True 6 7 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
1 p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置 2 p.name:进程的名称 3 p.pid:进程的pid 4 p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束(了解便可) 5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功(了解便可)
3、Process类的使用数据结构
注意:在windows中Process()必须放到# if __name__ == '__main__':下多线程
Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() inside if __name__ == "__main__" since statements inside this if-statement will not get called upon import. 因为Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 若是在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。
简言之,是因为在windows系统下,子进程是经过导入模块的方式拿到父进程的代码,若是没有main会一直开启子进程,而子进程的申请是须要开辟内存以及申请pid等的并发
建立进程的两种方式
方式一:调用内置的类
from multiprocessing import Process def func(name): print('%s,running...' % name) print('%s,ending...' % name) if __name__ == '__main__': obj = Process(target=func, args=('子进程一',)) # 若是只有一个参数,args括号内必定要加逗号,确保以元组的形式传入 obj.start() # 只是主进程给操做系统发送创建子进程的请求,并不是马上创建子进程 print('主>>>')
方式二:自定义类
class MyProcess(Process): def run(self): # 必须为这个名字 print('%s running...' % self.name) print('%s ending...' % self.name) if __name__ == '__main__': obj = MyProcess() obj.start() # 本质上是在调用父类的start方法,而start方法下会触发run方法 print('主>>>')
进程之间的内存空间是隔离的
from multiprocessing import Process n=100 def work(): global n n=0 print('子进程内: ',n) if __name__ == '__main__': p=Process(target=work) p.start() print('主进程内: ',n)
进程对象相关的属性和方法
join()
import time from multiprocessing import Process x = 1 def task(name, num): print('%s is running' % name) global x x = 0 time.sleep(num) print('%s is done' % name) if __name__ == '__main__': obj_list = [] start_time = time.time() for i in range(1, 3): obj = Process(target=task, args=('子进程%s' % i, i)) obj_list.append(obj) obj.start() for obj in obj_list: obj.join() # join()让父进程原地等待,等子进程运行完毕后,才执行下一行代码(也拥有wait方法,等子进程运行完毕后,向操做系统发送请求,回收子进程占用的PID) end_start = time.time() print(x, end_start - start_time) # 能够看出修改子进程内的变量值并不会影响到父进程--->进程之间内存空间彼此隔离
import time from multiprocessing import Process def task(name): print('%s is running'% name) time.sleep(15) # 给咱们在cmd内查看进程预留时间 if __name__ == '__main__': p = Process(target=task,args=('进程一',)) p.start() print(p.pid) p.join() print(p.pid) # 执行结果 26324 进程一 is running 26324 """ 说join()内也拥有wait()方法,会在子进程结束后向操做系统发送请求,回收子进程的pid,那么为何在join()以后仍然能查到子进程的pid? join()确实向操做系统发送了请求,操做系统也确实回收了子进程(经过cmd能够查看到),可是在子进程被建立时,pid已经成为父进程的一个属性(指向子进程的pid),join以后咱们并无删除这个属性(只是这个属性没有任何意义) """
os.get_pid与os.get_ppid()
import time,os from multiprocessing import Process def task(name): print('%s is running'% name) print(os.getpid()) time.sleep(5) if __name__ == '__main__': p = Process(target=task,args=('进程一',)) p.start() p.join() print(os.getpid(),os.getppid()) time.sleep(100)
咱们能够用os模块下的get_pid()查看当前进程的pid,以及get_ppid()查看父进程的pid
经过cmd能够看出子进程与父进程的pid都是指向python.exe,而父进程的父进程是pycharm
缘由是子进程与父进程执行的都是python代码,须要经过解释器执行(将所要执行的代码做为参数传入解释器内),咱们的进程是在pychram内执行的,若是经过cmd执行那么父进程就是cmd.exe
咱们经过cmd执行这个py文件,而后用另外一个cmd2去杀死cmd1,发现cmd1并无关闭
缘由是子进程在占用cmd的终端显示(cmd1确实被回收了,在cmd2内查不到cmd1的pid号),若是将子进程设置为后台运行,就会发如今咱们杀死cmd1时,cmd1窗口就会当即关闭(进程与进程的内存空间彼此是隔离的)
obj.terminate():用于杀死子进程
obj.is_alive(): 判断一个子进程是否存活
4、僵尸进程、孤儿进程、守护进程
僵尸进程
子进程在结束以后,释放掉其占用的绝大部份内存空间以及cpu等资源,可是会留下一个称为僵尸进程的数据结构(包含子进程的pid),等待父进程处理。这种状况下的僵尸进程是无害的(待全部的子进程结束后,父进程会统一贯操做系统发送回收子进程pid的请求,或者使用join(),其内部也拥有wait()方法),可是,若是父进程是一个死循环,不断的创造子进程,而又不发送回收请求,这就形成了大量的pid被占用
孤儿进程
在子进程结束以前,父进程就挂掉了,该子进程称之为孤儿进程(无害),最后会由全部进程的父进程进行发送回收请求(linux中为init进程)
守护进程
两个关键词:守护/进程
进程:其本质也是一种“子进程”
守护:伴随的意思
即,守护进程会伴随着父进程代码的执行结束而死亡(不折不扣,而非成为僵尸进程)
为何要用守护进程
当该进程的代码在父进程的代码执行完毕后就没有存在的意义了,则应该将该进程设置为守护进程(例如在生产者与消费者模型中,生产者是专门负责产生数据的任务,而消费者是负责处理数据的任务,当生产者对象join以后,意味生产者再也不生产数据,也意味着执行父进程的下一行代码,而消费者处理的数据来自生产者,因此应该将充当消费者的子进程设置为守护进程)
如何将进程设置为守护进程
import time from multiprocessing import Process def bar(name): print("%s is running" % name) time.sleep(3) print("%s is done" % name) def foo(name): print("%s is running" % name) time.sleep(3) print("%s is done" % name) if __name__ == '__main__': p1 = Process(target=bar, args=('守护进程',)) p2 = Process(target=foo, args=('子进程',)) p1.daemon = True # 开启守护进程必定要在父进程发送请求以前 p1.start() p2.start() print('主is done')
在这种状况下,执行结果可能有三种(取决于计算机的性能)
分析以前,首先要明确的是
1.start()操做只是表明父进程向操做系统发送建立子进程的请求(而非当即产生子进程),至于什么建立,先建立谁由操做系统决定
2.产生进程是须要开辟内存空间的,为子进程分配pid,经过导入的方式将父进程的代码复制到子进程中(还有其它资源),这些都是须要必定的时间
3.守护进程是伴随着父进程的代码执行完毕而结束(而非父进程的死亡)
4.print()操做,是先在内存中产生字符串这个对象,而后将其输出到屏幕上,这也是须要必定的时间
主进程is done 子进程 is running 子进程 is done #这是因为执行两个start()以后,当即执行了print()操做,待父进程代码执行完毕后,守护进程仍然没有被创造出来
主进程is done 守护进程 is running 子进程 is running 子进程 is done #这是因为,在父进程执行print()操做的时候,守护进程被建立出来
守护进程 is running 主进程is done 子进程 is running 子进程 is done #这是因为在父进程向操做系统发送申请子进程的时候,守护进程被建立出来
5、互斥锁
将多个任务对修改共享数据的操做由并发变为“串行”
# json文件 {"count": 2} # 执行文件 import os import time import json import random from multiprocessing import Process def check(): with open("a.json","r",encoding="utf-8") as f: data_dic = json.load(f) time.sleep(random.random()) print('[%s] 在%s 查看了车票,车票还剩:%s' % (os.getpid(), time.strftime('%Y-%m-%d %X'), data_dic['count'])) def pay(): with open("a.json","r",encoding="utf-8") as f: data_dic = json.load(f) time.sleep(random.random()) if data_dic['count'] > 0: data_dic['count'] -= 1 time.sleep(random.random()) with open("a.json","w",encoding="utf-8") as f: json.dump(data_dic,f) print('[%s] 在%s 购买了车票' % (os.getpid(), time.strftime('%Y-%m-%d %X'))) else: print('[%s] 在%s 购票失败' % (os.getpid(), time.strftime('%Y-%m-%d %X'))) def buy_tickets(): check() pay() if __name__ == '__main__': for i in range(8): p = Process(target=buy_tickets) p.start() #发现,两张票被8我的购买
# json 文件 {"count": 2} # 执行文件 import os import time import json import random from multiprocessing import Process, Lock def check(): with open("a.json", "r", encoding="utf-8") as f: data_dic = json.load(f) time.sleep(random.random()) print('[%s] 在%s 查看了车票,车票还剩:%s' % (os.getpid(), time.strftime('%Y-%m-%d %X'), data_dic['count'])) def pay(): with open("a.json", "r", encoding="utf-8") as f: data_dic = json.load(f) time.sleep(random.random()) if data_dic['count'] > 0: data_dic['count'] -= 1 time.sleep(random.random()) with open("a.json", "w", encoding="utf-8") as f: json.dump(data_dic, f) print('[%s] 在%s 购买了车票' % (os.getpid(), time.strftime('%Y-%m-%d %X'))) else: print('[%s] 在%s 购票失败' % (os.getpid(), time.strftime('%Y-%m-%d %X'))) def buy_tickets(lock): # 若是将锁加到这里,就将整个任务串行了 check() lock.acquire() # 该进程拿到锁,其他没有锁的进程等待 pay() lock.release() # 释放锁,其他等待进程竞争 ''' 或者使用with上下文管理 with lock: pay() ''' if __name__ == '__main__': mutex = Lock() # 得到锁对象 for i in range(8): p = Process(target=buy_tickets, args=(mutex,)) # p = Process(target=buy_tickets, kwargs={'lock': mutex}) p.start()
总结
1.多个任务并发的去操做共享数据会形成数据错乱,使用互斥锁,虽然使任务对共享数据的操做由并发变为“串行”,减低了效率,可是提升了数据的安全性
2.此方法的共享数据效率低(数据来自于硬盘) 而且须要本身去处理锁的问题
因此,为了解决第二个问题,就要寻求一种方法---->使多个进程共享一个内存空间中的共享数据,该方法能够替咱们处理好锁的问题
IPC通讯
管道与队列
管道与队列将数据存放于内存中,并且队列是经过管道+锁实现的
6、IPC
进程间的通讯
两种实现方式
管道:pipe
队列:queue(其实就是pipe+lock)
注意:二者实际上都是内存空间,不要往里面放入大数据,只能放数据量较小的消息
IPC所解决的问题
1.当多个任务并发的去修改共享数据,就可能会形成数据错乱,咱们经过加互斥锁使多个任务对共享数据的操做由并发变为“串行”,从而保证了共享数据的安全,而当出现须要修改多个共享数据的需求时,咱们就得再次加锁处理
---->IPC帮咱们解决了须要本身加锁的问题
2.进程间的内存空间是彼此隔离的,如何完成通讯(数据交互),就须要寻求一种共享的东西,硬盘是共享的,可是读取硬盘的速度慢
---->IPC实现了一种内存空间上的共享(两个进程之间经过队列交流)
队列的使用
from multiprocessing import Queue q = Queue(3) # 设置队列存放大小 q.put('你好') q.put({'name':"bob",'age':26}) q.put([1,2,3]) print(q.get()) print(q.get()) print(q.get())
注意
1.队列内能够存放的是python任意类型的数据
2.队列是先进先出
3.当put的数量大于队列容许放入的数量时,就会发生阻塞(block=True),直到队列中有消息被取走
4.当队列中的内容被取空时,get操做也会发生阻塞(block=True),直到有新的数据放入队列中
队列的其它参数
q.put(obj=,block=,timeout=)
q.get(block=,timeout=)
put()
obj:表明要放入队列的数据
block:默认为True,表明当放入的数据数量大于队列容许的数量时,就会发生阻塞;False,则直接抛出 queue.Full 异常
timeout:默认为-1,表明当发生阻塞时,会一直等待(等待队列中有数据被取走),若是阻塞时间大于自设置的时间,则抛出queue.Full 异常
get()
block:与put()的意思相似,默认值也为True
timeout:与put()的意思相似,默认值为-1
7、生产者与消费者模型
三个关键词:消费者/生产者/模型
模型:能够理解为用于解决一类问题的统一方法/模板
生产者:比喻的是在程序中负责产生数据的任务
消费者:比喻的是在程序中负责处理数据的任务
实现
生产者 ------> queue < ------ 消费者 (经过队列进行交互,实现解耦和)
好处
实现了生产者与消费者的解耦和,生产者能够不断的生产,消费者能够不断的消费,平衡了生产者的生产能力与消费者的消费能力,从而提高了程序的总体运行效率
何时使用该模型
当程序中明显的出现了两类任务,一类负责产生数据,一类负责处理数据,就可使用生产者与消费者模型来提高程序的总体效率
使用
import os import time import random from multiprocessing import Process, Queue def produce(q): for i in range(1, 5): res = '包子%s' % i time.sleep(random.uniform(1, 3)) q.put(res) print('厨师:%s生产了%s' % (os.getpid(), res)) def customer(q): while True: res = q.get() time.sleep(random.uniform(1, 3)) print('客户:%s吃了包子%s' % (os.getpid(), res)) if __name__ == '__main__': q = Queue() p1 = Process(target=produce, args=(q,)) c1 = Process(target=customer, args=(q,)) p1.start() c1.start() print('主进程 is done')
执行上诉代码,发现一个问题,当队列内的数据取空后(生产者生产完毕),消费者的get操做发生了阻塞,因此要寻求一种方法让消费者知道生产者生产完毕
import time import random from multiprocessing import Process,Queue def produce(name,food,q): for i in range(1,4): res = '%s%s'%(food,i) time.sleep(random.uniform(1,3)) q.put(res) print('\033[45m厨师%s生产了%s%s\033[0m'%(name,food,i)) def customer(name,q): while True: res = q.get() if res is None: break time.sleep(random.uniform(1,3)) print('\033[46m%s 吃了 %s\033[0m'%(name,res)) if __name__ == '__main__': q = Queue() p1 = Process(target=produce,args=('bob','包子',q)) p2 = Process(target=produce,args=('tom','馒头',q)) p3 = Process(target=produce,args=('tony','花卷',q)) c1 = Process(target=customer,args=('顾客1',q)) c2 = Process(target=customer,args=('顾客1',q)) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.put(None) q.put(None) print('主进程 is done')
3个join以后意味着生产者生产完毕,由主进程为队列添加结束信号(也能够在生产者内添加),消费者拿到结束信号后结束
可是这种方法须要有几个消费者就发送几个信号(消费者个数没法预知,这种作法也low)
另外一种队列:JoinableQueue([maxsize]) # maxsize为容许队列存放的最大数,默认无限制
方法
1.拥有与Queue对象相同的方法
2.task_done().消费者调用该方法,记录get()方法的返回项已经被处理
3.join().调用此方法发生阻塞,直到队列中的内容所有被取干净
import time import random from multiprocessing import Process,JoinableQueue def produce(name,food,q): for i in range(1,4): res = '%s%s'%(food,i) time.sleep(random.uniform(1,3)) q.put(res) print('\033[45m厨师%s生产了%s%s\033[0m'%(name,food,i)) def customer(name,q): while True: res = q.get() time.sleep(random.uniform(1,3)) print('\033[46m%s 吃了 %s\033[0m'%(name,res)) q.task_done() if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=produce,args=('bob','包子',q)) p2 = Process(target=produce,args=('tom','馒头',q)) p3 = Process(target=produce,args=('tony','花卷',q)) c1 = Process(target=customer,args=('顾客1',q)) c2 = Process(target=customer,args=('顾客1',q)) c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() ''' 让父进程等待生产者进程执行完毕后再执行 q.join执行结束意味着--->生产者代码执行完毕 ''' q.join() ''' 等待队列被取干净 执行结束意味着父进程代码执行结束,生产者执行结束而且队列被取干净--->消费者没有存在的意义 即,这种状况下,消费者代码应该伴随着父进程代码的结束而结束(守护进程) '''