gearman 分布式系统学习 python

前段时间遇到了这样一个需求html

某一客户端向一个服务器提交任务python

服务器再将分发下去由对于的工做人员来完成linux

前辈告诉我用gearman搭建一个分布式系统。gearman有三个部分,client、service和worker
服务器

client:提交任务多线程

service:分配任务分布式

worker:执行任务函数

它能够实现的效果就是一台机器上搭建好了服务器测试

另外能够多台机器做为客户端,能够一块儿提交任务,而后服务器会用个队列来存起来spa

而后就是起多台机器做为worker去服务器要任务线程

把每一个步骤都分布到了不一样的主机上,这就是典型的分布式系统。

(感受这个这就是一个增强版的多线程机制,一个线程提交任务到队列,起多个线程去队列中去。就是一个网游一个是单机)

 

详细介绍能够看官网Gearman,官网上还有各类语言的例子

配置过程的话跟着网上来就能够,前辈说linux就是配置特别麻烦,等配置好了以后,用起来就方便了

 

gearman client

 提交任务用法很简单,如下是用例

gm_client = gearman.GearmanClient(['localhost:4730'])
    
gm_client.submit_job(GRARMAN_TASK_NAME, data, priority=gearman.PRIORITY_HIGH, background=True)
  

记得先引包 import gearman

['localhost:4730'] 就是gearman服务器的位置,端口默认是4730

而后client有一个submit_job的方法,下面有该函数的源码,有一堆参数意思就和名字同样。如background 参数就是提交后台任务False就是等待返回结果,适用于大量提交任务。

其中最重要的参数 就是 task  在测试代码中我填写是 GRARMAN_TASK_NAME 。这个是任务的惟一标识,就是能够有多个client提交任务,但服务器怎么识别任务呢,就是靠这参数,固然worker也是靠这个参数识别。(我第一次写的时候把这个参数写成了“echo”结果提交了一坨任务,但我本身的worker只收到几个,我调试了很久才发现。。。)

gearman.client.submit_job 源码

 

 

1     def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
2         """Submit a single job to any gearman server"""
3         job_info = dict(task=task, data=data, unique=unique, priority=priority)
4         completed_job_list = self.submit_multiple_jobs([job_info], background=background, wait_until_complete=wait_until_complete, max_retries=max_retries, poll_timeout=poll_timeout)
5         return gearman.util.unlist(completed_job_list)

 

gearman worker

worker里面的话,除了GRARMAN_TASK_NAME  和 ['localhost:4730']值得注意意外,还有一个地方就是每一个worker都须要注册一下本身要作的任务,就是下面代码中的

gm_worker.register_task(GRARMAN_TASK_NAME, task_listener_reverse)

注册的名字须要和对应client中的一致,后面一个参数是一个方法,写的格式也是代码中的那样,至关于就是拿到了任务怎么干,这个怎么干的过程就写到方法里面

 

 

gm_worker = gearman.GearmanWorker(['localhost:4730'])
    
def task_listener_reverse(gearman_worker, gearman_job):
     
     #gearman_job 就是client端传过来的数据

     print "这里是想要干的事"
    
     return gearman_job.data[::-1]#返回数据的逆序
 

#GRARMAN_TASK_NAME  这个名字须要是任务的惟一标识
gm_worker.register_task(GRARMAN_TASK_NAME, task_listener_reverse)

 

 

 


 

GearmanAdminClient

今天有一个需求

获得gearman服务上有多少个job,又有多少worker正在工做

而后根据job和worker的数量进行一些相应的调整工做

忽然发现gearman中GearmanAdminClient有如下两个方法,瞬间完成任务

 

def get_workers(self):
    """Retrieves a list of workers and reports what tasks they're operating on"""
    self.establish_admin_connection()
    self.current_handler.send_text_command

(GEARMAN_SERVER_COMMAND_WORKERS)
    return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_WORKERS)


def get_status(self):
    """Retrieves a list of all registered tasks and reports how many items/workers are in 

the queue"""
    self.establish_admin_connection()
    self.current_handler.send_text_command(GEARMAN_SERVER_COMMAND_STATUS)
    return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_STATUS)

测试代码

    ad_client=gearman.GearmanAdminClient(['localhost:4730'])
    
    list=ad_client.get_workers()
    for row in list:
        print row
      
    print "\n"
    
    list=ad_client.get_status()
    for row in list:
        print row

部分结果展现

{'file_descriptor': '34', 'tasks': (), 'client_id': '-', 'ip': '127.0.0.1'}
{'file_descriptor': '50', 'tasks': ('resize', 'like', 'dislike'), 'client_id': '-', 'ip': '127.0.0.1'}
{'file_descriptor': '46', 'tasks': ('resize', 'like', 'dislike'), 'client_id': '-', 'ip': '127.0.0.1'}
{'file_descriptor': '59', 'tasks': ('add_phone_info',), 'client_id': '-', 'ip': '127.0.0.1'}
{'file_descriptor': '55', 'tasks': ('add_phone_info',), 'client_id': '-', 'ip': '127.0.0.1'}

{'workers': 0, 'running': 0, 'task': 'apkcrawler', 'queued': 22028}
{'workers': 0, 'running': 0, 'task': 'reverse', 'queued': 0}
{'workers': 1, 'running': 0, 'task': 'echo', 'queued': 0}
{'workers': 10, 'running': 0, 'task': 'add_phone_info', 'queued': 0}
{'workers': 0, 'running': 0, 'task': 'write_hbase', 'queued': 0}
{'workers': 0, 'running': 0, 'task': 'write_amazon', 'queued': 0}
{'workers': 10, 'running': 0, 'task': 'dislike', 'queued': 0}
相关文章
相关标签/搜索