python并发编程之多进程2--------数据共享及进程池和回调函数

1、数据共享 python

1.进程间的通讯应该尽可能避免共享数据的方式 git

2.进程间的数据是独立的,能够借助队列或管道实现通讯,两者都是基于消息传递的。 github

虽然进程间数据独立,但能够用过Manager实现数据共享,事实上Manager的功能远不止于此。编程

 1 命令就是一个程序,按回车就会执行(这个只是在windows状况下)
 2 tasklist 查看进程
 3 tasklist | findstr  pycharm   #(findstr是进行过滤的),|就是管道(tasklist执行的内容就放到管道里面了,
 4 管道后面的findstr  pycharm就接收了)

3.(IPC)进程之间的通讯有两种实现方式:管道和队列windows

 1 from multiprocessing import Manager,Process,Lock
 2 def work(dic,mutex):
 3     # mutex.acquire()
 4     # dic['count']-=1
 5     # mutex.release()
 6     # 也能够这样加锁
 7     with mutex:
 8         dic['count'] -= 1
 9 if __name__ == '__main__':
 10     mutex = Lock()
 11     m = Manager()  #实现共享,因为字典是共享的字典,因此得加个锁
 12     share_dic = m.dict({'count':100})
 13     p_l = []
 14     for i in range(100):
 15         p = Process(target=work,args=(share_dic,mutex))
 16         p_l.append(p)  #先添加进去
 17         p.start()
 18     for i in p_l:
 19         i.join()
 20     print(share_dic)
 21 # 共享就意味着会有竞争,
 22 
 23 
数据共享 View Code

2、进程池 数组

在利用Python进行系统管理的时候,特别是同时操做多个文件目录,或者远程控制多台主机,并行操做能够节约大量的时间。多进程是实现并发的手段之一,须要注意的问题是: 网络

  1. 很明显须要并发执行的任务一般要远大于核数
  2. 一个操做系统不可能无限开启进程,一般有几个核就开几个进程
  3. 进程开启过多,效率反而会降低(开启进程是须要占用系统资源的,并且开启多余核数目的进程也没法作到并行)

例如当被操做对象数目不大时,能够直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但若是是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时能够发挥进程池的功效。 并发

那么什么是进程池呢?进程池就是控制进程数目app

 1 ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool能够提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,若是池尚未满,那么就会建立一个新的进程用来执行该请求;但若是池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

 

进程池的结构: 异步

建立进程池的类:若是指定numprocess为3,则进程池会从无到有建立三个进程,而后自始至终使用这三个进程去执行全部任务,不会开启其余进程

1.建立进程池

 1 Pool([numprocess  [,initializer [, initargs]]]):建立进程池

2.参数介绍

 1 numprocess:要建立的进程数,若是省略,将默认为cpu_count()的值,可os.cpu_count()查看
 2 initializer:是每一个工做进程启动时要执行的可调用对象,默认为None
 3 initargs:是要传给initializer的参数组

3.方法介绍

 1 p.apply(func [, args [, kwargs]]):在一个池工做进程中执行
 2 func(*args,**kwargs),而后返回结果。
 3 须要强调的是:此操做并不会在全部池工做进程中并执行func函数。
 4 若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()
 5 函数或者使用p.apply_async()
 6 
 7 
 8 p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。此方法的结果是AsyncResult类的实例,
 9 callback是可调用对象,接收输入参数。当func的结果变为可用时,
 10 将理解传递给callback。callback禁止执行任何阻塞操做,
 11 不然将接收其余异步操做中的结果。
 12 
 13 p.close():关闭进程池,防止进一步操做。禁止往进程池内在添加任务(须要注意的是必定要写在close()的上方)
 14 
 1 P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用

应用1:

 1 from multiprocessing import Pool
 2 import os,time
 3 def task(n):
 4     print('[%s] is running'%os.getpid())
 5     time.sleep(2)
 6     print('[%s] is done'%os.getpid())
 7     return n**2
 8 if __name__ == '__main__':
 9     # print(os.cpu_count())  #查看cpu个数
 10     p = Pool(4) #最大四个进程
 11     for i in range(1,7):#开7个任务
 12         res = p.apply(task,args=(i,))  #同步的,等着一个运行完才执行另外一个
 13         print('本次任务的结束:%s'%res)
 14     p.close()#禁止往进程池内在添加任务
 15     p.join() #在等进程池
 16     print('主')
apply同步进程池(阻塞)(串行) View Code

 

 1 # ----------------
 2 # 那么咱们为何要用进程池呢?这是由于进程池使用来控制进程数目的,
 3 # 咱们须要几个就开几个进程。若是不用进程池实现并发的话,会开不少的进程
 4 # 若是你开的进程特别多,那么你的机器就会很卡,因此咱们把进程控制好,用几个就
 5 # 开几个,也不会太占用内存
 6 from multiprocessing import Pool
 7 import os,time
 8 def walk(n):
 9     print('task[%s] running...'%os.getpid())
 10     time.sleep(3)
 11     return n**2
 12 if __name__ == '__main__':
 13      p = Pool(4)
 14      res_obj_l = []
 15      for i in range(10):
 16          res = p.apply_async(walk,args=(i,))
 17          # print(res)  #打印出来的是对象
 18          res_obj_l.append(res)  #那么如今拿到的是一个列表,怎么获得值呢?咱们用个.get方法
 19      p.close() #禁止往进程池里添加任务
 20      p.join()
 21      # print(res_obj_l)
 22      print([obj.get() for obj in res_obj_l])  #这样就获得了
 23 
apply_async异步进程池(非阻塞)(并行) View Code

 

那么什么是同步,什么是异步呢?

同步就是指一个进程在执行某个请求的时候,若该请求须要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去

异步是指进程不须要一直等下去,而是继续执行下面的操做,无论其余进程的状态。当有消息返回时系统会通知进程进行处理,这样能够提升执行的效率。

什么是串行,什么是并行呢?

举例:能并排开几辆车的就能够说是“并行”,只能一辆一辆开的就属于“串行”了。很明显,并行的速度要比串行的快得多。(并行互不影响,串行的等着一个完了才能接着另外一个)

应用2:

使用进程池维护固定数目的进程(之前客户端和服务端的改进

 1 from socket import *
 2 from multiprocessing import Pool
 3 s = socket(AF_INET,SOCK_STREAM)
 4 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #端口重用
 5 s.bind(('127.0.0.1',8081))
 6 s.listen(5)
 7 print('start running...')
 8 def talk(coon,addr):
 9     while True:  # 通讯循环
 10         try:
 11             cmd = coon.recv(1024)
 12             print(cmd.decode('utf-8'))
 13             if not cmd: break
 14             coon.send(cmd.upper())
 15             print('发送的是%s'%cmd.upper().decode('utf-8'))
 16         except Exception:
 17             break
 18     coon.close()
 19 if __name__ == '__main__':
 20     p = Pool(4)
 21     while True:#连接循环
 22         coon,addr = s.accept()
 23         print(coon,addr)
 24         p.apply_async(talk,args=(coon,addr))
 25     s.close()
 26 #由于是循环,因此就不用p.join了
 27 
 28 
服务端 View Code

 

 1 from socket import *
 2 c = socket(AF_INET,SOCK_STREAM)
 3 c.connect(('127.0.0.1',8081))
 4 while True:
 5     cmd = input('>>:').strip()
 6     if not cmd:continue
 7     c.send(cmd.encode('utf-8'))
 8     data = c.recv(1024)
 9     print('接受的是%s'%data.decode('utf-8'))
 10 c.close()
 11 
客户端 View Code

 

3、回调函数

 1 回调函数何时用?(回调函数在爬虫中最经常使用)
 2 造数据的很是耗时
 3 处理数据的时候不耗时
 4 
 5 你下载的地址若是完成了,就自动提醒让主进程解析
 6 谁要是好了就通知解析函数去解析(回调函数的强大之处)

须要回调函数的场景:进程池中任何一个任务一旦处理完了,就当即告知主进程:我好了额,你能够处理个人结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

咱们能够把耗时间(阻塞)的任务放到进程池中,而后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

 1 from  multiprocessing import Pool
 2 import requests
 3 import os
 4 import time
 5 def get_page(url):
 6     print('<%s> is getting [%s]' %(os.getpid(),url))
 7     response = requests.get(url)  #获得地址
 8     time.sleep(2)
 9     print('<%s> is  done [%s]'%(os.getpid(),url))
 10     return {'url':url,'text':response.text}
 11 def parse_page(res):
 12     '''解析函数'''
 13     print('<%s> parse [%s]'%(os.getpid(),res['url']))
 14     with open('db.txt','a') as f:
 15         parse_res = 'url:%s size:%s\n' %(res['url'],len(res['text']))
 16         f.write(parse_res)
 17 if __name__ == '__main__':
 18     p = Pool(4)
 19     urls = [
 20         'https://www.baidu.com',
 21         'http://www.openstack.org',
 22         'https://www.python.org',
 23         'https://help.github.com/',
 24         'http://www.sina.com.cn/'
 25     ]
 26     for url in urls:
 27         obj = p.apply_async(get_page,args=(url,),callback=parse_page)
 28     p.close()
 29     p.join()
 30     print('主',os.getpid())  #都不用.get()方法了
 31 
 32 
回调函数(下载网页的小例子) View Code

 

若是在主进程中等待进程池中全部任务都执行完毕后,再统一处理结果,则无需回调函数

 1 from  multiprocessing import Pool
 2 import requests
 3 import os
 4 def get_page(url):
 5     print('<%os> get [%s]' %(os.getpid(),url))
 6     response = requests.get(url)  #获得地址  response响应
 7     return {'url':url,'text':response.text}
 8 if __name__ == '__main__':
 9     p = Pool(4)
 10     urls = [
 11         'https://www.baidu.com',
 12         'http://www.openstack.org',
 13         'https://www.python.org',
 14         'https://help.github.com/',
 15         'http://www.sina.com.cn/'
 16     ]
 17     obj_l= []
 18     for url in urls:
 19         obj = p.apply_async(get_page,args=(url,))
 20         obj_l.append(obj)
 21     p.close()
 22     p.join()
 23     print([obj.get() for obj in obj_l])
 24 
 25 
下载网页小例子(无需回调函数) View Code

 

 

归类: 网络编程socket

相关文章
相关标签/搜索