目录css
1.进程就是一个运行中的程序(是对正在运行程序的一个抽象)。python
2.程序和进程之间的区别:linux
3.进程—是计算机中最小的资源分配单位
在操做系统中的惟一标识符 :pidios
注意:同一个程序执行两次,就会在操做系统中出现两个进程,因此咱们能够同时运行一个软件,分别作不一样的事情也不会混乱。web
4.进程是数据隔离的,进程的数据隔离是由操做系统完成的。redis
要想多个进程交替运行,操做系统必须对这些进程进行调度,这个调度也不是随即进行的,而是须要遵循必定的法则,由此就有了进程的调度算法。算法
进程的调度是由操做系统完成的(咱们不能干预)。数据库
操做系统调度进程的算法:json
并行:windows
两个程序、两个CPU,每一个程序分别占用一个CPU本身执行本身的
看起来是同时执行,实际在每个时间点上都在各自执行着
并发:
两个程序、一个cpu,每一个程序交替的在一个cpu上执行
看起来在同时执行,可是实际上仍然是串行
1.同步
同步就是一个任务的完成须要依赖另一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态能够保持一致。
简单说:调用一个方法,要等待这个方法结束。
如:烧水 和 吹头发 正在烧水 停下烧水这个动做去吹头发 吹完头发以后继续烧水
2.异步
异步是不须要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工做,依赖的任务也当即执行,只要本身完成了整个任务就算完成了
。至于被依赖的任务最终是否真正完成,依赖它的任务没法肯定,因此它是不可靠的任务序列
。
简单说:调用一个方法,不等待这个方法结束,也不关心这个方法作了什么。
如:烧水 和 吹头发 正在烧水 开始吹头发,但烧水也在继续进行
3.阻塞
cpu不工做
阻塞影响了程序运行的效率。
4.非阻塞
cpu工做
5.同步阻塞
效率最低
例1:conn.recv
socket 阻塞的tcp协议的时候
例2:你专心排队,什么别的事都不作。
6.同步非阻塞
其实是效率低下的
例1:func() 没有io操做
socket 非阻塞的tcp协议的时候
调用函数(这个函数内部不存在io操做)
例2:你一边打着电话一边还须要抬头看到底队伍排到你了没有,若是把打电话和观察排队的位置当作是程序的两个操做的话,这个程序须要在这两种不一样的行为之间来回的切换,效率可想而知是低下的。
7.异步非阻塞
效率更高
例:把func扔到其余任务里去执行了
我自己的任务和func任务各自执行各自的,没有io操做
8.异步阻塞
异步操做是能够被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。
若是在银行等待办理业务的人采用的是异步的方式去等待消息被触发(通知)
,也就是领了一张小纸条,假如在这段时间里他不能离开银行作其它的事情,那么很显然,这我的被阻塞在了这个等待的操做上面;
在了解其余概念以前,咱们首先要了解进程的几个状态。在程序运行的过程当中,因为被操做系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。
1.就绪(Ready)状态
当进程已分配到除CPU之外的全部必要的资源,只要得到处理机即可当即执行,这时的进程状态称为就绪状态。
2.执行/运行(Running)状态当进程已得到处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。
3.阻塞(Blocked)状态正在执行的进程,因为等待某个事件发生而没法执行时,便放弃处理机而处于阻塞状态。引发进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能知足、等待信件(信号)等。
multiprocessing—— multi:multiple 多元的 processing 进程
import os import time print('start') time.sleep(20) print(os.getpid(),os.getppid(),'end')
os.getpid() 获取当前进程的pid
pid process id 子进程
ppid parent process id 父进程
父进程:在父进程中建立子进程
import os import time from multiprocessing import Process def func(): print('start',os.getpid()) time.sleep(1) print('end',os.getpid()) if __name__ == '__main__': p = Process(target=func) p.start() # 异步 调用开启进程的方法,可是并不等待这个进程真的开启 print('main :',os.getpid()) import os import time from multiprocessing import Process def eat(): print('start eating',os.getpid()) time.sleep(1) print('end eating',os.getpid()) def sleep(): print('start sleeping',os.getpid()) time.sleep(1) print('end sleeping',os.getpid()) if __name__ == '__main__': p1 = Process(target=eat) # 建立一个即将要执行eat函数的进程对象 p1.start() # 开启第一个子进程 p2 = Process(target=sleep) # 建立一个即将要执行sleep函数的进程对象 p2.start() # 开启第二个子进程 print('main :',os.getpid())
1.小知识补充:
if __name__ == '__main__'
:
__name__
有两种状况:
__name__ == '__main__'
执行的文件就是__name__
所在的文件
`name == '文件名'``
`__name__
所在的文件被导入执行的时候
2.操做系统建立进程的方式不一样:
windows操做系统执行开启进程的代码
实际上新的子进程须要经过import父进程的代码来完成数据的导入工做(再一次执行父进程文件中的代码来获取父进程中的数据),因此有一些内容咱们只但愿在父进程中完成,就写在if __name__ == '__main__'
:下面
ios linux操做系统建立进程 fork
正常的写就能够,不须要写if __name__ == '__main__'
3.主进程和子进程之间的关系:
主进程的结束逻辑:
主进程怎么知道子进程结束了的呢? —— 基于网络、文件
4.join方法
join方法:同步阻塞,直到子进程结束就结束
把一个进程的结束事件封装成一个join方法
执行join方法的效果:就是 阻塞,直到这个子进程执行结束就结束阻塞。
# 开起一个子进程 import time from multiprocessing import Process def send_mail(): time.sleep(3) print('发送了一封邮件') if __name__ == '__main__': p = Process(target=send_mail) p.start() # 异步 非阻塞 # time.sleep(5) print('join start') p.join() # 同步 阻塞,直到p对应的进程结束以后才结束阻塞 print('5000封邮件已发送完毕') # 开起多个子进程 # 开启10个进程,给公司的5000我的发邮件,发送完邮件以后,打印一个消息“5000封邮件已发送完毕” import time import random from multiprocessing import Process def send_mail(a): time.sleep(random.random()) print('发送了一封邮件',a) if __name__ == '__main__': l = [] for i in range(10): p = Process(target=send_mail,args=(i,)) p.start() l.append(p) print(l) for p in l:p.join() # 阻塞 直到上面的十个进程都结束 print('5000封邮件已发送完毕') # 全部的子进程都结束以后要执行的代码写在这里
1.有一个参数能够把一个子进程设置为一个守护进程
import time from multiprocessing import Process def son1(a,b): while True: print('is alive') time.sleep(0.5) def son2(): for i in range(5): print('in son2') time.sleep(1) if __name__ == '__main__': p = Process(target=son1,args=(1,2)) p.daemon = True p.start() # 把p子进程设置成了一个守护进程 p2 = Process(target=son2) p2.start() time.sleep(2) # son1只执行了2S
守护进程是随着主进程的代码结束而结束的
应用场景:
全部的子进程都必须在主进程结束以前结束,由主进程来负责回收资源
p.is_alive() 判断一个子进程是否还活着
p.terminate() 强制结束一个子进程
import time from multiprocessing import Process def son1(): while True: print('is alive') time.sleep(0.5) if __name__ == '__main__': p = Process(target=son1) p.start() # 异步 非阻塞 print(p.is_alive()) time.sleep(1) p.terminate() # 异步的 非阻塞 print(p.is_alive()) # 进程还活着 由于操做系统还没来得及关闭进程 time.sleep(0.01) print(p.is_alive()) # 操做系统已经响应了咱们要关闭进程的需求,再去检测的时候,获得的结果是进程已经结束了
什么是异步非阻塞?—— terminate
2.Process类使用面向对象的方式:
def start(self) ——> None :……开起了一个子进程,调用run。
def run(self) ——> None :……
import os import time from multiprocessing import Process class MyProcecss1(Process): def __init__(self,x,y): # 传参数,不要忘记写init方法 self.x = x self.y = y super().__init__() def run(self): # 子进程中要作的事情写在run方法中 print(self.x,self.y,os.getpid()) while True: print('is alive') time.sleep(0.5) class MyProcecss2(Process): def run(self): for i in range(5): print('in son2') time.sleep(1) if __name__ == '__main__': mp = MyProcecss1(1,2) mp.daemon = True mp.start() print(mp.is_alive()) mp.terminate() mp2 = MyProcecss2() mp2.start() print('main :',os.getpid()) time.sleep(1)
Process类的总结:
1.开启进程的方式
1.面向函数:
def 函数名:要在子进程中执行的代码
p = Process(target= 函数名,args=(参数1,))
2.面向对象:
class 类名(Process):
def init(self,参数1,参数2): # 若是子进程不须要参数能够不写
self.a = 参数1
self.b = 参数2
super().__init__()
def run(self): # 要在子进程中执行的代码
p = 类名(参数1,参数2)
2.Process提供的操做进程的方法
并发 可以作的事儿,如:
[TOC]
1.若是在一个并发的场景下,涉及到某部份内容,是须要修改一些全部进程共享的数据资源,须要加锁来维护数据的安全。
2.在数据安全的基础上,才考虑效率问题
3.同步存在的意义:数据的安全性
在主进程中实例化 lock = Lock(),把这把锁传递给子进程,在子进程中,对须要加锁的代码 进行 with lock:
import time import json from multiprocessing import Process,Lock def search_ticket(user): with open('ticket_count') as f: dic = json.load(f) print('%s查询结果 : %s张余票'%(user,dic['count'])) def buy_ticket(user,lock): # with lock: # 至关于lock.acquire() + lock.release() # lock.acquire() # 给这段代码加上一把锁 time.sleep(0.02) with open('ticket_count') as f: dic = json.load(f) if dic['count'] > 0: print('%s买到票了'%(user)) dic['count'] -= 1 else: print('%s没买到票' % (user)) time.sleep(0.02) with open('ticket_count','w') as f: json.dump(dic,f) # lock.release() # 给这段代码解锁 def task(user, lock): search_ticket(user) with lock: buy_ticket(user, lock) if __name__ == '__main__': lock = Lock() for i in range(10): p = Process(target=task,args=('user%s'%i,lock)) p.start()
在进程中须要加锁的场景:
加锁以后可以保证数据的安全性 可是也下降了程序的执行效率
进程之间的数据是隔离的
from multiprocessing import Process n = 100 def func(): global n n -= 1 if __name__ == '__main__': p_l = [] for i in range(10): p = Process(target=func) p.start() p_l.append(p) for p in p_l:p.join() print(n) # 主进程数据中的变量的值 不会随着 子进程数据中变量值的修改而改变
进程之间的通讯 - IPC(inter process communication)
Queue 队列
Queue 天生就是数据安全的
Queue 基于文件家族的socket执行的,基于 pickle操做的,基于 lock实现的。
from multiprocessing import Queue,Process # 先进先出 def func(exp,q): ret = eval(exp) q.put({ret,2,3}) q.put(ret*2) q.put(ret*4) if __name__ == '__main__': q = Queue() Process(target=func,args=('1+2+3',q)).start() print(q.get()) print(q.get()) print(q.get())
Pipe 管道
pipe 管道(不安全的) —— 基于文件家族的socket 、pickle
from multiprocessing import Pipe pip = Pipe() pip.send() pip.recv()
队列 = 管道 + 锁
import queue from multiprocessing import Queue q = Queue(5) q.put(1) q.put(2) q.put(3) q.put(4) q.put(5) # 当队列为满的时候再向队列中放数据 队列会阻塞 print('5555555') try: q.put_nowait(6) # 当队列为满的时候再向队列中放数据,会报错而且会丢失数据(不安全,不建议使用) except queue.Full: pass print('6666666') print(q.get()) print(q.get()) print(q.get()) # 在队列为空的时候会发生阻塞 print(q.get()) # 在队列为空的时候会发生阻塞 print(q.get()) # 在队列为空的时候会发生阻塞 try: print(q.get_nowait()) # 在队列为空的时候 直接报错 except queue.Empty:pass
IPC机制:
第三方IPC的优势:
队列:进程之间数据安全
管道:进程之间数据不安全
解耦 :修改 复用 可读性
程序的解耦:把写在一块儿的大的功能分开成多个小的功能处理(如:登录 注册)
生产者消费者模型:
什么是生产者消费者模型?
把一个产生数据而且处理数据的过程解耦,让生产数据的过程和处理数据的过程达到一个工做效率上的平衡。
中间的容器,在多进程中咱们使用队列或者可被join的队列,作到控制数据的量:
好比说:一个爬虫或者一个web程序的server端
爬虫:
请求网页的平均时间是0.3s
处理网页代码的时候是0.003s
100倍,每启动100个线程生产数据
启动一个线程来完成处理数据
web程序的server端:
每秒钟有6w条请求
一个服务每s中只能处理2000条
先写一个web程序,只负责一件事情,就是接收请求,而后把请求放到队列中
再写不少个server端,从队列中获取请求,而后处理,而后返回结果
import time import random from multiprocessing import Process,Queue def producer(q,name,food): for i in range(10): time.sleep(random.random()) fd = '%s%s'%(food,i) q.put(fd) print('%s生产了一个%s'%(name,food)) def consumer(q,name): while True: food = q.get() if not food:break time.sleep(random.randint(1,3)) print('%s吃了%s'%(name,food)) def cp(c_count,p_count): q = Queue(10) for i in range(c_count): Process(target=consumer, args=(q, 'alex')).start() p_l = [] for i in range(p_count): p1 = Process(target=producer, args=(q, 'wusir', '烧饼')) p1.start() p_l.append(p1) for p in p_l:p.join() for i in range(c_count): q.put(None) if __name__ == '__main__': cp(2,3)
# 用生产者消费者模式实现爬虫的例子 from multiprocessing import Process,Queue import requests import re import json def producer(q,url): response = requests.get(url) q.put(response.text) def consumer(q): while True: s = q.get() if not s:break com = re.compile( '<div class="item">.*?<div class="pic">.*?<em .*?>(?P<id>\d+).*?<span class="title">(?P<title>.*?)</span>' '.*?<span class="rating_num" .*?>(?P<rating_num>.*?)</span>.*?<span>(?P<comment_num>.*?)评价</span>', re.S) ret = com.finditer(s) for i in ret: print({ "id": i.group("id"), "title": i.group("title"), "rating_num": i.group("rating_num"), "comment_num": i.group("comment_num")} ) if __name__ == '__main__': count = 0 q = Queue(3) p_l = [] for i in range(10): url = 'https://movie.douban.com/top250?start=%s&filter='%count count+=25 p = Process(target=producer,args=(q,url,)).start() p_l.append(p) p = Process(target=consumer, args=(q,)).start() for p in p_l:p.join() q.put(None)
JoinableQueue 与 Queue使用成本同样,但JoinableQueue 更严紧一些
import time import random from multiprocessing import JoinableQueue,Process def producer(q,name,food): for i in range(10): time.sleep(random.random()) fd = '%s%s'%(food,i) q.put(fd) print('%s生产了一个%s'%(name,food)) q.join() def consumer(q,name): while True: food = q.get() time.sleep(random.random()) print('%s吃了%s'%(name,food)) q.task_done() if __name__ == '__main__': jq = JoinableQueue() p =Process(target=producer,args=(jq,'wusir','烧饼')) p.start() c = Process(target=consumer,args=(jq,'alex')) c.daemon = True c.start() p.join()
multiprocessing中有一个manager类,封装了全部和进程相关的(包括:数据共享、数据传递)、与共享相关的数据类型,可是对于 字典 、列表这一类的数据操做的时候会产生数据不安全,须要加锁解决问题,而且须要尽可能少的使用这种方式。
```python
from multiprocessing import Manager,Process,Lock
def func(dic,lock):
with lock:
dic['count'] -= 1
if name == 'main': # m = Manager() with Manager() as m: l = Lock() dic = m.dict({'count':100}) p_l = [] for i in range(100): p = Process(target=func,args=(dic,l)) p.start() p_l.append(p) for p in p_l:p.join() print(dic) ``