celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢?redis
celery能够支持多台不一样的计算机执行不一样的任务或者相同的任务。vim
若是要说celery的分布式应用的话,就要提到celery的消息路由机制,提到AMQP协议。服务器
具体能够查看AMQP文档详细了解。app
简单理解:分布式
能够有多个"消息队列"(message Queue),不一样的消息能够指定发送给不一样的Message Queue,code
而这是经过Exchange来实现的,发送消息到"消息队列"中时,能够指定routiing_key,Exchange经过routing_key来吧消息路由(routes)到不一样的"消息队列"中去。对象
如图:blog
exchange 对应 一个消息队列(queue),即:经过"消息路由"的机制使exchange对应queue,每一个queue对应每一个worker队列
写个例子:ip
vim demon3.py
from celery import Celery app = Celery() app.config_from_object("celeryconfig") @app.task def taskA(x, y): return x * y @app.task def taskB(x, y, z): return x + y + z @app.task def add(x, y): return x + y
vim celeryconfig.py
from kombu import Queue BORKER_URL = "redis://192.168.48.131:6379/1" #1库 CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2" #2库 CELERY_QUEUES = { Queue("default", Exchange("default"), routing_key = "default"), Queue("for_task_A", Exchange("for_task_A"), routing_key = "for_task_A"), Queue("for_task_B", Exchange("for_task_B"), routing_key = "for_task_B") } #路由 CELERY_ROUTES = { "demon3.taskA":{"queue": "for_task_A", "routing_key": "for_task_A"}, "demon3.taskB":{"queue": "for_task_B", "routing_key": "for_task_B"} }
下面把两个脚本导入服务器:
指定taskA启动一个worker:
# celery -A demon3 worker -l info -n workerA.%h -Q for_task_A
同理:
# celery -A demon3 worker -l info -n workerB.%h -Q for_task_B
下面远程客户端调用:新文件
vim remote.py
from demon3 import * r1 = taskA.delay(10, 20) print (r1.result) print (r1.status) r2 = taskB.delay(10, 20, 30) time.sleep(1) prnit (r2.result) print (r2.status) #print (dir(r2)) r3 = add.delay(100, 200) print (r3.result) print (r3.status) #PENDING
看到状态是PENDING,表示没有执行,这个是由于没有celeryconfig.py文件中指定改route到哪个Queue中,因此会被发动到默认的名字celery的Queue中,可是咱们尚未启动worker执行celery中的任务。
下面,咱们来启动一个worker来执行celery队列中的任务
# celery -A tasks worker -l info -n worker.%h -Q celery ##默认的
能够看到这行的结果为success
print(re3.status) #SUCCESS
定时任务:
Celery 与 定时任务
在celery中执行定时任务很是简单,只须要设置celery对象中的CELERYBEAT_SCHEDULE属性便可。
下面咱们接着在配置文件:celeryconfig.py,添加关于 CELERYBEAT_SCHEDULE 变量到脚本中去:
CELERY_TIMEZONE = 'UTC' CELERYBEAT_SCHEDULE = { 'taskA_schedule' : { 'task':'tasks.taskA', 'schedule':20, 'args':(5,6) }, 'taskB_scheduler' : { 'task':"tasks.taskB", "schedule":200, "args":(10,20,30) }, 'add_schedule': { "task":"tasks.add", "schedule":10, "args":(1,2) } }
注意格式,不然会有问题
启动:
celery -A demon3 worker -l info -n workerA.%h -Q for_task_A
celery -A demon3 worker -l info -n workerB.%h -Q for_task_B
celery -A tasks worker -l info -n worker.%h -Q celery
celery -A demon3 beat