day 035 管道 和数据共享

管道安全

 

#建立管道的类:
Pipe([duplex]):在进程之间建立一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的链接对象,强调一点:必须在产生Process对象以前产生管道
#参数介绍:
dumplex:默认管道是全双工的,若是将duplex射成False,conn1只能用于接收,conn2只能用于发送。
#主要方法:
    conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接收,recv方法会一直阻塞。若是链接的另一端已经关闭,那么recv方法会抛出EOFError。
    conn1.send(obj):经过链接发送对象。obj是与序列化兼容的任意对象
 #其余方法:
conn1.close():关闭链接。若是conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回链接使用的整数文件描述符
conn1.poll([timeout]):若是链接上的数据可用,返回True。timeout指定等待的最长时限。若是省略此参数,方法将当即返回结果。若是将timeout射成None,操做将无限期地等待数据到达。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。若是进入的消息,超过了这个最大值,将引起IOError异常,而且在链接上没法进行进一步读取。若是链接的另一端已经关闭,不再存在任何数据,将引起EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):经过链接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,而后调用c.recv_bytes()函数进行接收   
 
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或相似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。若是消息长度大于可用的缓冲区空间,将引起BufferTooShort异常。

 

管道报错

主进程将管道的两端都传送给子进程,子进程和主进程共用管道的两种报错状况,都是在recv接收的时候报错的:并发

    1.主进程和子进程中的管道的相同一端都关闭了,出现EOFError;app

    2.若是你管道的一端在主进程和子进程中都关闭了,可是你还用这个关闭的一端去接收消息,那么就会出现OSError;异步

 

    因此你关闭管道的时候,就容易出现问题,须要将全部只用这个管道的进程中的两端所有关闭才行。固然也能够经过异常捕获(try:except EOFerror)来处理。async

    虽然咱们在主进程和子进程中都打印了一下conn1一端的对象,发现两个再也不同一个地址,可是子进程中的管道和主进程中的管道仍是能够通讯的,由于管道是同一套,系统可以记录。    函数

 

    咱们的目的就是关闭全部的管道,那么主进程和子进程进行通讯的时候,能够给子进程传管道的一端就够了,而且用咱们以前学到的,信息发送完以后,再发送一个结束信号None,那么你收到的消息为None的时候直接结束接收或者说结束循环,就不用每次都关闭各个进程中的管道了。spa

数据共享:操作系统

Manager模块介绍线程

多进程共同去处理共享数据的时候,就和咱们多进程同时去操做一个文件中的数据是同样的,不加锁就会出现错误的结果,进程不安全的,因此也须要加锁code

 

from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加锁而操做共享的数据,确定会出现数据错乱
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()    
    with Manager() as m:
        dic=m.dict({'count':100})   #字典形式
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

 

进程池和multiprocess Pool 模块

建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个进程去执行全部任务(高级一些的进程池能够根据你的并发量,搞成动态增长或减小进程池中的进程数量的操做),不会开启其余进程,提升操做系统效率,减小空间的占用等。

定义池子将建立定量的进程:去完成任务,3个出去再回到池子中但进程不关闭,在出去执行任务,直到将任务执行完成结束

进程池的主要方法:

p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。
'''须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()'''
p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。  获得的是返回结果也能够不返回结果
'''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。'''
   
p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用
 
进程池的实例:
进程池的效比进程的效率高不少
map方法是异步执行而且自带close和join方法
from multiprocessing import Process,Pool
import time
def func(n):
    for i in range(5):
        n=n=i
    print(n)
if __name__ == '__main__':
    pool_start_time=time.time()
    pool= Pool(4)
    pool.map(func,range(100))
    pool_end_time=time.time()
    pool_dif_time=pool_end_time-pool_start_time
    p_s_time=time.time()
    p_list=[]
    for i in range(100):
        p2=Process(target=func,args=(i,))
        p2.start()
        p_list.append(p2)
    [p.join() for p in p_list]
    p_e_time=time.time()
    p_dif_time=p_e_time-p_s_time
    print("进程池时间",pool_dif_time)
    print("多进程时间",p_dif_time)

进程池的同步异步:

同步:

from multiprocessing import Process,Pool
import time
def func(i):
    time.sleep(1)
    return i**2
if __name__ == '__main__':
    p=Pool(4)
    for i in range(10):
        res=p.apply(func,args=(i,))   #同步的方法多个进程完成任务  能够获得返回值也能够不提取返回值
        print(res)

异步:

from multiprocessing import Process,Pool
import time
import os
def func(i):
    time.sleep(2)
    # print(os.getpid())
    return i**2
if __name__ == '__main__':
    p=Pool(4)
    res_list=[]
    for i in range(10):
        res=p.apply_async(func,args=(i,))   #异步的方法  获得返回值
        res_list.append(res)
    for i in res_list:
        print(i.get())

回调函数:

进程池中任何一个任务一旦处理完了,就当即告知主进程:我好了额,你能够处理个人结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,这是进程池特有的,普通进程没有这个机制,可是咱们也能够经过进程通讯来拿到返回值,进程池的这个回调也是进程通讯的机制完成的。

import os
from multiprocessing import Pool

def func1(n):
    print('func1>>',os.getpid())
    print('func1')
    return n*n

def func2(nn):
    print('func2>>',os.getpid())
    print('func2')
    print(nn)
    # import time
    # time.sleep(0.5)
if __name__ == '__main__':
    print('主进程:',os.getpid())
    p = Pool(5)
    #args里面的10给了func1,func1的返回值做为回调函数的参数给了callback对应的函数,不能直接给回调函数直接传参数,他只能是你任务函数func1的函数的返回值
    # for i in range(10,20): #若是是多个进程来执行任务,那么当全部子进程将结果给了回调函数以后,回调函数又是在主进程上执行的,那么就会出现打印结果是同步的效果。咱们上面func2里面注销的时间模块打开看看
    #     p.apply_async(func1,args=(i,),callback=func2)
    p.apply_async(func1,args=(10,),callback=func2)

    p.close()
    p.join()

#结果
# 主进程: 11852  #发现回调函数是在主进程中完成的,其实若是是在子进程中完成的,那咱们直接将代码写在子进程的任务函数func1里面就好了,对不对,这也是为何称为回调函数的缘由。
# func1>> 17332
# func1
# func2>> 11852
# func2
# 100
相关文章
相关标签/搜索