一:多进程方法汇总安全
# 多进程代码 # from multiprocessing import Process # 方法 # 进程对象.start() 开启一个子进程 # 进程对象.join() 感知一个子进程的结束 # 进程对象.terminate() 结束一个子进程 # 进程对象.is_alive() 查看某个子进程是否还在运行 # 属性 # 进程对象.name 进程名 # 进程对象.pid 进程号 # 进程对象.daemon 值为True的时候,表示新的子进程是一个守护进程 # 守护进程 随着主进程代码的执行结束而结束 # 必定在start以前设置 # from multiprocessing import Lock # l = Lock() # l.acquire() # 拿钥匙 # 会形成数据不安全的操做 # l.release() # 还钥匙
二:多进程补充知识:服务器
from multiprocessing import Process def func(): num = input('>>>') print(num) if __name__ == '__main__': Process(target=func).start() # 多进程中server端子进程不能直接input。不然报错。
三:dom
信号量 —— multiprocess.Semaphore
互斥锁同时只容许一个线程更改数据,而信号量Semaphore是同时容许必定数量的线程更改数据 。 假设商场里有4个迷你唱吧,因此同时能够进去4我的,若是来了第五我的就要在外面等待, 等到有人出来才能再进去玩。 实现: 信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时, acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。 信号量同步机制适用于访问像服务器这样的有限资源。 信号量与进程池的概念很像,可是要区分开,信号量涉及到加锁的概念
理解:就好像四我的一块儿去ktv,屋里只能有四人,必须有人出去他才能出去。用代码怎么实现呢?函数
import time import random from multiprocessing import Process from multiprocessing import Semaphore
也能够简写,如import tinme,random这样 下面也能够那样简写。ui
def ktv(i,sem): sem.acquire() #获取钥匙 print('%s走进ktv'%i) time.sleep(random.randint(1,5)) print('%s走出ktv'%i) sem.release() #还钥匙 if __name__ == '__main__' : sem = Semaphore(4) #总共钥匙总数为4 for i in range(20): p = Process(target=ktv,args=(i,sem)) p.start()
解读:总共 sem = Semaphore(4) 表示总共钥匙总数为4,而后把他进程实例化的参数,传到上面,上面才能用sem.acquire()拿钥匙这个和 sem.release() 表示还钥匙。spa
注意:sem = Semaphore() 什么都不写,默认一把钥匙。也就是一我的走进,一个出去才能下一我的,至关于买火车票上锁了。线程
2 什么是事件?为何要引入事件。server
当咱们须要经过一个信号来控制多个进程阻塞或者执行就引入事件。对象
代码表示:from multiprocessing import Event队列
怎样建立事件?
e=Event() 当一个事件建立时,默认是阻塞状态。
怎么查看是不是阻塞状态呢?
print(e.is_set()) 查看进程的状态,默认是阻塞状态。
怎么改变他状态呢?
e.set() 将事件改变为非阻塞,也就是True
e.clear() 将事件改变为阻塞,也就是False
e.wait()又是怎么用的呢?
与e.is_set()连用,当判断是阻塞状态时,e.wait()起到阻塞做用,后面的代码不执行。
也便是这样:
# set 和 clear # 分别用来修改一个事件的状态 True或者False # is_set 用来查看一个事件的状态 # wait 是依据事件的状态来决定本身是否在wait处阻塞 # False阻塞 True不阻塞
例题:红绿灯
import time import random from multiprocessing import Event,Process def cars(e,i): if not e.is_set(): print('car%i在等待'%i) e.wait() # 阻塞 直到获得一个 事件状态变成 True 的信号 print('\033[0;32;40mcar%i经过\033[0m' % i) def light(e): while True: if e.is_set(): e.clear() print('\033[31m红灯亮了\033[0m') else: e.set() print('\033[32m绿灯亮了\033[0m') time.sleep(2) if __name__ == '__main__': e = Event() traffic = Process(target=light,args=(e,)) traffic.start() for i in range(20): car = Process(target=cars, args=(e,i)) car.start() time.sleep(random.random())
解读:
首先理解,灯和车是同时执行的,两个进程相互不影响。
而后if e.is_set():
e.clear()
不执行红灯,而是执行绿灯,下一行也是这样。
打印结果:
绿灯亮了
car0经过
car1经过
car2经过
car3经过
红灯亮了
car4在等待
car5在等待
car6在等待
car7在等待
car8在等待
绿灯亮了
car4经过
car7经过
car8经过
car5经过
car6经过
car9经过
car10经过
car11经过
红灯亮了
car12在等待
car13在等待
绿灯亮了
car13经过
car12经过
car14经过
car15经过
car16经过
还在循环。
3 什么是队列?(先进先出)
实现多进程之间的通讯,也叫 IPC(Inter-Process Communication) 。
建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递 。
Queue([maxsize]) 建立共享的进程队列。 参数 :maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。 底层队列使用管道和锁定实现。
方法介绍:
Queue([maxsize]) 建立共享的进程队列。maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。底层队列使用管道和锁定实现。 另外,还须要运行支持线程以便队列中的数据传输到底层管道中。 Queue的实例q具备如下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一个项目。若是q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 若是设置为False,将引起Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。 若是在制定的时间间隔内没有项目变为可用,将引起Queue.Empty异常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 将item放入队列。若是队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。若是设置为False, 将引起Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。 超时后将引起Queue.Full异常。 q.qsize() 返回队列中目前项目的正确数量。此函数的结果并不可靠,由于在返回结果和在稍后程序中使用结果之间, 队列中可能添加或删除了项目。在某些系统上,此方法可能引起NotImplementedError异常。
例题2:
from multiprocessing import Queue,Process def produce(q): q.put('hello') 把hello放入队列 def consume(q): print(q.get()) 返回队列中的项 也就是得到hello if __name__ == '__main__': q = Queue() p = Process(target=produce,args=(q,)) p.start() c = Process(target=consume, args=(q,)) c.start()
打印结果:hello
4 生产者模型:使用queue模块。
# 队列 # 生产者消费者模型 # 生产者 进程 # 消费者 进程 import time import random from multiprocessing import Process,Queue def consumer(q,name): while True: food = q.get() if food is None: print('%s获取到了一个空'%name) break print('\033[31m%s消费了%s\033[0m' % (name,food)) time.sleep(random.randint(1,3)) def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) f = '%s生产了%s%s'%(name,food,i) print(f) q.put(f) if __name__ == '__main__': q = Queue(20) p1 = Process(target=producer,args=('Egon','包子',q)) p2 = Process(target=producer, args=('wusir','泔水', q)) c1 = Process(target=consumer, args=(q,'alex')) c2 = Process(target=consumer, args=(q,'jinboss')) p1.start() p2.start() c1.start() c2.start() p1.join() p2.join() q.put(None) q.put(None)
升级版本:模块JoinableQueue
import time import random from multiprocessing import Process,JoinableQueue 用的这个 def consumer(q,name): while True: food = q.get() print('\033[31m%s消费了%s\033[0m' % (name,food)) time.sleep(random.randint(1,3)) q.task_done() # count - 1 def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) f = '%s生产了%s%s'%(name,food,i) print(f) q.put(f) q.join() # 阻塞 直到一个队列中的全部数据 所有被处理完毕 if __name__ == '__main__': q = JoinableQueue(20) p1 = Process(target=producer,args=('Egon','包子',q)) p2 = Process(target=producer, args=('wusir','泔水', q)) c1 = Process(target=consumer, args=(q,'alex')) c2 = Process(target=consumer, args=(q,'jinboss')) p1.start() p2.start() c1.daemon = True # 设置为守护进程 主进程中的代码执行完毕以后,子进程自动结束 c2.daemon = True c1.start() c2.start() p1.join() p2.join() # 感知一个进程的结束 # 在消费者这一端: # 每次获取一个数据 # 处理一个数据 # 发送一个记号 : 标志一个数据被处理成功 # 在生产者这一端: # 每一次生产一个数据, # 且每一次生产的数据都放在队列中 # 在队列中刻上一个记号 # 当生产者所有生产完毕以后, # join信号 : 已经中止生产数据了 # 且要等待以前被刻上的记号都被消费完 # 当数据都被处理完时,join阻塞结束 # consumer 中把全部的任务消耗完 # producer 端 的 join感知到,中止阻塞 # 全部的producer进程结束 # 主进程中的p.join结束 # 主进程中代码结束 # 守护进程(消费者的进程)结束