day29

进程互斥锁

加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。json

虽然能够用文件共享数据实现进程间通讯,但问题是:安全

  1. 效率低(共享数据基于文件,而文件是硬盘上的数据)
  2. 须要本身加锁处理

多进程同时抢购余票

# 文件db的内容为:{"count":1}
# 注意必定要用双引号,否则json没法识别
# 并发运行,效率高,但竞争写同一文件,数据写入错乱
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('剩余票数%s' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(0.1)  # 模拟读数据的网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2)  # 模拟写数据的网络延迟
        json.dump(dic,open('db','w'))
        print('购票成功')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(100):  # 模拟并发100个客户端抢票
        p=Process(target=task)
        p.start()

使用锁来保证数据安全

# 文件db的内容为:{"count":5}
# 注意必定要用双引号,否则json没法识别
# 并发运行,效率高,但竞争写同一文件,数据写入错乱
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('剩余票数%s' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(random.random())  # 模拟读数据的网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(random.random())  # 模拟写数据的网络延迟
        json.dump(dic,open('db','w'))
        print('购票成功')
    else:
        print('购票失败')

def task(lock):
    search()
    lock.acquire() #lock.acquire要放在打开文件以前
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(100):  # 模拟并发100个客户端抢票
        p=Process(target=task,args=(lock,))
        p.start()

队列

队列和管道都是将数据存放于内存中,队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来,咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性网络

建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。多线程

Queue([maxsize])建立共享的进程队列。
参数 :maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。并发

底层队列使用管道和锁定实现。app

from multiprocessing import Queue
q=Queue(3)
print(q.empty())
q.put(1)
q.put(2)
q.put(3)
print(q.full())
q.get()
q.get()
q.get()
print(q.empty())
print(q.put_niwait(4))
print(q._buffer)  #查看列表内容

子进程发送数据给父进程

mport time
from multiprocessing import Process, Queue

def f(q):
    #调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。
    q.put([time.asctime(), 'from Eva', 'hello'])  

if __name__ == '__main__':
    q = Queue() #建立一个Queue对象
    p = Process(target=f, args=(q,)) #建立一个进程
    p.start()
    print(q.get())
    p.join()

IPC(进程间通讯)

进程间数据是相互隔离的,若想实现进程间通讯,能够利用队列dom

基于队列实现生产者消费者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        print(222)
        res=q.get()
        time.sleep(1)

        print('%s 吃 %s' %(os.getpid(),res))

def producer(q):
    for i in range(5):
        print(11111)
        time.sleep(2)
        res='包子%s' %i
        q.put(res)

        # print('生产了 %s' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('主')
    
'''
此时的问题是主进程永远不会结束,缘由是:生产者p在生产完后就结束了,可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环
'''

线程

进程是资源分配的最小单位,线程是CPU调度的最小单位。每个进程中至少有一个线程函数

进程: 资源单位 - 线程: 执行单位ui

线程与进程都是虚拟单位,目的是为了更好地描述某种事物

GIL锁

Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中能够“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。

对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。

在多线程环境中,Python 虚拟机按如下方式执行:

  1. 设置 GIL;
  2. 切换到一个线程去运行;
  3. 运行指定数量的字节码指令或者线程主动让出控制(能够调用 time.sleep(0));
  4. 把线程设置为睡眠状态;
  5. 解锁 GIL;
  6. 再次重复以上全部步骤。

线程和进程的区别

  1. 地址空间和其余资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其余进程不可见
  2. 通讯:进程间通讯IPC,线程间能够直接读写进程数据段(如全局变量)来通讯,须要进程同步和互斥手段的辅助,以保证数据的一致性
  3. 调度和切换:线程上下文切换比进程上下文切换要快的多
  4. 在多线程操做系统中,进程不是一个可执行的实体
  5. 注意:线程不能实现并行, 线程只能实现并发, 进程能够实现并行

建立线程的两种方式

from threading import Thread
import time


# 开启线程方式1:
def task():
    print('线程开启')
    time.sleep(1)
    print('线程结束')


# t = Thread()
if __name__ == '__main__':
    # 调用Thread线程类实例化获得线程对象
    t = Thread(target=task)
    t.start()
# 开启线程方式2:
class MyThread(Thread):
    def run(self):
        print('线程开启')
        time.sleep(1)
        print('线程结束')
        
if __name__ == '__main__':
    t = MyThread()
    t.start()

线程对象的属性

from threading import Thread
from threading import current_thread
import time


def task():
    print(f'线程开启{current_thread().name}')
    time.sleep(3)
    print(f'线程结束{current_thread().name}')
    
if __name__ == '__main__':
    t = Thread(target=task)
    t.daemon = True
    t.start()

    print('主')

线程互斥锁

#线程间数据共享

from threading import Thread
import time
x = 100


def task():
    print('开启线程...')
    time.sleep(1)
    global x
    x = 200
    print('关闭线程...')

if __name__ == '__main__':
    t = Thread(target=task)
    t.start()
    t.join()  # 关键
    print(x)  # x = 200
    print('主')
from threading import Thread, Lock
import time

mutex = Lock()

n = 100
def task(i):
    print(f'线程{i}启动...')
    global n
    mutex.acquire()
    temp = n
    # time.sleep(0.1)  # 一共等待10秒
    n = temp-1
    print(n)
    mutex.release()

if __name__ == '__main__':
    t_l=[]
    for i in range(5):
        t = Thread(target=task, args=(i, ))
        t_l.append(t)
        t.start()
        
    for t in t_l:
        t.join()
'''
>>>                 
线程0启动...        
99                 
线程1启动...            
98
线程2启动...
97
线程3启动...
96
线程4启动...
95
'''
相关文章
相关标签/搜索