复制代码 1 import os 2 import time 3 from multiprocessing import Process 4 5 6 def f(name): 7 print('in f', os.getpid(), os.getppid()) 8 print('i am is son process', name) 9 10 11 if __name__ == '__main__': 12 p = Process(target=f, args=('小青',)) 13 p.start() # start不是运行一个程序,而是调用操做系统的命令,要建立子进程 14 15 print('我是主程序……') 》》》结果: 我是主程序…… in f 6016 12312 i am is son process 小青 结论:在另外一个地方开辟内存空间,执行f函数,主进程不会阻塞等待
1 def f(args): 2 print('in func 2', args, os.getpid(), os.getppid()) 3 4 5 if __name__ == '__main__': 6 print('我在主进程中……') 7 p1 = Process(target=f, args=(666,)) 8 p1.start() 9 p2 = Process(target=f, args=(777,)) 10 p2.start() 11 print('我会在子进程前运行……,由于我和他们是隔离的,不会等他们') 12 13 ''' 14 结果: 15 我在主进程中…… 16 我会在子进程前运行……,由于我和他们是隔离的,不会等他们 17 in func 2 666 8144 7660 18 in func 2 777 10820 7660 19 '''
1 def fu(num): 2 print("我是进程 :%d " % num) 3 4 5 if __name__ == '__main__': 6 print('in main', os.getpid(), os.getppid()) 7 for i in range(10): 8 Process(target=fu, args=(i,)).start() 9 print('main 66666') 10 """ 11 结果: 12 in main 11984 2064 13 main 66666 14 我是进程 :1 15 我是进程 :0 16 我是进程 :8 17 我是进程 :7 18 我是进程 :4 19 我是进程 :2 20 我是进程 :9 21 我是进程 :3 22 我是进程 :6 23 我是进程 :5 24 p.start()并无当即执行,而是进入就绪队列,等带cpu调度,因此不是有序的 25 """
1 def g(args): 2 print("in g", args) 3 4 5 if __name__ == '__main__': 6 print('in main') 7 p = Process(target=g, args=(888,)) 8 p.start() 9 p.join() 10 print('i am in main process') 11 ''' 12 in main 13 in g 888 14 i am in main process 15 结:join老是等子进程执行完毕后再执行接下来的代码 16 '''
1 def pro(i): 2 print('in func', i, os.getpid(), os.getppid()) 3 4 5 if __name__ == '__main__': 6 print('in main', os.getpid(),os.getppid()) 7 p_list = [] 8 for i in range(10): 9 p = Process(target=pro, args=(i,)) 10 p.start() # 不是运行一个程序,而是调用操做系统命令,要建立子进程,等待操做系统做业,非阻塞 11 p_list.append(p) 12 print(p_list) 13 for p in p_list: # 遍历每一个子进程,每一个join一下,若是该子进程已经接收,join失效至关于pass,遍历完成就能保证每一个子进程都结束了 14 p.join() # 阻塞,直到p这个子进程执行完毕以后再继续执行 15 print('主进程……') 16 ''' 17 in main 1480 2064 18 [<Process(Process-1, started)>, <Process(Process-2, started)>, <Process(Process-3, started)>, <Process(Process-4, started)>, <Process(Process-5, started)>, <Process(Process-6, started)>, <Process(Process-7, started)>, <Process(Process-8, started)>, <Process(Process-9, started)>, <Process(Process-10, started)>] 19 in func 3 6108 1480 20 in func 7 13756 1480 21 in func 5 12548 1480 22 in func 8 12116 1480 23 in func 4 10948 1480 24 in func 6 11744 1480 25 in func 9 11244 1480 26 in func 1 3968 1480 27 in func 2 9412 1480 28 in func 0 14024 1480 29 主进程…… 30 '''
1 # p.is_alive方法:查看子进程生存状态 2 # p.terminate() 强制结束子进程--非阻塞 3 def gro(i): 4 time.sleep(1) 5 print('in func', i, os.getpid(), os.getppid()) 6 7 8 if __name__ == '__main__': 9 print("in main") 10 p1 = Process(target=gro, args=(1,)) 11 p1.start() 12 # time.sleep(2) # 若是等待一下子,就会执行函数,若是不等,就无论操做系统去建子进程,而直接执行后面的代码,因此可能比建立子进程前就执行了 13 print(p1.is_alive()) # 检测子进程是否还在执行任务 14 p1.terminate() # 强制结束子进程,非阻塞,不会等待状态改变,会立刻执行后面代码 15 print(p1.is_alive()) 16 print('主进程的代码执行结束了……') 17 ''' 18 in main 19 True 20 True 21 主进程的代码执行结束了…… 22 23 结:由于直接执行,主进程执行快些,子进程函数不会执行 24 '''
1 class Myprocess(Process): 2 def __init__(self, name): 3 super().__init__(self) # 需继承父类的init方法 4 self.name = name # 添加须要本身的属性 5 6 def run(self): 7 print(self.name) # 只有重写run方法才能将参数传入 8 print(os.getppid(), os.getpid()) 9 10 11 if __name__ == '__main__': 12 p = Myprocess('小强') 13 p.start()
1 from multiprocessing import Process 2 3 n = 100 4 5 6 def func(): 7 global n 8 n = n - 1 9 return 111 10 11 12 if __name__ == '__main__': 13 n_l = [] 14 for i in range(100): 15 p = Process(target=func) 16 p.start() 17 n_l.append(p) 18 for p in n_l: p.join() 19 print(n) 20 21 结果为:100 22 23 总结:说明子进程没法改变主进程的全局变量,本质是没法自由通讯,但子进程中的n确定减小了,只是无法拿出来
1 from multiprocessing import Process 2 import time 3 def func1(): 4 print('begin') 5 time.sleep(3) 6 print('wawww') 7 8 # if __name__ == '__main__': 9 # p = Process(target=func1) 10 # # p.daemon = True 11 # p.start() 12 # time.sleep(1) 13 # print('in main') 14 ''' 15 结果: 16 begin 17 in main 18 19 结论:守护进程随着主进程结束而结束,那怕守护进程任务没有执行完毕 20 ''' 21 22 def f1(): 23 print('begin fun1') 24 time.sleep(3) 25 print('baidu') 26 27 def f2(): 28 while True: 29 print('in f2') 30 time.sleep(0.5) 31 32 if __name__ == '__main__': 33 Process(target=f1,).start() 34 p = Process(target=f2) 35 p.daemon = True 36 # 守护进程的属性,默认是False,若是设置成True,就表示设置这个子进程为一个守护进程 37 # 设置守护进程的操做应该在开启子进程以前 38 p.start() 39 time.sleep(1) 40 print('in main') # 主进程in main执行完后,守护进程就会结束,但主进程并无结束而是等另外一个子进程结束后才结束 41 42 43 # 设置成守护进程以后 会有什么效果呢? 44 # 守护进程会在主进程的代码执行完毕以后直接结束,不管守护进程是否执行完毕 45 46 # 应用 47 # 报活 主进程还活着 48 # 100台机器 100个进程 10000进程 49 # 应用是否在正常工做 - 任务管理器来查看 50 # 守护进程如何向监测机制报活???send/写数据库 51 # 为何要用守护进程来报活呢?为何不用主进程来工做呢??? 52 # 守护进程报活几乎不占用CPU,也不须要操做系统去调度 53 # 主进程不能严格的每60s就发送一条信息
1 import json 2 import time 3 from multiprocessing import Lock 4 from multiprocessing import Process 5 6 # 锁 7 lock = Lock() # 创造了一把锁 8 lock.acquire() # 获取了这把锁的钥匙 9 lock.release() # 归还这把钥匙,其余进程就能够拿锁了 10 11 12 # 抢票的故事 13 # 需求:每一个人都能查看余票、买相同车次票同一刻只能一人买完,另外一人才能买 14 15 16 def search(i): 17 with open('db', encoding='utf-8') as f: 18 count_dic = json.load(f) 19 time.sleep(0.2) # 模拟网络延迟 20 print('person %s 余票:%s 张' % (i, count_dic.get('count'))) 21 return count_dic.get('count'), count_dic # 返回余票数,及字典 22 23 24 def buy(i): 25 count, count_dict = search(i) 26 if count > 0: 27 count_dict['count'] -= 1 # 有票就能够买 28 print('person %s 买票成功'% i) 29 time.sleep(2) 30 with open('db', 'w', encoding='utf-8') as f: 31 json.dump(count_dict, f) # 更改余票额度 32 33 34 def task(i, lock): 35 search(i) 36 lock.acquire() # 若是以前已经被acquire了 且 没有被release 那么进程会在这里阻塞 37 buy(i) 38 lock.release() 39 40 41 if __name__ == '__main__': 42 lock = Lock() 43 for i in range(1, 11): 44 Process(target=task, args=(i, lock)).start() 45 46 # 当多个进程共享一段数据的时候,数据会出现不安全的现象, 47 # 须要加锁来维护数据的安全性 48 49 ''' 50 D:\install\Python36\python.exe D:/install/project/七、并发编程/三、锁.py 51 person 6 余票:5 张 52 person 5 余票:5 张 53 person 8 余票:5 张 54 person 2 余票:5 张 55 person 4 余票:5 张 56 person 10 余票:5 张 57 person 1 余票:5 张 58 person 9 余票:5 张 59 person 3 余票:5 张 60 person 7 余票:5 张 61 person 6 余票:5 张 62 person 6 买票成功 63 person 5 余票:4 张 64 person 5 买票成功 65 person 8 余票:3 张 66 person 8 买票成功 67 person 2 余票:2 张 68 person 2 买票成功 69 person 4 余票:1 张 70 person 4 买票成功 71 person 10 余票:0 张 72 person 1 余票:0 张 73 person 9 余票:0 张 74 person 3 余票:0 张 75 person 7 余票:0 张 76 77 Process finished with exit code 0 78 '''
注:进程间的数据交互,本质也用到了socket通讯,不过都是本地的,基于文件的,能够经过将py名写成socket来看报错得知。node
1 #加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
2 虽然能够用文件共享数据实现进程间通讯,但问题是: 3 1.效率低(共享数据基于文件,而文件是硬盘上的数据) 4 2.须要本身加锁处理 5
6 #所以咱们最好找寻一种解决方案可以兼顾:一、效率高(多个进程共享一块内存的数据)二、帮咱们处理好锁问题。这就是mutiprocessing模块为咱们提供的基于消息的IPC通讯机制:队列和管道。
7 队列和管道都是将数据存放于内存中 8 队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来, 9 咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。
1 互斥锁同时只容许一个线程更改数据,而信号量Semaphore是同时容许必定数量的线程更改数据 。 2 假设商场里有4个迷你唱吧,因此同时能够进去4我的,若是来了第五我的就要在外面等待,等到有人出来才能再进去玩。 3 实现: 4 信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。 5 信号量与进程池的概念很像,可是要区分开,信号量涉及到加锁的概念 6 7 8 import time, random 9 from multiprocessing import Semaphore, Process 10 11 # ktv 只有4个房间,即同时只能四我的进去,其余人必须等其中的人出来才能进去 12 # 13 # sem = Semaphore(4) # 设置信号量个数,并发数 14 # sem.acquire() 15 # print('进去1我的,关门阻塞中') 16 # sem.acquire() 17 # print('进去第2我的,关门阻塞中') 18 # sem.acquire() 19 # print('进去第3我的,关门阻塞中') 20 # sem.acquire() 21 # print('进去第4我的,关门阻塞中') 22 # sem.release() # 必须归还一把,才能继续下面的代码,否则一直阻塞中 23 # sem.acquire() 24 # print(6666) 25 # sem.release() 26 ''' 27 D:\install\Python36\python.exe D:/install/project/七、并发编程/四、信号量.py 28 进去1我的,关门阻塞中 29 进去第2我的,关门阻塞中 30 进去第3我的,关门阻塞中 31 进去第4我的,关门阻塞中 32 6666 33 34 Process finished with exit code 0 35 ''' 36 37 38 def ktv(num, sem): 39 sem.acquire() 40 print('person %s 进入了ktv' % num) 41 time.sleep(random.randint(1, 4)) 42 print('person %s 进出了ktv' % num) 43 sem.release() 44 45 46 if __name__ == '__main__': 47 sem = Semaphore(4) 48 for i in range(10): 49 Process(target=ktv, args=(i, sem)).start() 50 51 ''' 52 最开始是4个同时进入,以后又人出,才能有人进 53 D:\install\Python36\python.exe D:/install/project/七、并发编程/四、信号量.py 54 person 2 进入了ktv 55 person 8 进入了ktv 56 person 9 进入了ktv 57 person 7 进入了ktv 58 person 2 进出了ktv 59 person 6 进入了ktv 60 person 7 进出了ktv 61 person 5 进入了ktv 62 person 8 进出了ktv 63 person 1 进入了ktv 64 person 9 进出了ktv 65 person 0 进入了ktv 66 person 1 进出了ktv 67 person 4 进入了ktv 68 person 6 进出了ktv 69 person 3 进入了ktv 70 person 0 进出了ktv 71 person 5 进出了ktv 72 person 4 进出了ktv 73 person 3 进出了ktv 74 75 Process finished with exit code 0 76 '''
事件:multiprocessing.Eventpython
定义:全局定义了一个‘flag’,若是标志为False,当程序执行event.wait方法时就会阻塞,若是为True,那么event.wait方法时便再也不阻塞。redis
1 from multiprocessing import Process, Event 2 import time, random 3 4 5 def car(e, n): 6 while True: 7 if not e.is_set(): # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色 8 print('\033[31m红灯亮\033[0m,car%s等着' % n) 9 e.wait() # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色 10 print('\033[32m车%s 看见绿灯亮了\033[0m' % n) 11 time.sleep(random.randint(3, 6)) 12 if not e.is_set(): #若是is_set()的值是Flase,也就是红灯,仍然回到while语句开始 13 continue 14 print('车开远了,car', n) 15 break 16 17 18 def police_car(e, n): 19 while True: 20 if not e.is_set():# 进程刚开启,is_set()的值是Flase,模拟信号灯为红色 21 print('\033[31m红灯亮\033[0m,car%s等着' % n) 22 e.wait(0.1) # 阻塞,等待设置等待时间,等待0.1s以后没有等到绿灯就闯红灯走了 23 if not e.is_set(): 24 print('\033[33m红灯,警车先走\033[0m,car %s' % n) 25 else: 26 print('\033[33;46m绿灯,警车走\033[0m,car %s' % n) 27 break 28 29 30 31 def traffic_lights(e, inverval): 32 while True: 33 time.sleep(inverval) 34 if e.is_set(): 35 print('######', e.is_set()) 36 e.clear() # ---->将is_set()的值设置为False 37 else: 38 e.set() # ---->将is_set()的值设置为True 39 print('***********',e.is_set()) 40 41 42 if __name__ == '__main__': 43 e = Event() 44 for i in range(10): 45 p=Process(target=car,args=(e,i,)) # 建立是个进程控制10辆车 46 p.start() 47 48 for i in range(5): 49 p = Process(target=police_car, args=(e, i,)) # 建立5个进程控制5辆警车 50 p.start() 51 t = Process(target=traffic_lights, args=(e, 10)) # 建立一个进程控制红绿灯 52 t.start() 53 54 print('============》')
from queue import Queue # 队列 先进先出FIFO,有序 # 应用:维护秩序的时候用的比较多,买票,抢票 q = Queue(5) # 设置队列的长度,即元素个数,即只能放入5个元素 ret = q.qsize() # 得到当前队列中的元素个数,此方法不许,在多进程中,此刻获取结果时,也许其余进程向里面加入了元素 q.put(1111) # 向队列中放入对象,若是队列已满,则阻塞等待,一直到空间可用为止 ''' 参数: item:项目、元素、对象 block:默认True,队列满一直等待阻塞,False则为不阻塞,满则直接主动自定义报错 timeout:阻塞等待的时间,时间到了,还不能放,则报错Queue.Empty异常 ''' q.put_nowait(2222) # 放入元素,满了,不等直接报错 q.get() # 返回q即队列中的元素,队列中为空,则阻塞一直等待有值为止,通向可设置timeout q.get_nowait() # 队列为空时,直接报错 q.empty() # 判断是否为空,空则返回True,一样在多进程中不许 q.full() # 判断是否满了,满了则返回True,多进程中不许,主要是进程是异步操做
# 例子1 一进程放 一进程取 from multiprocessing import Queue, Process def con(q): print(q.get()) # 从队列中拿,没有直到等到有,因此那么它比其余进程快,最后也能拿到数据 def pro(q): q.put(112) # 向队列中放入112 if __name__ == '__main__': q = Queue() p = Process(target=con, args=(q,)) p.start() p = Process(target=pro, args=(q,)) p.start() print('我在主进程中……') ''' 看出队列能够实现进程间的通讯 ''' # 主放, 子取 from multiprocessing import Queue, Process def f(q): print(q.get()) if __name__ == '__main__': q = Queue() Process(target=f, args=(q,)).start() # create son_process q.put(666) '''看出主进程可和子进程通讯'''
1 import time 2 import random 3 from multiprocessing import Process, Queue 4 5 6 def consumer(q,name): 7 while 1: 8 food = q.get() 9 print('%s 吃了 %s ' % (name, food)) 10 time.sleep(random.random()) 11 12 13 def producer(q,name,food,n=10): 14 for i in range(1, n): # 定义生产10个食物 15 time.sleep(random.random()) # 模拟生产慢,消费快 16 fd = food + str(i) 17 print('%s 生产了 %s' %(name,fd)) 18 q.put(fd) 19 20 if __name__ == '__main__': 21 q = Queue(10) 22 for person in range(6): # 定义消费者多 23 Process(target=consumer, args=(q, 'person'+ str(person))).start() 24 Process(target=producer, args=(q,'小强','米饭')).start() 25 Process(target=producer, args=(q,'小东','面条')).start() 26 Process(target=producer, args=(q,'小hua','面条')).start() 27 Process(target=producer, args=(q,'小cai','面条')).start()
消费者一直在等着拿数据,生产者生产完了就结束了,生产者须要告诉消费者生产完了才合理,即向队列中放入stop信号
1 def consumer(q,name): 2 while 1: 3 food = q.get() 4 if food == 'stop':break 5 print('%s 吃了 %s ' % (name, food)) 6 time.sleep(random.random()) 7 8 9 def producer(q,name,food,n=10): 10 for i in range(1, n): # 定义生产10个食物 11 time.sleep(random.random()) # 模拟生产慢,消费快 12 fd = food + str(i) 13 print('%s 生产了 %s' %(name,fd)) 14 q.put(fd) 15 q.put('stop') 16 17 if __name__ == '__main__': 18 q = Queue(10) 19 for person in range(4): # 定义消费者多 20 Process(target=consumer, args=(q, 'person'+ str(person))).start() 21 Process(target=producer, args=(q,'小强','米饭')).start() 22 Process(target=producer, args=(q,'小东','面条')).start() 23 Process(target=producer, args=(q,'小hua','面条')).start() 24 Process(target=producer, args=(q,'小cai','面条')).start() 25 26 # 生产者加入结束信号,消费者收到后就结束,不存在还有进程在使用数据,由于队列是先进先出的 27 28 # 且有多少个消费者就须要发多少个stop信号,否则就会致使有的的进程还在等待中
1 from multiprocessing import Queue, Process 2 import random, time 3 4 5 def consumer(q, name): 6 while 1: 7 food = q.get() 8 if food == 'stop': break 9 print('%s 吃了 %s ' % (name, food)) 10 time.sleep(random.random()) 11 12 13 def producer(q, name, food, n=10): 14 for i in range(1, n): # 定义生产10个食物 15 time.sleep(random.random()) # 模拟生产慢,消费快 16 fd = food + str(i) 17 print('%s 生产了 %s' % (name, fd)) 18 q.put(fd) 19 20 21 if __name__ == '__main__': 22 q = Queue(10) 23 for person in range(4): # 定义消费者4个 24 Process(target=consumer, args=(q, 'person'+ str(person))).start() 25 p1 = Process(target=producer, args=(q,'小强','米饭')) 26 p1.start() 27 p2 = Process(target=producer, args=(q,'小东','面条')) 28 p2.start() 29 p1.join() # 保证p生产者结束 30 p2.join() 31 q.put('stop') # 必须得发四个stop信号 32 q.put('stop') 33 q.put('stop') 34 q.put('stop')
1 JoinableQueue的实例p除了与Queue对象相同的方法以外,还具备如下方法: 2 3 q.task_done() 4 使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。若是调用此方法的次数大于从队列中删除的项目数量,将引起ValueError异常。 5 6 q.join() 7 生产者将使用此方法进行阻塞,直到队列中全部项目均被处理。阻塞将持续到为队列中的每一个项目均调用q.task_done()方法为止。 8 下面的例子说明如何创建永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。
1 from multiprocessing import Process,JoinableQueue 2 import time,random,os 3 def consumer(q): 4 while True: 5 res=q.get() 6 time.sleep(random.randint(1,3)) 7 print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) 8 q.task_done() #向q.join()发送一次信号,证实一个数据已经被取走了 9 10 def producer(name,q): 11 for i in range(10): 12 time.sleep(random.randint(1,3)) 13 res='%s%s' %(name,i) 14 q.put(res) 15 print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) 16 q.join() #生产完毕,使用此方法进行阻塞,直到队列中全部项目均被处理。 17 18 19 if __name__ == '__main__': 20 q=JoinableQueue() 21 #生产者们:即厨师们 22 p1=Process(target=producer,args=('包子',q)) 23 p2=Process(target=producer,args=('骨头',q)) 24 p3=Process(target=producer,args=('泔水',q)) 25 26 #消费者们:即吃货们 27 c1=Process(target=consumer,args=(q,)) 28 c2=Process(target=consumer,args=(q,)) 29 c1.daemon=True 30 c2.daemon=True 31 32 #开始 33 p_l=[p1,p2,p3,c1,c2] 34 for p in p_l: 35 p.start() 36 37 p1.join() 38 p2.join() 39 p3.join() 40 print('主') 41 42 #主进程等--->p1,p2,p3等---->c1,c2 43 #p1,p2,p3结束了,证实c1,c2确定全都收完了p1,p2,p3发到队列的数据 44 #于是c1,c2也没有存在的价值了,不须要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,因此设置成守护进程就能够了。
注:栈 ==> 先进后出----算法算法
管道:multiprocessing.Pipe
数据库
答:不安全是因多个进程可能会在同一刻同时去取同一个数据,也可能同一刻拿走一个数据,位置就空了,别的应该放在空位置上,
而因同一刻时,另外一个进程会认为该位置上有数据,就放在后面了,这就会致使数据异常,解决方法就是在拿数据前加锁,拿完归还锁。django
1 from multiprocessing import Pipe, Process 2 3 4 # 管道 5 # 队列是基于管道实现的 6 # 队列 进程间数据安全的 7 # 管道 进程间数据不安全的 8 # 队列 = 管道 + 锁 9 10 # left, right = Pipe() 11 # print(right.recv()) 12 # (<multiprocessing.connection.PipeConnection object at 0x000002817A7FB128>, <multiprocessing.connection.PipeConnection object at 0x000002817A65C128>) 13 # 管道对象返回的是一个元组 14 15 16 # 全部的IPC通讯都是经过socket实现的 17 18 # 左边放,右边出,一样能够左收,右发,全双工模式 19 20 # 管道必须在建立进程前建立 21 def consumer(left, right): 22 left.close() # 消费者用右边接那么就把左边关闭 23 while 1: 24 try: 25 print(right.recv()) 26 except EOFError: # 再也接不到数据了,从而报错,才能退出 27 break 28 29 30 if __name__ == '__main__': 31 left, right = Pipe() 32 p = Process(target=consumer, args=(left, right)) 33 p.start() 34 right.close() # 用右边发送,那么左边就关闭 35 for i in range(1, 11): 36 left.send(3333) 37 left.close() # 不用了就关闭 38 39 # EOF异常的触发 40 # 在这一个进程中 若是不在用这个端点了,应该close 41 # 这一在recv的时候,若是其余端点都被关闭了,就可以知道不会在有新的消息传进来 42 # 此时就不会在这里阻塞等待,而是抛出一个EOFError 43 # * close并非关闭了整个管道,而是修改了操做系统对管道端点的引用计数的处理
进程之间数据共享
消息传递的并发是趋势
线程是经过线程集合,用消息队列来交换数据
进程间应尽可能避免通讯,由于可能不安全,想安全就必须加锁,加锁就会影响效率。
redis分布式、数据库解决进程之间数据共享问题
'''
进程间数据是独立的,能够借助于队列或管道实现通讯,两者都是基于消息传递的
虽然进程间数据独立,但能够经过Manager实现数据共享,事实上Manager的功能远不止于此
'''
# Manager模块
# 全部的数据类型 都可以进行数据共享
# 一部分都是不加锁 不支持数据进程安全
# 不安全的解决办法 加锁
1 from multiprocessing import Manager,Process,Lock 2 def work(d,lock): 3 with lock: 4 d['count'] -= 1 5 6 if __name__ == '__main__': 7 lock = Lock() 8 m = Manager() 9 dic = m.dict({'count':100}) 10 p_l = [] 11 for i in range(100): # 开了100进程 12 p = Process(target=work, args=(dic, lock)) 13 p_l.append(p) 14 p.start() 15 for p in p_l: 16 p.join() 17 print(dic) 18 ''' 19 结果:{'count':0} 20 ''' 21 22 # with as 的机制 23 # __enter__ 24 # __exit__
1 import os 2 import time 3 from multiprocessing import Pool 4 print(os.cpu_count()) # 获取cpu个数 5 6 7 def wahaha(): 8 time.sleep(1) 9 print(os.getpid()) 10 return True 11 12 13 if __name__ == '__main__': 14 p = Pool(5) # 进程池中进程数通常为cpu个数或者cpu+1,不要超过10个 15 for i in range(20): 16 # p.apply(func=wahaha) # 同步,通常不用,还不如一个进程去循环作,进程池有返回值,基于ipc通讯,本身能够经过q来通讯 17 p.apply_async(func=wahaha) # async 异步的 18 p.close() # 关闭进程池,进程池中的进程不工做了,让任务不能再继续提交了, 19 p.join() # 等待这个池中提交的任务都执行完,就结束
异步提交,不获取返回值编程
def wahaha(): time.sleep(1) print(os.getpid()) if __name__ == '__main__': p = Pool(5) # CPU的个数 或者 +1 ret_l = [] for i in range(20): ret = p.apply_async(func = wahaha) # async 异步的 ret_l.append(ret) p.close() # 关闭 并非进程池中的进程不工做了 # 而是关闭了进程池,让任务不能再继续提交了 p.join() # 等待这个池中提交的任务都执行完 # # 表示等待全部子进程中的代码都执行完 主进程才结束
异步提交,得到返回值,等待全部任务都执行完毕以后再统一获取结果json
1 # 异步提交,获取返回值,等待全部任务都执行完毕以后再统一获取结果 2 def wahaha(): 3 time.sleep(1) 4 print(os.getpid()) 5 return True 6 7 if __name__ == '__main__': 8 p = Pool(5) # CPU的个数 或者 +1 9 ret_l = [] 10 for i in range(20): 11 ret = p.apply_async(func = wahaha) # async 异步的 12 ret_l.append(ret) 13 p.close() # 关闭 不是进程池中的进程不工做了 14 # 而是关闭了进程池,让任务不能再继续提交了 15 p.join() # 等待这个池中提交的任务都执行完 16 for ret in ret_l: 17 print(ret.get())
异步提交,得到返回值,一个任务执行完毕以后就能够获取到一个结果(顺序是按照提交任务的顺序)【用在任务关联不大的时候】flask
1 def wahaha(): 2 time.sleep(1) 3 print(os.getpid()) 4 return True 5 6 if __name__ == '__main__': 7 p = Pool(5) # CPU的个数 或者 +1 8 ret_l = [] 9 for i in range(20): 10 ret = p.apply_async(func = wahaha) # async 异步的 11 ret_l.append(ret) 12 for ret in ret_l: 13 print(ret.get())
总结bootstrap
1 # 异步的 apply_async 2 # 1.若是是异步的提交任务,那么任务提交以后进程池和主进程也异步了, 3 #主进程不会自动等待进程池中的任务执行完毕 4 # 2.若是须要主进程等待,须要p.join 5 # 可是join的行为是依赖close 6 # 3.若是这个函数是有返回值的 7 # 也能够经过ret.get()来获取返回值 8 # 可是若是一边提交一遍获取返回值会让程序变成同步的 9 # 因此要想保留异步的效果,应该讲返回对象保存在列表里,全部任务提交完成以后再来取结果 10 # 这种方式也能够去掉join,来完成主进程的阻塞等待池中的任务执行完毕
进程池解决原生socket,同一时刻只能和一个客户端链接【这种方式的弊端是同时最多只能和进程池中的数量相同,其它用户等待】
1 import socket 2 from multiprocessing import Pool 3 4 5 def talk(conn): 6 try: 7 while 1: 8 msg = conn.recv(1024).decode('utf-8') 9 print(msg) 10 conn.send(b'hello') 11 finally: 12 conn.close() 13 14 15 if __name__ == '__main__': 16 sk = socket.socket() 17 sk.bind(('127.0.0.1', 9999)) 18 sk.listen() 19 pool = Pool(5) 20 try: 21 while 1: 22 conn, addr = sk.accept() 23 pool.apply_async(talk, args=(conn,)) 24 finally: 25 conn.close() 26 sk.close()
1 import socket 2 3 ip_port = ('127.0.0.1', 9999) 4 sk = socket.socket() 5 sk.connect(ip_port) 6 7 while 1: 8 msg = input('>>>>:').strip() 9 if len(msg) == 0: continue 10 sk.send(msg.encode('utf-8')) 11 content = sk.recv(1024).decode('utf-8') 12 print(content)
Pool中的回调函数callback
1 import os 2 import time 3 import random 4 from multiprocessing import Pool 5 6 # 异步提交,获取返回值,从头至尾一个任务执行完毕以后就能够获取到一个结果 7 def wahaha(num): 8 time.sleep(random.random()) 9 print('pid : ',os.getpid(),num) 10 return num 11 12 def back(arg): 13 print('call_back : ',os.getpid(),arg) 14 15 if __name__ == '__main__': 16 print('主进程',os.getpid()) 17 p = Pool(5) # CPU的个数 或者 +1 18 for i in range(20): 19 ret = p.apply_async(func = wahaha,args=(i,),callback=back) # async 异步的 20 p.close() 21 p.join() 22 23 # 回调函数 _ 在主进程中执行 24 # 在发起任务的时候 指定callback参数 25 # 在每一个进程执行完apply_async任务以后,返回值会直接做为参数传递给callback的函数,执行callback函数中的代码 26 27 28 # 北京 30min 30min +5min 5min + 5min 29 # 建设 20min 直接办 + 5min 5min 30 # 中国 1h 20min +5 25min + 5min 31 # 农业 2h 55min + 5min 55min + 5min 32 # 工商 15min 5min 15min + 5min 33 34 # 2h10min 35 # 2h05min
1 程序:是指令的集合,它是进程运行的静态描述文本 2 进程:是程序的一次执行活动,是动态概念,计算机中最小资源分配单位 3 线程:cpu调度的最小单位,从属于进程,任务的实际执行者
1 # 经过类Thread 2 import os 3 import time 4 from threading import Thread,get_ident 5 def func(name): 6 time.sleep(0.1) 7 print('线程:%s,该线程的进程id为:%s,线程id为:%s' %(name,os.getpid(),get_ident())) # get_ident()为类get_ident中的方法,做用是获取线程id 8 for i in range(10): 9 t = Thread(target=func, args=(i,)) 10 t.start() 11 12 》》》》结果: 13 线程:3,该线程的进程id为:14140,线程id为:13352 14 线程:0,该线程的进程id为:14140,线程id为:1672 15 线程:1,该线程的进程id为:14140,线程id为:15808 16 线程:2,该线程的进程id为:14140,线程id为:10136 17 线程:7,该线程的进程id为:14140,线程id为:4508 18 线程:6,该线程的进程id为:14140,线程id为:15068 19 线程:4,该线程的进程id为:14140,线程id为:13532 20 线程:5,该线程的进程id为:14140,线程id为:10836 21 线程:9,该线程的进程id为:14140,线程id为:9520 22 线程:8,该线程的进程id为:14140,线程id为:15568
1 # 自定义类建立多线程 【继承Thread类,同时重些run方法,如需添加本身的属性或者说加入参数继承父类init方法】 2 import os 3 from threading import Thread, get_ident 4 5 6 class Mythread(Thread): 7 def __init__(self, args): 8 super().__init__() 9 self.args = args 10 11 def run(self): 12 """ 13 执行函数 14 :return: 15 """ 16 print(self.args) # 就能够调用本身自定义的属性了 17 print('in thread 子线程Id:', get_ident(), '进程Id:', os.getpid()) 18 19 20 print("父进程python解释器:", os.getppid()) 21 print('进程即执行py文件:', os.getpid()) 22 print('主线程:', get_ident()) 23 t_obj = Mythread('china') 24 t_obj.start() 25 ''' 26 结果: 27 父进程python解释器: 1168 28 进程及执行py文件: 2724 29 主线程: 7468 30 china 31 in thread 子线程Id: 8180 进程Id: 2724 32 '''
1 '''效率:多线程开闭切开销远远小于进程隔了大几百倍''' 2 3 def func(a): 4 a += 1 5 6 7 if __name__ == '__main__': 8 start = time.time() 9 t_lis = [] 10 for i in range(50): 11 t = Thread(target=func, args=(i,)) 12 t.start() 13 t_lis.append(t) 14 for t in t_lis:t.join() 15 print('主线程') 16 print('时间:%s' % str(time.time() - start)) 17 18 start = time.time() 19 t_lis = [] 20 for i in range(50): 21 t = Process(target=func, args=(i,)) 22 t.start() 23 t_lis.append(t) 24 for t in t_lis: t.join() 25 print('主进程') 26 print('时间:%s' % str(time.time() - start)) 27 28 ''' 29 效率测试: 30 主线程 31 时间:0.008229732513427734 32 主进程 33 时间:5.307320833206177 34 '''
1 '''线程间数据共享''' 2 from threading import Thread 3 4 n = 100 # 全局变量,存在主进程内存空间中 5 6 7 def func(): 8 global n 9 n -= 1 10 print(n) 11 12 13 if __name__ == '__main__': 14 t_li = [] 15 for i in range(100): 16 t = Thread(target=func) # 始终在同一个内存空间中 17 t.start() 18 t_li.append(t) 19 for t in t_li: 20 t.join() 21 print("线程:", n) 22 ''' 23 结果: 24 n = 0 25 每一个线程中的n从99,开始递减 26 ''' 27 28 p_li = [] 29 for i in range(100): 30 p = Process(target=func,) # 分别建立100个独立的内存空间,且相互独立 31 p.start() 32 p_li.append(p) 33 for p in p_li: 34 p.join() 35 print('进程:',n) 36 ''' 37 结果: 38 n = 100 39 每一个进程中的n为99 40 '''
1 '''守护线程等待主线程结束而结束, 2 # 主线程结束,必须等全部非守护线程结束,才能结束 3 # 主线程结束,进程就结束了,进程必须保证全部非守护线程结束才行''' 4 import os 5 import time 6 from threading import Thread 7 8 def f1(): 9 print(True) 10 time.sleep(0.5) 11 print(os.getpid()) 12 13 def f2(): 14 print('in f2 start') 15 time.sleep(3) 16 print('in f2 end') 17 print(os.getpid()) 18 19 t = Thread(target=f1) 20 t.setDaemon(True) 21 t.start() 22 23 t2 = Thread(target=f2) 24 t2.start() 25 print('主线程',os.getpid()) 26 27 ''' 28 True 29 主线程 1440 30 in f2 start 31 1440 32 in f2 end 33 1440 34 '''
场景: 多我的围着桌子吃一盘面,必须拿到叉子和面才能吃 若是一我的拿到了叉子,另外一我的拿到了面,就不能吃,就会致使僵直在一块儿 import time from threading import Thread,Lock lock = Lock() noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s拿到了面' % name) fork_lock.acquire() print('%s拿到了叉子' % name) print('%s在吃面'%name) time.sleep(0.5) fork_lock.release() # 0.01 noodle_lock.release() # 0.01 def eat2(name): fork_lock.acquire() # 0.01 print('%s拿到了叉子' % name) # 0.01 noodle_lock.acquire() print('%s拿到了面' % name) print('%s在吃面'%name) time.sleep(0.5) noodle_lock.release() fork_lock.release() eat_lst = ['alex','wusir','太白','yuan'] for name in eat_lst: # 8个子线程 7个线程 3个线程eat1,4个线程eat2 Thread(target=eat1,args=(name,)).start() Thread(target=eat2,args=(name,)).start() 结果: china拿到了面 china拿到了叉子 china在吃面 china拿到了叉子 usa拿到了面 --------程序开始卡死
1 # 递归锁解决死锁问题 2 import time 3 from threading import Thread, RLock 4 5 lock = RLock() 6 7 8 def eat1(name): 9 lock.acquire() 10 print('%s拿到了面' % name) 11 lock.acquire() 12 print('%s拿到了叉子' % name) 13 print('%s在吃面' % name) 14 time.sleep(0.5) 15 lock.release() # 0.01 16 lock.release() # 0.01 17 18 19 def eat2(name): 20 lock.acquire() # 0.01 21 print('%s拿到了叉子' % name) # 0.01 22 lock.acquire() 23 print('%s拿到了面' % name) 24 print('%s在吃面' % name) 25 time.sleep(0.5) 26 lock.release() 27 lock.release() 28 29 30 eat_lst = ['china', 'beijing', 'shanghai', 'shenzhen'] 31 for name in eat_lst: # 8个子线程 7个线程 3个线程eat1,4个线程eat2 32 Thread(target=eat1, args=(name,)).start() 33 Thread(target=eat2, args=(name,)).start()
1 import time 2 from threading import Thread,Lock 3 lock = Lock() 4 def eat1(name): 5 lock.acquire() 6 print('%s拿到了面' % name) 7 print('%s拿到了叉子' % name) 8 print('%s在吃面'%name) 9 time.sleep(0.5) 10 lock.release() # 0.01 11 12 def eat2(name): 13 lock.acquire() # 0.01 14 print('%s拿到了叉子' % name) # 0.01 15 print('%s拿到了面' % name) 16 print('%s在吃面'%name) 17 time.sleep(0.5) 18 lock.release() 19 20 eat_lst = ['china', 'beijing', 'shanghai', 'shenzhen'] 21 for name in eat_lst: # 8个子线程 7个线程 3个线程eat1,4个线程eat2 22 Thread(target=eat1,args=(name,)).start() 23 Thread(target=eat2,args=(name,)).start() 24 25 26 》》》: 27 china拿到了面 28 china拿到了叉子 29 china在吃面 30 china拿到了叉子 31 china拿到了面 32 china在吃面 33 beijing拿到了面 34 beijing拿到了叉子 35 beijing在吃面 36 beijing拿到了叉子 37 beijing拿到了面 38 beijing在吃面 39 shanghai拿到了面 40 shanghai拿到了叉子 41 shanghai在吃面 42 shanghai拿到了叉子 43 shanghai拿到了面 44 shanghai在吃面 45 shenzhen拿到了面 46 shenzhen拿到了叉子 47 shenzhen在吃面 48 shenzhen拿到了叉子 49 shenzhen拿到了面 50 shenzhen在吃面
1 # 进程中建立信号量,与开启进程池效率对比 2 # 池效率高于信号量 3 def ktv1(sem, i): 4 sem.acquire() 5 i += 1 6 sem.release() 7 8 9 def ktv2(i): 10 i += 1 11 12 13 # process 14 if __name__ == '__main__': 15 sem = Semaphore(5) # 同时只执行5个任务 16 start_time = time.time() 17 p_list = [] 18 for i in range(20): # 开启20个任务 19 p = Process(target=ktv1, args=(sem, i)) # 开启进程20个 20 p.start() 21 p_list.append(p) 22 for p in p_list: p.join() 23 print('process_semaphore:', time.time() - start_time) 24 25 pool = Pool(5) # 开启进程池,同一时间执行5个任务 26 start_time1 = time.time() 27 pool_list = [] 28 for i in range(20): # 开启20个任务 29 ret = pool.apply_async(func=ktv2, args=(i,)) # 异步 30 pool_list.append(ret) 31 pool.close() # 关闭进程池,再也不受理任务 32 pool.join() # 等待全部进程池中的任务结束 33 print('process_pool:', time.time() - start_time1) 34 ''' 35 process_semaphore: 2.2986388206481934 36 process_pool: 0.5303816795349121 37 ''' 38 39 ========================================== 40 # thread 类中没有线程池 41 # 但concurrent.futures中有 42 from threading import Thread,Semaphore,currentThread 43 from concurrent.futures import ThreadPoolExecutor 44 def f(sem,i): 45 sem.acquire() 46 i += 1 47 # print("sem",currentThread().getName()) # 获取线程名 48 sem.release() 49 50 def f2(i): 51 i += 1 52 # print("pool",currentThread().getName()) 53 54 start = time.time() 55 t_sem = Semaphore(5) # 线程信号量, 同时5个任务 56 t_list = [] 57 for i in range(20): # 20个任务,开启20个线程 58 t = Thread(target=f, args=(t_sem, i)) 59 t.start() 60 t_list.append(t) 61 for t in t_list:t.join() 62 print("in thread sem:" , time.time() - start) 63 64 start = time.time() 65 t_pool = ThreadPoolExecutor(5) 66 67 for i in range(20): 68 ret = t_pool.submit(f2,i) 69 t_pool.shutdown() 70 end = time.time() 71 print("in thread pool:", end- start) 72 ''' 73 in thread: 0.00498652458190918 74 in thread pool: 0.001001596450805664 75 '''
1 '''事件:event,一个任务依赖另外一个任务的状态才进行下一步 2 wait 等待事件内部的信号变成True就不阻塞了 3 set 将标志改成True 4 clear 改为False 5 is_set 查看标志是否为True 6 ''' 7 # 数据库链接 8 import time 9 import random 10 from threading import Event,Thread 11 12 13 def check(e): 14 '''检测是否可以连通数据库,网络''' 15 print('正在检测两台机器之间的网络状况……') 16 time.sleep(random.randint(2,5)) 17 e.set() # 改为True,非阻塞 18 19 20 def connet_db(e): 21 print("status:", e.is_set()) 22 e.wait() 23 print("status:", e.is_set()) 24 print('链接数据库……') 25 print('链接数据库成功~~~') 26 27 # e = Event() 28 # Thread(target=connet_db, args=(e,)).start() 29 # Thread(target=check, args=(e,)).start() 30 ''' 31 status: False 32 正在检测两台机器之间的网络状况…… 33 status: True 34 链接数据库…… 35 链接数据库成功~~~ 36 ''' 37 38 39 def check(e): 40 '''检测是否可以连通数据库,网络''' 41 print('正在检测两台机器之间的网络状况……') 42 time.sleep(random.randint(2,5)) 43 e.set() # 改为True,非阻塞 44 45 def connet_db(e): 46 '''3次链接不上就退出''' 47 n = 0 48 while n < 3: 49 if e.is_set(): 50 break # 退出循环,执行链接库 51 else: 52 e.wait(timeout=0.5) 53 n += 1 54 if n == 3: 55 raise TimeoutError 56 print('链接数据库……') 57 print('链接数据库成功~~~') 58 59 60 e = Event() 61 Thread(target=connet_db, args=(e,)).start() 62 Thread(target=check, args=(e,)).start() 63 64 ''' 65 正在检测两台机器之间的网络状况…… 66 Exception in thread Thread-1: 67 Traceback (most recent call last): 68 File "D:\install\Python36\lib\threading.py", line 916, in _bootstrap_inner 69 self.run() 70 File "D:\install\Python36\lib\threading.py", line 864, in run 71 self._target(*self._args, **self._kwargs) 72 File "D:/install/project/七、并发编程/十一、threading_信号量.py", line 140, in connet_db 73 raise TimeoutError 74 TimeoutError 75 '''
1 # from multiprocessing import Queue,JoinableQueue # 进程IPC队列 2 from queue import Queue # 线程队列 先进先出的 3 from queue import LifoQueue # 后进先出的 4 #方法: put get put_nowait get_nowait full empty qsize 5 # 队列Queue 6 # 先进先出 7 # 自带锁 数据安全 8 # 栈 LifoQueue 9 # 后进先出 10 # 自带锁 数据安全 11 # lq = LifoQueue(5)
1 Python提供的Condition对象提供了对复杂线程同步问题的支持。 2 Condition被称为条件变量,除了提供与Lock相似的acquire和release方法外,还提供了wait和notify方法。 3 线程首先acquire一个条件变量,而后判断一些条件。 4 若是条件不知足则wait; 5 若是条件知足,进行一些处理改变条件后,经过notify方法通知其余线程,其余处于wait状态的线程接到通知后会从新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
1 from threading import Condition 2 # acquire 3 # release 4 # wait 阻塞 5 # notify 让wait解除阻塞的工具 6 # wait仍是notify在执行这两个方法的先后 必须执行acquire和release 7 from threading import Condition,Thread 8 def func(con,i): 9 con.acquire() 10 # 判断某条件 11 con.wait() 12 print('threading : ',i) 13 con.release() 14 15 con = Condition() 16 for i in range(20): 17 Thread(target=func,args=(con,i)).start() 18 con.acquire() 19 # 帮助wait的子线程处理某个数据直到知足条件 20 con.notify_all() 21 con.release() 22 while True: 23 num = int(input('num >>>')) 24 con.acquire() 25 con.notify(num) 26 con.release()
1 from threading import Timer 2 3 4 def func(): 5 print('执行我啦') 6 7 8 # interval 时间间隔 9 Timer(0.2, func).start() # 定时器 10 # 建立线程的时候,就规定它多久以后去执行
1 #1 介绍
2 concurrent.futures模块提供了高度封装的异步调用接口 3 ThreadPoolExecutor:线程池,提供异步调用 4 ProcessPoolExecutor: 进程池,提供异步调用 5 Both implement the same interface, which is defined by the abstract Executor class. 6
7 #2 基本方法
8 #submit(fn, *args, **kwargs)
9 异步提交任务 10
11 #map(func, *iterables, timeout=None, chunksize=1)
12 取代for循环submit的操做 13
14 #shutdown(wait=True)
15 至关于进程池的pool.close()+pool.join()操做 16 wait=True,等待池内全部任务执行完毕回收完资源后才继续 17 wait=False,当即返回,并不会等待池内的任务执行完毕 18 但无论wait参数为什么值,整个程序都会等到全部任务执行完毕 19 submit和map必须在shutdown以前 20
21 #result(timeout=None)
22 取得结果 23
24 #add_done_callback(fn)
25 回调函数