管道:html
Pipepython
Conn1,conn2 = Pipe()git
数据共享:github
M = Manager()面试
Dic = m.dict({‘name’:sbalex})数据库
数据安全的问题编程
加锁json
进程池 *****数组
Map:异步提交任务,参数是可迭代对象,自带close + join浏览器
Apply :同步提交任务,直接能够收到返回值
Apply_async() 异步提交任务:res.get() 阻塞效果
Close join
回调函数:callback=
1、管道
进程间通讯(IPC)方式二:管道(不推荐使用,了解便可),会致使数据不安全的状况出现,后面咱们会说到为何会带来数据 不安全的问题。
#建立管道的类: Pipe([duplex]):在进程之间建立一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的链接对象,强调一点:必须在产生Process对象以前产生管道 #参数介绍: dumplex:默认管道是全双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送。 #主要方法: conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另一端已经关闭,那么recv方法会抛出EOFError。 conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象 #其余方法: conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法 conn1.fileno():返回链接使用的整数文件描述符 conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。 conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。
from multiprocessing import Process, Pipe def f(conn): conn.send("Hello 妹妹") #子进程发送了消息 conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() #创建管道,拿到管道的两端,双工通讯方式,两端均可以收发消息 p = Process(target=f, args=(child_conn,)) #将管道的一段给子进程 p.start() #开启子进程 print(parent_conn.recv()) #主进程接受了消息 p.join()
应该特别注意管道端点的正确管理问题。若是是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为什么在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。若是忘记执行这些步骤,程序可能在消费者中的recv()操做上挂起(就是阻塞)。管道是由操做系统进行引用计数的,必须在全部进程中关闭管道的相同一端就会能生成EOFError异常。所以,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。
from multiprocessing import Process, Pipe def f(parent_conn,child_conn): #parent_conn.close() #不写close将不会引起EOFError while True: try: print(child_conn.recv()) except EOFError: child_conn.close() break if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(parent_conn,child_conn,)) p.start() child_conn.close() parent_conn.send('hello') parent_conn.close() p.join()
主进程将管道的两端都传送给子进程,子进程和主进程共用管道的两种报错状况,都是在recv接收的时候报错的:
1.主进程和子进程中的管道的相同一端都关闭了,出现EOFError;
2.若是你管道的一端在主进程和子进程中都关闭了,可是你还用这个关闭的一端去接收消息,那么就会出现OSError;
因此你关闭管道的时候,就容易出现问题,须要将全部只用这个管道的进程中的两端所有关闭才行。固然也能够经过异常捕获(try:except EOFerror)来处理。
虽然咱们在主进程和子进程中都打印了一下conn1一端的对象,发现两个再也不同一个地址,可是子进程中的管道和主进程中的管道仍是能够通讯的,由于管道是同一套,系统可以记录。
咱们的目的就是关闭全部的管道,那么主进程和子进程进行通讯的时候,能够给子进程传管道的一端就够了,而且用咱们以前学到的,信息发送完以后,再发送一个结束信号None,那么你收到的消息为None的时候直接结束接收或者说结束循环,就不用每次都关闭各个进程中的管道了。
from multiprocessing import Pipe,Process def func(conn): while True: msg = conn.recv() if msg is None:break print(msg) if __name__ == '__main__': conn1,conn2 = Pipe() p = Process(target=func,args=(conn1,)) p.start() for i in range(10): conn2.send('约吧') conn2.send(None)
from multiprocessing import Process,Pipe def consumer(p,name): produce, consume=p produce.close() while True: try: baozi=consume.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: break def producer(seq,p): produce, consume=p consume.close() for i in seq: produce.send(i) if __name__ == '__main__': produce,consume=Pipe() c1=Process(target=consumer,args=((produce,consume),'c1')) c1.start() seq=(i for i in range(10)) producer(seq,(produce,consume)) produce.close() consume.close() c1.join() print('主进程')
关于管道会形成数据不安全问题的官方解释: The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
由Pipe方法返回的两个链接对象表示管道的两端。每一个链接对象都有send和recv方法(除其余以外)。注意,若是两个进程(或线程)试图同时从管道的同一端读取或写入数据,那么管道中的数据可能会损坏。固然,在使用管道的不一样端部的过程当中不存在损坏风险。
from multiprocessing import Process,Pipe,Lock def consumer(p,name,lock): produce, consume=p produce.close() while True: lock.acquire() baozi=consume.recv() lock.release() if baozi: print('%s 收到包子:%s' %(name,baozi)) else: consume.close() break def producer(p,n): produce, consume=p consume.close() for i in range(n): produce.send(i) produce.send(None) produce.send(None) produce.close() if __name__ == '__main__': produce,consume=Pipe() lock = Lock() c1=Process(target=consumer,args=((produce,consume),'c1',lock)) c2=Process(target=consumer,args=((produce,consume),'c2',lock)) p1=Process(target=producer,args=((produce,consume),10)) c1.start() c2.start() p1.start() produce.close() consume.close() c1.join() c2.join() p1.join() print('主进程')
管道能够用于双工通讯,一般利用在客户端/服务端中使用的请求/响应模型,或者远程过程调用,就可使用管道编写与进程交互的程序,像前面将网络通讯的时候,咱们使用了一个叫subprocess的模块,里面有个参数是pipe管道,执行系统指令,并经过管道获取结果。
7.数据共享(了解)
展望将来,基于消息传递的并发编程是大势所趋
即使是使用线程,推荐作法也是将程序设计为大量独立的线程集合
经过消息队列交换数据。这样极大地减小了对使用锁定和其余同步手段的需求,还能够扩展到分布式系统中
进程间应该尽可能避免通讯,即使须要通讯,也应该选择进程安全的工具来避免加锁带来的问题,应该尽可能避免使用本节所讲的共享数据的方式,之后咱们会尝试使用数据库来解决进程之间的数据共享问题。
进程之间数据共享的模块之一Manager模块:
进程间数据是独立的,能够借助于队列或管道实现通讯,两者都是基于消息传递的 虽然进程间数据独立,但能够经过Manager实现数据共享,事实上Manager的功能远不止于此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
多进程共同去处理共享数据的时候,就和咱们多进程同时去操做一个文件中的数据是同样的,不加锁就会出现错误的结果,进程不安全的,因此也须要加锁
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加锁而操做共享的数据,确定会出现数据错乱 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
总结一下,进程之间的通讯:队列、管道、数据共享也算
下面要讲的信号量和事件也至关于锁,也是全局的,全部进程都能拿到这些锁的状态,进程之间这些锁啊信号量啊事件啊等等的通讯,其实底层仍是socekt,只不过是基于文件的socket通讯,而不是跟上面的数据共享啊空间共享啊之类的机制,咱们以前学的是基于网络的socket通讯,还记得socket的两个家族吗,一个文件的一个网络的,因此未来若是说这些锁之类的报错,可能你看到的就是相似于socket的错误,简单知道一下就能够啦~~~
工做中经常使用的是锁,信号量和事件不经常使用,可是信号量和事件面试的时候会问到,你能知道就行啦~~~
为何要有进程池?进程池的概念。
在程序实际处理问题过程当中,忙时会有成千上万的任务须要被执行,闲时可能只有零星任务。那么在成千上万个任务须要被执行的时候,咱们就须要去建立成千上万个进程么?首先,建立进程须要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也须要消耗时间。第二即使开启了成千上万的进程,操做系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还须要进行切换而且记录每一个进程的执行节点,也就是记录上下文(各类变量等等乱七八糟的东西,虽然你看不到,可是操做系统都要作),这样反而会影响程序的效率。所以咱们不能无限制的根据任务开启或者结束进程。就看咱们上面的一些代码例子,你会发现有些程序是否是执行的时候比较慢才出结果,就是这个缘由,那么咱们要怎么作呢?
在这里,要给你们介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等处处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。若是有不少任务须要执行,池中的进程数量不够,任务就要等待以前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增长操做系统的调度难度,还节省了开闭进程的时间,也必定程度上可以实现并发效果
multiprocess.Poll模块
建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个进程去执行全部任务(高级一些的进程池能够根据你的并发量,搞成动态增长或减小进程池中的进程数量的操做),不会开启其余进程,提升操做系统效率,减小空间的占用等。
概念介绍:
Pool([numprocess [,initializer [, initargs]]]):建立进程池
numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值 initializer:是每一个工做进程启动时要执行的可调用对象,默认为None initargs:是要传给initializer的参数组
p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。 '''须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。''' p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成 P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法 obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起。 obj.ready():若是调用完成,返回True obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常 obj.wait([timeout]):等待结果变为可用。 obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数
import time from multiprocessing import Pool,Process #针对range(100)这种参数的 # def func(n): # for i in range(3): # print(n + 1) def func(n): print(n) # 结果: # (1, 2) # alex def func2(n): for i in range(3): print(n - 1) if __name__ == '__main__': #1.进程池的模式 s1 = time.time() #咱们计算一下开多进程和进程池的执行效率 poll = Pool(5) #建立含有5个进程的进程池 # poll.map(func,range(100)) #异步调用进程,开启100个任务,map自带join的功能 poll.map(func,[(1,2),'alex']) #异步调用进程,开启100个任务,map自带join的功能 # poll.map(func2,range(100)) #若是想让进程池完成不一样的任务,能够直接这样搞 #map只限于接收一个可迭代的数据类型参数,列表啊,元祖啊等等,若是想作其余的参数之类的操做,须要用后面咱们要学的方法。 # t1 = time.time() - s1 # # #2.多进程的模式 # s2 = time.time() # p_list = [] # for i in range(100): # p = Process(target=func,args=(i,)) # p_list.append(p) # p.start() # [pp.join() for pp in p_list] # t2 = time.time() - s2 # # print('t1>>',t1) #结果:0.5146853923797607s 进程池的效率高 # print('t2>>',t2) #结果:12.092015027999878s
有一点,map是异步执行的,而且自带close和join
通常约定俗成的是进程池中的进程数量为CPU的数量,工做中要看具体状况来考量。
实际应用代码示例:
同步与异步两种执行方式:
import os,time from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(1) return n**2 if __name__ == '__main__': p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程当中可能有阻塞也可能没有阻塞 # 但无论该任务是否存在阻塞,同步调用都会在原地等着 res_l.append(res) print(res_l)
import os import time import random from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(random.random()) return n**2 if __name__ == '__main__': p=Pool(3) #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行,而且能够执行不一样的任务,传送任意的参数了。 # 返回结果以后,将结果放入列表,归还进程,以后再执行新的任务 # 须要注意的是,进程池中的三个进程不会同时开启或者同时结束 # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。 res_l.append(res) # 异步apply_async用法:若是使用异步提交的任务,主进程须要使用join,等待进程池内任务都处理完,而后能够用get收集结果 # 不然,主进程结束,进程池可能还没来得及执行,也就跟着一块儿结束了 p.close() #不是关闭进程池,而是结束进程池接收任务,确保没有新任务再提交过来。 p.join() #感知进程池中的任务已经执行结束,只有当没有新的任务添加进来的时候,才能感知到任务结束了,因此在join以前必须加上close方法 for res in res_l: print(res.get()) #使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get
#一:使用进程池(异步调用,apply_async) #coding: utf-8 from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply_async(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 res_l.append(res) # s = res.get() #若是直接用res这个结果对象调用get方法获取结果的话,这个程序就变成了同步,由于get方法直接就在这里等着你建立的进程的结果,第一个进程建立了,而且去执行了,那么get就会等着第一个进程的结果,没有结果就一直等着,那么主进程的for循环是没法继续的,因此你会发现变成了同步的效果 print("==============================>") #没有后面的join,或get,则程序总体结束,进程池中的任务还没来得及所有执行完也都跟着主进程一块儿结束了 pool.close() #关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成 pool.join() #调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束 print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证实结果已经计算完毕,剩下的事情就是调用每一个对象下的get方法去获取结果 for i in res_l: print(i.get()) #使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get #二:使用进程池(同步调用,apply) #coding: utf-8 from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(0.1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另一个 print("==============================>") pool.close() pool.join() #调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束 print(res_l) #看到的就是最终的结果组成的列表 for i in res_l: #apply是同步的,因此直接获得结果,没有get()方法 print(i)
进程池版的socket并发聊天代码示例:
#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count()) #开启6个客户端,会发现2个客户端处于等待状态 #在每一个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程 from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): print('进程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(4) while True: conn,*_=server.accept() p.apply_async(talk,args=(conn,)) # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
发现:并发开启多个客户端,服务端同一时间只有4个不一样的pid,只能结束一个客户端,另一个客户端才会进来.
同时最多和4我的进行聊天,由于进程池中只有4个进程可供调用,那有同窗会问,咱们这么多人想同时聊天怎么办,又不让用多进程,进程池也不能开太多的进程,那咋整啊,后面咱们会学到多线程,到时候你们就知道了,如今大家先这样记住就好啦
而后咱们再提一个回调函数
须要回调函数的场景:进程池中任何一个任务一旦处理完了,就当即告知主进程:我好了额,你能够处理个人结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,这是进程池特有的,普通进程没有这个机制,可是咱们也能够经过进程通讯来拿到返回值,进程池的这个回调也是进程通讯的机制完成的。 咱们能够把耗时间(阻塞)的任务放到进程池中,而后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果
import os from multiprocessing import Pool def func1(n): print('func1>>',os.getpid()) print('func1') return n*n def func2(nn): print('func2>>',os.getpid()) print('func2') print(nn) # import time # time.sleep(0.5) if __name__ == '__main__': print('主进程:',os.getpid()) p = Pool(5) #args里面的10给了func1,func1的返回值做为回调函数的参数给了callback对应的函数,不能直接给回调函数直接传参数,他只能是你任务函数func1的函数的返回值 # for i in range(10,20): #若是是多个进程来执行任务,那么当全部子进程将结果给了回调函数以后,回调函数又是在主进程上执行的,那么就会出现打印结果是同步的效果。咱们上面func2里面注销的时间模块打开看看 # p.apply_async(func1,args=(i,),callback=func2) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join() #结果 # 主进程: 11852 #发现回调函数是在主进程中完成的,其实若是是在子进程中完成的,那咱们直接将代码写在子进程的任务函数func1里面就好了,对不对,这也是为何称为回调函数的缘由。 # func1>> 17332 # func1 # func2>> 11852 # func2 # 100
回调函数在写的时候注意一点,回调函数的形参执行有一个,若是你的执行函数有多个返回值,那么也能够被回调函数的这一个形参接收,接收的是一个元祖,包含着你执行函数的全部返回值。
使用进程池来搞爬虫的时候,最耗时间的是请求地址的网络请求延迟,那么若是咱们在将处理数据的操做加到每一个子进程中,那么全部在进程池后面排队的进程就须要等更长的时间才能获取进程池里面的执行进程来执行本身,因此通常咱们就将请求做成一个执行函数,经过进程池去异步执行,剩下的数据处理的内容放到另一个进程或者主进程中去执行,将网络延迟的时间也利用起来,效率更高。
requests这个模块的get方法请求页面,就和咱们在浏览器上输入一个网址而后回车去请求别人的网站的效果是同样的。安装requests模块的指令:在cmd窗口执行pip install requests。
import requests response = requests.get('http://www.baidu.com') print(response) print(response.status_code) #200正常,404找不到网页,503等5开头的是人家网站内部错误 print(response.content.decode('utf-8'))
from multiprocessing import Pool import requests import json import os def get_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的结果,其实彻底不必拿该结果,该结果已经传给回调函数处理了 ''' 打印结果: <进程3388> get https://www.baidu.com <进程3389> get https://www.python.org <进程3390> get https://www.openstack.org <进程3388> get https://help.github.com/ <进程3387> parse https://www.baidu.com <进程3389> get http://www.sina.com.cn/ <进程3387> parse https://www.python.org <进程3387> parse https://help.github.com/ <进程3387> parse http://www.sina.com.cn/ <进程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''
from multiprocessing import Pool import time,random import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() # res=requests.get('http://maoyan.com/board/7') # print(re.findall(pattern,res.text))
若是在主进程中等待进程池中全部任务都执行完毕后,再统一处理结果,则无需回调函数
from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待进程池中全部进程执行完毕 nums=[] for res in res_l: nums.append(res.get()) #拿到全部结果 print(nums) #主进程拿到全部的处理结果,能够在主进程中进行统一进行处理
进程池和信号量的区别:
进程池是多个须要被执行的任务在进程池外面排队等待获取进程对象去执行本身,而信号量是一堆进程等待着去执行一段逻辑代码。
信号量不能控制建立多少个进程,可是能够控制同时多少个进程可以执行,可是进程池能控制你能够建立多少个进程。
举例:就像那些开大车拉煤的,信号量是什么呢,就比如我只有五个车道,你每次只能过5辆车,可是不影响你建立100辆车,可是进程池至关于什么呢?至关于你只有5辆车,每次5个车拉东西,拉完你再把车放回来,给别的人拉煤用。
其余语言里面有更高级的进程池,在设置的时候,能够将进程池中的进程动态的建立出来,当需求增大的时候,就会自动在进程池中添加进程,需求小的时候,自动减小进程,而且能够设置进程数量的上线,最多为多,python里面没有。