并发编程之多进程

 

目录python

 

一 multiprocessing模块介绍                                                          

二 Process类的介绍

三 开启进程的两种方式

四 僵尸进程与孤儿进程

五 Process的join方法和其余属性方法介绍

六 守护进程

七 互斥锁

八 队列介绍

九 生产者消费者模型介绍

十 练习题

 

 


一 multiprocessing模块介绍

python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu\_count\(\)查看),在python中大部分状况须要使用多进程。编程

Python提供了multiprocessing。 multiprocessing模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。multiprocessing模块的功能众多:支持子进程、通讯和共享数据、执行不一样形式的同步,>提供了Process、Queue、Pipe、Lock等组件。json

须要再次强调的一点是:与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。windows

二 Process类的介绍

建立进程的类:安全

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化获得的对象,可用来开启一个子进程

强调:
1. 须要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:数据结构

group参数未使用,值始终为None

target表示调用对象,即子进程要执行的任务

args表示调用对象的位置参数元组,args=(1,2,'egon',)

kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}

name为子进程的名称

方法介绍:多线程

p.start():启动进程,并调用该子进程中的p.run() 
p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法  

p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁
p.is_alive():若是p仍然运行,返回True

p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间。

属性介绍:并发

p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置

p.name:进程的名称

p.pid:进程的pid

三 开启进程的两种方式

注意:在windows中Process()必须放到# if __name__ == '__main__':下dom

建立并开启子进程的方式一socket

 1 from multiprocessing import Process  2 import time  3 
 4 def task(name):  5     print("%s is running" % name)  6     time.sleep(3)  7     print("%s is done" % name)  8 
 9 
10 if __name__ == "__main__": 11     p = Process(target=task, kwargs={"name": "子进程1"}) 12  p.start() 13     print("")
View Code

建立并开启子进程的方式二

 1 from multiprocessing import Process  2 import time  3 
 4 class MyProcess(Process):  5     def __init__(self, name):  6         super().__init__()  7         self.name = name  8 
 9     def run(self): 10         """
11  默认方法run 12  :return: 13         """
14         print("%s is running" % self.name) 15         time.sleep(3) 16         print("%s is done " % self.name) 17         
18         
19 if __name__ == "__main__": 20     p = MyProcess("子进层1") 21     p.start()
View Code

 

四僵尸进程与孤儿进程

僵尸进程:(父进程没结束,子进程提早结束(清理内存空间保留状态),父进程没有处理子进程的状态)(有害,应当避免) 
一个进程使用fork建立子进程,若是子进程退出,而父进程没有调用wait或waitpid获取进程的状态信息,那么子进程的进程描述符仍保存在系统中,这种进程称为僵死进程。

孤儿进程:(父进程提早退出,子进程还没结束,子进程成为孤儿进程)(无害) 
一个父进程退出,而它的一个或着多个子进程还在运行,那么这些子进程将称为孤儿进程。孤儿进程将被init进程(进程号1)所收养, 并由init进程对它们完成状态收集工做

任何一个子进程(init除外)在exit()以后,并非立刻就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程去处理。这是每一个子进程结束时都必须通过的阶段。若是子进程在exit()以后,父进程没有来的及处理,此时用ps命令查看状态的话就时Z状态。若是父进程能及时处理掉,则用ps就查看不到子进程的状态。

僵尸进程的危害

unix提供了一种机制去获取子进程结束时的状态信息。在每一个进程退出的时候,内核释放全部的资源,包括打开的文件,占用的内存等。可是仍然为其保留必定的信息(包括进程号processid,退出状态the termination status of the process,运行时间the amount of CPU time taken by the process等)。直到父进程经过wait/waitpid来取时才释放。若是父进程不调用wait/waitpid的话,那么保留的那段信息就不会释放,其进程号就会一直被占用,可是系统所能使用的进程号是有限的,若是大量的产生僵死进程,将由于没有可用的进程号而致使系统不能产生新的进程,这就是讲师进程的危害,应当避免

五Process的join方法和其余属性方法介绍

join定义:

p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间。

在主进程运行过程当中若是想并发地执行其余的任务,咱们能够开启子进程,此时主进程的任务与子进程的任务分两种状况

状况一:在主进程的任务与子进程的任务彼此独立的状况下,主进程的任务先执行完毕后,主进程还须要等待子进程执行完毕,而后统一回收资源。

状况二:若是主进程的任务在执行到某一个阶段时,须要等待子进程执行完毕后才能继续执行,就须要有一种机制可以让主进程检测子进程是否运行完毕,在子进程执行完毕后才继续执行,不然一直在原地阻塞,这就是join方法的做用

 1 from multiprocessing import Process  2 import time  3 
 4 def task(name, n):  5     print("%s is running" % name)  6  time.sleep(n)  7 
 8 
 9 if __name__ == "__main__": 10     start_time = time.time() 11     p = Process(target=task, kwargs={"name": "子进程", "n": 5}) 12     p1 = Process(target=task, kwargs={"name": "子进程1", "n": 4}) 13     p2 = Process(target=task, kwargs={"name": "子进程2", "n": 3}) 14     p3 = Process(target=task, kwargs={"name": "子进程3", "n": 2}) 15 
16     p_l = [p, p1, p2, p3] 17     for p in p_l: 18  p.start() 19     for p in p_l: 20  p.join() 21 
22     print("") 23     # 同时发送p,p1,p2,p3,主进程结束时间是,等待子进程运行时间最长的那个:
24     print(time.time()-start_time)
View Code
p.is_alive()定义:
若是p仍然运行,返回True
 1 from multiprocessing import Process  2 
 3 def task(name):  4     print("%s is running" % name)  5 
 6 if __name__ == "__main__":  7     p = Process(target=task, kwargs={"name": "子进程1"})  8  p.start()  9  p.join() 10     # p.is_alive():若是p仍然运行,返回True
11     print(p.is_alive()) 12     print("")
View Code
p.terminate()定义:
强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁
 1 from multiprocessing import Process  2 import time  3 def task(name):  4     print("%s is running" % name)  5 
 6 if __name__ == "__main__":  7     p = Process(target=task, kwargs={"name": "子进程1"})  8  p.start()  9  p.terminate() 10     # 若是不睡上2秒,只是发送p.terminate(),还没运行,p.is_alive()返回的是TRUE
11     time.sleep(2) 12     print(p.is_alive()) 13     print("")
View Code
p.pid定义:进程的pid

使用os模块的os.getpid()和os.getppid()方法来查看:

 
 
View Code
 
 

查看运行软件的进程方法

 
 

wimdows系统在cmd中用:tasklist | findstr pycharm

 
 

mac系统:ps aux | grep pycharm

p.name定义:进程的名称
 
 
 1 from multiprocessing import Process  2 def task(name):  3     print("%s is running" % name)  4 
 5 
 6 if __name__ == "__main__":  7     p = Process(target=task, kwargs={"name": "子进程1"}, name="revenge")  8  p.start()  9     print("") 10     print(p.name)
View Code
 
  
 

六守护进程

关于守护进程须要强调两点:

其一:守护进程会在主进程代码执行结束后就终止

其二:守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children

 
 
 1 from multiprocessing import Process  2 import time  3 
 4 def task(name):  5     print("%s is running" % name)  6     time.sleep(3)  7     print("%s is done" % name)  8 
 9 
10 if __name__ == "__main__": 11     p = Process(target=task, kwargs={"name": "子进程1"}) 12     # 必定要在p.start()前设置,设置p为守护进程,禁止p建立子进程,而且父进程代码执行结束,p即终止运行
13     p.daemon = True 14  p.start() 15     # 只要终端打印出这一行内容,那么守护进程p也就跟着结束掉了
16     print("")
View Code

七互斥锁

进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,以下

 1 from multiprocessing import Process  2 import time  3 
 4 def task(name):  5     print("%s 1" % name)  6     time.sleep(1)  7     print("%s 2" % name)  8     time.sleep(1)  9     print("%s 3" % name) 10 
11 
12 if __name__ == "__main__": 13     for i in range(3): 14         p = Process(target=task, args=("进程%s" % i)) 15         p.start()
View Code 并发运行,效率高,但竞争同一打印终端,带来了打印错乱

 如何控制,就是加锁处理。而互斥锁的意思就是互相排斥,若是把多个进程比喻为多我的,互斥锁的工做原理就是多我的都要去争抢同一个资源:卫生间,一我的抢到卫生间后上一把锁,其余人都要等着,等到这个完成任务后释放锁,其余人才有可能有一个抢到......因此互斥锁的原理,就是把并发改为穿行,下降了效率,但保证了数据安全不错乱

 1 from multiprocessing import Process,Lock  2 import time  3 
 4 def task(name, mutex):  5  mutex.acquire()  6     print("%s 1" % name)  7     time.sleep(1)  8     print("%s 2" % name)  9     time.sleep(1) 10     print("%s 3" % name) 11  mutex.release() 12 
13 
14 if __name__ == "__main__": 15     mutex = Lock() 16     for i in range(3): 17         p = Process(target=task, args=("进程%s" % i, mutex)) 18         p.start()
View Code

互斥锁与join

使用join能够将并发变成串行,互斥锁的原理也是将并发变成穿行,那咱们直接使用join就能够了啊,为什么还要互斥锁?

答:join是将一个任务总体串行,而互斥锁的好处则是能够将一个任务中的某一段代码串行,好比只让task函数中的get任务串行

总结:

加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然能够用文件共享数据实现进程间通讯,但问题是:

一、效率低(共享数据基于文件,而文件是硬盘上的数据)

二、须要本身加锁处理

所以咱们最好找寻一种解决方案可以兼顾:

一、效率高(多个进程共享一块内存的数据)

二、帮咱们处理好锁问题。

这就是mutiprocessing模块为咱们提供的基于消息的IPC通讯机制:队列和管道。

队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来,于是队列才是进程间通讯的最佳选择。

咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。

加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

八队列介绍

进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

建立队列的类(底层就是以管道和锁定的方式实现):

Queue([maxsize]):建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。

参数介绍:

maxsize是队列中容许最大项数,省略则无大小限制。

但须要明确:

一、队列内存放的是消息而非大数据

二、队列占用的是内存空间,于是maxsize即使是无大小限制也受限于内存大小

主要方法介绍:

q.put方法用以插入数据到队列中。

q.get方法能够从队列读取而且删除一个元素。

九生产者消费者模型介绍

为何要使用生产者消费者模型

生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

什么是生产者和消费者模式

生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的

JoinableQueue([maxsize])

这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

参数介绍

maxsize是队列中容许最大项数,省略则无大小限制。

方法介绍

JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:

q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常

q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止

 1 from multiprocessing import Process,JoinableQueue  2 import time  3 
 4 def producer(q):  5     for i in range(5):  6         res = "内松%s" % i  7         time.sleep(2)  8         print("生产者生产了%s" % res)  9  q.put(res) 10  q.join() 11 
12 def consumer(q): 13     while True: 14         res = q.get() 15         time.sleep(1) 16         print("消费者吃了%s" % res) 17  q.task_done() 18 
19 
20 if __name__ == "__main__": 21     q = JoinableQueue() 22     p = Process(target=producer, args=(q,)) 23     p1 = Process(target=producer, args=(q,)) 24     c = Process(target=consumer, args=(q,)) 25     c1 = Process(target=consumer, args=(q,)) 26     c.daemon = True 27     c1.daemon = True 28  c.start() 29  c1.start() 30  p.start() 31  p1.start() 32  p.join() 33  p1.join() 34     print("")
View Code

生产者消费者模型总结

一、程序中有两类角色

一类负责生产数据(生产者)
一类负责处理数据(消费者)

二、引入生产者消费者模型为了解决的问题是

平衡生产者与消费者之间的速度差
程序解开耦合

三、如何实现生产者消费者模型

生产者<--->队列<--->消费者

 

 

十练习题

一、思考开启进程的方式一和方式二各开启了几个进程?

答:各开启了两进程

二、进程之间的内存空间是共享的仍是隔离的?下述代码的执行结果是什么?

 1 from multiprocessing import Process  2 
 3 #在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就能够了
 4 n=100 
 5 
 6 def work():  7     global n  8     n=0  9     print('子进程内: ',n) 10 
11 
12 if __name__ == '__main__': 13     p=Process(target=work) 14  p.start() 15     print('主进程内: ',n)
View Code

答:内存空间是隔离的。

代码运行结果:

主进程内: 100
子进程内: 0

三、基于多进程实现并发的套接字通讯?

服务端:

 1 import socket  2 from multiprocessing import Process  3 def talk(conn):  4     while True:  5         try:  6             res = conn.recv(1024)  7             if not res:  8                 continue
 9  conn.send(res.upper()) 10         except Exception as e: 11             print('\033[1;31m客户端出现错误,关闭链接!---->\033[0m', e) 12             break
13  conn.close() 14 
15 def serve(ip, port): 16     server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 17     server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 18  server.bind((ip, port)) 19     server.listen(5) 20 
21     while True: 22         conn, client_addr = server.accept() 23         p = Process(target=talk, args=(conn,)) 24  p.start() 25 
26 
27 if __name__ == "__main__": 28     serve("127.0.0.1", 8080)
View Code

客户端:

 1 import socket  2 
 3 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  4 
 5 client.connect(("127.0.0.1", 8080))  6 while True:  7     cmd = input(">>>:").strip()  8     if not cmd:continue
 9     client.send(cmd.encode("utf-8")) 10     res = client.recv(1024) 11     print(res.decode("utf-8"))
View Code

思考每来一个客户端,服务端就开启一个新的进程来服务它,这种实现方式有无问题?

答:有问题,若是客户端愈来愈多,多道必定的数目,服务端会奔溃:

四、改写下列程序,分别别实现下述打印效果

 1 from multiprocessing import Process  2 import time  3 import random  4 
 5 def task(n):  6     time.sleep(random.randint(1,3))  7     print('-------->%s' %n)  8 
 9 if __name__ == '__main__': 10     p1=Process(target=task,args=(1,)) 11     p2=Process(target=task,args=(2,)) 12     p3=Process(target=task,args=(3,)) 13 
14  p1.start() 15  p2.start() 16  p3.start() 17 
18     print('-------->4')
View Code

保证最后输出-------->4

 1 from multiprocessing import Process  2 import time  3 import random  4 
 5 def task(n):  6     time.sleep(random.randint(1, 3))  7     print('-------->%s' % n)  8 
 9 
10 if __name__ == '__main__': 11     p1 = Process(target=task, args=(1,)) 12     p2 = Process(target=task, args=(2,)) 13     p3 = Process(target=task, args=(3,)) 14     p_l = [p1, p2, p3] 15     for p in p_l: 16  p.start() 17     for p in p_l: 18         p.join()
View Code

保证按顺序输出(串行了)

def task(n): time.sleep(random.randint(1, 3)) print('-------->%s' % n) if __name__ == '__main__': p1 = Process(target=task, args=(1,)) p2 = Process(target=task, args=(2,)) p3 = Process(target=task, args=(3,)) p_l = [p1, p2, p3] for p in p_l: p.start() p.join() print('-------->4')

五、思考下列代码的执行结果有可能有哪些状况?为何?

from multiprocessing import Process

import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True
    p1.start()
    p2.start()
    print("main-------")

 

六、模拟抢票练习

 1 # 文件"db.json"
 2 from multiprocessing import Process,Lock  3 import time  4 import json  5 def search(name):  6     dic = json.load(open("db.json", "r", encoding="utf-8"))  7     time.sleep(1)  8     print("<%s>剩余票数%s" % (name, dic["count"]))  9 
10 def get(name): 11     dic = json.load(open("db.json", "r", encoding="utf-8")) 12     time.sleep(1) 13     if dic["count"] > 0: 14         dic["count"] -= 1
15         time.sleep(2) 16         json.dump(dic, open("db.json", "w", encoding="utf-8")) 17         print("<%s>购票成功" % name) 18 
19 
20 def task(name, mutex): 21  search(name) 22  mutex.acquire() 23  get(name) 24     mutex.release()
View Code
相关文章
相关标签/搜索