分布式进程在python 中依然要用到multiprocessing 模块。multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。能够写一个服务进程做为调度者,将任务分布到其余多
个进程中,依靠网络通讯进行管理。例子:在作爬虫程序时,抓取某个网站的全部图片,若是使用多进程的话,通常是一个进程负责抓取图片的连接地址,将连接地址放到queue中,另外的进程负责从queue中取连接地址进行下载和存储到本地。如今把这个过程作成分布式,一台机器上的进程负责抓取连接地址,其余机器上的进程负责系在存储。那么遇到的主要问题是将queue 暴露到网络中,让其余机器进程均可以访问,分布式进程就是将这个过程进行了封装,咱们能够将这个过程称为本地队列的网络化python
#!coding:utf-8 from multiprocessing.managers import BaseManager from multiprocessing import freeze_support, Queue # 任务个数 task_number = 10 # 收发队列 task_quue = Queue(task_number) result_queue = Queue(task_number) def get_task(): return task_quue def get_result(): return result_queue # 建立相似的queueManager class QueueManager(BaseManager): pass def win_run(): # 注册在网络上,callable 关联了Queue 对象 # 将Queue对象在网络中暴露 #window下绑定调用接口不能直接使用lambda,因此只能先定义函数再绑定 QueueManager.register('get_task_queue', callable=get_task) QueueManager.register('get_result_queue', callable=get_result) # 绑定端口和设置验证口令 manager = QueueManager(address=('127.0.0.1', 8001), authkey='qiye'.encode()) # 启动管理,监听信息通道 manager.start() try: # 经过网络获取任务队列和结果队列 task = manager.get_task_queue() result = manager.get_result_queue() # 添加任务 for url in ["ImageUrl_" + str(i) for i in range(10)]: print('url is %s' % url) task.put(url) print('try get result') for i in range(10): print('result is %s' % result.get(timeout=10)) except: print 'Manager error' finally: manager.shutdown() if __name__ == '__main__': # window下多进程可能有问题,添加这句话缓解 freeze_support() win_run()
#coding:utf-8 import time from multiprocessing.managers import BaseManager # 建立相似的QueueManager: class QueueManager(BaseManager): pass # 实现第一步:使用QueueManager注册获取Queue的方法名称 QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 实现第二步:链接到服务器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证口令注意保持与服务进程设置的彻底一致: m = QueueManager(address=(server_addr, 8001), authkey='qiye') # 从网络链接: m.connect() # 实现第三步:获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 实现第四步:从task队列取任务,并把结果写入result队列: while(not task.empty()): image_url = task.get(True,timeout=5) print('run task download %s...' % image_url) time.sleep(1) result.put('%s--->success'%image_url) # 处理结束: print('worker exit.')
C:\Python27\python.exe F:/python_scrapy/python_study/taskManager.py url is ImageUrl_0 url is ImageUrl_1 url is ImageUrl_2 url is ImageUrl_3 url is ImageUrl_4 url is ImageUrl_5 url is ImageUrl_6 url is ImageUrl_7 url is ImageUrl_8 url is ImageUrl_9 try get result result is ImageUrl_0--->success result is ImageUrl_1--->success result is ImageUrl_2--->success result is ImageUrl_3--->success result is ImageUrl_4--->success result is ImageUrl_5--->success result is ImageUrl_6--->success result is ImageUrl_7--->success result is ImageUrl_8--->success result is ImageUrl_9--->success Process finished with exit code 0
C:\Python27\python.exe F:/python_scrapy/python_study/taskWorker.py Connect to server 127.0.0.1... run task download ImageUrl_0... run task download ImageUrl_1... run task download ImageUrl_2... run task download ImageUrl_3... run task download ImageUrl_4... run task download ImageUrl_5... run task download ImageUrl_6... run task download ImageUrl_7... run task download ImageUrl_8... run task download ImageUrl_9... worker exit. Process finished with exit code 0