python之路-day32-管道、数据共享、进程池

 

 

1、管道(不推荐使用,了解便可)数据库

  进程间通讯(IPC)方式二:管道(不推荐使用,了解便可),会致使数据不安全的状况出现,后面还会提到为何编程

会带来数据不安全的问题。数组

 

 1 #建立管道的类:
 2 Pipe([duplex]):在进程之间建立一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的链接对象,强调一点:必须在产生Process对象以前产生管道
 3 #参数介绍:
 4 dumplex:默认管道是全双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送。
 5 #主要方法:
 6     conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另一端已经关闭,那么recv方法会抛出EOFError。
 7     conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象
 8  #其余方法:
 9 conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法
10 conn1.fileno():返回链接使用的整数文件描述符
11 conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。
12  
13 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。
14 conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收    
15  
16 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。
管道介绍
 1 from multiprocessing import Process, Pipe
 2 
 3 def f(conn):
 4     conn.send("Hello 妹妹") #子进程发送了消息
 5     conn.close()
 6 
 7 if __name__ == '__main__':
 8     parent_conn, child_conn = Pipe() #创建管道,拿到管道的两端,双工通讯方式,两端均可以收发消息
 9     p = Process(target=f, args=(child_conn,)) #将管道的一段给子进程
10     p.start() #开启子进程
11     print(parent_conn.recv()) #主进程接受了消息
12     p.join()
管道初使用
1 应该特别注意管道端点的正确管理问题。若是是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为什么在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。若是忘记执行这些步骤,程序可能在消费者中的recv()操做上挂起(就是阻塞)。管道是由操做系统进行引用计数的,必须在全部进程中关闭管道的相同一端就会能生成EOFError异常。所以,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点
管道使用注意须知
from multiprocessing import Process, Pipe

def f(parent_conn,child_conn):
    #parent_conn.close() #不写close将不会引起EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()
            break

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.close()
    p.join()            
引起EOFError

主进程将管道的两端都传送给子进程,子进程和主进程共用管道的两种报错状况,都是在recv接收的时候报错的:安全

    1.主进程和子进程中的管道的相同一端都关闭了,出现EOFError;并发

    2.若是你管道的一端在主进程和子进程中都关闭了,可是你还用这个关闭的一端去接收消息,那么就会出现OSError;app

 

2、数据共享(了解便可)异步

  基于消息传递的并发编程是大势所趋async

  即使是使用线程,推荐作法也是将程序设计为大量独立的线程集合分布式

  经过消息队列交换数据,这样极大的减小了对使用锁定和其余同步手段的需求,还能够扩展到分布式系统中ide

注意:进程之间应当是尽可能避免通讯,即便须要通讯,也应该选择进程安全的工具来避免加锁带来的问题。应该尽可能避免使用

共享数据的方法,之后会尝试使用数据库来解决进程之间的数据共享问题

  进程之间数据共享模块之一Manager模块:

1 进程间数据是独立的,能够借助于队列或管道实现通讯,两者都是基于消息传递的
2 虽然进程间数据独立,但能够经过Manager实现数据共享,事实上Manager的功能远不止于此
3 
4 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
5 
6 A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
Manager模块介绍

  多进程共同去处理共享数据的时候,就和咱们多进程同时去操做一个文件中的数据同样,不加锁就会出现错误的结果,进程

不安全的,因此也须要加锁

 1 from multiprocessing import Manager,Process,Lock
 2 def work(d,lock):
 3     with lock: #不加锁而操做共享的数据,确定会出现数据错乱
 4         d['count']-=1
 5 
 6 if __name__ == '__main__':
 7     lock=Lock()
 8     with Manager() as m:
 9         dic=m.dict({'count':100})
10         p_l=[]
11         for i in range(100):
12             p=Process(target=work,args=(dic,lock))
13             p_l.append(p)
14             p.start()
15         for p in p_l:
16             p.join()
17         print(dic)
Manager模块使用

  总结下,进程之间的通讯:队列、管道、数据共享

 

3、进程池

  multiprocess.pool 模块

  建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个

进程去执行使用全部的任务(高级一些的进程池能够根据你的并发量,搞成动态增长或减小进程池中进程数量的操做),

不会开启其余进程,提升操做系统效率,减小空间的占用等

 1 numprocess:要建立的进程数,若是省略,将默认使用cpu_count()的值
 2 initializer:是每一个工做进程启动时要执行的可调用对象,默认为None
 3 initargs:是要传给initializer的参数组
 4 
 5 p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。
 6 '''须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()'''
 7 
 8 p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。
 9 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。'''
10     
11 p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
12 
13 P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用
14 
15 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法
16 obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起。
17 obj.ready():若是调用完成,返回True
18 obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常
19 obj.wait([timeout]):等待结果变为可用。
20 obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数
Manager的方法
 1 import time
 2 from multiprocessing import Pool,Process
 3 
 4 #针对range(100)这种参数的
 5 # def func(n):
 6 #     for i in range(3):
 7 #         print(n + 1)
 8 
 9 def func(n):
10     print(n)
11     # 结果:
12     #     (1, 2)
13     #     alex
14 def func2(n):
15     for i in range(3):
16         print(n - 1)
17 if __name__ == '__main__':
18     #1.进程池的模式
19     s1 = time.time()  #咱们计算一下开多进程和进程池的执行效率
20     poll = Pool(5) #建立含有5个进程的进程池
21     # poll.map(func,range(100)) #异步调用进程,开启100个任务,map自带join的功能
22     poll.map(func,[(1,2),'alex']) #异步调用进程,开启100个任务,map自带join的功能
23     # poll.map(func2,range(100))  #若是想让进程池完成不一样的任务,能够直接这样搞
24     #map只限于接收一个可迭代的数据类型参数,列表啊,元祖啊等等,若是想作其余的参数之类的操做,须要用后面咱们要学的方法。
25     # t1 = time.time() - s1
26     #
27     # #2.多进程的模式
28     # s2 = time.time()
29     # p_list = []
30     # for i in range(100):
31     #     p = Process(target=func,args=(i,))
32     #     p_list.append(p)
33     #     p.start()
34     # [pp.join() for pp in p_list]
35     # t2 = time.time() - s2
36     #
37     # print('t1>>',t1) #结果:0.5146853923797607s 进程池的效率高
38     # print('t2>>',t2) #结果:12.092015027999878s
进程池的简单应用及与进程池的效率对比
相关文章
相关标签/搜索