Python3 并发编程2

进程互斥锁

基本概念

  • 临界资源: 一次仅容许一个进程使用的资源称为临界资源, 进程间采起互斥的方式, 共享临界资源
  • 进程互斥: 一个进程正在访问临界资源, 另外一个要访问该资源的进程必须等待
  • 让并发变成串形, 牺牲了执行效率, 保证了数据的安全
  • 在程序并发执行时, 须要修改时使用

互斥锁的使用

# base_data--->{"ticket_num": 1}
# 模拟抢票软件
import json
import time
from multiprocessing import Process


# 查看余票
def search(user):
    with open('base_data', 'r', encoding='utf-8') as f:
        dic = json.load(f)
        ticket_num = dic.get('ticket_num')
        print(f'用户{user}正在查看余票, 当前余票{ticket_num}张...')


# 购买车票
def buy(user):
    with open('base_data', 'r', encoding='utf-8') as f:
        dic = json.load(f)
        
        # 阻塞
        time.sleep(1)

        if dic.get('ticket_num') > 0:
            dic['ticket_num'] -= 1
            with open('base_data', 'w', encoding='utf-8') as f1:
                json.dump(dic, f1)
            print(f'用户[{user}]抢票成功!')

        else:
            print(f'用户[{user}]抢票失败!')


# 开始抢票
def run(user):
    search(user)
    buy(user)


if __name__ == '__main__':
    for i in range(10):
        # 并发开启10个子进程
        p = Process(target=run, args=(f'{i}',))
        p.start()
'''
用户8正在查看余票, 当前余票1张...
用户6正在查看余票, 当前余票1张...
用户3正在查看余票, 当前余票1张...
用户0正在查看余票, 当前余票1张...
用户4正在查看余票, 当前余票1张...
用户2正在查看余票, 当前余票1张...
用户1正在查看余票, 当前余票1张...
用户7正在查看余票, 当前余票1张...
用户5正在查看余票, 当前余票1张...
用户9正在查看余票, 当前余票1张...
用户[8]抢票成功!
用户[6]抢票成功!
用户[3]抢票成功!
用户[4]抢票成功!
用户[1]抢票成功!
用户[5]抢票成功!
用户[2]抢票成功!
用户[0]抢票成功!
用户[7]抢票成功!
用户[9]抢票成功!
'''

使用进程锁将并发变成串行json

# 模拟抢票软件
import json
import time
from multiprocessing import Process
from multiprocessing import Lock


# 查看余票
def search(user):
    with open('base_data', 'r', encoding='utf-8') as f:
        dic = json.load(f)
        ticket_num = dic.get('ticket_num')
        print(f'用户[{user}]正在查看余票, 当前余票{ticket_num}张...')


# 购买车票
def buy(user):
    with open('base_data', 'r', encoding='utf-8') as f:
        dic = json.load(f)

        time.sleep(1)

        if dic.get('ticket_num') > 0:
            dic['ticket_num'] -= 1
            with open('base_data', 'w', encoding='utf-8') as f1:
                json.dump(dic, f1)
            print(f'用户[{user}]抢票成功!')

        else:
            print(f'用户[{user}]抢票失败!')


# 开始抢票
def run(user, mutex):
    # 上锁
    mutex.acquire()
    search(user)
    buy(user)
    # 解锁
    mutex.release()


if __name__ == '__main__':
    # 调用Lock()类获得一个锁对象
    mutex = Lock()
    for i in range(10):
        # 并发开启10个子进程
        p = Process(target=run, args=(f'{i}', mutex))
        p.start()
                
'''
用户[0]正在查看余票, 当前余票1张...
用户[0]抢票成功!
用户[3]正在查看余票, 当前余票0张...
用户[3]抢票失败!
用户[2]正在查看余票, 当前余票0张...
用户[2]抢票失败!
用户[4]正在查看余票, 当前余票0张...
用户[4]抢票失败!
用户[8]正在查看余票, 当前余票0张...
用户[8]抢票失败!
用户[6]正在查看余票, 当前余票0张...
用户[6]抢票失败!
用户[7]正在查看余票, 当前余票0张...
用户[7]抢票失败!
用户[5]正在查看余票, 当前余票0张...
用户[5]抢票失败!
用户[1]正在查看余票, 当前余票0张...
用户[1]抢票失败!
用户[9]正在查看余票, 当前余票0张...
用户[9]抢票失败!

'''

IPC

基本概念

  • inter-process communication 进程间通讯
  • 进程间的数据是相互隔离的, 要想进行进程间的通讯能够使用队列

队列

  • 进程间通讯的一种方式, 支持多进程传入和取出数据
  • 遵循先进先出的原则
from multiprocessing import Queue

q = Queue(5)  # 队列中最多存放5个数据

# 填入数据
q.put('数据1')
q.put('数据2')
q.put('数据3')
q.put('数据4')
q.put('数据5')
# q.put('数据6')  # 数据填满了继续存放, 程序会被卡住

# 查看队列是否填满
print(q.full())

# 队列满了, 则会会报错
# q.put_nowait('数据6')


# 获取数据, 若队列中无数据可获取, 程序会卡住
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())

# 队列中没有, 则会报错
# print(q.get_nowait())

# 判断队列是否为空
print(q.empty())


'''
True
数据1
数据2
数据3
数据4
数据5
True
'''

生产者消费者模型

基本概念

  • 生产者: 生产数据的
  • 消费者: 使用数据
  • 生产者消费者模型: 经过容器来解决生产者和消费者的之间的强耦合问题

代码实现

from multiprocessing import Process, Queue
import time


# 定义生产者
def producer(q):
    for i in range(5):
        data = f'包子{i}'
        q.put(data)
        print(f'生产了{data}')
        time.sleep(0.1)


# 定义生产者
def consumer(q):
    while True:
        data = q.get()
        print(f'吃了{data}')


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()
    print('主')

    
'''
主
生产了包子0
吃了包子0
生产了包子1
吃了包子1
生产了包子2
吃了包子2
生产了包子3
吃了包子3
生产了包子4
吃了包子4
'''

线程

基本概念

  • 进程是资源单位, 线程才是CPU的执行单位, 进行运算调度的最小单位
  • 线程包含在进程之中, 是进程中的实际运做单位
  • 线程开销要远小于进程, 能够节省内存资源
  • 线程之间共享进程中的数据
  • 线程pid为主进程pid

建立线程

from threading import Thread
import time


# 方式一
def task():
    print('线程开启')
    time.sleep(1)
    print('线程结束')


if __name__ == '__main__':
    t = Thread(target=task)
    t.start()


# 方式二
class MyThread(Thread):
    def run(self):
        print('线程开启')
        time.sleep(1)
        print('线程结束')


if __name__ == '__main__':
    t = MyThread()
    t.start()

线程互斥锁

from threading import Thread, Lock
import time

n = 100


def task(i):
    print(f'线程{i}启动...')
    global n
    temp = n
    time.sleep(0.1)
    n = temp - 1
    print(n)


if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=task, args=(i + 1,))
        t.start()

'''
线程1启动...
线程2启动...
线程3启动...
线程4启动...
线程5启动...
线程6启动...
线程7启动...
线程8启动...
线程9启动...
线程10启动...
99
99
99
99
99
99
99
99
99
99
'''
from threading import Thread, Lock
import time

mutex = Lock()

n = 100


def task(i):
    mutex.acquire()
    print(f'线程{i}启动...')
    global n
    temp = n
    time.sleep(0.1)
    n = temp - 1
    print(n)
    mutex.release()


if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=task, args=(i + 1,))
        t.start()

'''
线程1启动...
99
线程2启动...
98
线程3启动...
97
线程4启动...
96
线程5启动...
95
线程6启动...
94
线程7启动...
93
线程8启动...
92
线程9启动...
91
线程10启动...
90
'''