并发编程(五)

前言

上篇博客的内容是守护进程,对于操做系统来讲能够在后台执行一些程序.这篇的内容是互斥锁,在上上篇博客上说到进程内存空间互相隔离,因此能够经过共享文件来操做同一个文件,那么这样操做的话会发生什么呢?python

互斥锁

多个进程须要共享数据时,先将其锁定,此时资源状态为'锁定',其余进程不能更改;知道该进程释放资源,将资源的状态变成非'锁定',其余的线程才能再次锁定该资源.互斥锁保证了每次只有一个进程进入写入操做,从而保证了多进程状况下数据的正确性.json

咱们使用一个demo 来模拟多个进程操做同一个文件:安全

import json
import time,random
from multiprocessing import Process

def show_tickets(name):
    time.sleep(random.randint(1,3))
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        data = json.load(f)
        print('%s 查看 剩余票数: %s' % (name, data['count']))

def buy_ticket(name):
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        dic = json.load(f)

        if dic['count'] > 0:
            dic['count'] -= 1

            time.sleep(random.randint(1,3))

            with open('ticket.json', 'wt', encoding='utf-8') as f:
                json.dump(dic, f)
                print('%s: 购票成功' % name)

def task(name):
    show_tickets(name)
    buy_ticket(name)

if __name__ == '__main__':
    for i in range(1,11):
        p = Process(target=task, args=(i,))
        p.start()

运行结果:并发

在 ticket.json 里面只有一张票,结果却形成多个用户购买成功,这很显然是不符合实际状况的.\dom

那么怎么解决呢?若是多个进程对同一个文件进行读操做能够不进行限制,可是对同一个文件进行写操做就必要要进行限制,不能够同时多我的对同一个文件进行写操做.python 在多进程模块里提供一个类, Lock 类,当进程获取到锁的时候其余的进程就必需要等待锁释放才能够进行争抢,在这个例子里面就能够加上一把锁来保护数据安全.函数

from multiprocessing import Process, Lock
import json,time,random


def show_tickets(name):
    time.sleep(random.randint(1,3))
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        data = json.load(f)
        print('%s 查看 剩余票数: %s' % (name, data['count']))

def buy_ticket(name):
    time.sleep(random.randint(1,3))
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        dic = json.load(f)

        if dic['count'] > 0:
            dic['count'] -= 1

            time.sleep(random.randint(1,3))

            with open('ticket.json', 'wt', encoding='utf-8') as f:
                json.dump(dic, f)
                print('%s: 购票成功' % name)

def task(name,lock):
    show_tickets(name)
    lock.acquire()
    buy_ticket(name)
    lock.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(1,11):
        p = Process(target=task, args=(i,mutex))
        p.start()

运行结果:性能

这样加了锁(互斥锁)就能够解决同时操做同一个文件形成的数据混乱问题了.ui

当使用多进程开发时,若是多个进程同时读写同一个资源,可能会形成数据的混乱,为了防止发生问题,使用锁,或者使用 Process 的方法 join 将并行变为串行.spa

join 和锁的区别操作系统

  1. join 人为控制进程的执行顺序
  2. join 把整个进程所有串行,而锁能够指定部分代码串行

一旦串行,效率就会下降,一旦并行,数据就可能会出错.

进程间通讯

进程间通讯( internal-process communication),咱们在开启子进程是但愿子进程帮助完成任务,不少状况下须要将数据返回给父进程,然而进程间内存是物理隔离的.

解决办法:

  1. 将共享数据放到文件中
  2. 管道 多进程模块中的一个类,须要有父子关系
  3. 共享一快内存区域 须要操做系统分配

管道通讯

Pipe类返回一个由管道链接的链接对象,默认状况下为双工:

from multiprocessing import Process,Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
    
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()

运行结果:

[42, None, 'hello']

实例化 Pipe 类会返回两个链接对象表示管道的两端.每一个链接对象都有 send() 和 recv() 方法(及其余).请注意,若是两个进程同时尝试读写管道的同一端,则管道中的数据可能会损坏.固然,同时使用管道的不一样端部的过程不存在损坏的风险.

共享内存通讯

Queue 通讯

Queue类会生成一个先进先出的容器,经过往队列中存取数据而进行进程间通讯.

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])
    
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()

运行结果:

[42, None, 'hello']

队列其余特性

# 阻塞操做 必须掌握
q = Queue(3)
# # 存入数据
q.put("hello",block=False)
q.put(["1","2","3"],block=False)
q.put(1,block=False)
# 当容量满的时候 再执行put 默认会阻塞直到执行力了get为止
# 若是修改block=False 直接报错 由于没地方放了
# q.put({},block=False)

# # # 取出数据
print(q.get(block=False))
print(q.get(block=False))
print(q.get(block=False))
# 对于get   当队列中中没有数据时默认是阻塞的  直达执行了put
# 若是修改block=False 直接报错 由于没数据可取了
print(q.get(block=False))



# 了解
q = Queue(3)
q.put("q",timeout=3)
q.put("q2",timeout=3)
q.put("q3",timeout=3)
# 若是满了 愿意等3秒  若是3秒后还存不进去 就炸
# q.put("q4",timeout=3)

print(q.get(timeout=3))
print(q.get(timeout=3))
print(q.get(timeout=3))
# 若是没了 愿意等3秒  若是3秒后还取不到数据 就炸
print(q.get(timeout=3))

Manager 通讯

demo

from multiprocessing import  Process,Manager
import time

def task(dic):
    print("子进程xxxxx")
    # li[0] = 1
    # print(li[0])
    dic["name"] = "xx"

if __name__ == '__main__':
    m = Manager()
    # li = m.list([100])
    dic = m.dict({})
    # 开启子进程
    p = Process(target=task,args=(dic,))
    p.start()
    time.sleep(3)

能够建立一片共享内存区域用来存取数据.

生产者消费者模型

什么是生产者消费者模型

在软件开发过程当中,常常碰到这样的场景:

某些模块负责生产数据,这些数据由其余模块来负责处理(此处的模块多是:函数,线程,进程等).生产数据的模块称为生产者,而处理数据的模块称为消费者.在生产者与消费者之间的缓冲区称之为仓库.生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模型.

结构图以下:

为了便于理解,咱们举一个寄信的例子。假设你要寄一封信,大体过程以下:

  1. 你把信写好——至关于生产者生产数据;
  2. 你把信放入邮箱——至关于生产者把数据放入缓冲区;
  3. 邮递员把信从邮箱取出,作相应处理——至关于消费者把数据取出缓冲区,处理数据.

生产者消费者模型的优势

  • 解耦

假设生产者和消费者分别是两个线程.若是让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(耦合).若是将来消费者的代码发生改变,可能会影响到生产者的代码.而若是二者都依赖于某个缓冲区,二者之间不直接依赖,耦合也就相应下降了.

举个例子,咱们去邮局投递信件,若是不使用邮箱(也就是缓冲区,你必须得把信直接交给邮递员.有同窗会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他.这就产生了你和邮递员之间的依赖(至关于生产者和消费者的强耦合).万一哪天邮递员换人了,你还要从新认识一下(至关于消费者变化致使修改生产者代码).而邮箱相对来讲比较固定,你依赖它的成本就比较低(至关于和缓冲区之间的弱耦合).

  • 并发

因为生产者与消费者是两个独立的并发体,它们之间是使用缓冲区通讯的,生产者只须要往缓冲区里丢数据,就能够接着生产下一个数据了,而消费者只须要从缓冲区拿数据便可,这样就不会由于彼此的处理速度而发生阻塞.

继续上面的例子,若是没有邮箱,就得在邮局等邮递员,知道他回来,把信交给他,这期间咱们什么事都干不了(生产者阻塞).或者邮递员挨家挨户问,谁要寄信(消费者阻塞).

  • 支持忙闲不均

当生产者制造数据快的时候,消费者来不及处理,为处理的数据能够暂时存在缓冲区中,慢慢处理,而不至于由于消费者的性能过慢形成数据丢失或影响生产者生产数据.

再拿寄信的例子,假设邮递员一次只能带走1000封信,万一碰上情人节或者其余的紧急任务,须要寄出的信超过了1000封,这个时候邮箱做为缓冲区就派上用场了.邮递员把来不及带走的信暂存在邮箱中,等下次过来时在拿走.

使用

from multiprocessing import Process, Queue
import time, random

def producer(name, food, q):
    for i in range(10):
        res = '%s %s' % (food, i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print('%s 生产了 %s' % (name, res))
        
def consumer(name, q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,3))
        print('%s 消费了 %s' % (name, res))

if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=('musibii', '🍔', q))
    c = Process(target=consumer, args=('thales', q))
    p.start()
    c.start()
    
    p.join()
    c.join()
    print('主进程')

运行结果:

这样的话该进程并不会结束,由于 get 方法是阻塞的,数据消费完就会一直等待知道生产者生产新的数据,而生产者只能生产9个数据.因此会一直阻塞.

改进使用

咱们须要在消费者消费的时候知道队列里面有多少数据,应该何时消费完了,因此能够在生产者里面生产结束后添加一个标志,好比 None.

import time, random
from multiprocessing import Process, Queue


# 制做热狗
def make_hotdog(queue, name):
    for i in range(1, 4):
        time.sleep(random.randint(1, 2))
        print("%s 制做了一个🌭 %s" % (name, i))
        # 生产获得的数据
        data = "%s 生产的🌭%s" % (name, i)
        # 存到队列中
        queue.put(data)
    # 装入一个特别的数据 告诉消费方 没有了
    # queue.put(None)


# 吃热狗
def eat_hotdog(queue, name):
    while True:
        data = queue.get()
        if not data: break
        time.sleep(random.randint(1, 2))
        print("%s 吃了 %s" % (name, data))


if __name__ == '__main__':
    # 建立队列
    q = Queue()
    p1 = Process(target=make_hotdog, args=(q, "musibii的热狗店"))
    p2 = Process(target=make_hotdog, args=(q, "egon的热狗店"))
    p3 = Process(target=make_hotdog, args=(q, "eureka的热狗店"))

    c1 = Process(target=eat_hotdog, args=(q, "thales"))
    c2 = Process(target=eat_hotdog, args=(q, "maffia"))

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    # 让主进程等三家店全都作完后....
    p1.join()
    p2.join()
    p3.join()

    # 添加结束标志   注意这种方法有几个消费者就加几个None 不太合适 不清楚未来有多少消费者
    q.put(None)
    q.put(None)

    # 如今 须要知道何时作完热狗了 生产者不知道  消费者也不知道
    # 只有队列知道

    print("主进程over")

    # 生产方不生产了 然而消费方不知道 因此已知等待  get函数阻塞
    # 三家店都放了一个空表示没热狗了  可是消费者只有两个 他们只要看见None 就认为没有了
    # 因而进程也就结束了  形成一些数据没有被处理
    # 等待作有店都作完热狗在放None

运行结果:

这样就解决了最第一版本消费之由于没有数据而阻塞的问题了,可是这里仍是有问题,由于不知道到底有多少消费者,由于想让消费者知道数据已经结束了的话,须要给每一个消费者一个标志位,这样是不现实的.

完美使用

python 多进程模块提供了一个JoinableQueue类,追根溯源继承于 Queue,源码看的头疼.

import time, random
from multiprocessing import Process, JoinableQueue


# 制做热狗
def make_hotdog(queue, name):
    for i in range(1,4):
        time.sleep(random.randint(1, 2))
        print("%s 制做的🌭 %s" % (name, i))
        # 生产获得的数据
        data = "%s 生产的🌭 %s" % (name, i)
        # 存到队列中
        queue.put(data)
    # 装入一个特别的数据 告诉消费方 没有了
    # queue.put(None)


# 吃热狗
def eat_hotdog(queue, name):
    while True:
        data = queue.get()
        time.sleep(random.randint(1, 2))
        print("%s 吃了%s" % (name, data))
        # 该函数就是用来记录一共给消费方多少数据了 就是get次数
        queue.task_done()


if __name__ == '__main__':
    # 建立队列
    q = JoinableQueue()
    p1 = Process(target=make_hotdog, args=(q, "musibii的热狗店"))
    p2 = Process(target=make_hotdog, args=(q, "egon的热狗店"))
    p3 = Process(target=make_hotdog, args=(q, "eureka的热狗店"))

    c1 = Process(target=eat_hotdog, args=(q, "thales"))
    c2 = Process(target=eat_hotdog, args=(q, "maffia"))

    p1.start()
    p2.start()
    p3.start()

    # 将消费者做为主进程的守护进程
    c1.daemon = True
    c2.daemon = True

    c1.start()
    c2.start()

    # 让主进程等三家店全都作完后....
    p1.join()
    p2.join()
    p3.join()

    # 如何知道生产方生产完了 而且 消费方也吃完了
    # 方法一:等待作有店都作完热狗在放None
    # # 添加结束标志   注意这种方法有几个消费者就加几个None 不太合适 不清楚未来有多少消费者
    # q.put(None)
    # q.put(None)

    # 主进程等到队列结束时再继续  那队列何时算结束? 生产者已经生产完了 而且消费者把数据全取完了
    q.join()  # 已经明确生产放一共有多少数据

    # 如今 须要知道何时作完热狗了 生产者不知道  消费者也不知道
    # 只有队列知道

    print("主进程over")
    # 生产方不生产了 然而消费方不知道 因此一直等待  get函数阻塞
    # 三家店都放了一个空表示没热狗了  可是消费者只有两个 他们只要看见None 就认为没有了
    # 因而进程也就结束了  形成一些数据没有被处理

运行结果:

查看 JoinableQueue 类方法 task_done 源码:

看不懂.........

相关文章
相关标签/搜索