from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() 上锁 print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) lock.release() 解锁 if __name__ == '__main__': lock=Lock() 造锁(实例化一个锁) for i in range(3): p=Process(target=work,args=(lock,)) p.start()
三台电脑同一时刻共同调用打印机完成打印任务,即三个进程同一时刻共抢同一个资源(输出平台)json
多个进程共抢一个资源,应结果第一位,效率第二位,因此应该牺牲效率,保求结果(串行)安全
以上的代码虽然完成串行结果,但没有实现公平,执行的顺序都是人为写好的,应该作到公平的抢占资源,谁先抢到就执行谁网络
from multiprocessing import Process from multiprocessing import Lock import time import random def task1(lock): print('task1') # 验证cpu遇到io切换了 lock.acquire() print('task1: 开始打印') time.sleep(random.randint(1, 3)) print('task1: 打印完成') lock.release() def task2(lock): print('task2') # 验证cpu遇到io切换了 lock.acquire() print('task2: 开始打印') time.sleep(random.randint(1, 3)) print('task2: 打印完成') lock.release() def task3(lock): print('task3') # 验证cpu遇到io切换了 lock.acquire() print('task3: 开始打印') time.sleep(random.randint(1, 3)) print('task3: 打印完成') lock.release() if __name__ == '__main__': lock = Lock() p1 = Process(target=task1, args=(lock,)) p2 = Process(target=task2, args=(lock,)) p3 = Process(target=task3, args=(lock,)) p1.start() p2.start() p3.start()
上锁时必定要给进程上同一把锁,并且上锁一次,就必定要解锁一次并发
都是完成了进程之间的串行dom
join是经过人为控制使进程串行ide
互斥锁是随机抢占资源,保证了公平性ui
业务需求分析: 买票以前先要查票,必须经历的流程: 你在查票的同时,100我的也在查本此列票. 买票时,你要先从服务端获取到票数,票数>0 ,买票,而后服务端票数减一. 中间确定有网络延迟. from multiprocessing import Process from multiprocessing import Lock import time import json import os import random 多进程原则上是不能互相通讯的,它们在内存级别数据隔离的.不表明磁盘上数据隔离. 它们能够共同操做一个文件. def search(): time.sleep(random.random()) with open('db.json',encoding='utf-8') as f1: dic = json.load(f1) print(f'剩余票数{dic["count"]}') def get(): with open('db.json',encoding='utf-8') as f1: dic = json.load(f1) time.sleep(random.randint(1,3)) if dic['count'] > 0: dic['count'] -= 1 with open('db.json', encoding='utf-8', mode='w') as f1: json.dump(dic,f1) print(f'{os.getpid()}用户购买成功') else: print('没票了.....') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(5): p = Process(target=task,args=(lock,)) p.start()
进程之间的通讯最好的方式是基于队列spa
from multiprocessing import Process from multiprocessing import Queue import time import os import random # 多进程原则上是不能互相通讯的,它们在内存级别数据隔离的.不表明磁盘上数据隔离. # 它们能够共同操做一个文件. def search(ticket): time.sleep(random.random()) print(f'剩余票数{ticket}') def _get(q): dic = q.get() time.sleep(random.randint(1,3)) if dic['count'] > 0: dic['count'] -= 1 print(f'{os.getpid()}用户购买成功') else: print('没票了.....') q.put(dic) def task(ticket,q): search(ticket) _get(q) if __name__ == '__main__': q = Queue(1) q.put({'count': 1}) ticket = 1 for i in range(5): p = Process(target=task,args=(ticket,q)) p.start()
队列是存在与内存中的一个容器,最大的特色:先进先出(FIFO)3d
利用队列进行进程之间的通讯,简单,方便,不用本身手动加锁,队列自带优质阻塞,可持续化取数据code
from multiprocessing import Queue q = Queue(3) # 能够设置元素个数 def func(): print('in func') q.put('alex') q.put({'count': 1}) q.put(func) q.put(666) # 当队列数据已经达到上限,在插入数据的时候,程序就会夯住. 当你将数据所有取完继续再取值时,程序也会夯住 put中有block参数 默认为True 当你插入的数据超过最大限度,默认阻塞.改为False 数据超过最大限度,不阻塞了直接报错. put中有timeout参数 延时报错,好比q.put(3,timeout = 3) 超过三秒再put不进,程序就会报错 get也有这两个参数
# 小米:抢手环4.预期发售10个. # 有100我的去抢. import os from multiprocessing import Queue from multiprocessing import Process def task(q): try: q.put(f'{os.getpid()}',block=False) except Exception: return if __name__ == '__main__': q = Queue(10) for i in range(100): p = Process(target=task,args=(q,)) p.start() for i in range(1,11): print(f'排名第{i}的用户: {q.get()}',)
栈与队列性质相似,特色与队列相反:先进后出(FILO)
import queue q = queue.LifoQueue() q.put(1) q.put(3) q.put('barry') print(q.get()) print(q.get()) print(q.get()) print(q.get())
须要经过元组的形式放入,(int,数据) int 表明优先级,数字越低,优先级越高
import queue q = queue.PriorityQueue(3) q.put((10, '垃圾消息')) q.put((-9, '紧急消息')) q.put((3, '通常消息')) print(q.get()) print(q.get()) print(q.get())
生产者:生产数据进程
消费者:对生产者生产出来的数据作进一步处理进程
from multiprocessing import Process from multiprocessing import Queue import time import random def producer(name,q): for i in range(1,6): time.sleep(random.randint(1,3)) res = f'{i}号包子' q.put(res) print(f'\033[0;32m 生产者{name}: 生产了{res}\033[0m') def consumer(name,q): while 1: try: time.sleep(random.randint(1,3)) ret = q.get(timeout=5) print(f'消费者{name}: 吃了{ret}') except Exception: return if __name__ == '__main__': q = Queue() p1 = Process(target=producer, args=('alex',q)) p2 = Process(target=consumer, args=('barry',q)) p1.start() p2.start()
模型中有三个主体:生产者,消费者,容器队列
合理的调控多个进程去生成数据以及提取数据,中间有个必不可少的环节就是容器队列
若是没有容器,生产者与消费者造成强耦合性,不合理,因此要有一个缓冲区(容器),平衡了生产力与消费力
基于文件 + 锁 效率低,麻烦
基于队列 推荐
基于管道 管道本身加锁,底层可能会出现数据丢失损坏
多个进程抢占一个资源
串行,有序以及数据安全
多个进程实现并发效果
生产者消费者模型