在进入多进程的学习以前, 必定须要先了解一个应用程序是如何开启一个进程的, 以及操做系统对进程是如何进行分配资源的, 进程、线程、进程池、进程三态、同步、异步、并发、并行、串行的概念也要很是的明确, 下面将介绍 Python 并发编程之多进程html
ps : 值得注意的是 : 与线程不一样,进程没有任何共享状态,多个进程的内存空间相互物理隔离, 进程修改的数据,改动仅限于该进程内python
multiprocessing 模块提供了 Process 类,该类可用来在 Windows 平台上建立新进程编程
使用 Process 类建立实例化对象,其本质是调用该类的构造方法建立新进程json
Process([group [, target [, name [, args [, kwargs]]]]]) # 其实是调用了下面的构造方法 def __init__(self,group=None,target=None,name=None,args=(),kwargs={})
值得注意的是 :数组
参数的指定须要使用关键字的方式安全
args 指定的值是为 target 指定的函数的位置参数, 而且是一个元组形式, 一个值必须带逗号bash
参数名 | 说明 |
---|---|
group | 该参数未进行实现,不须要传参 |
target | 为新建进程指定执行任务,也就是指定一个函数 |
name | 为新建进程设置名称 |
args | 为 target 参数指定的参数传递非关键字参数 |
kwargs | 为 target 参数指定的参数传递关键字参数 |
方法 | 做用 |
---|---|
run( ) | 第 2 种建立进程的方式须要用到,继承类中须要对方法进行重写,该方法中包含的是新进程要执行的代码 |
start( ) | 和启动子线程同样,新建立的进程也须要手动启动,该方法的功能就是启动新建立的线程 |
join([timeout]) | 主线程等待子进程终止(强调:是主线程处于等的状态,而p是处于运行的状态),timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 |
is_alive( ) | 判断当前进程是否还活着 |
terminate( ) | 中断该进程 |
属性 | 做用 |
---|---|
name | 能够为该进程重命名,也能够得到该进程的名称。 |
daemon | 和守护线程相似,经过设置该属性为 True,可将新建进程设置为“守护进程” |
pid | 返回进程的 ID 号。大多数操做系统都会为每一个进程配备惟一的 ID 号 |
exitcode | 进程在运行时为None、若是为–N,表示被信号N结束(了解便可) |
authkey | 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网 |
在 Windows 中 Process( ) 必须放在 if __name__ == '__main__':
之下网络
ps : fork 是 OS提供的方法 os.fork()
, 该方法能够在当前程序中再建立出一个进程, 可是在 Windows 平台上无效, 只在 Linux, UNIX, Mac OSX上有效数据结构
from multiprocessing import Process import time,os def test(n): print(f"父进程{os.getppid()},紫禁城{os.getpid()}") time.sleep(n) print(f"父进程{os.getppid()},紫禁城{os.getpid()}") if __name__ == '__main__': p = Process(target=test,args=(2,)) p.start() # 作发起系统调用的活 print(f"当前执行文件{os.getpid()}") ''' 当前执行文件16860 父进程16860,紫禁城6404 父进程16860,紫禁城6404 '''
from multiprocessing import Process import time,os class MyProcess(Process): def __init__(self,n): super().__init__() self.n = n def run(self) -> None: print(f"父进程{os.getppid()},紫禁城{self.pid}") time.sleep(self.n) print(f"父进程{os.getppid()},紫禁城{os.getpid()}") if __name__ == '__main__': p = MyProcess(2) p.start() print(f"当前执行文件{os.getpid()}") ''' 当前执行文件8136 父进程8136,紫禁城1280 父进程8136,紫禁城1280 '''
from multiprocessing import Process import time x = 222 def test(): global x x = 111 if __name__ == '__main__': p = Process(target=test) p.start() # 发送系统调用 time.sleep(1) # 等待子进程运行完 print(x) # 222 (仍是原来的)
子进程 test 函数中声明全局变量 x, 并修改 x 的值, 等待子进程运行完毕, 最后打印 x , 发现 x 的值并无改变多线程
from multiprocessing import Process x = 222 def test(): global x x = 111 if __name__ == '__main__': p = Process(target=test) p.start() # 发送系统调用 p.join() # 等待子进程运行完(以前咱们使用sleep并不能精确的知道子进程结束运行的时间) print(x) # 222 (仍是原来的)
timeout
是可选的超时间, 等多久就不等了from multiprocessing import Process x = 222 def test(): global x x = 111 if __name__ == '__main__': p = Process(target=test) p.start() # 发送系统调用 p.join(0.001) # 等待 0.001 秒就不等了
start()
只是发起系统调用, 并非运行子进程, 当 start()
执行完后紧接着就执行后面的代码start()
发起调用以后, 是通知操做系统建立一个子进程, 操做系统须要申请一个内存空间, 将父进程的数据复制一份到子进程的内存空间中做为初始化用 (Linux是将父进程的数据原本来本的复制一份, 而Windows 稍有些不一样), 而后子进程才运行起来import time,os def test(n): time.sleep(n) print(f"父进程{os.getppid()} 子进程{os.getpid()}") if __name__ == '__main__': p1 = Process(target=test,args=(3,)) p2 = Process(target=test,args=(2,)) p3 = Process(target=test,args=(1,)) p1.start() # 用时 3 秒 p2.start() # 用时 2 秒 p3.start() # 用时 1 秒 start_time = time.time() p1.join() p2.join() p3.join() # 三个进程都在并发的运行, 主进程一共运行3秒多 stop_time = time.time() print(f'主进程{os.getpid()} 用时{stop_time-start_time}') ''' 父进程10888 子进程6792 父进程10888 子进程13368 父进程10888 子进程14800 主进程10888 用时3.131737470626831 '''
from multiprocessing import Process import time def test(): time.sleep(1) if __name__ == '__main__': p = Process(target=test,args=(2,)) p.start() p.terminate() # 只是发起系统调用, 通知操做系统关闭子进程 print(p.is_alive()) # True
由上面可知
terminate()
只是发起系统调用, 并非当即关闭子进程, 操做系统关闭子进程回收资源也要一小会, 咱们可使用sleep简单延时
from multiprocessing import Process import time def test(): time.sleep(1) if __name__ == '__main__': p = Process(target=test,args=(2,)) p.start() p.terminate() # 只是发起系统调用, 通知操做系统关闭子进程 time.sleep(0.1) # 稍微延时一点 print(p.is_alive()) # False
from multiprocessing import Process import time,os class MyProcess(Process): def __init__(self,n,name): super().__init__() self.n = n self.name = name def run(self) -> None: time.sleep(self.n) print(f"子进程pid:{self.pid}") # 子进程pid:14156 print(f"子进程模块名:{__name__}") # 子进程模块名:__mp_main__ print(f"子进程名:{self.name}") # 子进程名:aaaa if __name__ == '__main__': p = MyProcess(1,"aaaa") p.start() p.join() print(f"打印子进程pid:{p.pid}") # 打印子进程pid:14156 print(f"打印主进程pid:{os.getpid()}") # 打印主进程pid:16340 print(f"子进程名:{p.name}") # 子进程名:aaaa print(f"主进程模块名:{__name__}") # 主进程模块名:__main__
__name__
: Python中每一个模块都有本身的名字,__name__
是一个系统变量, 是模块的标识符, 值是模块的名称, 而且在自身模块中:__name__
的值等于__mian__
ps : 任何正常结束的子进程都会进入到僵尸状态, 而被强制终止的进程的全部信息将会被清除
这些软件在开启子进程时, 父进程内部会及时调用"wait" / "waitpid" 通知操做系统来回收僵尸进程
功底深厚,知道父进程要对子进程负责 会在父进程内部考虑到调用 "wait" / "waitpid" 通知操做系统回收僵尸进程 可是发起系统调用时间可能慢了一点 因而咱们就可使用 "ps aux | grep [z]+" 命令查看到僵尸进程
技术半吊子,只知道开子进程,父进程也不结束,并在那一直开子进程,不知道什么是僵尸进程 系统调用 "wait" / "waitpid" 也没有据说过 因而计算机会堆积许多的僵尸进程,占用着大量的"pid",(每启动一个进程就会分配一个"pid号") 计算机进入一个奇怪的现象: 内存够用,硬盘充足,CPU空闲,但新的程序没法启动 这就是由于"PID"不够用了
咱们能够手动发信号给父进程: "# kill -CHLD [父进程的PID]" 通知父进程快点向操做系统发起系统调用 "wait" / "waitpid" 来清理变成僵尸的儿子们
这种状况子下,咱们只能将父进程终结,由于你发给它的信号不会获得回应 父进程被杀死,"僵尸进程"将会变成"僵尸孤儿进程" 但凡是"孤儿进程"都会被Linux系统中"PID"为"1"的顶级进程"systemd"回收 "systemd"会发起系统调用 "wait" / "waitpid" 来通知操做系统清理僵尸进程 # Centos7 的顶级进程为 systemd # Centos6 的顶级进程为 init
本来 multiprocessing 模块在你发起系统调用 start()
开启子进程的时候会自动检测当前状态下是否存在僵尸进程, 并将其回收, join()
调用也是同样, 咱们能够查看这两个调用的源码进行查看 :
# coding:utf-8 from multiprocessing import Process import os,time def task(): print("子进程:%s"%os.getpid()) time.sleep(4) # 子进程 4 秒后结束变成僵尸进程 if __name__ == "__main__": for i in range(400): print("父进程:%s"%os.getpid()) p = Process(target=task) p.start() time.sleep(100000) # 让父进程停在原地什么也不作
使用 top 命令查看系统状态信息, 能够发现已经出现了 400 个僵尸进程
咱们能够经过 kill 刚运行的 py 文件将这些僵尸进程变成孤儿进程, 从而被 systemd 接管, systemd 再发起系统调用将其清除
由主进程建立, 并会随着主进程的结束而结束
进程之间是相互独立的, 守护进程会在主进程代码执行结束后就终止
守护进程内没法再次开启子进程, 不然会抛出异常 : AssertionError: daemonic processes are not allowed to have children
from multiprocessing import Process import os,time class MyProcess(Process): def __init__(self,n): super().__init__() self.n = n def run(self) -> None: print(f'子进程:{os.getpid()}开始') time.sleep(2) print(f"子进程:{os.getpid()}结束") if __name__ == '__main__': p = MyProcess(2) p.daemon = True # 须要在 strat() 以前设置 p.start() print(f"主进程:{os.getpid()}结束") # 在当前主进程的代码已经运行完毕, 守护进程就会终止, 甚至守护进程还没来的急启动 '''输出 主进程:16924结束 '''
咱们使用 sleep 让主进程简单延时一下好让子进程启动起来
from multiprocessing import Process import os,time class MyProcess(Process): def __init__(self,n): super().__init__() self.n = n def run(self) -> None: print(f'子进程:{os.getpid()}开始') time.sleep(2) print(f"子进程:{os.getpid()}结束") if __name__ == '__main__': p = MyProcess(2) p.daemon = True p.start() time.sleep(1) # 延时一秒, 足够操做系统将子进程开起来 print(f"主进程:{os.getpid()}结束") '''输出 子进程:8620开始 主进程:10480结束 '''
再次强调, 守护进程是在主进程的代码执行完毕终止
from multiprocessing import Process import os,time def Foo(): print(f"Foo:{os.getpid()}-->111") time.sleep(1) print(f"Foo--->222") def Bar(): print(f"Bar:{os.getpid()}-->333") time.sleep(2) print(f"Bar--->444") if __name__ == '__main__': p1 = Process(target=Foo) p2 = Process(target=Bar) p1.daemon = True # 将 p1 设置守护进程 p1.start() p2.start() print("------>end") # 当运行到这一行的时候主进程代码已经运行完了, 那么守护进程也已经终止了, 与主进程在等着 p2 运行无关, 这时操做系统还没来的急启动 p1 这个子进程 '''输出 ------>end Bar:18124-->333 Bar--->444 '''
上面咱们实现了进程的并发, 进程之间的数据是不共享的, 可是他们能够共享同一个文件(硬盘空间), 或者是同一个打印空间, 然而在共享的同时也带来了问题 : 进程的运行不是同时进行的, 它们没有前后顺序, 一旦开启也不受咱们的限制, 当多个进程使用同一份数据资源时, 就会引起数据安全或者数据混乱问题
咱们打个简单的比方, 公司里的一台打印机, 每一个人均可以使用, 但同事只能有一我的在使用, 否则就会形成打印错乱; 又好比合租房的卫生间, 合住的同伴均可以使用卫生间, 但每次只能一我的进去, 进去以后门就锁上了(至关于加锁 Lock( ).acquire
( )), 出来以后开门, 其余人又可使用卫生间了(至关于解锁Lock( ).release( )
)
🍓余票文件 "aaa.json" {"count": 1} # 剩一张票 🍓模拟多我的抢票 # coding:utf-8 from multiprocessing import Process import os,time,json def check(): # 先查票 time.sleep(1) # 模拟网络延迟 with open("aaa.json")as f: dic = json.load(f) print(f"剩余票数 : {dic['count']}") def get(): # 查完以后开始抢 time.sleep(1) # 模拟网络延迟 with open("aaa.json")as f: dic = json.load(f) if dic["count"] >0: dic["count"] -= 1 time.sleep(1) # 模拟网络延迟 with open("aaa.json","w")as f2: # 抢完以后修改数据并提交到服务端 json.dump(dic,f2) print(f"用户 : {os.getpid()} 抢票成功") else: print(f"用户 : {os.getpid()} 抢票失败") def run(): check() time.sleep(1) # 模拟网络延迟 get() if __name__ == "__main__": for i in range(4): p = Process(target=run) p.start() '''输出 剩余票数 : 1 剩余票数 : 1 剩余票数 : 1 剩余票数 : 1 用户 : 13116 抢票成功 用户 : 2364 抢票成功 用户 : 1796 抢票成功 用户 : 6228 抢票成功 '''
打印的结果发现只有一张票, 可是四我的都抢成功了, 这就很是不合理,形成了数据混乱
# coding:utf-8 from multiprocessing import Process,Lock import os,time,json def check(): # 先查票 time.sleep(1) # 模拟网络延迟 with open("aaa.json")as f: dic = json.load(f) print(f"剩余票数 : {dic['count']}") def get(): # 查完以后开始抢 time.sleep(1) # 模拟网络延迟 with open("aaa.json")as f: dic = json.load(f) if dic["count"] >0: dic["count"] -= 1 time.sleep(1) # 模拟网络延迟 with open("aaa.json","w")as f2: # 抢完以后修改数据并提交到服务端 json.dump(dic,f2) print(f"用户 : {os.getpid()} 抢票成功") else: print(f"用户 : {os.getpid()} 抢票失败") def run(lock): check() time.sleep(1) # 模拟网络延迟 lock.acquire() # 在抢票环节加锁 get() lock.release() # 抢完后解锁 if __name__ == "__main__": lock = Lock() for i in range(4): p = Process(target=run,args=(lock,)) p.start() '''输出 剩余票数 : 1 剩余票数 : 1 剩余票数 : 1 剩余票数 : 1 用户 : 432 抢票成功 用户 : 2636 抢票失败 用户 : 7772 抢票失败 用户 : 1272 抢票失败 '''
加锁以后, 一张票只有一我的能抢成功, 其实就是让抢票这个局部环节变成了串行, 谁抢到了就谁用, 牺牲了效率, 提高了数据安全性
一、共享的数据基于文件, 文件又属于硬盘, 效率就比较低 二、须要本身加锁和解锁操做, 这是一件很是危险的操做, 若是忘记解锁程序就停在原地
🍑需求 一、多个进程共享同一块内存数据, 实现高效率 二、找到一个能帮咱们处理好锁的问题的机制 : multiprocessing模块为咱们提供了IPC通讯机制:管道和队列 🍑介质 一、管道和队列, 基于内存中的空间存放数据 二、队列是基于管道和锁实现的, 可让咱们从复杂的锁问题中解脱出来
ps : 咱们应该尽可能避免使用共享数据, (好比一个文件的传递应该将文件保存到硬盘, 在管道中放的应该是一个路径, 而不该该是一个完整的文件), 尽量使用消息传递和队列, 避免处理复杂 的同步和锁问题, 并且在进程数目增多时, 每每能够得到更好的可扩展性
进程间通讯机制简称 IPC (Inter Process Communication)
进程间彼此隔离, 要实现 IPC, multiprocessing 模块为咱们提供了队列和管道这两种形式
🍑导入模块 from multiprocessing import Queue 🍑建立一个队列对象 q = Queue([maxsize]) # 多进程可使用Queue进行数据传递 🍑参数介绍 maxsize # 是队列中容许最大项, 省略则无大小限制
方法 | 功能 |
---|---|
q.put( ) | 向队列中传入数据,可选参数 : blocked(锁定状态)和timeout(超时时间)。若是blocked为True(默认值), 而且timeout为正值, 该方法会阻塞timeout指定的时间, 直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常 |
q.get( ) | 从队列读取走一个元素, 有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常 |
q.get_nowait( ) | 同q.get(blocked=False) |
q.put_nowait( ) | 同q.put(blocked=False) |
q.empty( ) | 调用此方法时q为空则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中又加入了项目 |
q.full( ) | 调用此方法时q已满则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中的项目被取走 |
q.qsize( ) | 返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()同样 |
方法 | 做用 |
---|---|
q.cancel_join_thread( ) | 不会在进程退出时自动链接后台线程, 能够防止join_thread()方法阻塞 |
q.close( ) | 关闭队列, 防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,若是某个使用者正在被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误 |
q.join_thread( ) | 链接队列的后台线程。此方法用于在调用q.close()方法以后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread方法能够禁止这种行为 |
from multiprocessing import Queue q = Queue(3) # 建立一个队列,设置最大项为3 q.put({"name":"ii"}) # 放入一个字典 q.put([1,2,3,4,5]) # 放入一个列表 q.put("shawn") # 放入一个字符串 try: # q.put(1777,block=True,timeout=3) q.put(1777,block=False) # 放入一个整形,并设置队列已满立马抛异常 except Exception: print("队列已满") print(q.get()) # 取一个值 print(q.get()) # 2 print(q.get()) # 3 try: # print(q.get(block=True,timeout=3)) print(q.get(block=False)) # 取一个值,队列为空立马抛出异常 except Exception: print("队列已空") '''输出 队列已满 {'namwe': 'ahsns'} [1, 2, 3, 4, 5] shawn 队列已空 '''
在并发编程中, 生产者消费者模式经过一个容器来解决生产者和消费者之间的强耦合性, 二者之间再也不是直接通讯, 而是经过堵塞队列来进行通讯, 生产者(生产速度快)没必要再等待消费者是否处理完数据, 消费者直接从队列中取, 该队列就至关于一个缓冲区, 平衡了生产者和消费者的工做能力, 从而提升了程序总体的数据处理速度
经过队列 : 生产者------>队列------->消费者
from multiprocessing import Process, Queue import time, random def producer(q, name, food): for i in range(3): res = f"{food}{i}" time.sleep(random.randint(1, 3)) # 模拟生产者数据产出时间 q.put(res) # 将产生的数据放入到队列中 print(f"\033[1;35m{name}:生产了:{res}\033[0m") def consumer(q, name): while True: res = q.get() # 取出数据 if res == None: break # 判断是否None, None表明队列取完了,结束 time.sleep(random.randint(1, 3)) # 模拟消费者处理数据时间 print(f"\033[1;36m{name}吃了{res}\033[0m") if __name__ == "__main__": q = Queue() # 建立队列 # 开启三个生产者进程 p1 = Process(target=producer, args=(q, "shawn", "香肠")) p2 = Process(target=producer, args=(q, "派大星", "热狗")) p3 = Process(target=producer, args=(q, "海绵宝宝", "鸡")) # 开启两个消费者进程 c1 = Process(target=consumer, args=(q, "章鱼哥")) c2 = Process(target=consumer, args=(q, "蟹老板")) p1.start() p2.start() p3.start() c1.start() c2.start() # 等待生产者所有生产完毕结束进程 p1.join() p2.join() p3.join() # 主进程再想队列里面放入两个None,当消费者拿到后表明取完了 q.put(None) q.put(None) print("痞老板:主") '''输出 shawn:生产了:香肠0 派大星:生产了:热狗0 章鱼哥吃了香肠0 蟹老板吃了热狗0 派大星:生产了:热狗1 shawn:生产了:香肠1 海绵宝宝:生产了:鸡0 章鱼哥吃了热狗1 海绵宝宝:生产了:鸡1 派大星:生产了:热狗2 章鱼哥吃了鸡0 蟹老板吃了香肠1 shawn:生产了:香肠2 海绵宝宝:生产了:鸡2 痞老板:主 蟹老板吃了热狗2 章鱼哥吃了鸡1 蟹老板吃了香肠2 章鱼哥吃了鸡2 Process finished with exit code 0 '''
q = JoinableQueue([maxsize])
: 与 Queue 的对象同样, 但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的
方法 | 做用 |
---|---|
q.task_done( ) | 使用者使用此方法发出信号,表示q.get( )的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常 |
q.join( ) | 生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止 |
from multiprocessing import Process, JoinableQueue import time, random def producer(q, name, food): for i in range(3): res = f"{food}{i}" q.put(res) time.sleep(random.randint(1, 3)) print(f"\033[1;35m{name}:生产了:{res}\033[0m") q.join() # 等待每一个生产者本身放入的数据被消费者取完才结束该进程 def consumer(q, name): while True: res = q.get() if res == None: break time.sleep(random.randint(1, 3)) print(f"\033[1;36m{name}吃了{res}\033[0m") q.task_done() # 消费者每次取走一个数据都发送一个task_done信号,生产者那边的计数相应减1 if __name__ == "__main__": q = JoinableQueue() # 建立一个对象 # 建立三个生产者 p1 = Process(target=producer, args=(q, "shawn", "香肠")) p2 = Process(target=producer, args=(q, "派大星", "热狗")) p3 = Process(target=producer, args=(q, "海绵宝宝", "鸡")) # 建立两个消费者 c1 = Process(target=consumer, args=(q, "章鱼哥")) c2 = Process(target=consumer, args=(q, "蟹老板")) # 将两个消费者设置成守护进程, 主进程代码结束,这两个消费者进程相应结束 c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() # 等待三个生产者进程结束 p1.join() p2.join() p3.join() 🔰#原理分析 : 生产者生产数据, 假设一个生产者生产3个数据带队列,每一个相应的计数为3 🔰#消费者从队列中取走数据的时候发送task_done信号给生产者,生产者的计数3-1,剩下两个 🔰#消费者继续取数据并发送信号,当生产者的计数为0,表明队列已经取完了,这时q.join()就再也不进行堵塞,生产者进程结束 🔰#而此时的消费者也已经没有做用了,将消费者进程设置成守护进程,主进程等待生产者进程结束就结束,消费者进程天然被带走 '''输出 shawn:生产了:香肠0 海绵宝宝:生产了:鸡0 章鱼哥吃了香肠0 派大星:生产了:热狗0 蟹老板吃了热狗0 shawn:生产了:香肠1 海绵宝宝:生产了:鸡1 章鱼哥吃了鸡0 蟹老板吃了香肠1 shawn:生产了:香肠2 派大星:生产了:热狗1 章鱼哥吃了鸡1 蟹老板吃了热狗1 海绵宝宝:生产了:鸡2 派大星:生产了:热狗2 章鱼哥吃了香肠2 蟹老板吃了鸡2 章鱼哥吃了热狗2 Process finished with exit code 0 '''
互斥锁同时只容许一个线程修改数据, 而 Semaphore 容许同时有必定数量的进程更改数据, 就像理发店, 好比只有3个托尼老师, 那最多只容许3我的同时理发, 后面的人只能等到有人理完了才能开始, 若是指定信号量为3, 那么来一我的得到一把锁, 计数加1, 当计数等于3时, 后面的人均须要等待 , 一旦释放, 就有人能够得到一把锁
from multiprocessing import Semaphore,Process import time,random def haircut(sem,name): start_time = time.time() sem.acquire() # 加锁 print(f"{name}开始理发") time.sleep(random.randint(2,3)) # 模拟理发时间 print(f"{name}理发加等待用时%.2f"%(time.time()-start_time)) sem.release() # 解锁 if __name__ == '__main__': sem = Semaphore(3) # 最大进程数为3 user_list = [] for i in range(8): p = Process(target=haircut,args=(sem,f"明星{i}")) p.start() user_list.append(p) for obj in user_list: obj.join() print("关门") '''输出 明星0开始理发 明星1开始理发 明星2开始理发 明星0理发加等待用时3.00 明星3开始理发 明星1理发加等待用时3.00 明星4开始理发 明星2理发加等待用时3.00 明星5开始理发 明星3理发加等待用时4.93 明星6开始理发 明星4理发加等待用时4.87 明星7开始理发 明星5理发加等待用时5.82 明星7理发加等待用时6.69 明星6理发加等待用时7.74 关门 Process finished with exit code 0 '''
进程死锁、递归锁与线程死锁、递归锁同样, 将统一放在线程一块儿讲, 请参见多线程
参见多线程
😮忙时会有成千上万的任务须要被执行,闲时可能只有零星任务。 😒那么在成千上万个任务须要被执行的时候,咱们就须要去建立成千上万个进程么? 😓首先,建立进程须要消耗时间,销毁进程也须要消耗时间。 😟第二即使开启了成千上万的进程,操做系统也不能让他们同时执行,这样反而会影响程序的效率。 😥所以咱们不能无限制的根据任务去开启或者结束进程。那么咱们要怎么作呢?
😺定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务
😸等处处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务
😹若是有不少任务须要执行,池中的进程数量不够,任务就要等待以前的进程执行任务完毕归来,拿到空闲进程才能继续执行。
😻也就是说,进池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行
😼这样不会增长操做系统的调度难度,还节省了开关进程的时间,也必定程度上可以实现并发效果。
👉管理进程负责建立资源进程,把工做交给空闲资源进程处理,回收已经处理完工做的资源进程。
😱管理进程如何有效的管理资源进程,分配任务给资源进程? 👉经过IPC,信号,信号量,消息队列,管道等进行交互。
咱们能够经过维护一个进程池来控制进程的数目, 好比使用httpd的进程模式, 能够规定最大进程数和最小进程数, multiprocessing模块Pool类能够提供指定数量的进程供用户调用
Pool([numprocess],[initializer],[initargs])
参数 | 做用 |
---|---|
numprocess | 要建立的进程数,若是省略,将默认使用cpu_count()的值 |
initializer | 每一个工做进程启动时要执行的可调用对象,默认为None |
initargs | 传给initializer的参数组 |
方法 | 做用 |
---|---|
p.apply(func,args,kwargs) | (同步调用)在进程池工做的进程中执行func函数,后面是参数,而后返回结果,若是想要传入不一样的参数并发的执行func, 就须要以不一样的线程去调用p.apply()函数或者使用p.apply_async() |
p.apply_async(func,args,kwargs) | (异步调用)在进程池工做的进程中执行func函数,后面是参数,而后返回结果, 结果是AsyncResult类的实例, 可使用回调函数callback, 将前面funct返回的结果单作参数传给回调函数 |
p.close( ) | 关闭进程池,防止进一步操做, 若是全部操做持续挂起,它们将在工做进程终止前完成 |
P.jion() | 等待全部工做进程退出。此方法只能在close()或teminate()以后调用,不然报错 |
如下方法运用于 pply_async()
和 map_async()
的返回值, 返回值是**AsyncResul **实例的对象, 也就是该对象的方法
方法 | 做用 |
---|---|
obj.get( ) | 返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起 |
obj.ready( ) | 若是调用完成,返回True |
obj.successful( ) | 若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常 |
obj.wait([timeout]) | 等待结果变为可用, 参数是超时时间 |
obj.terminate( ) | 当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是对象被垃圾回收, 将自动调用这个方法 |
from multiprocessing import Pool import time,os,random def test(n): print(f"子进程:{os.getpid()}") time.sleep(2) return n*random.randint(2,9) if __name__ == '__main__': n = os.cpu_count() # 本机CPU个数,个人是4,进程池容量个数自定义,默认CPU核数 p = Pool(processes=n) # 设置进程池进程个数,从无到有,而且之后一直只有这四个进程在执行任务 li = [] start_time = time.time() for i in range(10): res = p.apply(test,args=(2,)) # 建立十个个任务, 使用同步调用的方式 li.append(res) p.close() # 先关闭进程池, 不会再有新的进程加入到pool中, 防止进一步的操做(同步调用能够不加此方法) p.join() # 必须在close调用以后执行, 不然报错, 执行后等待全部子进程结束(同步调用能够不加此方法) print(li) # 同步调用, 获得的就是最终结果,(异步调用获得的是对象, 须要使用get方法取值) print(f'使用时间:{time.time()-start_time}') '''输出 子进程:7768 子进程:16276 子进程:17544 子进程:15680 子进程:7768 子进程:16276 子进程:17544 子进程:15680 子进程:7768 子进程:16276 [4, 18, 14, 14, 12, 14, 16, 14, 6, 10] 使用时间:20.226498126983643 '''
从上面的输出结果能够看到,进程一直是那四个 : 776九、1627六、1754四、15680, 而且异步提交须要等待上一个任务结束拿到结果才能进行下一个任务, 因此用时 20 秒多一点
from multiprocessing import Pool import time,os,random def test(n): print(f"子进程:{os.getpid()}") time.sleep(2) return n*n*random.randint(2,9) if __name__ == '__main__': n = os.cpu_count() # 本机CPU个数,个人是4,进程池容量个数自定义,默认CPU核数 p = Pool(processes=n) # 设置进程池大小, 从无到有, 并以后只有这四个进程执行任务 li = [] start_time = time.time() for i in range(10): res = p.apply_async(test,args=(2,)) # 开启十个任务, 使用异步调用的方式 li.append(res) p.close() # 关闭进程池, 不会再有新的进程加入到pool中, 防止进一步的操做 p.join() # join必须在close函数以后进行, 不然报错, 执行后等待全部子进程结束 print(li) # 返回的是AsyncResul的对象[<multiprocessing.pool.ApplyResult object at 0x000002318511B408>,....] print([i.get() for i in li]) # 使用get方法来获取异步调用的值(同步调用没有该方法),并放入列表中打印 print(f"使用时间:{time.time()-start_time}") '''输出 子进程:8636 子进程:10828 子进程:7432 子进程:13976 子进程:8636 子进程:10828 子进程:7432 子进程:13976 子进程:8636 子进程:10828 [<multiprocessing.pool.ApplyResult object at 0x000001623059B308>,...省略] [16, 24, 24, 24, 16, 28, 36, 28, 8, 32] 使用时间:6.301024436950684 '''
从上面结果也能看出自始至终都只有四个进程在工做 : 863六、1082八、743二、13976,异步调用方式若是任务进行时遇到阻塞操做将立马接收其它异步操做中的结果, 若是进程池满了, 则只能等待任务进行完毕拿到结果, 拿到的结果是 AsyncResul 的对象, 须要使用 get 方法取值, 用时 6 秒多一点
from socket import * from multiprocessing import Pool import os s = socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # 重用IP和端口 s.bind(("127.0.0.1",8055)) s.listen(5) def connection(conn): print(f"当前进程:{os.getpid()}") while 1: try: date = conn.recv(1024) if len(date) == 0:break conn.send("阿巴阿巴".encode("utf-8")) except Exception: break if __name__ == '__main__': p = Pool(2) # 不指定,默认本机CPU核数 print("connection....") while 1: conn,addr = s.accept() print(f"已连上{addr}") p.apply_async(connection,args=(conn,))
from socket import * c = socket(AF_INET,SOCK_STREAM) c.connect(("127.0.0.1",8055)) while 1: msg = input("内容>>").strip() if len(msg) == 0:continue c.send(msg.encode("utf-8")) date = c.recv(1024) print(f"服务端的回复:{date.decode('utf-8')}")
四个客户端一个服务端 :
启动五台机器, 让四台客户端发送信息
前两台能发收消息, 后两台阻塞原地
服务端显示两个进程启动成功 : 892八、17584, 剩余两个阻塞
咱们将前面两个客户端进程关闭, 看看进程号是否变化
关闭前两个客户端进程以后, 后两个客户端进程立马启动起来了, 而且发现PID仍是原来的两个
将第一个函数的指针(也就是内存地址,Python中淡化了指针的概念)做为参数传给另外一个函数处理, 这第一个函数就称为回调函数
def foo(n): print(f"foo输出{n}") def Bar(i,func): func(i) for i in range(3): Bar(i,foo) '''输出 foo输出0 foo输出1 foo输出2 '''
当进程池中一个任务处理完以后, 它去通知主进程本身结束了, 让主进程处理本身的结果, 因而主进程去调用另外一个函数去处理该结果, 咱们能够将耗时间或者阻塞的任务放入进程池, 在主进程中指定回调函数, 并由主进程负责执行, 这样主进程在执行回调函数的时候就省去了I/O的过程, 直接拿到的就是任务的结果
from multiprocessing import Pool import os def get(n): print(f"get--->{os.getpid()}") return n # 返回任务执行的结果 def set(num): # 拿到回调函数的处理结果--->num print(f"set--->{os.getpid()} : {num**2}") if __name__ == '__main__': p = Pool(3) nums = [2,3,4,1] li = [] for i in nums: # 异步调用,并使用callback指定回调函数 res = p.apply_async(get,args=(i,),callback=set) li.append(res) p.close() # 关闭进程池 p.join() # 等待子进程结束 print([ii.get() for ii in li]) # 使用get方法拿到结果 '''输出 get--->8388 get--->8388 set--->8768 : 4 get--->8388 set--->8768 : 9 get--->8388 set--->8768 : 16 set--->8768 : 1 [2, 3, 4, 1] '''
from multiprocessing import Pool import requests,os def get_htm(url): print(f"进程:{os.getpid()}开始获取:{url}网页") response = requests.get(url) if response.status_code == 200: # 若是是200,则获取成功 return {'url':url,'text':response.text} else: return {'url':url,'text':''} # 有些网页获取不到,设置空 def parse_htm(htm_dic): print(f'进程:{os.getpid()}正在处理:{htm_dic["url"]}的text') parse_data = f"url:{htm_dic['url']} size:{len(htm_dic['text'])}" with open("./db.txt","a")as f: # 将URL和对应网页源码大小保存到文件 f.write(f"{parse_data}\n") if __name__ == '__main__': urls=[ 'https://zhuanlan.zhihu.com', 'https://www.cnblogs.com', 'https://www.python.org', 'https://blog.csdn.net', 'http://www.china.com.cn', ] p = Pool(3) # 设置进程池最大进程数为3 li = [] for url in urls: # 异步调用并指定回调函数 res = p.apply_async(get_htm,args=(url,),callback=parse_htm) li.append(res) p.close() # 关闭进程池 p.join() # 等待子进程结束 print([i.get() for i in li]) # 使用get方法获取结果 '''输出 进程:11484开始获取:https://zhuanlan.zhihu.com网页 进程:17344开始获取:https://www.cnblogs.com网页 进程:2688开始获取:https://www.python.org网页 进程:11484开始获取:https://blog.csdn.net网页 进程:3928正在处理:https://zhuanlan.zhihu.com的text 进程:17344开始获取:http://www.china.com.cn网页 进程:3928正在处理:https://www.cnblogs.com的text 进程:3928正在处理:https://blog.csdn.net的text 进程:3928正在处理:http://www.china.com.cn的text 进程:3928正在处理:https://www.python.org的text [{'url': 'https://zhuanlan.zhihu.com', 'text': ''},...一堆网页源码的bytes(省略)] '''
from multiprocessing import Pool import re import requests def get_htm(url,format1): response = requests.get(url) if response.status_code == 200: return (response.text,format1) else: return ('',format1) def parse_htm(res): text,format1 = res data_list = re.findall(format1,text) for data in data_list: with open("福布斯排行.txt","a",encoding="utf-8")as f: f.write(f"排名:{data[0]},名字:{data[1]},身价:{data[2]},公司:{data[3]},国家:{data[4]}\n") if __name__ == '__main__': url1 = "https://www.phb123.com/renwu/fuhao/shishi.html" # 使用正则匹配关键字 format1 = re.compile(r'<td.*?"xh".*?>(\d+)<.*?title="(.*?)".*?alt.*?<td>(.*?)</td>.*?<td>(.*?)<.*?title="(.*?)"', re.S) url_list = [url1] for i in range(2, 16): # 总共15页排行,将连接都加进列表里 url_list.append(f"https://www.phb123.com/renwu/fuhao/shishi_{i}.html") p = Pool() li = [] for url in url_list: res = p.apply_async(get_htm,args=(url,format1),callback=parse_htm) li.append(res) p.close() p.join() print("保存完成")
ps : 若是在主进程中等待进程池中全部任务都执行完毕后,再统一处理结果,则无需回调函数