Python并发编程之进程

1、理论概念

  一、定义

    进程(Process 也能够称为重量级进程)是程序的一次执行。在每一个进程中都有本身的地址空间、内存、数据栈以及记录运行的辅助数据,它是系统进行资源分配和调度的一个独立单位。python

  二、并行和并发

    并行:并行是指多个任务同一时间执行;linux

    并发:是指在资源有限的状况下,两个任务相互交替着使用资源;编程

  三、同步和异常

    同步是指多个任务在执行时有一个前后的顺序,必须是一个任务执行完成另一个任务才能执行;json

    异步是指多个任务在执行时没有前后顺序,多个任务能够同时执行;
  四、同步/异步/阻塞/非阻塞/windows

    同步阻塞:这个阻塞的造成效率是最低的;好比你在下载一个东西是,你一直盯着下载进度条,到达100%时下载完成;安全

        同步体如今:你等待下载完成通知;网络

        阻塞体如今:等待下载的过程当中,不能作别的事情数据结构

    同步非阻塞:你在下载东西时,你把任务提交后就去干别的事情了,只是每过一段时间就看一下是否是下载完成;多线程

        同步体如今:等待下载完成通知;并发

        非阻塞提如今:等待下载完成通知过程当中,去干别的事情了,只是时不时会瞄一眼进度条;

    异步阻塞:你在下载东西时换了一个如今使用的客户端好比迅雷,下载完成后会有一个提示音,可是这时候你仍然一直在等待那个完成后的提示音;

        异步体如今:下载完成时有提示音;

        阻塞体如今:等待下载完成提示音时,不作任何事情;

    异步非阻塞:你然然使用的是迅雷下载软件,这时候你把下载任务提交后就去干别的事情去了,当你听到‘叮’之后就知道下载完成;

        异步体如今:下载完成叮一声完成通知

        非阻塞体如今:等待下载完成“叮”一声通知过程当中,去干别的任务了,只须要接收“叮”声通知便可;

2、进程的建立与结束

  multiprocessing模块:multiprocess不是一个模块而是python中一个操做、管理进程的包。 之因此叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的全部子模块。因为提供的子模块很是多,为了方便你们归类记忆,我将这部分大体分为四个部分:建立进程部分,进程同步部分,进程池部分,进程之间数据共享。

  Process模块的各类方法介绍

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,表示一个子进程中的任务(还没有启动) 强调: 1. 须要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号 参数介绍: group参数未使用,值始终为None target表示调用对象,即子进程要执行的任务 args表示调用对象的位置参数元组,args=(1,2,'egon',) kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} name为子进程的名称 p.start():启动进程,并调用该子进程中的p.run() p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法 p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁 p.is_alive():若是p仍然运行,返回True p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

  在windows中使用process注意事项:

    在Windows操做系统中因为没有fork(linux操做系统中建立进程的机制),在建立子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。所以若是将process()直接写在文件中就会无限递归建立子进程报错。因此必须把建立子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候 ,就不会递归运行了。

  process模块建立进程:

#!/usr/bin/env python # -*- coding:utf-8 -*-  #Author: caoyf
import time from multiprocessing import Process def func(name): print('hello %s'%name) print('我是子进程') if __name__ == '__main__': p = Process(target=func,args=('caoyf',))  #在实例化时候,args的参数必须是一个元祖形式(注册一个子进程)
    p.start() #启动一个子进程
    time.sleep(3) print('执行主进程内容了')
建立第一个进程

  多个进程同时运行,子进程的执行顺序不是根据启动的顺序来决定的;

#!/usr/bin/env python # -*- coding:utf-8 -*- #Author: caoyf
import time from multiprocessing import Process def func(name): print('hello %s'%name) time.sleep(2) if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=func, args=('caoyf',)) p.start() p_lst.append(p) for p in p_lst: p.join()  # 是感知一个子进程的结束,将异步的程序改成同步
    print('父进程在运行')
多个进程同时运行

  另外一种开启进程的方法,继承process的形式

#!/usr/bin/env python # -*- coding:utf-8 -*- #Author: caoyf
import time import os from multiprocessing import Process class Func(Process): def __init__(self,name): super().__init__() self.name = name def run(self): print(os.getpid()) print('%s正在和小明聊天'%self.name) if __name__ == '__main__': p1 = Func('caoyf') p2 = Func('Zhao') p1.start() p2.start() p1.join() p2.join()
继承的方式开启进程

  守护进程:会随着主进程的结束而结束,进程之间是相互独立的,主进程的代码运行结束,守护进程也会随即结束

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author: caoyf
import  time
import os
from multiprocessing import  Process
def foo():
    print('start123')
    time.sleep(2)
    print('end123')

def func():
    print('start456')
    time.sleep(5)
    print('end456')
if __name__ == '__main__':
    p1 = Process(target=foo)
    p2 = Process(target=func)
    p1.daemon = True
    p1.start()
    p2.start()
    time.sleep(0.1)
    print('main------------')#打印该行则主进程代码结束,则守护进程p1应该被终止.#可能会有p1任务执行的打印信息123,由于主进程打印main---
    # -时,p1也执行了,可是随即被终止.
守护进程

 

3、进程同步(multiprocessing.Lock\Spemaphore\Event)

  锁(Lock):

    资源是有限的,多个进程若是对同一个对象进行操做,则有可能形成资源的争用,甚至致使死锁,在并发进程中就能够利用锁进行操做来避免访问的冲突;

    加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,可是速度就变慢了,但牺牲了速度却保证了数据安全。

    虽然能够用文件共享数据实现进程间通讯,但问题是:

      1.效率低(共享数据基于文件,而文件是硬盘上的数据)

      2.须要本身加锁处理

 

    咱们能够模拟一个火车抢票的过程,当过个客户同时对一个程序发起访问时,假设此时有5张票,有10我的抢

from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩余票数%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(random.random()) #模拟读数据的网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(random.random()) #模拟写数据的网络延迟
        json.dump(dic,open('db','w'))
        print('\033[32m购票成功\033[0m')
    else:
        print('\033[31m购票失败\033[0m')

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(100): #模拟并发100个客户端抢票
        p=Process(target=task,args=(lock,))
        p.start()
抢火车票

  信号量:

    信号量Semaphore是同时容许必定数量的线程更改数据 。

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
#Author: caoyf
import  time
import  random
from multiprocessing import Semaphore
from multiprocessing import Process
def f(i,a):
    a.acquire()
    print('%s走进了房间'%i)
    time.sleep(random.randint(1,5))
    print('%s走出了房间'%i)
    a.release()
if __name__ == '__main__':
    a = Semaphore(5)
    for i in range(10):
        p = Process(target=f,args=(i,a))
        p.start()
信号量

  事件:

用于主线程控制其余线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。
            clear:将“Flag”设置为False
            set:将“Flag”设置为True
#!/usr/bin/env python
# -*- coding:utf-8 -*- 
#Author: caoyf
from multiprocessing import Event,Process
import random
import time
def cars(a,i):
    if not a.is_set():
        print('car%s在等待'%i)
        a.wait()
    print('\033[31mcar%s经过\033[0m' % i)
def f(a):
    while True:
        if a.is_set():
            a.clear()
            print('\033[31m红灯亮了\033[0m')

        else:
            a.set()
            print('\033[32m绿灯亮了\033[0m')
        time.sleep(2)
if __name__ == '__main__':
    a = Event()
    p = Process(target=f,args=(a,))
    p.start()
    for i in range(20):
        car = Process(target=cars,args=(a,i))
        car.start()
        time.sleep(random.random())
事件/红绿灯实例

4、进程间通讯---队列和管道

  队列Queue:适用于多线程编程的先进先出数据结构,能够用来安全的传递多线程信息。

  经过队列实现了 主进程与子进程的通讯 子进程与子进程之间的通讯

q=Queue(10)     #实例化一个对象,容许队列对多10个元素
q.put() #放入队列
q.get() #从队列中取出

  假设如今有一个队伍,队伍里最多只能站5我的,可是有15我的想要进去

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
#Author: caoyf
from multiprocessing import Process
from multiprocessing import Queue
def getin(q):     #进入队伍的子进程
    for i in range(15):
        q.put(i)
        # print(q)
def getout(q):    #离开队伍的子进程
    for i in range(6):
        print(q.get())
if __name__=='__main__':
    q=Queue(5)      #队伍内最多能够容纳的人数
    p=Process(target=getin,args=(q,))     #进入队伍的进程
    p.start()
    p2=Process(target=getout,args=(q,))   #离开队伍的进程
    p2.start()
队列实例

  管道(Pipes)

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
#Author: caoyf
from multiprocessing import Process,Pipe,Manager,Lock
import time
import  random
# 管道 进程之间建立的一条管道,默认是全双工模式,两头均可以进和出,
# 注意 必须在产生Process对象以前产生管道
# 若是在Pipe括号里面填写False后就变成了单双工,
# 左边的只能收,右边的只能发,recv(接收),send(发送)
#若是没有消息能够接收,recv会一直阻塞,若是链接的另一段关闭后,
#recv会抛出EOFError错误
# close 关闭链接
#下面的实例是在Pipe的括号里填写和不填写False的区别
# from multiprocessing import Process,Pipe
# def func(pro):
#     pro.send('hello')
#     pro.close()
#
# if __name__=='__main__':
#     con,pro =  Pipe(False)
#     p = Process(target=func,args=(pro,))
#     p.start()
#     print(con.recv())
#     p.join()
# 模拟recv阻塞状况
# def func(con,pro):
#     con.close()
#     while 1:
#         try:
#             print(pro.recv())
#         except EOFError:
#             pro.close()
#             break
#
#
# if __name__=='__main__':
#     con,pro =  Pipe()
#     p = Process(target=func,args=(con,pro,))
#     p.start()
#     pro.close()
#     con.send('aaaaa')
#     con.close()
#     p.join()

# 利用管道实现生产者和消费者
# def sc(con,pro,name,food):
#     con.close()
#     for i in range(5):
#         time.sleep(random.random())
#         f = '%s生产了%s%s'%(name,food,i)
#         print(f)
#         pro.send(f)
# def xf(con,pro,name):
#     pro.close()
#     while 1:
#         try:
#             baozi = con.recv()
#             print('%s消费了%s'%(name,baozi))
#         except EOFError:
#             break
# if __name__=='__main__':
#     con,pro = Pipe()
#     p1 = Process(target=sc,args=(con,pro,'caoyf','包子'))
#     c1 = Process(target=xf,args=(con,pro,'zhoaf'))
#     p1.start()
#     c1.start()
#     con.close()
#     pro.close()
#     p1.join()
管道

  数据共享:

    队列和管道只是实现了数据的传递,尚未实现数据的共享,如实现数据共享,就要用到Managers(注:进程间通讯应该尽可能避免使用共享数据的方式

 
 

from multiprocessing import Process,Manager
import os

 
 

def f(dict1,list1):
dict1[os.getpid()] = os.getpid() # 往字典里放当前PID
list1.append(os.getpid()) # 往列表里放当前PID
print(list1)

 
 

if __name__ == "__main__":
with Manager() as manager:
d = manager.dict() #生成一个字典,可在多个进程间共享和传递
l = manager.list(range(5)) #生成一个列表,可在多个进程间共享和传递
p_list = []
for i in range(10):
p = Process(target=f,args=(d,l))
p.start()
p_list.append(p) # 存进程列表
for res in p_list:
res.join()
print('\n%s' %d) #若要保证数据安全,须要加锁lock=Lock()

 

  进程池

    对于须要使用几个甚至十几个进程时,咱们使用Process仍是比较方便的,可是若是要成百上千个进程,用Process显然太笨了,multiprocessing提供了Pool类,即如今要讲的进程池,可以将众多进程放在一块儿,设置一个运行进程上限,每次只运行设置的进程数,等有进程结束,再添加新的进程

  • Pool(processes =num):设置运行进程数,当一个进程运行完,会添加新的进程进去
  • apply_async:异步,串行
  • apply:同步,并行
  • close():关闭pool,不能再添加新的任务
复制代码
import os
import time
import random
from multiprocessing import Pool
from multiprocessing import Process
def func(i):
    i += 1

if __name__ == '__main__':
    p = Pool(5)          # 建立了5个进程
    start = time.time()
    p.map(func,range(1000))  
    p.close()                        # 是不容许再向进程池中添加任务
    p.join()                        #阻塞等待 执行进程池中的全部任务直到执行结束
    print(time.time() - start)
    start = time.time()
    l = []
    for i in range(1000):
        p = Process(target=func,args=(i,))  # 建立了一百个进程
        p.start()
        l.append(p)
    [i.join() for i in l]
    print(time.time() - start)

回调函数:
 
 
复制代码
import os
import time
from multiprocessing import Pool
# 参数 概念 回调函数
def func(i):    # 多进程中的io多,分出去一部分
    print('子进程%s:%s'%(i,os.getpid()))
    return i*'*'

def call(arg):   # 回调函数是在主进程中完成的,不能传参数,只能接受多进程中函数的返回值
    print('回调 :',os.getpid())
    print(arg)

if __name__ == '__main__':
    print('主进程',os.getpid())
    p = Pool(5)
    for i in range(10):
        p.apply_async(func,args=(i,),callback=call)  #callback 回调函数 :主进程执行 参数是子进程执行的函数的返回值
    p.close()
    p.join()
相关文章
相关标签/搜索