一、系统进行资源分配和调度的基本单位,一个具备必定独立功能的程序关于某个数据集合的一次运行活动;python
二、它是一个动态的概念,一个活动的实体;算法
an instance of a computer program that is being executed
即正在运行的程序的实例化对象。注:其概念的关键点在于安全
1)、进程是一个实体(动态的),具备本身独立的地址空间,包括: 文本区域(text region):存储处理器执行的代码; 数据区域(data region):存储变量与进程执行期间使用的动态分配的内存; 堆栈(stack region):存储的是程序执行过程当中调用的指令与本地变量; 注:正是因为每一个进程是一个独立的实体,其中以上所述的三个区域,即每一个进程的数据区域以及堆栈是独立的,相互隔离的,因此在多进程中能够保证数据的安全性
2)、编写完的代码,没有运行时,称为程序, 正在运行的代码,称为进程 程序是死的(静态的),进程是活的(动态的)
三、进程的三大状态bash
如图所示
服务器
导入multiprocessing模块中的Process类
以供后续建立类的时候直接调用并发
p = Process(target = func, name = process01, args=(5,))
实例化进程对象app
- target = func 表示调用对象,即子进程要执行的任务 func
- args 表示任务 func 的位置参数元组,args=(5, )
- name = process01 为子进程的名称
- p.start( ): 启动进程,并调用该子进程中的p.run( )
- p.run( ): 进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要写入该方法
- p.terminate( ): 强制终止进程p,不会进行任何清理操做
- p.is_alive( ): 若是p仍然运行,返回True。用来判断进程是否还在运行
- p.join([timeout]): 主进程等待子进程p终止,timeout是可选的等待时间
# 主进程速度快于子进程,join方法可使得子进程执行结束后,再继续执行主进程中的代码,能够用来同步代码的一致性 import multiprocessing def func(): print("发送第一份邮件") if __name__ == "__main__": p = multiprocessing.Process(target=func) p.start() p.join() print("发送第二份邮件") # 发送第一份邮件 # 发送第二份邮件
# 多个子进程配合 join 方法实现异步并发 import multiprocessing def func(index): print(f"发送第{index}封邮件") if __name__ == "__main__": process_list = [] for i in range(10): p = multiprocessing.Process(target=func, args=(i, )) p.start() process_list.append(p) # p.join() 程序会变成同步阻塞 for i in process_list: i.join() # 异步并发 print("主进程发最后一封邮件!")
- name: 当前进程实例别名, 默认为Process-N, N为从1开始递增的整
数- pid: 当前进程实例的ID值
# 建立进程的方法一: # 利用multiprocessing模块提供一个Process类来建立一个进程对象 from multiprocessing import Process import time def func(n): while n > 0: print(n) time.sleep(3) n -= 1 if __name__ == "__main__": p = Process(target = func, args=(5,)) p.start() p.join()
# 建立进程的方法二: # 建立新的进程能够自定义一个类去继承Process类,每次实例化这个类的时候,就等同于实例化一个进程对象 import multiprocessing import time class ClockProcess(multiprocessing.Process): def run(self): n = 5 while n > 0: print(n) time.sleep(3) n -= 1 if __name__ == "__main__": p = ClockProcess() p.start() p.join()
- 守护 主进程 时,若是主进程执行结束了,意味着守护进程的寿命马上终止.马上杀死
- 语法:
- 进程对象.daemon = True 设置当前进程为守护进程
- 必须写在start( )调用进程以前进行设置
- 默认状况下,主进程会等待全部子进程执行完毕以后,关闭程序,释放资源。若不等待,子进程并不方便管理,容易形成僵尸进程,在后台不停的占用系统的资源(cpu和内存),不清楚进程的来源。
- 守护主进程即在主进程代码执行结束以后,无需等待子进程执行,当即杀死程序
import multiprocessing def func(): print("start 当前子进程") print("end 当前子进程") if __name__ == "__main__": p = multiprocessing.Process(target=func) p.daemon = True p.start() print("主进程执行结束 ... ") # 主进程执行结束 ...
多个子进程下,未守护主进程,主进程仍会等待子进程执行结束dom
- 守护进程的实际用途:监控报活
import time # 监控报活 def alive(): while True: print("给监控服务器发消息, 当前5号服务器功能正常 i am ok ~") time.sleep(1) # 当前服务器正常完成的功能 def func(): time.sleep(5) print("当前5号服务器功能,统计财务报表~") if __name__ == "__main__": p1 = Process(target=func) p2 = Process(target=alive) # 守护p2进程 p2.daemon = True p1.start() p2.start() # 等待p1子进程执行结束以后,下面的主程序的代码才会放行; p1.join() # 未守护主进程,主进程会默认等待 print("当前服务器状态:统计财务报表功能异常.....") # 给监控服务器发消息, 当前5号服务器功能正常 i am ok ~ # 给监控服务器发消息, 当前5号服务器功能正常 i am ok ~ # 给监控服务器发消息, 当前5号服务器功能正常 i am ok ~ # 给监控服务器发消息, 当前5号服务器功能正常 i am ok ~ # 给监控服务器发消息, 当前5号服务器功能正常 i am ok ~ # 当前5号服务器功能,统计财务报表~ # 给监控服务器发消息, 当前5号服务器功能正常 i am ok ~ # 当前服务器状态:统计财务报表功能异常.....
# 手动建立 from multiprocessing import Process num = 1 def run1(): global num num += 5 print("子进程1运行中,num = %d" % (num)) def run2(): global num num += 10 print("子进程2运行中,num = %d" % (num)) if __name__ == "__main__": print("父进程启动") p1 = Process(target=run1) p2 = Process(target=run2) print("子进程将要执行") p1.start() p2.start() p1.join() p2.join() print("子进程结束")
# 借助旧版进程池建立多进程 from multiprocessing import Pool import random import time def work(num): print(random.random() * num) time.sleep(3) if __name__ == "__main__": # 实例化进程池对象,设置同一时间内最多能够执行的进程数为3个 # 题中的10个任务都由进程池中的这三个进程轮询执行,不会建立额外 的进程数 # 若不指定则同一时间内能够执行的进程个数默认为cpu逻辑核心数 p = Pool(3) for i in range(10): # apply_async 选择要调用的任务,每次循环出来的任务会用闲下来的子进程去执行 # 使⽤⾮阻塞⽅式调⽤func(并⾏执⾏,阻塞⽅式必须为等待上⼀个进程退出后才能执⾏下⼀个进程), args为传递给func的参数列表,kwargs为传递给func的关键字参数列表; p.apply_async(work, (i,)) # 进程池关闭以后不会再接受新的请求 p.close() # 等待进程池中的全部子进程都结束 p.join() # 多进程中,主进程通常用来等待子进程执行完毕,真正的任务都由子进程中执行
# 借助新版进程池建立多进程 from concurrent.futures import ProcessPoolExecutor import os import time def func(i): print("任务执行中... start", os.getpid()) time.sleep(10) print("任务结束... end", i) return i # ProcessPoolExecutor 进程池基本使用 """ 默认若是一个进程短期内能够完成更多的任务,就不会建立额外的新的进程,以节省资源 """ if __name__ == "__main__": lst = [] print(os.cpu_count()) # cpu逻辑核心数 # 建立进程池对象 """进程池中默认最多建立cpu这么多个进程,全部任务全由这几个进程完成,不会额外建立进程""" p = ProcessPoolExecutor() # 异步提交任务 for i in range(10): res = p.submit(func, i) lst.append(res) # 获取当前进程池返回值 for i in lst: print(i.result()) # 等待全部子进程执行结束 p.shutdown() # join print("主程序执行结束....")
进程间数据不共享,他们之间进行数据传递即为通讯异步
from multiprocessing import Queue
async
借助进程队列
Queue
完成进程间的通讯
- 消息队列遵循 先进先出 的原则
- 初始化Queue()对象时(q=Queue()),若括号中没有指定最⼤可接收
的消息数量, 或数量为负值, 那么就表明可接受的消息数量没有上限
q = Queue()
q.put(item, [block[, timeout]])
:将item消息写⼊队列- block 默认值为True
- 若是block 使⽤默认值,且没有设置timeout(单位秒)时,若消息列队已经没有空间可写⼊,此时程序将被阻塞(停在写⼊状态) ,直到从消息列队腾出空间为⽌,若是设置了True和timeout,则会等待timeout秒,若还没空间,则抛 出"q.Full"的异常信息
- 若是block值为False, 消息列队若是出现没有空间可写⼊的状况, 则会⽴刻抛出"q.Full"满了异常
q.put_nowait(item)
: 至关q.put(item, False)
;
q.get([block[, timeout]])
:获取队列中的⼀条消息, 而后将其从列队中移除block默认值为True
若是block使⽤默认值,且没有设置timeout(单位秒),消息列队若是为空, 此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为⽌,
若是设置了timeout, 则会等待timeout秒, 若还没读取到任何消息, 则抛
出"q.Empty"异常若是block值为False,消息列队若是为空,则会⽴刻抛出“q.Empty”空的异常
q.get_nowait()
:至关q.get(False)
- q = Queue()
- q.qsize(): 返回当前队列包含的消息数量
- q.empty(): 若是队列为空, 返回True, 反之False
- q.full(): 若是队列满了, 返回True,反之False
from multiprocessing import Queue, Process import time def write(q): for value in ["a", "b", "c"]: print("开始写入:", value) q.put(value) time.sleep(2) def read(q): while True: if not q.empty(): print("读取到的是", q.get()) time.sleep(2) else: break if __name__ == "__main__": q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pw.join() #等待接收完毕 pr.start() pr.join() print("接受完毕!")
# 三个进程间通讯 from multiprocessing import Process from multiprocessing import Queue def func1(q1): q1.put("你好!") print(f"子进程p1往队列q1中放入的数据为:你好!") def func2(q1, q2): msg = q1.get() print(f"子进程p2从队列q1中取出的数据为:{msg}") q2.put(msg) print(f"子进程p2往队列q2中放入的数据为:{msg}") def func3(q2): msg = q2.get() print(f"子进程p3从队列q2中取出的数据为:{msg}") if __name__ == "__main__": q1 = Queue() q2 = Queue() p1 = Process(target=func1, args=(q1,)) p2 = Process(target=func2, args=(q1, q2)) p3 = Process(target=func3, args=(q2,)) p1.start() p2.start() p3.start()
# put 存储 # get 获取 # task_done 队列计数减1 # join 阻塞 # task_done 配合 join 一块儿使用 # [1,2,3,4,5] # 队列计数5 # put 一次 每存放一个值,队列计数器加1 # get 一次 经过task_done让队列计数器减1 # join 函数,会根据队列中的计数器来断定是阻塞仍是放行 # 若是计数器变量是0,意味着放行,其余状况阻塞; from multiprocessing import Process,JoinableQueue jq = JoinableQueue() # put 会让队列计数器加1 jq.put("a") print(jq.get()) # 经过task_done,让队列计数器减1 jq.task_done() # 只有队列计数器是0的时,才会放行 jq.join() # 队列.join print("finish")
# 消费者模型 def consumer(q, name): while True: food = q.get() if food is None: break time.sleep(random.uniform(0.1, 1)) print("%s 吃了一个%s" % (name, food)) # 生产者模型 def producer(q, name, food): for i in range(5): time.sleep(random.uniform(0.1, 1)) print("%s 生产了 %s%s" % (name, food, i)) q.put(food + str(i)) if __name__ == "__main__": q = Queue() # 消费者1 p1 = Process(target=consumer, args=(q, "张三")) p1.start() # 消费者2 a2 = Process(target=consumer, args=(q, "李四")) a2.start() # 生产者1 p2 = Process(target=producer, args=(q, "王五", "黄金")) p2.start() # 生产者2 b2 = Process(target=producer, args=(q, "小明", "钻石")) b2.start() # 在生产完全部的数据以后,在队列的末尾塞入一个None p2.join() b2.join() # 消费者模型若是获取的是None,表明中止消费 q.put(None) q.put(None)
from multiprocessing import Process,JoinableQueue # 消费者模型 def consumer(q, name): while True: food = q.get() time.sleep(random.uniform(0.1, 1)) print("%s 吃了一个%s" % (name, food)) q.task_done() # 生产者模型 def producer(q, name, food): for i in range(5): time.sleep(random.uniform(0.1, 1)) print("%s 生产了 %s%s" % (name, food, i)) q.put(food + str(i)) if __name__ == "__main__": q = JoinableQueue() # 消费者1 p1 = Process(target=consumer, args=(q, "张三")) p1.daemon = True p1.start() # 生产者1 p2 = Process(target=producer, args=(q, "李四", "黄金")) p2.start() # 把生产者全部的数据都装载到队列中 p2.join() # 当队列计数器减到0的时候,会马上放行 # 必须等待消费者模型中全部的数据都task_done以后,变成0了就表明消费结束. q.join() print("程序结束....")
from multiprocessing import Manager, Pool import time def write(q): for i in "welcome": print("开始写入", i) q.put(i) def read(q): time.sleep(2) for i in range(q.qsize()): # q.qsize()获取到当前队列的消息数量! print("获得消息", q.get()) if __name__ == "__main__": print("主进程启动!") q = Manager().Queue() po = Pool() po.apply_async(write, (q,)) po.apply_async(read, (q,)) po.close() po.join()