信号量,事件,队列

一:多进程方法汇总安全

# 多进程代码
# from multiprocessing import Process
# 方法
    # 进程对象.start()     开启一个子进程
    # 进程对象.join()      感知一个子进程的结束
    # 进程对象.terminate() 结束一个子进程
    # 进程对象.is_alive()  查看某个子进程是否还在运行
# 属性
    # 进程对象.name        进程名
    # 进程对象.pid         进程号
    # 进程对象.daemon      值为True的时候,表示新的子进程是一个守护进程
            # 守护进程 随着主进程代码的执行结束而结束
            # 必定在start以前设置


# from multiprocessing import Lock
# l = Lock()
# l.acquire()   # 拿钥匙
# 会形成数据不安全的操做
# l.release()   # 还钥匙

二:多进程补充知识:服务器

from multiprocessing import Process
def func():
    num = input('>>>')
    print(num)

if __name__ == '__main__':
    Process(target=func).start()

    # 多进程中server端子进程不能直接input。不然报错。

 

三:dom

信号量 —— multiprocess.Semaphore
互斥锁同时只容许一个线程更改数据,而信号量Semaphore是同时容许必定数量的线程更改数据 。
假设商场里有4个迷你唱吧,因此同时能够进去4我的,若是来了第五我的就要在外面等待,
等到有人出来才能再进去玩。
实现:
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,
acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。
信号量同步机制适用于访问像服务器这样的有限资源。
信号量与进程池的概念很像,可是要区分开,信号量涉及到加锁的概念

理解:就好像四我的一块儿去ktv,屋里只能有四人,必须有人出去他才能出去。用代码怎么实现呢?函数

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore

也能够简写,如import tinme,random这样  下面也能够那样简写。ui

def ktv(i,sem):
    sem.acquire()    #获取钥匙
    print('%s走进ktv'%i)
    time.sleep(random.randint(1,5))
    print('%s走出ktv'%i)
    sem.release()   #还钥匙


if __name__ == '__main__' :
    sem = Semaphore(4)  #总共钥匙总数为4
    for i in range(20):
        p = Process(target=ktv,args=(i,sem))
        p.start()

解读:总共 sem = Semaphore(4) 表示总共钥匙总数为4,而后把他进程实例化的参数,传到上面,上面才能用sem.acquire()拿钥匙这个和 sem.release() 表示还钥匙。spa

注意:sem = Semaphore() 什么都不写,默认一把钥匙。也就是一我的走进,一个出去才能下一我的,至关于买火车票上锁了。线程

 

2 什么是事件?为何要引入事件。server

当咱们须要经过一个信号来控制多个进程阻塞或者执行就引入事件。对象

代码表示:from multiprocessing import Event队列

怎样建立事件?

e=Event()   当一个事件建立时,默认是阻塞状态。

怎么查看是不是阻塞状态呢?

print(e.is_set())  查看进程的状态,默认是阻塞状态。

 

怎么改变他状态呢?

e.set()  将事件改变为非阻塞,也就是True

e.clear() 将事件改变为阻塞,也就是False

 

e.wait()又是怎么用的呢?

与e.is_set()连用,当判断是阻塞状态时,e.wait()起到阻塞做用,后面的代码不执行。

也便是这样:

# set 和 clear
    #  分别用来修改一个事件的状态 True或者False
# is_set 用来查看一个事件的状态
# wait 是依据事件的状态来决定本身是否在wait处阻塞
    #  False阻塞 True不阻塞

例题:红绿灯

import time
import random
from multiprocessing import Event,Process
def cars(e,i):
    if not e.is_set():
        print('car%i在等待'%i)
        e.wait()    # 阻塞 直到获得一个 事件状态变成 True 的信号
    print('\033[0;32;40mcar%i经过\033[0m' % i)

def light(e):
    while True:
        if e.is_set():
            e.clear()
            print('\033[31m红灯亮了\033[0m')
        else:
            e.set()
            print('\033[32m绿灯亮了\033[0m')
        time.sleep(2)

if __name__ == '__main__':
    e = Event()
    traffic = Process(target=light,args=(e,))
    traffic.start()
    for i in range(20):
        car = Process(target=cars, args=(e,i))
        car.start()
        time.sleep(random.random())

解读:

首先理解,灯和车是同时执行的,两个进程相互不影响。

而后if e.is_set():

         e.clear()

      不执行红灯,而是执行绿灯,下一行也是这样。

打印结果:

绿灯亮了
car0经过
car1经过
car2经过
car3经过
红灯亮了
car4在等待
car5在等待
car6在等待
car7在等待
car8在等待
绿灯亮了
car4经过
car7经过
car8经过
car5经过
car6经过
car9经过
car10经过
car11经过
红灯亮了
car12在等待
car13在等待
绿灯亮了
car13经过
car12经过
car14经过
car15经过
car16经过

还在循环。

 

3 什么是队列?(先进先出)

实现多进程之间的通讯,也叫 IPC(Inter-Process Communication) 。

建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递 。

Queue([maxsize]) 
建立共享的进程队列。
参数 :maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。
底层队列使用管道和锁定实现。

方法介绍:

Queue([maxsize]) 
建立共享的进程队列。maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。底层队列使用管道和锁定实现。
另外,还须要运行支持线程以便队列中的数据传输到底层管道中。 
Queue的实例q具备如下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一个项目。若是q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 
若是设置为False,将引起Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。
若是在制定的时间间隔内没有项目变为可用,将引起Queue.Empty异常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
将item放入队列。若是队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。若是设置为False,
将引起Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。
超时后将引起Queue.Full异常。

q.qsize() 
返回队列中目前项目的正确数量。此函数的结果并不可靠,由于在返回结果和在稍后程序中使用结果之间,
队列中可能添加或删除了项目。在某些系统上,此方法可能引起NotImplementedError异常。

例题2:

from multiprocessing import Queue,Process
def produce(q):
    q.put('hello') 把hello放入队列

def consume(q):
    print(q.get())  返回队列中的项   也就是得到hello

if __name__ == '__main__':
    q = Queue()
    p = Process(target=produce,args=(q,))
    p.start()
    c = Process(target=consume, args=(q,))
    c.start()

打印结果:hello

 

4 生产者模型:使用queue模块。

# 队列
# 生产者消费者模型

# 生产者 进程
# 消费者 进程
import time
import random
from multiprocessing import Process,Queue
def consumer(q,name):
    while True:
        food = q.get()
        if food is None:
            print('%s获取到了一个空'%name)
            break
        print('\033[31m%s消费了%s\033[0m' % (name,food))
        time.sleep(random.randint(1,3))

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)

if __name__  == '__main__':
    q = Queue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    q.put(None)
    q.put(None)

 

升级版本:模块JoinableQueue

import time
import random
from multiprocessing import Process,JoinableQueue   用的这个
def consumer(q,name):
    while True:
        food = q.get()
        print('\033[31m%s消费了%s\033[0m' % (name,food))
        time.sleep(random.randint(1,3))
        q.task_done()     # count - 1

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)
    q.join()    # 阻塞  直到一个队列中的全部数据 所有被处理完毕

if __name__  == '__main__':
    q = JoinableQueue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start()
    p2.start()
    c1.daemon = True   # 设置为守护进程 主进程中的代码执行完毕以后,子进程自动结束
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()      # 感知一个进程的结束

#  在消费者这一端:
    # 每次获取一个数据
    # 处理一个数据
    # 发送一个记号 : 标志一个数据被处理成功

# 在生产者这一端:
    # 每一次生产一个数据,
    # 且每一次生产的数据都放在队列中
    # 在队列中刻上一个记号
    # 当生产者所有生产完毕以后,
    # join信号 : 已经中止生产数据了
                # 且要等待以前被刻上的记号都被消费完
                # 当数据都被处理完时,join阻塞结束

# consumer 中把全部的任务消耗完
# producer 端 的 join感知到,中止阻塞
# 全部的producer进程结束
# 主进程中的p.join结束
# 主进程中代码结束
# 守护进程(消费者的进程)结束
相关文章
相关标签/搜索