进程html
本节目录python
顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。linux
进程的概念起源于操做系统,是操做系统最核心的概念,也是操做系统提供的最古老也是最重要的抽象概念之一。操做系统的其余全部内容都是围绕进程的概念展开的。nginx
因此想要真正了解进程,必须事先了解操做系统,点击进入 git
PS:即便能够利用的cpu只有一个(早期的计算机确实如此),也能保证支持(伪)并发的能力。将一个单独的cpu变成多个虚拟的cpu(多道技术:时间多路复用和空间多路复用+硬件上支持隔离),没有进程的抽象,现代计算机将不复存在。github
必备的理论基础:web
#一 操做系统的做用: 1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口 2:管理、调度进程,而且将多个进程对硬件的竞争变得有序 #二 多道技术: 1.产生背景:针对单核,实现并发 ps: 如今的主机通常是多核,那么每一个核都会利用多道技术 有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再从新调度,会被调度到4个 cpu中的任意一个,具体由操做系统调度算法决定。 2.空间上的复用:如内存中同时有多道程序 3.时间上的复用:复用一个cpu的时间片 强调:遇到io切,占用cpu时间过长也切,核心在于切以前将进程的状态保存下来,这样 才能保证下次切换回来时,能基于上次切走的位置继续运行
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操做系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。咱们本身在python文件中写了一些代码,这叫作程序,运行这个python文件的时候,这叫作进程。面试
第一,进程是一个实体。每个进程都有它本身的地址空间,通常状况下,包括文本区域(text region)(python的文件)、数据区域(data region)(python文件中定义的一些变量数据)和堆栈(stack region)。文本区域存储处理器执行的代码;数据区域存储变量和进程执行期间使用的动态分配的内存;堆栈区域存储着活动过程调用的指令和本地变量。 第二,进程是一个“执行中的程序”。程序是一个没有生命的实体,只有处理器赋予程序生命时(操做系统执行之),它才能成为一个活动的实体,咱们称其为进程。[3] 进程是操做系统中最基本、重要的概念。是多道程序系统出现后,为了刻画系统内部出现的动态状况,描述系统内部各道程序的活动规律引进的一个概念,全部多道程序设计操做系统都创建在进程的基础上。
动态性:进程的实质是程序在多道程序系统中的一次执行过程,进程是动态产生,动态消亡的。 并发性:任何进程均可以同其余进程一块儿并发执行 独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位; 异步性:因为进程间的相互制约,使进程具备执行的间断性,即进程按各自独立的、不可预知的速度向前推动 结构特征:进程由程序、数据和进程控制块三部分组成。 多个不一样的进程能够包含相同的程序:一个程序在不一样的数据集里就构成不一样的进程,能获得不一样的结果;可是执行过程当中,程序不能发生改变。
程序是指令和数据的有序集合,其自己没有任何运行的含义,是一个静态的概念。 而进程是程序在处理机上的一次执行过程,它是一个动态的概念。 程序能够做为一种软件资料长期存在,而进程是有必定生命期的。 程序是永久的,进程是暂时的。 举例:就像qq同样,qq是咱们安装在本身电脑上的客户端程序,其实就是一堆的代码文件,咱们不运行qq,那么他就是一堆代码程序,当咱们运行qq的时候,这些代码运行起来,就成为一个进程了。
注意:同一个程序执行两次,就会在操做系统中出现两个进程,因此咱们能够同时运行一个软件,分别作不一样的事情也不会混乱。好比打开暴风影音,虽然都是同一个软件,可是一个能够播放苍井空,一个能够播放饭岛爱。算法
要想多个进程交替运行,操做系统必须对这些进程进行调度,这个调度也不是随即进行的,而是须要遵循必定的法则,由此就有了进程的调度算法。shell
先来先服务(FCFS)调度算法是一种最简单的调度算法,该算法既可用于做业调度,也可用于进程调度。FCFS算法比较有利于长做业(进程),而不利于短做业(进程)。由此可知,本算法适合于CPU繁忙型做业,而不利于I/O繁忙型的做业(进程)。
短做业(进程)优先调度算法(SJ/PF)是指对短做业或短进程优先调度的算法,该算法既可用于做业调度,也可用于进程调度。但其对长做业不利;不能保证紧迫性做业(进程)被及时处理;做业的长短只是被估算出来的。
时间片轮转(Round Robin,RR)法的基本思路是让每一个进程在就绪队列中的等待时间与享受服务的时间成比例。在时间片轮转法中,须要将CPU的处理时间分红固定大小的时间片,例如,几十毫秒至几百毫秒。若是一个进程在被调度选中以后用完了系统规定的时间片,但又未完成要求的任务,则它自行释放本身所占有的CPU而排到就绪队列的末尾,等待下一次调度。同时,进程调度程序又去调度当前就绪队列中的第一个进程。 显然,轮转法只能用来调度分配一些能够抢占的资源。这些能够抢占的资源能够随时被剥夺,并且能够将它们再分配给别的进程。CPU是可抢占资源的一种。但打印机等资源是不可抢占的。因为做业调度是对除了CPU以外的全部系统硬件资源的分配,其中包含有不可抢占资源,因此做业调度不使用轮转法。 在轮转法中,时间片长度的选取很是重要。首先,时间片长度的选择会直接影响到系统的开销和响应时间。若是时间片长度太短,则调度程序抢占处理机的次数增多。这将使进程上下文切换次数也大大增长,从而加剧系统开销。反过来,若是时间片长度选择过长,例如,一个时间片能保证就绪队列中所需执行时间最长的进程能执行完毕,则轮转法变成了先来先服务法。时间片长度的选择是根据系统对响应时间的要求和就绪队列中所容许最大的进程数来肯定的。 在轮转法中,加入到就绪队列的进程有3种状况: 一种是分给它的时间片用完,但进程还未完成,回到就绪队列的末尾等待下次调度去继续执行。 另外一种状况是分给该进程的时间片并未用完,只是由于请求I/O或因为进程的互斥与同步关系而被阻塞。当阻塞解除以后再回到就绪队列。 第三种状况就是新建立进程进入就绪队列。 若是对这些进程区别对待,给予不一样的优先级和时间片从直观上看,能够进一步改善系统服务质量和效率。例如,咱们可把就绪队列按照进程到达就绪队列的类型和进程被阻塞时的阻塞缘由分红不一样的就绪队列,每一个队列按FCFS原则排列,各队列之间的进程享有不一样的优先级,但同一队列内优先级相同。这样,当一个进程在执行完它的时间片以后,或从睡眠中被唤醒以及被建立以后,将进入不一样的就绪队列。
前面介绍的各类用做进程调度的算法都有必定的局限性。如短进程优先的调度算法,仅照顾了短进程而忽略了长进程,并且若是并未指明进程的长度,则短进程优先和基于进程长度的抢占式调度算法都将没法使用。 而多级反馈队列调度算法则没必要事先知道各类进程所需的执行时间,并且还能够知足各类类型进程的须要,于是它是目前被公认的一种较好的进程调度算法。在采用多级反馈队列调度算法的系统中,调度算法的实施过程以下所述。 (1) 应设置多个就绪队列,并为各个队列赋予不一样的优先级。第一个队列的优先级最高,第二个队列次之,其他各队列的优先权逐个下降。该算法赋予各个队列中进程执行时间片的大小也各不相同,在优先权愈高的队列中,为每一个进程所规定的执行时间片就愈小。例如,第二个队列的时间片要比第一个队列的时间片长一倍,……,第i+1个队列的时间片要比第i个队列的时间片长一倍。 (2) 当一个新进程进入内存后,首先将它放入第一队列的末尾,按FCFS原则排队等待调度。当轮到该进程执行时,如它能在该时间片内完成,即可准备撤离系统;若是它在一个时间片结束时还没有完成,调度程序便将该进程转入第二队列的末尾,再一样地按FCFS原则等待调度执行;若是它在第二队列中运行一个时间片后仍未完成,再依次将它放入第三队列,……,如此下去,当一个长做业(进程)从第一队列依次降到第n队列后,在第n 队列便采起按时间片轮转的方式运行。 (3) 仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;仅当第1~(i-1)队列均空时,才会调度第i队列中的进程运行。若是处理机正在第i队列中为某进程服务时,又有新进程进入优先权较高的队列(第1~(i-1)中的任何一个队列),则此时新进程将抢占正在运行进程的处理机,即由调度程序把正在运行的进程放回到第i队列的末尾,把处理机分配给新到的高优先权进程。
对于多级反馈队列,windows不太清楚,可是在linux里面能够设置某个进程的优先级,提升了有限级有可能就会多执行几个时间片。
经过进程之间的调度,也就是进程之间的切换,咱们用户感知到的好像是两个视频文件同时在播放,或者音乐和游戏同时在进行,那就让咱们来看一下什么叫作并发和并行
不管是并行仍是并发,在用户看来都是'同时'运行的,无论是进程仍是线程,都只是一个任务而已,真是干活的是cpu,cpu来作这些任务,而一个cpu同一时刻只能执行一个任务
并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就能够实现并发,(并行也属于并发)
你是一个cpu,你同时谈了三个女友,每个均可以是一个恋爱任务,你被这三个任务共享要玩出并发恋爱的效果, 应该是你先跟女朋友1去看电影,看了一会说:很差,我要拉肚子,而后跑去跟第二个女朋友吃饭,吃了一会说:那啥,我去趟洗手间,而后跑去跟女朋友3开了个房,而后在你的基友眼里,你就在和三个女朋友同时在一块儿玩。
并行:并行:同时运行,只有具有多个cpu才能实现并行
将多个cpu必须成高速公路上的多个车道,进程就比如每一个车道上行驶的车辆,并行就是说,你们在本身的车道上行驶,会不影响,同时在开车。这就是并行
单核下,能够利用多道技术,多个核,每一个核也均可以利用多道技术(多道技术是针对单核而言的)
有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4,
一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术
而一旦任务1的I/O结束了,操做系统会从新调用它(需知进程的调度、分配给哪一个cpu运行,由操做系统说了算),可能被分配给四个cpu中的任意一个去执行
全部现代计算机常常会在同一时间作不少件事,一个用户的PC(不管是单cpu仍是多cpu),均可以同时运行多个任务(一个任务能够理解为一个进程)。
启动一个进程来杀毒(360软件)
启动一个进程来看电影(暴风影音)
启动一个进程来聊天(腾讯QQ)
全部的这些进程都需被管理,因而一个支持多进程的多道程序系统是相当重要的
多道技术概念回顾:内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另一个,使每一个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1秒内,cpu却能够运行多个进程,这就给人产生了并行的错觉,即伪并行,以此来区分多处理器操做系统的真正硬件并行(多个cpu共享同一个物理内存)
1.进程状态介绍
在了解其余概念以前,咱们首先要了解进程的几个状态。在程序运行的过程当中,因为被操做系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。
(1)就绪(Ready)状态
当进程已分配到除CPU之外的全部必要的资源,只要得到处理机即可当即执行,这时的进程状态称为就绪状态。
(2)执行/运行(Running)状态当进程已得到处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。
(3)阻塞(Blocked)状态正在执行的进程,因为等待某个事件发生而没法执行时,便放弃处理机而处于阻塞状态。引发进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能知足、等待信件(信号)等。
事件请求:input、sleep、文件输入输出、recv、accept等
事件发生:sleep、input等完成了
时间片到了以后有回到就绪状态,这三个状态不断的在转换。
2.同步异步
所谓同步就是一个任务的完成须要依赖另一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列
。要么成功都成功,失败都失败,两个任务的状态能够保持一致。其实就是一个程序结束才执行另一个程序,串行的,不必定两个程序就有依赖关系。
所谓异步是不须要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工做,依赖的任务也当即执行,只要本身完成了整个任务就算完成了
。至于被依赖的任务最终是否真正完成,依赖它的任务没法肯定,因此它
是不可靠的
任务序列
。
好比咱们去楼下的老家肉饼吃饭,饭点好了,取餐的时候发生了一些同步异步的事情。 同步:咱们都站在队里等着取餐,前面有我的点了一份肉饼,后厨作了好久,可是因为同步机制,咱们仍是要站在队里等着前面那我的的肉饼作好取走,咱们才往前走一步。 异步:咱们点完餐以后,点餐员给了咱们一个取餐号码,跟你说,你不用在这里排队等着,去找个地方坐着玩手机去吧,等饭作好了,我叫你。这种机制(等待别人通知)就是异步等待消息通知。在异步消息处理中,等待消息通知者(在这个例子中等着取餐的你)每每注册一个回调机制,在所等待的事件被触发时由触发机制(点餐员)经过某种机制(喊号,‘250号你的包子好了‘)找到等待该事件的人。
3.阻塞与非阻塞
阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来讲的
继续上面的那个例子,不管是排队仍是使用号码等待通知,若是在这个等待的过程当中,等待者除了等待消息通知以外不能作其它的事情,那么该机制就是阻塞的,表如今程序中,也就是该程序一直阻塞在该函数调用处不能继续往下执行。 相反,有的人喜欢在等待取餐的时候一边打游戏一边等待,这样的状态就是非阻塞的,由于他(等待者)没有阻塞在这个消息通知上,而是一边作本身的事情一边等待。阻塞的方法:input、time.sleep,socket中的recv、accept等等。
4.同步/异步 与 阻塞和非阻塞
效率最低。拿上面的例子来讲,就是你专心排队,什么别的事都不作。
若是在排队取餐的人采用的是异步的方式去等待消息被触发(通知)
,也就是领了一张小纸条,假如在这段时间里他不能作其它的事情,就在那坐着等着,不能玩游戏等,那么很显然,这我的被阻塞在了这个等待的操做上面;
异步操做是能够被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。
其实是效率低下的。
想象一下你一边打着电话一边还须要抬头看到底队伍排到你了没有,若是把打电话和观察排队的位置当作是程序的两个操做的话,这个程序须要在这两种不一样的行为之间来回的切换
,效率可想而知是低下的。
效率更高,
由于打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不一样的操做中来回切换
。
好比说,这我的忽然发觉本身烟瘾犯了,须要出去抽根烟,因而他告诉点餐员说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有被阻塞在这个等待的操做上面,天然这个就是异步+非阻塞的方式了。
不少人会把同步和阻塞混淆,是由于不少时候同步操做会以阻塞的形式表现出来
,一样的,不少人也会把异步和非阻塞混淆,由于异步操做通常都不会在真正的IO操做处被阻塞
。
1.进程的建立
但凡是硬件,都须要有操做系统去管理,只要有操做系统,就有进程的概念,就须要有建立进程的方式,一些操做系统只为一个应用程序设计,好比微波炉中的控制器,一旦启动微波炉,全部的进程都已经存在。
而对于通用系统(跑不少应用程序),须要有系统运行过程当中建立或撤销进程的能力,主要分为4中形式建立新的进程
1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台而且只在须要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)
2. 一个进程在运行过程当中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)
3. 用户的交互式请求,而建立一个新进程(如用户双击暴风影音)
4. 一个批处理做业的初始化(只在大型机的批处理系统中应用)
不管哪种,新进程的建立都是由一个已经存在的进程执行了一个用于建立进程的系统调用而建立的:
1. 在UNIX中该系统调用是:fork,fork会建立一个与父进程如出一辙的副本,两者有相同的存储映像、一样的环境字符串和一样的打开文件(在shell解释器进程中,执行一个命令就会建立一个子进程)
2. 在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的建立,也负责把正确的程序装入新进程。
关于建立的子进程,UNIX和windows
1.相同的是:进程建立后,父进程和子进程有各自不一样的地址空间(多道技术要求物理层面实现进程之间内存的隔离),任何一个进程的在其地址空间中的修改都不会影响到另一个进程。
2.不一样的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是能够有只读的共享内存区的。可是对于windows系统来讲,从一开始父进程与子进程的地址空间就是不一样的。
2.进程的结束
1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出错退出(自愿,python a.py中a.py不存在)
3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,能够捕捉异常,try...except...)
4. 被其余进程杀死(非自愿,如kill -9)
3.进程并发的实现(了解)
进程并发的实如今于,硬件中断一个正在运行的进程,把此时进程运行的全部状态保存下来,为此,操做系统维护一张表格,即进程表(process table),每一个进程占用一个进程表项(这些表项也称为进程控制块)
该表存放了进程状态的重要信息:程序计数器、堆栈指针、内存分配情况、全部打开文件的状态、账号和调度信息,以及其余在进程由运行态转为就绪态或阻塞态时,必须保存的信息,从而保证该进程在再次启动时,就像从未被中断过同样。
===========================================================
上面的内容都是进程的一些理论基础,下面的内容是python中进程的应用实战
=====================================================================
今天的内容就到这个地方吧,同窗们好好整理一下~~~~~~~~~~~~~~~~
经过上面内容的学习,咱们已经了解了不少进程相关的理论知识,了解进程是什么应该再也不困难了,刚刚咱们已经了解了,运行中的程序就是一个进程。全部的进程都是经过它的父进程来建立的。所以,运行起来的python程序也是一个进程,那么咱们也能够在程序中再建立进程。多个进程能够实现并发效果,也就是说,当咱们的程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快。以咱们以前所学的知识,并不能实现建立进程这个功能,因此咱们就须要借助python中强大的模块。
仔细说来,multiprocess不是一个模块而是python中一个操做、管理进程的包。 之因此叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的全部子模块。因为提供的子模块很是多,为了方便你们归类记忆,我将这部分大体分为四个部分:建立进程部分,进程同步部分,进程池部分,进程之间数据共享。重点强调:进程没有任何共享状态,进程修改的数据,改动仅限于该进程内,可是经过一些特殊的方法,能够实现进程之间数据的共享。
1.process模块介绍
process模块是一个建立进程的模块,借助这个模块,就能够完成进程的建立。
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动) 强调: 1. 须要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
咱们先写一个程序来看看:
#当前文件名称为test.py
# from multiprocessing import Process # # def func(): # print(12345) # # if __name__ == '__main__': #windows 下才须要写这个,这和系统建立进程的机制有关系,不用深究,记着windows下要写就好啦 # #首先我运行当前这个test.py文件,运行这个文件的程序,那么就产生了进程,这个进程咱们称为主进程 # # p = Process(target=func,) #将函数注册到一个进程中,p是一个进程对象,此时尚未启动进程,只是建立了一个进程对象。而且func是不加括号的,由于加上括号这个函数就直接运行了对吧。 # p.start() #告诉操做系统,给我开启一个进程,func这个函数就被咱们新开的这个进程执行了,而这个进程是我主进程运行过程当中建立出来的,因此称这个新建立的进程为主进程的子进程,而主进程又能够称为这个新进程的父进程。
#而这个子进程中执行的程序,至关于将如今这个test.py文件中的程序copy到一个你看不到的python文件中去执行了,就至关于当前这个文件,被另一个py文件import过去并执行了。
#start并非直接就去执行了,咱们知道进程有三个状态,进程会进入进程的三个状态,就绪,(被调度,也就是时间片切换到它的时候)执行,阻塞,而且在这个三个状态之间不断的转换,等待cpu执行时间片到了。 # print('*' * 10) #这是主进程的程序,上面开启的子进程的程序是和主进程的程序同时运行的,咱们称为异步
上面说了,咱们经过主进程建立的子进程是异步执行的,那么咱们就验证一下,而且看一会儿进程和主进程(也就是父进程)的ID号(讲一下pid和ppid,使用pycharm举例),来看看是不是父子关系。
打开windows下的任务管理器,看pycharm的pid进程号,是咱们上面运行的test.py这个文件主进程的父进程号:
看一个问题,说明linux和windows两个不一样的操做系统建立进程的不一样机制致使的不一样结果:
import time import os from multiprocessing import Process def func(): print('aaaa') time.sleep(1) print('子进程>>',os.getpid()) print('该子进程的父进程>>',os.getppid()) print(12345) print('太白老司机~~~~') #若是我在这里加了一个打印,你会发现运行结果中会出现两次打印出来的太白老司机,由于咱们在主进程中开了一个子进程,子进程中的程序至关于import的主进程中的程序,那么import的时候会不会执行你import的那个文件的程序啊,前面学的,是会执行的,因此出现了两次打印 #实际上是由于windows开起进程的机制决定的,在linux下是不存在这个效果的,由于windows使用的是process方法来开启进程,他就会拿到主进程中的全部程序,而linux下只是去执行我子进程中注册的那个函数,不会执行别的程序,这也是为何在windows下要加上执行程序的时候, 要加上if __name__ == '__main__':,不然会出现子进程中运行的时候还开启子进程,那就出现无限循环的建立进程了,就报错了
一个进程的生命周期:若是子进程的运行时间长,那么等到子进程执行结束程序才结束,若是主进程的执行时间长,那么主进程执行结束程序才结束,实际上咱们在子进程中打印的内容是在主进程的执行结果中看不出来的,可是pycharm帮咱们作了优化,由于它会识别到你这是开的子进程,帮你把子进程中打印的内容打印到了显示台上。
若是说一个主进程运行完了以后,咱们把pycharm关了,可是子进程尚未执行结束,那么子进程还存在吗?这要看你的进程是如何配置的,若是说咱们没有配置说我主进程结束,子进程要跟着结束,那么主进程结束的时候,子进程是不会跟着结束的,他会本身执行完,若是我设定的是主进程结束,子进程必须跟着结束,那么就不会出现单独的子进程(孤儿进程)了,具体如何设置,看下面的守护进程的讲解。好比说,咱们未来启动项目的时候,可能经过cmd来启动,那么我cmd关闭了你的项目就会关闭吗,不会的,由于你的项目不能中止对外的服务,对吧。
Process类中参数的介绍:
参数介绍: 1 group参数未使用,值始终为None 2 target表示调用对象,即子进程要执行的任务 3 args表示调用对象的位置参数元组,args=(1,2,'egon',) 4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} 5 name为子进程的名称
给要执行的函数传参数:
def func(x,y): print(x) time.sleep(1) print(y) if __name__ == '__main__': p = Process(target=func,args=('姑娘','来玩啊!'))#这是func须要接收的参数的传送方式。 p.start() print('父进程执行结束!') #执行结果: 父进程执行结束! 姑娘 来玩啊!
Process类中各方法的介绍:
1 p.start():启动进程,并调用该子进程中的p.run() 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法 3 p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁 4 p.is_alive():若是p仍然运行,返回True 5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
join方法的例子:
让主进程加上join的地方等待(也就是阻塞住),等待子进程执行完以后,再继续往下执行个人主进程,好多时候,咱们主进程须要子进程的执行结果,因此必需要等待。join感受就像是将子进程和主进程拼接起来同样,将异步改成同步执行。
def func(x,y): print(x) time.sleep(1) print(y) if __name__ == '__main__': p = Process(target=func,args=('姑娘','来玩啊!')) p.start() print('我这里是异步的啊!') #这里相对于子进程仍是异步的 p.join() #只有在join的地方才会阻塞住,将子进程和主进程之间的异步改成同步 print('父进程执行结束!') #打印结果: 我这里是异步的啊! 姑娘 来玩啊! 父进程执行结束!
怎么样开启多个进程呢?for循环。而且我有个需求就是说,全部的子进程异步执行,而后全部的子进程所有执行完以后,我再执行主进程,怎么搞?看代码
#下面的注释按照编号去看,别忘啦! import time import os from multiprocessing import Process def func(x,y): print(x) # time.sleep(1) #进程切换:若是没有这个时间间隔,那么你会发现func执行结果是打印一个x而后一个y,再打印一个x一个y,不会出现打印多个x而后打印y的状况,由于两个打印距离太近了并且执行的也很是快,可是若是你这段程序运行慢的话,你就会发现进程之间的切换了。 print(y) if __name__ == '__main__': p_list= [] for i in range(10): p = Process(target=func,args=('姑娘%s'%i,'来玩啊!')) p_list.append(p) p.start() [ap.join() for ap in p_list] #四、这是解决办法,前提是咱们的子进程所有都已经去执行了,那么我在一次给全部正在执行的子进程加上join,那么主进程就须要等着全部子进程执行结束才会继续执行本身的程序了,而且保障了全部子进程是异步执行的。 # p.join() #一、若是加到for循环里面,那么全部子进程包括父进程就所有变为同步了,由于for循环也是主进程的,循环第一次的时候,一个进程去执行了,而后这个进程就join住了,那么for循环就不会继续执行了,等着第一个子进程执行结束才会继续执行for循环去建立第二个子进程。 #二、若是我不想这样的,也就是我想全部的子进程是异步的,而后全部的子进程执行完了再执行主进程 #p.join() #三、若是这样写的话,屡次运行以后,你会发现会出现主进程的程序比一些子进程先执行完,由于咱们p.join()是对最后一个子进程进行了join,也就是说若是这最后一个子进程先于其余子进程执行完,那么主进程就会去执行,而此时若是还有一些子进程没有执行完,而主进程执行 #完了,那么就会先打印主进程的内容了,这个cpu调度进程的机制有关系,由于咱们的电脑可能只有4个cpu,个人子进程加上住进程有11个,虽然我for循环是按顺序起进程的,可是操做系统必定会按照顺序给你执行你的进程吗,答案是不会的,操做系统会按照本身的算法来分配进 #程给cpu去执行,这里也解释了咱们打印出来的子进程中的内容也是没有固定顺序的缘由,由于打印结果也须要调用cpu,能够理解成进程在争抢cpu,若是同窗你想问这是什么算法,这就要去研究操做系统啦。那咱们的想全部子进程异步执行,而后再执行主进程的这个需求怎么解决啊 print('不要钱~~~~~~~~~~~~~~~~!')
模拟两个应用场景:一、同时对一个文件进行写操做 二、同时建立多个文件
import time import os import re from multiprocessing import Process #多进程同时对一个文件进行写操做 def func(x,y,i): with open(x,'a',encoding='utf-8') as f: print('当前进程%s拿到的文件的光标位置>>%s'%(os.getpid(),f.tell())) f.write(y) #多进程同时建立多个文件 # def func(x, y): # with open(x, 'w', encoding='utf-8') as f: # f.write(y) if __name__ == '__main__': p_list= [] for i in range(10): p = Process(target=func,args=('can_do_girl_lists.txt','姑娘%s'%i,i)) # p = Process(target=func,args=('can_do_girl_info%s.txt'%i,'姑娘电话0000%s'%i)) p_list.append(p) p.start() [ap.join() for ap in p_list] #这就是个for循环,只不过用列表生成式的形式写的 with open('can_do_girl_lists.txt','r',encoding='utf-8') as f: data = f.read() all_num = re.findall('\d+',data) #打开文件,统计一下里面有多少个数据,每一个数据都有个数字,因此re匹配一下就好了 print('>>>>>',all_num,'.....%s'%(len(all_num))) #print([i in in os.walk(r'你的文件夹路径')]) print('不要钱~~~~~~~~~~~~~~~~!')
Process类中自带封装的各属性的介绍
1 p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置 2 p.name:进程的名称 3 p.pid:进程的pid 4 p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束(了解便可) 5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功(了解便可)
2.Process类的使用
注意:在windows中Process()必须放到# if __name__ == '__main__':下
Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() inside if __name__ == "__main__" since statements inside this if-statement will not get called upon import. 因为Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 若是在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。
进程的建立第二种方法(继承)
class MyProcess(Process): #本身写一个类,继承Process类 #咱们经过init方法能够传参数,若是只写一个run方法,那么无法传参数,由于建立对象的是传参就是在init方法里面,面向对象的时候,咱们是否是学过 def __init__(self,person): super().__init__() self.person=person def run(self): print(os.getpid()) print(self.pid) print(self.pid) print('%s 正在和女主播聊天' %self.person) # def start(self): # #若是你非要写一个start方法,能够这样写,而且在run方法先后,能够写一些其余的逻辑 # self.run() if __name__ == '__main__': p1=MyProcess('Jedan') p2=MyProcess('太白') p3=MyProcess('alexDSB') p1.start() #start内部会自动调用run方法 p2.start() # p2.run() p3.start() p1.join() p2.join() p3.join()
进程之间的数据是隔离的:
#咱们说进程之间的数据是隔离的,也就是数据不共享,看下面的验证 from multiprocessing import Process n=100 #首先我定义了一个全局变量,在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就能够了 def work(): global n n=0 print('子进程内: ',n) if __name__ == '__main__': p=Process(target=work) p.start() p.join() #等待子进程执行完毕,若是数据共享的话,我子进程是否是经过global将n改成0了,可是你看打印结果,主进程在子进程执行结束以后,仍然是n=100,子进程n=0,说明子进程对n的修改没有在主进程中生效,说明什么?说明他们之间的数据是隔离的,互相不影响的 print('主进程内: ',n) #看结果: # 子进程内: 0 # 主进程内: 100
练习:咱们以前学socket的时候,知道tcp协议的socket是不能同时和多个客户端进行链接的,(这里先不考虑socketserver那个模块),对不对,那咱们本身经过多进程来实现一下同时和多个客户端进行链接通讯。
服务端代码示例:(注意一点:经过这个是不能作qq聊天的,由于qq聊天是qq的客户端把信息发给另一个qq的客户端,中间有一个服务端帮你转发消息,而不是咱们这样的单纯的客户端和服务端对话,而且子进程开启以后我们是无法操做的,而且没有为子进程input输入提供控制台,全部你再在子进程中写上了input会报错,EOFError错误,这个错误的意思就是你的input须要输入,可是你输入不了,就会报这个错误。而子进程的输出打印之类的,是pycharm作了优化,将全部子进程中的输出结果帮你打印出来了,但实质仍是不一样进程的。)
from socket import * from multiprocessing import Process def talk(conn,client_addr): while True: try: msg=conn.recv(1024) print('客户端消息>>',msg) if not msg:break conn.send(msg.upper()) #在这里有同窗可能会想,我能不能在这里写input来本身输入内容和客户端进行对话?朋友,是这样的,按说是能够的,可是须要什么呢?须要你像咱们用pycharm的是同样下面有一个输入内容的控制台,当咱们的子进程去执行的时候,咱们是没有地方能够显示可以让你输入内容的控制台的,因此你没办法输入,就会给你报错。 except Exception: break if __name__ == '__main__': #windows下start进程必定要写到这下面 server = socket(AF_INET, SOCK_STREAM) # server.setsockopt(SOL_SOCKET, SO_REUSEADDR,1) # 若是你将若是你将bind这些代码写到if __name__ == '__main__'这行代码的上面,那么地址重用必需要有,由于咱们知道windows建立的子进程是对整个当前文件的内容进行的copy,前面说了就像import,若是你开启了子进程,那么子进程是会执行bind的,那么你的主进程bind了这个ip和端口,子进程在进行bind的时候就会报错。 server.bind(('127.0.0.1', 8080)) #有同窗可能还会想,我为何多个进程就能够链接一个server段的一个ip和端口了呢,我记得当时说tcp的socket的时候,我是不能在你这个ip和端口被链接的状况下再链接你的啊,这里是由于当时咱们就是一个进程,一个进程里面是只能一个链接的,多进程是能够多链接的,这和进程之间是单独的内存空间有关系,先这样记住他,好吗? server.listen(5) while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start()
客户端代码示例:
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
上面咱们经过多进程实现了并发,可是有个问题
每来一个客户端,都在服务端开启一个进程,若是并发来一个万个客户端,要开启一万个进程吗,你本身尝试着在你本身的机器上开启一万个,10万个进程试一试。 解决方法:进程池,本篇博客后面会讲到,你们继续学习呀
Process对象的其余方法或属性(简单了解一下就能够啦)
#进程对象的其余方法一:terminate,is_alive from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is 打飞机' %self.name) # s = input('???') #别忘了再pycharm下子进程中不能input输入,会报错EOFError: EOF when reading a line,由于子进程中没有像咱们主进程这样的在pycharm下的控制台能够输入东西的地方 time.sleep(2) print('%s is 打飞机结束' %self.name) if __name__ == '__main__': p1=Piao('太白') p1.start() time.sleep(5) p1.terminate()#关闭进程,不会当即关闭,有个等着操做系统去关闭这个进程的时间,因此is_alive马上查看的结果可能仍是存活,可是稍微等一会,就被关掉了 print(p1.is_alive()) #结果为True print('等会。。。。') time.sleep(1) print(p1.is_alive()) #结果为False
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): # self.name=name # super().__init__() #Process的__init__方法会执行self.name=Piao-1, # #因此加到这里,会覆盖咱们的self.name=name #为咱们开启的进程设置名字的作法 super().__init__() self.name=name def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.start() print('开始') print(p.pid) #查看pid
僵尸进程与孤儿进程(简单了解 一下就能够啦)
参考博客:http://www.cnblogs.com/Anker/p/3271773.html 一:僵尸进程(有害) 僵尸进程:一个进程使用fork建立子进程,若是子进程退出,而父进程并无调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。详解以下 咱们知道在unix/linux中,正常状况下子进程是经过父进程建立的,子进程在建立新的进程。子进程的结束和父进程的运行是一个异步过程,即父进程永远没法预测子进程到底何时结束,若是子进程一结束就马上回收其所有资源,那么在父进程内将没法获取子进程的状态信息。 所以,UNⅨ提供了一种机制能够保证父进程能够在任意时刻获取子进程结束时的状态信息: 一、在每一个进程退出的时候,内核释放该进程全部的资源,包括打开的文件,占用的内存等。可是仍然为其保留必定的信息(包括进程号the process ID,退出状态the termination status of the process,运行时间the amount of CPU time taken by the process等) 二、直到父进程经过wait / waitpid来取时才释放. 但这样就致使了问题,若是进程不调用wait / waitpid的话,那么保留的那段信息就不会释放,其进程号就会一直被占用,可是系统所能使用的进程号是有限的,若是大量的产生僵死进程,将由于没有可用的进程号而致使系统不能产生新的进程. 此即为僵尸进程的危害,应当避免。 任何一个子进程(init除外)在exit()以后,并不是立刻就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。这是每一个子进程在结束时都要通过的阶段。若是子进程在exit()以后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”。若是父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不通过僵尸状态。 若是父进程在子进程结束以前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。 二:孤儿进程(无害) 孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工做。 孤儿进程是没有父进程的进程,孤儿进程这个重任就落到了init进程身上,init进程就好像是一个民政局,专门负责处理孤儿进程的善后工做。每当出现一个孤儿进程的时候,内核就把孤 儿进程的父进程设置为init,而init进程会循环地wait()它的已经退出的子进程。这样,当一个孤儿进程凄凉地结束了其生命周期的时候,init进程就会表明党和政府出面处理它的一切善后工做。所以孤儿进程并不会有什么危害。 咱们来测试一下(建立完子进程后,主进程所在的这个脚本就退出了,当父进程先于子进程结束时,子进程会被init收养,成为孤儿进程,而非僵尸进程),文件内容 import os import sys import time pid = os.getpid() ppid = os.getppid() print 'im father', 'pid', pid, 'ppid', ppid pid = os.fork() #执行pid=os.fork()则会生成一个子进程 #返回值pid有两种值: # 若是返回的pid值为0,表示在子进程当中 # 若是返回的pid值>0,表示在父进程当中 if pid > 0: print 'father died..' sys.exit(0) # 保证主线程退出完毕 time.sleep(1) print 'im child', os.getpid(), os.getppid() 执行文件,输出结果: im father pid 32515 ppid 32015 father died.. im child 32516 1 看,子进程已经被pid为1的init进程接收了,因此僵尸进程在这种状况下是不存在的,存在只有孤儿进程而已,孤儿进程声明周期结束天然会被init来销毁。 三:僵尸进程危害场景: 例若有个进程,它按期的产 生一个子进程,这个子进程须要作的事情不多,作完它该作的事情以后就退出了,所以这个子进程的生命周期很短,可是,父进程只管生成新的子进程,至于子进程 退出以后的事情,则一律漠不关心,这样,系统运行上一段时间以后,系统中就会存在不少的僵死进程,假若用ps命令查看的话,就会看到不少状态为Z的进程。 严格地来讲,僵死进程并非问题的根源,罪魁祸首是产生出大量僵死进程的那个父进程。所以,当咱们寻求如何消灭系统中大量的僵死进程时,答案就是把产生大 量僵死进程的那个元凶枪毙掉(也就是经过kill发送SIGTERM或者SIGKILL信号啦)。枪毙了元凶进程以后,它产生的僵死进程就变成了孤儿进 程,这些孤儿进程会被init进程接管,init进程会wait()这些孤儿进程,释放它们占用的系统进程表中的资源,这样,这些已经僵死的孤儿进程 就能瞑目而去了。 四:测试 #一、产生僵尸进程的程序test.py内容以下 #coding:utf-8 from multiprocessing import Process import time,os def run(): print('子',os.getpid()) if __name__ == '__main__': p=Process(target=run) p.start() print('主',os.getpid()) time.sleep(1000) #二、在unix或linux系统上执行 [root@vm172-31-0-19 ~]# python3 test.py & [1] 18652 [root@vm172-31-0-19 ~]# 主 18652 子 18653 [root@vm172-31-0-19 ~]# ps aux |grep Z USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND root 18653 0.0 0.0 0 0 pts/0 Z 20:02 0:00 [python3] <defunct> #出现僵尸进程 root 18656 0.0 0.0 112648 952 pts/0 S+ 20:02 0:00 grep --color=auto Z [root@vm172-31-0-19 ~]# top #执行top命令发现1zombie top - 20:03:42 up 31 min, 3 users, load average: 0.01, 0.06, 0.12 Tasks: 93 total, 2 running, 90 sleeping, 0 stopped, 1 zombie %Cpu(s): 0.0 us, 0.3 sy, 0.0 ni, 99.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st KiB Mem : 1016884 total, 97184 free, 70848 used, 848852 buff/cache KiB Swap: 0 total, 0 free, 0 used. 782540 avail Mem PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND root 20 0 29788 1256 988 S 0.3 0.1 0:01.50 elfin #三、 等待父进程正常结束后会调用wait/waitpid去回收僵尸进程 但若是父进程是一个死循环,永远不会结束,那么该僵尸进程就会一直存在,僵尸进程过多,就是有害的 解决方法一:杀死父进程 解决方法二:对开启的子进程应该记得使用join,join会回收僵尸进程 参考python2源码注释 class Process(object): def join(self, timeout=None): ''' Wait until child process terminates ''' assert self._parent_pid == os.getpid(), 'can only join a child process' assert self._popen is not None, 'can only join a started process' res = self._popen.wait(timeout) if res is not None: _current_process._children.discard(self) join方法中调用了wait,告诉系统释放僵尸进程。discard为从本身的children中剔除 解决方法三:http://blog.csdn.net/u010571844/article/details/50419798
3.守护进程
以前咱们讲的子进程是不会随着主进程的结束而结束,子进程所有执行完以后,程序才结束,那么若是有一天咱们的需求是个人主进程结束了,由我主进程建立的那些子进程必须跟着结束,怎么办?守护进程就来了!
主进程建立守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self,person): super().__init__() self.person = person def run(self): print(os.getpid(),self.name) print('%s正在和女主播聊天' %self.person) time.sleep(3) if __name__ == '__main__': p=Myprocess('太白') p.daemon=True #必定要在p.start()前设置,设置p为守护进程,禁止p建立子进程,而且父进程代码执行结束,p即终止运行 p.start() # time.sleep(1) # 在sleep时linux下查看进程id对应的进程ps -ef|grep id print('主')
4.进程同步(锁)
经过刚刚的学习,咱们想方设法实现了程序的异步,让多个任务能够同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受咱们控制。尽管并发编程让咱们能更加充分的利用IO资源,可是也给咱们带来了新的问题:进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。
import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' %(n,os.getpid())) time.sleep(random.random()) print('%s:%s is done' %(n,os.getpid())) if __name__ == '__main__': for i in range(5): p=Process(target=work,args=(i,)) p.start() # 看结果:经过结果能够看出两个问题:问题一:每一个进程中work函数的第一个打印就不是按照咱们for循环的0-4的顺序来打印的 #问题二:咱们发现,每一个work进程中有两个打印,可是咱们看到全部进程中第一个打印的顺序为0-2-1-4-3,可是第二个打印没有按照这个顺序,变成了2-1-0-3-4,说明咱们一个进程中的程序的执行顺序都混乱了。 #问题的解决方法,第二个问题加锁来解决,第一个问题是没有办法解决的,由于进程开到了内核,有操做系统来决定进程的调度,咱们本身控制不了 # 0: 9560 is running # 2: 13824 is running # 1: 7476 is running # 4: 11296 is running # 3: 14364 is running # 2:13824 is done # 1:7476 is done # 0:9560 is done # 3:14364 is done # 4:11296 is done
#由并发变成了串行,牺牲了运行效率,但避免了竞争 from multiprocessing import Process,Lock import os,time def work(n,lock): #加锁,保证每次只有一个进程在执行锁里面的程序,这一段程序对于全部写上这个锁的进程,你们都变成了串行 lock.acquire() print('%s: %s is running' %(n,os.getpid())) time.sleep(1) print('%s:%s is done' %(n,os.getpid())) #解锁,解锁以后其余进程才能去执行本身的程序 lock.release() if __name__ == '__main__': lock=Lock() for i in range(5): p=Process(target=work,args=(i,lock)) p.start() #打印结果: # 2: 10968 is running # 2:10968 is done # 0: 7932 is running # 0:7932 is done # 4: 4404 is running # 4:4404 is done # 1: 12852 is running # 1:12852 is done # 3: 980 is running # 3:980 is done #结果分析:(本身去屡次运行一下,看看结果,我拿出其中一个结果来看)经过结果咱们能够看出,多进程刚开始去执行的时候,每次运行,首先打印出来哪一个进程的程序是不固定的,可是咱们解决了上面打印混乱示例代码的第二个问题,那就是同一个进程中的两次打印都是先完成的,而后才切换到下一个进程去,打印下一个进程中的两个打印结果,说明咱们控制住了同一进程中的代码执行顺序,若是涉及到多个进程去操做同一个数据或者文件的时候,就不担忧数据算错或者文件中的内容写入混乱了。
上面这种状况虽然使用加锁的形式实现了顺序的执行,可是程序又从新变成串行了,这样确实会浪费了时间,却保证了数据的安全。
接下来,咱们以模拟抢票为例,来看看数据安全的重要性。
#注意:首先在当前文件目录下建立一个名为db的文件 #文件db的内容为:{"count":1},只有这一行数据,而且注意,每次运行完了以后,文件中的1变成了0,你须要手动将0改成1,而后在去运行代码。 #注意必定要用双引号,否则json没法识别 #并发运行,效率高,但竞争写同一文件,数据写入错乱 from multiprocessing import Process,Lock import time,json,random #查看剩余票数 def search(): dic=json.load(open('db')) #打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典 print('\033[43m剩余票数%s\033[0m' %dic['count']) #抢票 def get(): dic=json.load(open('db')) time.sleep(0.1) #模拟读数据的网络延迟,那么进程之间的切换,致使全部人拿到的字典都是{"count": 1},也就是每一个人都拿到了这一票。 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模拟写数据的网络延迟 json.dump(dic,open('db','w')) #最终结果致使,每一个人显示都抢到了票,这就出现了问题~ print('\033[43m购票成功\033[0m') def task(): search() get() if __name__ == '__main__': for i in range(3): #模拟并发100个客户端抢票 p=Process(target=task) p.start() #看结果分析:因为网络延迟等缘由使得进程切换,致使每一个人都抢到了这最后一张票 # 剩余票数1 # 剩余票数1 # 剩余票数1 # 购票成功 # 购票成功 # 购票成功
进程锁总结:
#加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 虽然能够用文件共享数据实现进程间通讯,但问题是: 1.效率低(共享数据基于文件,而文件是硬盘上的数据) 2.须要本身加锁处理 #所以咱们最好找寻一种解决方案可以兼顾:一、效率高(多个进程共享一块内存的数据)二、帮咱们处理好锁问题。这就是mutiprocessing模块为咱们提供的基于消息的IPC通讯机制:队列和管道。 队列和管道都是将数据存放于内存中 队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来, 咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。 IPC通讯机制(了解):IPC是intent-Process Communication的缩写,含义为进程间通讯或者跨进程通讯,是指两个进程之间进行数据交换的过程。IPC不是某个系统所独有的,任何一个操做系统都须要有相应的IPC机制,
好比Windows上能够经过剪贴板、管道和邮槽等来进行进程间通讯,而Linux上能够经过命名共享内容、信号量等来进行进程间通讯。Android它也有本身的进程间通讯方式,Android建构在Linux基础上,继承了一
部分Linux的通讯方式。
次日进程的学习就到这里啦~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5.队列(推荐使用)
进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。队列就像一个特殊的列表,可是能够设置固定长度,而且从前面插入数据,从后面取出数据,先进先出。
Queue([maxsize]) 建立共享的进程队列。 参数 :maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。 底层队列使用管道和锁实现。
先看下面的代码示例,而后再看方法介绍。
queue的方法介绍
q = Queue([maxsize]) 建立共享的进程队列。maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还须要运行支持线程以便队列中的数据传输到底层管道中。 Queue的实例q具备如下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一个项目。若是q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 若是设置为False,将引起Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。若是在制定的时间间隔内没有项目变为可用,将引起Queue.Empty异常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 将item放入队列。若是队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。若是设置为False,将引起Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引起Queue.Full异常。 q.qsize() 返回队列中目前项目的正确数量。此函数的结果并不可靠,由于在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引起NotImplementedError异常。 q.empty() 若是调用此方法时 q为空,返回True。若是其余进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。 q.full() 若是q已满,返回为True. 因为线程的存在,结果也多是不可靠的(参考q.empty()方法)。。
queue的其余方法(了解)
q.close() 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,若是某个使用者正被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误。 q.cancel_join_thread() 不会再进程退出时自动链接后台线程。这能够防止join_thread()方法阻塞。 q.join_thread() 链接队列的后台线程。此方法用于在调用q.close()方法后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread()方法能够禁止这种行为。
咱们看一些代码示例:
from multiprocessing import Queue q=Queue(3) #建立一个队列对象,队列长度为3 #put ,get ,put_nowait,get_nowait,full,empty q.put(3) #往队列中添加数据 q.put(2) q.put(1) # q.put(4) # 若是队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。 # 若是队列中的数据一直不被取走,程序就会永远停在这里。 try: q.put_nowait(4) # 能够使用put_nowait,若是队列满了不会阻塞,可是会由于队列满了而报错。 except: # 所以咱们能够用一个try语句来处理这个错误。这样程序不会一直阻塞下去,可是会丢掉这个消息。 print('队列已经满了') # 所以,咱们再放入数据以前,能够先看一下队列的状态,若是已经满了,就不继续put了。 print(q.full()) #查看是否满了,满了返回True,不满返回False print(q.get()) #取出数据 print(q.get()) print(q.get()) # print(q.get()) # 同put方法同样,若是队列已经空了,那么继续取就会出现阻塞。 try: q.get_nowait(3) # 能够使用get_nowait,若是队列满了不会阻塞,可是会由于没取到值而报错。 except: # 所以咱们能够用一个try语句来处理这个错误。这样程序不会一直阻塞下去。 print('队列已经空了') print(q.empty()) #空了
#看下面的队列的时候,按照编号看注释 import time from multiprocessing import Process, Queue # 8. q = Queue(2) #建立一个Queue对象,若是写在这里,那么在windows还子进程去执行的时候,咱们知道子进程中还会执行这个代码,可是子进程中不可以再次建立了,也就是这个q就是你主进程中建立的那个q,经过咱们下面在主进程中先添加了一个字符串以后,在去开启子进程,你会发现,小鬼这个字符串还在队列中,也就是说,咱们使用的仍是主进程中建立的这个队列。 def f(q): # q = Queue() #9. 咱们在主进程中开启了一个q,若是咱们在子进程中的函数里面再开一个q,那么你下面q.put('姑娘,多少钱~')添加到了新建立的这q里里面了 q.put('姑娘,多少钱~') #4.调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。 # print(q.qsize()) #6.查看队列中有多少条数据了 def f2(q): print('》》》》》》》》') print(q.get()) #5.取数据 if __name__ == '__main__': q = Queue() #1.建立一个Queue对象 q.put('小鬼') p = Process(target=f, args=(q,)) #2.建立一个进程 p2 = Process(target=f2, args=(q,)) #3.建立一个进程 p.start() p2.start() time.sleep(1) #7.若是阻塞一点时间,就会出现主进程运行太快,致使咱们在子进程中查看qsize为1个。 # print(q.get()) #结果:小鬼 print(q.get()) #结果:姑娘,多少钱~ p.join()
接下来看一个稍微复杂一些的例子:
import os import time import multiprocessing # 向queue中输入数据的函数 def inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.asctime()) queue.put(info) # 向queue中输出数据的函数 def outputQ(queue): info = queue.get() print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info)) # Main if __name__ == '__main__': #windows下,若是开启的进程比较多的话,程序会崩溃,为了防止这个问题,使用freeze_support()方法来解决。知道就行啦 multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 输入进程 for i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) # 输出进程 for i in range(10): process = multiprocessing.Process(target=outputQ,args=(queue,)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join()
队列是进程安全的:同一时间只能一个进程拿到队列中的一个数据,你拿到了一个数据,这个数据别人就拿不到了。
下面咱们来看一个叫作生产者消费者模型的东西:
在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。
为何要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力,而且我能够根据生产速度和消费速度来均衡一下多少个生产者能够为多少个消费者提供足够的服务,就能够开多进程等等,而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据。
通俗的解释:看图说话。。背景有点乱,等我更新~~
那么咱们基于队列来实现一个生产者消费者模型,代码示例:
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主')
#生产者消费者模型总结 #程序中有两类角色 一类负责生产数据(生产者) 一类负责处理数据(消费者) #引入生产者消费者模型为了解决的问题是: 平衡生产者与消费者之间的工做能力,从而提升程序总体处理数据的速度 #如何实现: 生产者<-->队列<——>消费者 #生产者消费者模型实现类程序的解耦和
经过上面基于队列的生产者消费者代码示例,咱们发现一个问题:主进程永远不会结束,缘由是:生产者p在生产完后就结束了,可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。
解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(5): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) q.put(None) #在本身的子进程的最后加入一个结束信号 if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主')
注意:结束信号None,不必定要由生产者发,主进程里一样能够发,但主进程须要等生产者结束后才应该发送该信号
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() p1.join() #等待生产者进程结束 q.put(None) #发送结束信号 print('主')
但上述解决方式,在有多个生产者和多个消费者时,因为队列咱们说了是进程安全的,我一个进程拿走告终束信号,另一个进程就拿不到了,还须要多发送一个结束信号,有几个取数据的进程就要发送几个结束信号,咱们则须要用一个很low的方式去解决
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(name,q): for i in range(2): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨头',q)) p3=Process(target=producer,args=('泔水',q)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) #开始 p1.start() p2.start() p3.start() c1.start() p1.join() #必须保证生产者所有生产完毕,才应该发送结束信号 p2.join() p3.join() q.put(None) #有几个消费者就应该发送几回结束信号None q.put(None) #发送结束信号 print('主')
其实咱们的思路无非是发送结束信号而已,有另一种队列提供了这种机制
JoinableQueue([maxsize])
#JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 #参数介绍: maxsize是队列中容许最大项数,省略则无大小限制。 #方法介绍: JoinableQueue的实例p除了与Queue对象相同的方法以外还具备: q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常 q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止,也就是队列中的数据所有被get拿走了。
from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() # time.sleep(random.randint(1,3)) time.sleep(random.random()) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()发送一次信号,证实一个数据已经被取走并执行完了 def producer(name,q): for i in range(10): # time.sleep(random.randint(1,3)) time.sleep(random.random()) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) print('%s生产结束'%name) q.join() #生产完毕,使用此方法进行阻塞,直到队列中全部项目均被处理。 print('%s生产结束~~~~~~'%name) if __name__ == '__main__': q=JoinableQueue() #生产者们:即厨师们 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨头',q)) p3=Process(target=producer,args=('泔水',q)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True #若是不加守护,那么主进程结束不了,可是加了守护以后,必须确保生产者的内容生产完而且被处理完了,全部必须还要在主进程给生产者设置join,才能确保生产者生产的任务被执行完了,而且可以确保守护进程在全部任务执行完成以后才随着主进程的结束而结束。 c2.daemon=True #开始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() #我要确保你的生产者进程结束了,生产者进程的结束标志着你生产的全部的人任务都已经被处理完了 p2.join() p3.join() print('主') # 主进程等--->p1,p2,p3等---->c1,c2 # p1,p2,p3结束了,证实c1,c2确定全都收完了p1,p2,p3发到队列的数据 # 于是c1,c2也没有存在的价值了,不须要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,因此设置成守护进程就能够了。
6.管道(了解)
进程间通讯(IPC)方式二:管道(不推荐使用,了解便可),会致使数据不安全的状况出现,后面咱们会说到为何会带来数据 不安全的问题。
#建立管道的类: Pipe([duplex]):在进程之间建立一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的链接对象,强调一点:必须在产生Process对象以前产生管道 #参数介绍: dumplex:默认管道是全双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送。 #主要方法: conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另一端已经关闭,那么recv方法会抛出EOFError。 conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象 #其余方法: conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法 conn1.fileno():返回链接使用的整数文件描述符 conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。 conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。
from multiprocessing import Process, Pipe def f(conn): conn.send("Hello 妹妹") #子进程发送了消息 conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() #创建管道,拿到管道的两端,双工通讯方式,两端均可以收发消息 p = Process(target=f, args=(child_conn,)) #将管道的一段给子进程 p.start() #开启子进程 print(parent_conn.recv()) #主进程接受了消息 p.join()
应该特别注意管道端点的正确管理问题。若是是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为什么在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。若是忘记执行这些步骤,程序可能在消费者中的recv()操做上挂起(就是阻塞)。管道是由操做系统进行引用计数的,必须在全部进程中关闭管道的相同一端就会能生成EOFError异常。所以,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。
from multiprocessing import Process, Pipe def f(parent_conn,child_conn): #parent_conn.close() #不写close将不会引起EOFError while True: try: print(child_conn.recv()) except EOFError: child_conn.close() break if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(parent_conn,child_conn,)) p.start() child_conn.close() parent_conn.send('hello') parent_conn.close() p.join()
主进程将管道的两端都传送给子进程,子进程和主进程共用管道的两种报错状况,都是在recv接收的时候报错的:
1.主进程和子进程中的管道的相同一端都关闭了,出现EOFError;
2.若是你管道的一端在主进程和子进程中都关闭了,可是你还用这个关闭的一端去接收消息,那么就会出现OSError;
因此你关闭管道的时候,就容易出现问题,须要将全部只用这个管道的进程中的两端所有关闭才行。固然也能够经过异常捕获(try:except EOFerror)来处理。
虽然咱们在主进程和子进程中都打印了一下conn1一端的对象,发现两个再也不同一个地址,可是子进程中的管道和主进程中的管道仍是能够通讯的,由于管道是同一套,系统可以记录。
咱们的目的就是关闭全部的管道,那么主进程和子进程进行通讯的时候,能够给子进程传管道的一端就够了,而且用咱们以前学到的,信息发送完以后,再发送一个结束信号None,那么你收到的消息为None的时候直接结束接收或者说结束循环,就不用每次都关闭各个进程中的管道了。
from multiprocessing import Pipe,Process def func(conn): while True: msg = conn.recv() if msg is None:break print(msg) if __name__ == '__main__': conn1,conn2 = Pipe() p = Process(target=func,args=(conn1,)) p.start() for i in range(10): conn2.send('约吧') conn2.send(None)
from multiprocessing import Process,Pipe def consumer(p,name): produce, consume=p produce.close() while True: try: baozi=consume.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: break def producer(seq,p): produce, consume=p consume.close() for i in seq: produce.send(i) if __name__ == '__main__': produce,consume=Pipe() c1=Process(target=consumer,args=((produce,consume),'c1')) c1.start() seq=(i for i in range(10)) producer(seq,(produce,consume)) produce.close() consume.close() c1.join() print('主进程')
关于管道会形成数据不安全问题的官方解释: The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
由Pipe方法返回的两个链接对象表示管道的两端。每一个链接对象都有send和recv方法(除其余以外)。注意,若是两个进程(或线程)试图同时从管道的同一端读取或写入数据,那么管道中的数据可能会损坏。固然,在使用管道的不一样端部的过程当中不存在损坏风险。
from multiprocessing import Process,Pipe,Lock def consumer(p,name,lock): produce, consume=p produce.close() while True: lock.acquire() baozi=consume.recv() lock.release() if baozi: print('%s 收到包子:%s' %(name,baozi)) else: consume.close() break def producer(p,n): produce, consume=p consume.close() for i in range(n): produce.send(i) produce.send(None) produce.send(None) produce.close() if __name__ == '__main__': produce,consume=Pipe() lock = Lock() c1=Process(target=consumer,args=((produce,consume),'c1',lock)) c2=Process(target=consumer,args=((produce,consume),'c2',lock)) p1=Process(target=producer,args=((produce,consume),10)) c1.start() c2.start() p1.start() produce.close() consume.close() c1.join() c2.join() p1.join() print('主进程')
管道能够用于双工通讯,一般利用在客户端/服务端中使用的请求/响应模型,或者远程过程调用,就能够使用管道编写与进程交互的程序,像前面将网络通讯的时候,咱们使用了一个叫subprocess的模块,里面有个参数是pipe管道,执行系统指令,并经过管道获取结果。
7.数据共享(了解)
展望将来,基于消息传递的并发编程是大势所趋
即使是使用线程,推荐作法也是将程序设计为大量独立的线程集合
经过消息队列交换数据。这样极大地减小了对使用锁定和其余同步手段的需求,还能够扩展到分布式系统中
进程间应该尽可能避免通讯,即使须要通讯,也应该选择进程安全的工具来避免加锁带来的问题,应该尽可能避免使用本节所讲的共享数据的方式,之后咱们会尝试使用数据库来解决进程之间的数据共享问题。
进程之间数据共享的模块之一Manager模块:
进程间数据是独立的,能够借助于队列或管道实现通讯,两者都是基于消息传递的 虽然进程间数据独立,但能够经过Manager实现数据共享,事实上Manager的功能远不止于此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
多进程共同去处理共享数据的时候,就和咱们多进程同时去操做一个文件中的数据是同样的,不加锁就会出现错误的结果,进程不安全的,因此也须要加锁
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加锁而操做共享的数据,确定会出现数据错乱 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
总结一下,进程之间的通讯:队列、管道、数据共享也算
下面要讲的信号量和事件也至关于锁,也是全局的,全部进程都能拿到这些锁的状态,进程之间这些锁啊信号量啊事件啊等等的通讯,其实底层仍是socekt,只不过是基于文件的socket通讯,而不是跟上面的数据共享啊空间共享啊之类的机制,咱们以前学的是基于网络的socket通讯,还记得socket的两个家族吗,一个文件的一个网络的,因此未来若是说这些锁之类的报错,可能你看到的就是相似于socket的错误,简单知道一下就能够啦~~~
工做中经常使用的是锁,信号量和事件不经常使用,可是信号量和事件面试的时候会问到,你能知道就行啦~~~
8.信号量(了解)
互斥锁同时只容许一个线程更改数据,而信号量Semaphore是同时容许必定数量的线程更改数据 。 假设商场里有4个迷你唱吧,因此同时能够进去4我的,若是来了第五我的就要在外面等待,等到有人出来才能再进去玩。 实现: 信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。 信号量与进程池的概念很像,可是要区分开,信号量涉及到加锁的概念
好比大保健:提早设定好,一个房间只有4个床(计数器如今为4),那么同时只能四我的进来,谁先来的谁先占一个床(acquire,计数器减1),4个床满了以后(计数器为0了),第五我的就要等着,等其中一我的出来(release,计数器加1),他就去占用那个床了。
from multiprocessing import Process,Semaphore import time,random def go_ktv(sem,user): sem.acquire() print('%s 占到一间ktv小屋' %user) time.sleep(random.randint(0,3)) #模拟每一个人在ktv中待的时间不一样 sem.release() if __name__ == '__main__': sem=Semaphore(4) p_l=[] for i in range(13): p=Process(target=go_ktv,args=(sem,'user%s' %i,)) p.start() p_l.append(p) for i in p_l: i.join() print('============》')
9.事件(了解)
python线程的事件用于主线程控制其余线程的执行,事件主要提供了三个方法 set、wait、clear。 事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。 clear:将“Flag”设置为False set:将“Flag”设置为True
from multiprocessing import Process,Semaphore,Event import time,random e = Event() #建立一个事件对象 print(e.is_set()) #is_set()查看一个事件的状态,默认为False,可经过set方法改成True print('look here!') # e.set() #将is_set()的状态改成True。 # print(e.is_set())#is_set()查看一个事件的状态,默认为False,可经过set方法改成Tr # e.clear() #将is_set()的状态改成False # print(e.is_set())#is_set()查看一个事件的状态,默认为False,可经过set方法改成Tr e.wait() #根据is_set()的状态结果来决定是否在这阻塞住,is_set()=False那么就阻塞,is_set()=True就不阻塞 print('give me!!') #set和clear 修改事件的状态 set-->True clear-->False #is_set 用来查看一个事件的状态 #wait 依据事件的状态来决定是否阻塞 False-->阻塞 True-->不阻塞
from multiprocessing import Process, Event import time, random def car(e, n): while True: if not e.is_set(): # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色 print('\033[31m红灯亮\033[0m,car%s等着' % n) e.wait() # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色 print('\033[32m车%s 看见绿灯亮了\033[0m' % n) time.sleep(random.randint(2,4)) if not e.is_set(): #若是is_set()的值是Flase,也就是红灯,仍然回到while语句开始 continue print('车开远了,car', n) break # def police_car(e, n): # while True: # if not e.is_set():# 进程刚开启,is_set()的值是Flase,模拟信号灯为红色 # print('\033[31m红灯亮\033[0m,car%s等着' % n) # e.wait(0.1) # 阻塞,等待设置等待时间,等待0.1s以后没有等到绿灯就闯红灯走了 # if not e.is_set(): # print('\033[33m红灯,警车先走\033[0m,car %s' % n) # else: # print('\033[33;46m绿灯,警车走\033[0m,car %s' % n) # break def traffic_lights(e, inverval): while True: time.sleep(inverval) if e.is_set(): print('######', e.is_set()) e.clear() # ---->将is_set()的值设置为False else: e.set() # ---->将is_set()的值设置为True print('***********',e.is_set()) if __name__ == '__main__': e = Event() for i in range(10): p=Process(target=car,args=(e,i,)) # 建立10个进程控制10辆车 time.sleep(random.random(1, 3)) #车不是一会儿全过来 p.start() # for i in range(5): # p = Process(target=police_car, args=(e, i,)) # 建立5个进程控制5辆警车 # p.start() #信号灯必须是单独的进程,由于它无论你车开到哪了,我就按照我红绿灯的规律来闪烁变换,对吧 t = Process(target=traffic_lights, args=(e, 5)) # 建立一个进程控制红绿灯 t.start() print('预备~~~~开始!!!')
为何要有进程池?进程池的概念。
在程序实际处理问题过程当中,忙时会有成千上万的任务须要被执行,闲时可能只有零星任务。那么在成千上万个任务须要被执行的时候,咱们就须要去建立成千上万个进程么?首先,建立进程须要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也须要消耗时间。第二即使开启了成千上万的进程,操做系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还须要进行切换而且记录每一个进程的执行节点,也就是记录上下文(各类变量等等乱七八糟的东西,虽然你看不到,可是操做系统都要作),这样反而会影响程序的效率。所以咱们不能无限制的根据任务开启或者结束进程。就看咱们上面的一些代码例子,你会发现有些程序是否是执行的时候比较慢才出结果,就是这个缘由,那么咱们要怎么作呢?
在这里,要给你们介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等处处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。若是有不少任务须要执行,池中的进程数量不够,任务就要等待以前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增长操做系统的调度难度,还节省了开闭进程的时间,也必定程度上可以实现并发效果
multiprocess.Poll模块
建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个进程去执行全部任务(高级一些的进程池能够根据你的并发量,搞成动态增长或减小进程池中的进程数量的操做),不会开启其余进程,提升操做系统效率,减小空间的占用等。
概念介绍:
Pool([numprocess [,initializer [, initargs]]]):建立进程池
numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值 initializer:是每一个工做进程启动时要执行的可调用对象,默认为None initargs:是要传给initializer的参数组
p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。 '''须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。''' p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成 P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法 obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起。 obj.ready():若是调用完成,返回True obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常 obj.wait([timeout]):等待结果变为可用。 obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数
import time from multiprocessing import Pool,Process #针对range(100)这种参数的 # def func(n): # for i in range(3): # print(n + 1) def func(n): print(n) # 结果: # (1, 2) # alex def func2(n): for i in range(3): print(n - 1) if __name__ == '__main__': #1.进程池的模式 s1 = time.time() #咱们计算一下开多进程和进程池的执行效率 poll = Pool(5) #建立含有5个进程的进程池 # poll.map(func,range(100)) #异步调用进程,开启100个任务,map自带join的功能 poll.map(func,[(1,2),'alex']) #异步调用进程,开启100个任务,map自带join的功能 # poll.map(func2,range(100)) #若是想让进程池完成不一样的任务,能够直接这样搞 #map只限于接收一个可迭代的数据类型参数,列表啊,元祖啊等等,若是想作其余的参数之类的操做,须要用后面咱们要学的方法。 # t1 = time.time() - s1 # # #2.多进程的模式 # s2 = time.time() # p_list = [] # for i in range(100): # p = Process(target=func,args=(i,)) # p_list.append(p) # p.start() # [pp.join() for pp in p_list] # t2 = time.time() - s2 # # print('t1>>',t1) #结果:0.5146853923797607s 进程池的效率高 # print('t2>>',t2) #结果:12.092015027999878s
有一点,map是异步执行的,而且自带close和join
通常约定俗成的是进程池中的进程数量为CPU的数量,工做中要看具体状况来考量。
实际应用代码示例:
同步与异步两种执行方式:
import os,time from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(1) return n**2 if __name__ == '__main__': p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程当中可能有阻塞也可能没有阻塞 # 但无论该任务是否存在阻塞,同步调用都会在原地等着 res_l.append(res) print(res_l)
import os import time import random from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(random.random()) return n**2 if __name__ == '__main__': p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行,而且能够执行不一样的任务,传送任意的参数了。 # 返回结果以后,将结果放入列表,归还进程,以后再执行新的任务 # 须要注意的是,进程池中的三个进程不会同时开启或者同时结束 # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。 res_l.append(res) # 异步apply_async用法:若是使用异步提交的任务,主进程须要使用join,等待进程池内任务都处理完,而后能够用get收集结果 # 不然,主进程结束,进程池可能还没来得及执行,也就跟着一块儿结束了 p.close() #不是关闭进程池,而是结束进程池接收任务,确保没有新任务再提交过来。 p.join() #感知进程池中的任务已经执行结束,只有当没有新的任务添加进来的时候,才能感知到任务结束了,因此在join以前必须加上close方法 for res in res_l: print(res.get()) #使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get
#一:使用进程池(异步调用,apply_async) #coding: utf-8 from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply_async(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 res_l.append(res) # s = res.get() #若是直接用res这个结果对象调用get方法获取结果的话,这个程序就变成了同步,由于get方法直接就在这里等着你建立的进程的结果,第一个进程建立了,而且去执行了,那么get就会等着第一个进程的结果,没有结果就一直等着,那么主进程的for循环是没法继续的,因此你会发现变成了同步的效果 print("==============================>") #没有后面的join,或get,则程序总体结束,进程池中的任务还没来得及所有执行完也都跟着主进程一块儿结束了 pool.close() #关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成 pool.join() #调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束 print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证实结果已经计算完毕,剩下的事情就是调用每一个对象下的get方法去获取结果 for i in res_l: print(i.get()) #使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get #二:使用进程池(同步调用,apply) #coding: utf-8 from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(0.1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另一个 print("==============================>") pool.close() pool.join() #调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束 print(res_l) #看到的就是最终的结果组成的列表 for i in res_l: #apply是同步的,因此直接获得结果,没有get()方法 print(i)
进程池版的socket并发聊天代码示例:
#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count()) #开启6个客户端,会发现2个客户端处于等待状态 #在每一个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程 from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): print('进程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(4) while True: conn,*_=server.accept() p.apply_async(talk,args=(conn,)) # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
发现:并发开启多个客户端,服务端同一时间只有4个不一样的pid,只能结束一个客户端,另一个客户端才会进来.
同时最多和4我的进行聊天,由于进程池中只有4个进程可供调用,那有同窗会问,咱们这么多人想同时聊天怎么办,又不让用多进程,进程池也不能开太多的进程,那咋整啊,后面咱们会学到多线程,到时候你们就知道了,如今大家先这样记住就好啦
而后咱们再提一个回调函数
须要回调函数的场景:进程池中任何一个任务一旦处理完了,就当即告知主进程:我好了额,你能够处理个人结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,这是进程池特有的,普通进程没有这个机制,可是咱们也能够经过进程通讯来拿到返回值,进程池的这个回调也是进程通讯的机制完成的。 咱们能够把耗时间(阻塞)的任务放到进程池中,而后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果
回调函数在写的时候注意一点,回调函数的形参执行有一个,若是你的执行函数有多个返回值,那么也能够被回调函数的这一个形参接收,接收的是一个元祖,包含着你执行函数的全部返回值。
使用进程池来搞爬虫的时候,最耗时间的是请求地址的网络请求延迟,那么若是咱们在将处理数据的操做加到每一个子进程中,那么全部在进程池后面排队的进程就须要等更长的时间才能获取进程池里面的执行进程来执行本身,因此通常咱们就将请求做成一个执行函数,经过进程池去异步执行,剩下的数据处理的内容放到另一个进程或者主进程中去执行,将网络延迟的时间也利用起来,效率更高。
requests这个模块的get方法请求页面,就和咱们在浏览器上输入一个网址而后回车去请求别人的网站的效果是同样的。安装requests模块的指令:在cmd窗口执行pip install requests。
import requests response = requests.get('http://www.baidu.com') print(response) print(response.status_code) #200正常,404找不到网页,503等5开头的是人家网站内部错误 print(response.content.decode('utf-8'))
from multiprocessing import Pool import requests import json import os def get_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的结果,其实彻底不必拿该结果,该结果已经传给回调函数处理了 ''' 打印结果: <进程3388> get https://www.baidu.com <进程3389> get https://www.python.org <进程3390> get https://www.openstack.org <进程3388> get https://help.github.com/ <进程3387> parse https://www.baidu.com <进程3389> get http://www.sina.com.cn/ <进程3387> parse https://www.python.org <进程3387> parse https://help.github.com/ <进程3387> parse http://www.sina.com.cn/ <进程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''
from multiprocessing import Pool import time,random import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() # res=requests.get('http://maoyan.com/board/7') # print(re.findall(pattern,res.text))
若是在主进程中等待进程池中全部任务都执行完毕后,再统一处理结果,则无需回调函数
from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待进程池中全部进程执行完毕 nums=[] for res in res_l: nums.append(res.get()) #拿到全部结果 print(nums) #主进程拿到全部的处理结果,能够在主进程中进行统一进行处理
进程池和信号量的区别:
进程池是多个须要被执行的任务在进程池外面排队等待获取进程对象去执行本身,而信号量是一堆进程等待着去执行一段逻辑代码。
信号量不能控制建立多少个进程,可是能够控制同时多少个进程可以执行,可是进程池能控制你能够建立多少个进程。
举例:就像那些开大车拉煤的,信号量是什么呢,就比如我只有五个车道,你每次只能过5辆车,可是不影响你建立100辆车,可是进程池至关于什么呢?至关于你只有5辆车,每次5个车拉东西,拉完你再把车放回来,给别的人拉煤用。
其余语言里面有更高级的进程池,在设置的时候,能够将进程池中的进程动态的建立出来,当需求增大的时候,就会自动在进程池中添加进程,需求小的时候,自动减小进程,而且能够设置进程数量的上线,最多为多,python里面没有。
进程池的其余实现方式:https://docs.python.org/dev/library/concurrent.futures.html