网络编程6_multiprocess模块.锁.队列

一. multiprocess模块 仔细说来,multiprocess不是一个模块而是python中一个操做、管理进程的包。 之因此叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的全部子模块。因为提供的子模块很是多,为了方便你们归类记忆,我将这部分大体分为四个部分:建立进程部分,进程同步部分,进程池部分,进程之间数据共享。重点强调:进程没有任何共享状态,进程修改的数据,改动仅限于该进程内,可是经过一些特殊的方法,能够实现进程之间数据的共享。 1. Process模块 Process是穿件进程的模块, 借助这个模块, 能够实现进程的建立 Process([group [, target [, name [, args [, kwargs]]]]])由该类实例化一个对象, 表示一个子进程中的任务(还没有启动) 强调: (1). 须要使用关键字的方式来指定参数 (2). args指定的为传给target函数的位置参数, 是一个元祖形式, 必须有逗号 (1). 看一个程序实例: from multiprocessing import Process def func(): print(12345) # 当咱们运行当前这个test.py文件的时候, 就产生了进程, 这个进程咱们称之为主进程 if __name__ == '__main__': # 将函数注册到一个进程中, p是一个进程对象, 此时尚未启动进程, 只是建立了一个进程对象, 而且func是不加括号的的, 由于加上括号就直接运行了 p = Process(target=func, ) # 告诉操做系统, 给我开启一个进程, func这个函数就被咱们新开的这个进程执行了, 而这个进程是我主程序建立出来的因此称这个新建立的进程为主进程的子进程, 而主进程又能够称之为这个新进程的父进程 # 而这个子进程中执行的程序,至关于将如今这个test.py文件中的程序copy到一个你看不到的python文件中去执行了,就至关于当前这个文件,被另一个py文件import过去并执行了。 # start并非直接就去执行了,咱们知道进程有三个状态,进程会进入进程的三个状态,就绪,(被调度,也就是时间片切换到它的时候)执行,阻塞,而且在这个三个状态之间不断的转换,等待cpu执行时间片到了。 p.start() # 这是主进程的程序,上面开启的子进程的程序是和主进程的程序同时运行的,咱们称为异步 print("*"*10) (2). 上面说了,咱们经过主进程建立的子进程是异步执行的,那么咱们就验证一下,而且看一会儿进程和主进程(也就是父进程)的ID号(讲一下pid和ppid,使用pycharm举例),来看看是不是父子关系。 import time import os # os.getpid() # 获取本身的进程号 # os.getppid() # 获取本身进程的父进程的ID号 from multiprocessing import Process def func(): print("aaaaa") time.sleep(1) print("子进程>>>", os.getpid()) print("该子进程的父进程>>>", os.getppid()) print(12345) if __name__ == '__main__': p = Process(target=func, ) p.start() print("*"*10) print("父进程>>>", os.getpid()) print("父进程的父进程>>>", os.getppid()) # ********** # 首先打印出来了主进程的程序,而后打印的是子进程的,也就是子进程是异步执行的,至关于主进程和子进程同时运行着,若是是同步的话,咱们先执行的是func(),而后再打印主进程最后的10个*号。 # 父进程>>> 9044 # 父进程的父进程>>> 9528 #我运行的test.py文件的父进程号,它是pycharm的进程号 # aaaaa # 子进程>>> 10476 # 该子进程的父进程>>> 9044 #是我主进程的ID号,说明主进程为它的父进程 # 12345 (3). 看一个问题,说明linux和windows两个不一样的操做系统建立进程的不一样机制致使的不一样结果: import time import os from multiprocessing import Process def func(): print('aaaa') time.sleep(1) print('子进程>>',os.getpid()) print('该子进程的父进程>>',os.getppid()) print(12345) print('太白老司机') """若是我在这里加了一个打印,你会发现运行结果中会出现两次打印出来的太白老司机,由于咱们在主进程中开了一个子进程,子进程中的程序至关于import的主进程中的程序,那么import的时候会不会执行你import的那个文件的程序啊,前面学的,是会执行的,因此出现了两次打印, 实际上是由于windows开起进程的机制决定的,在linux下是不存在这个效果的,由于windows使用的是process方法来开启进程,他就会拿到主进程中的全部程序,而linux下只是去执行我子进程中注册的那个函数,不会执行别的程序,这也是为何在windows执行程序的时候,要加上if __name__ == '__main__':,不然会出现子进程中运行的时候还开启子进程,那就出现无限循环的建立进程了,就报错了""" if __name__ == '__main__': p = Process(target=func, ) p.start() print("*"*10) print("父进程>>>", os.getpid()) print("父进程的父进程>>>", os.getppid()) # 太白老司机 # ********** # 父进程>>> 11204 # 父进程的父进程>>> 9528 # 太白老司机 # aaaa # 子进程>> 6644 # 该子进程的父进程>> 11204 # 12345 (4). 一个进程的生命周期:若是子进程的运行时间长,那么等到子进程执行结束程序才结束,若是主进程的执行时间长,那么主进程执行结束程序才结束,实际上咱们在子进程中打印的内容是在主进程的执行结果中看不出来的,可是pycharm帮咱们作了优化,由于它会识别到你这是开的子进程,帮你把子进程中打印的内容打印到了显示台上。 若是说一个主进程运行完了以后,咱们把pycharm关了,可是子进程尚未执行结束,那么子进程还存在吗?这要看你的进程是如何配置的,若是说咱们没有配置说我主进程结束,子进程要跟着结束,那么主进程结束的时候,子进程是不会跟着结束的,他会本身执行完,若是我设定的是主进程结束,子进程必须跟着结束,那么就不会出现单独的子进程(孤儿进程)了,具体如何设置,看下面的守护进程的讲解。好比说,咱们未来启动项目的时候,可能经过cmd来启动,那么我cmd关闭了你的项目就会关闭吗,不会的,由于你的项目不能中止对外的服务,对吧 (5). Process类中参数的介绍: 参数介绍: ①. group参数未使用,值始终为None ②. target表示调用对象,即子进程要执行的任务 ③. args表示调用对象的位置参数元组,args=(1,2,'egon',) ④. kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} ⑤. name为子进程的名称 给要执行的函数传参数: def func(x,y): print(x) time.sleep(1) print(y) if __name__ == '__main__': p = Process(target=func,args=('姑娘','来玩啊!'))#这是func须要接收的参数的传送方式。 p.start() print('父进程执行结束!') #执行结果: 父进程执行结束! 姑娘 来玩啊! (6). Process类中各方法的介绍: ①. p.start():启动进程,并调用该子进程中的p.run() ②. p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法 ③. p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁 ④. p.is_alive():若是p仍然运行,返回True ⑤. p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 jion方法的例子: 让主进程加上join的地方等待(也就是阻塞住), 等待子程序执行完以后, 在继续往下执行主进程, 不少时候, 咱们主进程须要子进程的执行结果, 因此必需要等待, join有点像把子程序和主程序拼接起来, 将异步改成同步执行 import time from multiprocessing import Process def func(x, y): print(x) time.sleep(1) print(y) if __name__ == "__main__": p = Process(target=func, args=("姑娘", "来玩啊")) p.start() print("这里是异步的") p.join() print("父程序执行结束") # 打印结果 这里是异步的 姑娘 来玩啊 父程序执行结束 用for循环开启多个线程: import time import os from multiprocessing import Process def func(x, y): print(x) # time.sleep(1) print(y) if __name__ == '__main__': p_list = [] for i in range(10): p = Process(target=func, args=("姑娘%s"%i, "来玩啊")) p_list.append(p) p.start() # 一、若是加到for循环里面,那么全部子进程包括父进程就所有变为同步了,由于for循环也是主进程的,循环第一次的时候,一个进程去执行了,而后这个进程就join住了,那么for循环就不会继续执行了,等着第一个子进程执行结束才会继续执行for循环去建立第二个子进程。 # 二、若是我不想这样的,也就是我想全部的子进程是异步的,而后全部的子进程执行完了再执行主进程 p.join() # 四、这是解决办法,前提是咱们的子进程所有都已经去执行了,那么我在一次给全部正在执行的子进程加上join,那么主进程就须要等着全部子进程执行结束才会继续执行本身的程序了,而且保障了全部子进程是异步执行的。 [ap.join() for ap in p_list] # 三、若是这样写的话,屡次运行以后,你会发现会出现主进程的程序比一些子进程先执行完,由于咱们p.join()是对最后一个子进程进行了join,也就是说若是这最后一个子进程先于其余子进程执行完,那么主进程就会去执行,而此时若是还有一些子进程没有执行完,而主进程执行完了,那么就会先打印主进程的内容了,这个cpu调度进程的机制有关系,由于咱们的电脑可能只有4个cpu,个人子进程加上住进程有11个,虽然我for循环是按顺序起进程的,可是操做系统必定会按照顺序给你执行你的进程吗,答案是不会的,操做系统会按照本身的算法来分配进程给cpu去执行,这里也解释了咱们打印出来的子进程中的内容也是没有固定顺序的缘由,由于打印结果也须要调用cpu,能够理解成进程在争抢cpu,若是同窗你想问这是什么算法,这就要去研究操做系统啦。那咱们的想全部子进程异步执行,而后再执行主进程的这个需求怎么解决啊 p.join() print("不要钱") 模拟两个应用场景, 1. 同时对一个文件进行写操做, 2. 同时建立多个文件 import time import os import re from multiprocessing import Process # 多进程同时对一个文件进行写操做 # def func(x, y, i): # with open(x, "a", encoding = "utf-8") as f: # print("当前进程%s拿到的文件的光标位置>>%s" % (os.getpid(), f.tell())) # f.write("%s\n"%y) # 多线程同时建立多个文件 def func(x, y): with open(x, "w", encoding="utf-8") as f: f.write(y) if __name__ == '__main__': p_list = [] for i in range(10): # p = Process(target=func, args=("can_do_girl_lists.txt", "姑娘%s"%(i+1), i+1)) p = Process(target=func, args=("girl/can_do_girl_lists_%s.txt"%(i+1), "姑娘%s"%(i+1))) p_list.append(p) p.start() [ap.join() for ap in p_list] # with open("can_do_girl_lists.txt", "r", encoding="utf-8") as f: # data = f.read() # all_num = re.findall("\d+", data) # print(">>>>>", all_num, ".....%s"%(len(all_num))) # print([i for i in os.walk(r"D:/1PY/Day30")]) print("不要钱") (7). Process类中自带封装的各属性的介绍 ①. p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置 ②. p.name:进程的名称 ③. p.pid:进程的pid ④. p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束(了解便可) ⑤. p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功(了解便可) 2. Process类的使用 注意: 在windows中Process()必须放到 if __name__ == "__main__": 下 因为Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 若是在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。 进程建立的第二种方法(继承): (1). 进程建立的第二种方法: import os from multiprocessing import Process class MyProcess(Process): def __init__(self, person): super().__init__() # 必须执行父类的__init__方法 self.person = person def run(self): print(os.getpid()) print(self.pid) print("%s正在和女主播聊天" % self.person) # def start(self): # self.run() # print("我是%s" % self.person) if __name__ == '__main__': p1 = MyProcess("Jedan") p2 = MyProcess("太白") p3 = MyProcess("alexDSB") p1.start() p2.start() p2.run() p3.start() p1.join() p2.join() p3.join() (2). 进程之间的数据是隔离的 from multiprocessing import Process n = 100 def work(): global n n = 0 print("子进程内:", n) # print(n) if __name__ == '__main__': p = Process(target=work) p.start() p.join() #等待子进程执行完毕,若是数据共享的话,我子进程是否是经过global将n改成0了,可是你看打印结果,主进程在子进程执行结束以后,仍然是n=100,子进程n=0,说明子进程对n的修改没有在主进程中生效,说明什么?说明他们之间的数据是隔离的,互相不影响的 print("主进程内:", n) # 子进程内: 0 # 主进程内: 100 (3). 多线程实现多个客户端通讯 (4). is_alive(), terminate() import time from multiprocessing import Process class Piao(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print("%s is 打飞机" % self.name) s = input("!!!") # 在pycharm下子进程中不能input输入, 会报错 EOFError: EOF when reading a line, 由于子进程中没有像咱们主进程这样的在pycharm下的控制台能够输入东西的地方 time.sleep(2) print("%s is 打飞机结束" % self.name) if __name__ == '__main__': p1 = Piao("太白") p1.start() p1.join() # time.sleep(5) p1.terminate() # 关闭进程, 不会当即关闭, 有个等着操做系统去关闭这个进程的时间, 因此is_alive马上查看的结果可能仍是存活, 可是稍微等一会就关闭了 print(p1.is_alive()) # 查看子程序是否还存活 print("等会...") time.sleep(1) print(p1.is_alive()) (5). 僵尸进程(有害)和孤儿进程(无害) 僵尸进程: 一个进程使用fork建立子进程, 若是子程序退出, 而父程序没有调用wait或waitpid获取子进程的状态消息, 那么子进程的进程描述符任然保存在系统中, 这个进程称之为僵尸进程 任何一个子进程(init除外)在exit()以后,并不是立刻就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。这是每一个子进程在结束时都要通过的阶段。若是子进程在exit()以后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”。若是父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不通过僵尸状态。 若是父进程在子进程结束以前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。 孤儿进程: 一个父进程退出, 而它的一个或多个子进程还在运行, 那么那些子进程将会成为孤儿进程, 孤儿进程将会被init进程(进程号为1)所收养, 并由init进程对他们完成状态收集工做 僵尸进程的危害场景: 例若有个进程,它按期的产 生一个子进程,这个子进程须要作的事情不多,作完它该作的事情以后就退出了,所以这个子进程的生命周期很短,可是,父进程只管生成新的子进程,至于子进程 退出以后的事情,则一律漠不关心,这样,系统运行上一段时间以后,系统中就会存在不少的僵死进程,假若用ps命令查看的话,就会看到不少状态为Z的进程。 严格地来讲,僵死进程并非问题的根源,罪魁祸首是产生出大量僵死进程的那个父进程。所以,当咱们寻求如何消灭系统中大量的僵死进程时,答案就是把产生大 量僵死进程的那个元凶枪毙掉(也就是经过kill发送SIGTERM或者SIGKILL信号啦)。枪毙了元凶进程以后,它产生的僵死进程就变成了孤儿进 程,这些孤儿进程会被init进程接管,init进程会wait()这些孤儿进程,释放它们占用的系统进程表中的资源,这样,这些已经僵死的孤儿进程 就能瞑目而去了。 3. 守护进程 若是主进程结束了, 由主进程建立的子进程必须跟着结束, 这时就须要守护进程 主进程建立守护进程: 其一. 守护进程会在主进程代码执行结束后就终止 其二. 守护进程内没法再开启子进程, 不然会抛出异常AssertionError: daemonic processes are not allowed to have children 注意: 进程之间是相互独立的, 主进程代码运行结束, 守护进程随即终止 import os, time from multiprocessing import Process class MyProcess(Process): def __init__(self, person): super().__init__() self.name = name deg run(self): print(os.getpid(), self.name) print("%s正在和女主播聊天" % self.name) time.sleep(3) if __name__ == "__mian__": p = MyProcess("太白") p.daemon = True # 必定要在p.start()以前设置p为守护进程, 禁止p建立子进程, 而且父进程代码执行结束, p即终止运行 p.start() print("宝") 4. 进程同步(锁) 利用并发编程能够更加充分的利用io资源, 但也带来了新的问题: 进程之间的数据不共享, 可是共享同一套文件系统, 因此访问同一个文件或者同一个打印终端, 是没有问题的, 而共享带来的是竞争, 竞争带来的是错乱, 如何控制, 就是枷锁 (1). 多进程抢占输出资源, 致使打印混乱的示例: import os, time, random from multiprocessing import Process def work(n): print("%s:%s is running" % (n, os.getpid())) time.sleep(random.randint(1,3)) print("%s:%s is done" % (n, os.getpid())) if __name__ == '__main__': for i in range(5): p = Process(target = work, args = (i,)) p.start() # 3:6716 is running # 4:10220 is running # 0:5524 is running # 1:3164 is running # 2:10036 is running # 3:6716 is done # 4:10220 is done # 1:3164 is done # 0:5524 is done # 2:10036 is done 两个问题: 一. 每一个进程中work函数的第一个打印就不是按照咱们for循环的顺序来打印的 二. 每一个work都要两个打印, 但时第一个打印的顺序是3-4-0-1-2, 而第二个打印的顺序是3-4-1-0-2, 说明咱们一个进程中的程序顺序都乱了 第二个问题能够经过枷锁来解决, 第一个问题是无法解决的, 由于进程开到了内核, 由操做系统来决定进程的调度, 没法控制 (2). 加锁, 由并发改成了串行 import os, time from multiprocessing import Process, Lock def work(n, l): # 加锁, 保证每一次只有一个进程在执行锁里面的程序, 这一段程序对于全部写上这个锁的进程, 你们都变成了串行 lock.acquire() print("%s:%s is running" % (n, os.getpid())) time.sleep(1) print("%s:%s is done" % (n, os.getpid())) # 解锁, 解锁以后其余进程才能去执行本身的程序 lock.release() if __name_ == "__main__": lock = Lock() for i in range(5): p = Process(target=work, args=(i, l)) p.start() # 2:4032 is running # 2:4032 is done # 0:8444 is running # 0:8444 is done # 4:2872 is running # 4:2872 is done # 3:7480 is running # 3:7480 is done # 1:5196 is running # 1:5196 is done 结果分析:(本身去屡次运行一下,看看结果,我拿出其中一个结果来看)经过结果咱们能够看出,多进程刚开始去执行的时候,每次运行,首先打印出来哪一个进程的程序是不固定的,可是咱们解决了上面打印混乱示例代码的第二个问题,那就是同一个进程中的两次打印都是先完成的,而后才切换到下一个进程去,打印下一个进程中的两个打印结果,说明咱们控制住了同一进程中的代码执行顺序,若是涉及到多个进程去操做同一个数据或者文件的时候,就不担忧数据算错或者文件中的内容写入混乱了。 上面这种状况虽然使用加锁的形式实现了顺序的执行, 可是程序又从新变成串行了, 这样确实会浪费了时间, 可是却保证了数据的安全 (3). 模拟抢票 import json, time from multiprocessing import Process, Lock def check(n): ticket_dic = json.load(open("ticketinfo.json", "r", encoding="utf-8")) print(ticket_dic) print("%s查看了余票数, 尚有余票%s张" % (n, ticket_dic["余票"])) def buy(n): ticket_dic = json.load(open("ticketinfo.json", "r", encoding="utf-8")) if ticket_dic["余票"]>0: time.sleep(1) ticket_dic["余票"] -= 1 json.dump(ticket_dic, open("ticketinfo.json", "w", encoding="utf-8"), ensure_ascii=False) print("%s购票成功" % n) else: print("没票了") def task(n, lock): # def task(n): check(n) lock.acquire() buy(n) lock.release() if __name__ == '__main__': lock = Lock() for i in range(5): p = Process(target=task, args=(i, lock)) # p = Process(target=task, args=(i, )) p.start() # {'余票': 2} # 1查看了余票数, 尚有余票2张 # {'余票': 2} # 2查看了余票数, 尚有余票2张 # {'余票': 2} # 0查看了余票数, 尚有余票2张 # {'余票': 2} # 3查看了余票数, 尚有余票2张 # {'余票': 2} # 4查看了余票数, 尚有余票2张 # 1购票成功 # 2购票成功 # 没票了 # 没票了 # 没票了 进程锁总结: 加锁能够保证多个进程修改同一块数据时, 同一时间只能有一个任务能够进行修改, 即串行的修改, 没错, 速度是慢了, 但保证了数据安全. 虽然能够用文件共享数据实现进程间通讯, 但问题是: 1效率低(共享数据基于文件, 而文件是硬盘上的数据). 2须要本身加锁处理 所以咱们最好找一种解决方案可以兼顾: 1效率高(多个进程共享一块内存的数据) 2帮咱们处理好问题, 这就是multiprocessing模块为咱们提供的基于消息的IPC通讯机制: 队列和管道 队列和管道都是将数据存放于内存中 队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来, 咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。 IPC通讯机制(了解):IPC是intent-Process Communication的缩写,含义为进程间通讯或者跨进程通讯,是指两个进程之间进行数据交换的过程。IPC不是某个系统所独有的,任何一个操做系统都须要有相应的IPC机制, 好比Windows上能够经过剪贴板、管道和邮槽等来进行进程间通讯,而Linux上能够经过命名共享内容、信号量等来进行进程间通讯。Android它也有本身的进程间通讯方式,Android建构在Linux基础上,继承了一 部分Linux的通讯方式。 5. 队列 进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。队列就像一个特殊的列表,可是能够设置固定长度,而且从前面插入数据,从后面取出数据,先进先出。 Queue([maxsize]) 建立共享的进程队列。 参数 :maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。 底层队列使用管道和锁实现。 (1). 方法介绍 q = Queue([maxsize]) 建立共享的进程队列。maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还须要运行支持线程以便队列中的数据传输到底层管道中。 Queue的实例q具备如下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一个项目。若是q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 若是设置为False,将引起Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。若是在制定的时间间隔内没有项目变为可用,将引起Queue.Empty异常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 将item放入队列。若是队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。若是设置为False,将引起Queue.Full 异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引起Queue.Full异常。 q.qsize() 返回队列中目前项目的正确数量。此函数的结果并不可靠,由于在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引起NotImplementedError异常。 q.empty() 若是调用此方法时 q为空,返回True。若是其余进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目 q.full() 若是q已满,返回为True. 因为线程的存在,结果也多是不可靠的(参考q.empty()方法)。。 q.close() 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,若是某个使用者正被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误。 q.cancel_join_thread() 不会再进程退出时自动链接后台线程。这能够防止join_thread()方法阻塞。 q.join_thread() 链接队列的后台线程。此方法用于在调用q.close()方法后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread()方法能够禁止这种行为。 (2). 队列是进程安全的, 同一时间只能一个进程拿到队列中的数据 例子: 批量生产输入放入队列, 再批量的获取结果 import os, time import multiprocessing # 向queue中输入数据的函数 def inputQ(queue): info = str(os.getpid()) + "(put):" + str(time.asctime()) queue.put(info) # 向queue中输出数据的函数 def outputQ(queue): info = queue.get() print("%s%s %s" % (str(os.getpid()), "(get):", info)) if __name__ == '__main__': # windows下, 若是开启进程较多的话, 程序会崩溃, 为了防止这个问题, 使用freeze_support()方法来解决 multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 输入进程 for i in range(10): process = multiprocessing.Process(target=inputQ, args=(queue,)) time.sleep(0.2) process.start() record1.append(process) # 输出进程 for i in range(10): process = multiprocessing.Process(target=outputQ, args=(queue,)) process.start() record2.append(process) # for p in record1: # p.join() [pp.join() for pp in record1] # for p in record2: # p.join() [pp.join() for pp in record2] # 6248(get): 200(put):Wed Jan 9 23:05:38 2019 # 1568(get): 11692(put):Wed Jan 9 23:05:38 2019 # 9292(get): 6736(put):Wed Jan 9 23:05:38 2019 # 3452(get): 12136(put):Wed Jan 9 23:05:38 2019 # 6676(get): 3400(put):Wed Jan 9 23:05:39 2019 # 372(get): 2904(put):Wed Jan 9 23:05:39 2019 # 1396(get): 6352(put):Wed Jan 9 23:05:39 2019 # 1532(get): 4156(put):Wed Jan 9 23:05:39 2019 # 6868(get): 9528(put):Wed Jan 9 23:05:40 2019 # 10832(get): 8336(put):Wed Jan 9 23:05:41 2019 (3). 生产者消费者模型 在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。 为何要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。 什么是生产者消费者模式 生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力,而且我能够根据生产速度和消费速度来均衡一下多少个生产者能够为多少个消费者提供足够的服务,就能够开多进程等等,而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据。 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主') (3). 生产者消费者模型总结 #程序中有两类角色 一类负责生产数据(生产者) 一类负责处理数据(消费者) #引入生产者消费者模型为了解决的问题是: 平衡生产者与消费者之间的工做能力,从而提升程序总体处理数据的速度 #如何实现: 生产者队列<——>消费者 #生产者消费者模型实现类程序的解耦和 (4). 经过上面基于队列的生产者消费者代码示例,咱们发现一个问题:主进程永远不会结束,缘由是:生产者p在生产完后就结束了,可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。 解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(5): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) q.put(None) #在本身的子进程的最后加入一个结束信号 if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主') 注意:结束信号None,不必定要由生产者发,主进程里一样能够发,但主进程须要等生产者结束后才应该发送该信号 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() p1.join() #等待生产者进程结束 q.put(None) #发送结束信号 print('主') (5). 但上述解决方式,在有多个生产者和多个消费者时,因为队列咱们说了是进程安全的,我一个进程拿走告终束信号,另一个进程就拿不到了,还须要多发送一个结束信号,有几个取数据的进程就要发送几个结束信号,咱们则须要用一个很low的方式去解决 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(name,q): for i in range(2): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨头',q)) p3=Process(target=producer,args=('泔水',q)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) #开始 p1.start() p2.start() p3.start() c1.start() p1.join() #必须保证生产者所有生产完毕,才应该发送结束信号 p2.join() p3.join() q.put(None) #有几个消费者就应该发送几回结束信号None q.put(None) #发送结束信号 print('主') (6). 其实咱们的思路无非是发送结束信号而已,有另一种队列提供了这种机制 #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 #参数介绍: maxsize是队列中容许最大项数,省略则无大小限制。   #方法介绍: JoinableQueue的实例p除了与Queue对象相同的方法以外还具备: q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常 q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止,也就是队列中的数据所有被get拿走了。 from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() # time.sleep(random.randint(1,3)) time.sleep(random.random()) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()发送一次信号,证实一个数据已经被取走并执行完了 def producer(name,q): for i in range(10): # time.sleep(random.randint(1,3)) time.sleep(random.random()) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) print('%s生产结束'%name) q.join() #生产完毕,使用此方法进行阻塞,直到队列中全部项目均被处理。 print('%s生产结束~~~~~~'%name) if __name__ == '__main__': q=JoinableQueue() #生产者们:即厨师们 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨头',q)) p3=Process(target=producer,args=('泔水',q)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True #若是不加守护,那么主进程结束不了,可是加了守护以后,必须确保生产者的内容生产完而且被处理完了,全部必须还要在主进程给生产者设置join,才能确保生产者生产的任务被执行完了,而且可以确保守护进程在全部任务执行完成以后才随着主进程的结束而结束。 c2.daemon=True #开始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() #我要确保你的生产者进程结束了,生产者进程的结束标志着你生产的全部的人任务都已经被处理完了 p2.join() p3.join() print('主') # 主进程等--->p1,p2,p3等---->c1,c2 # p1,p2,p3结束了,证实c1,c2确定全都收完了p1,p2,p3发到队列的数据 # 于是c1,c2也没有存在的价值了,不须要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,因此设置成守护进程就能够了。python

相关文章
相关标签/搜索