原理:celery-beat做为任务调度,当达到定时时间时,beat将任务id装载进rabbitmq队列中,worker在队列的另外一端取出任务id,并匹配当前注册的任务。若是没有注册,那么会报错。除此以外,worker还会经过celery-once来尝试从redis中获取分布式锁,只有获取到锁的worker才会执行这个任务。worker执行成功或者失败经过flower监控
pip install celery pip install flower pip install celery_once
代码结构以下:html
任务函数sendDingTest.py(钉钉机器人):python
from dingtalkchatbot.chatbot import DingtalkChatbot from start import celery_app from datetime import datetime import socket from celery_once import QueueOnce WEB_HOOK_SPIDER = '在钉钉机器人页面获取web_hook' # 基于任务名及传递的参数值来确认是不是同一个任务 @celery_app.task(base=QueueOnce, once={'graceful': True}) def send_ding_test(arg1, arg2): dingding = DingtalkChatbot(WEB_HOOK_SPIDER) arg3 = arg1 + arg2 dingding.send_text(msg="城市数据定时任务测试-{},{} 执行时刻:{}".format(arg3, socket.gethostname(), datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
配置文件config.py:git
from celery.schedules import crontab class celeryConfig(object): # accept_content = ['json'] # 能够是set,list,tuple,pickle,yaml # result_accept_content = ['json'] timezone = 'Asia/Shanghai' # 中国只有两个时区,一个上海,一个乌鲁木齐 broker_url = "amqp://user:password@ip:port/vhost" backend = "" include = ['jobs.sendDingTest'] # worker启动时要导入的任务模块,须要在这里添加,以便worker可以找到咱们的任务 beat_schedule = { 'add-every-monday-morning': { 'task': 'jobs.sendDingTest.send_ding_test', # 这里要写全路径,不然worker找不到 'schedule': crontab(minute="*/2"), 'args': (16, 16), }, } # celery-once配置 ONCE = { 'backend': 'celery_once.backends.Redis', 'settings': { 'url': 'redis://ip:port/database', 'default_timeout': 60 * 60 # 分布式锁的默认超时时间 } }
启动函数start.py:github
from celery import Celery import os # windows平台须要设置, os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1') celery_app = Celery() celery_app.config_from_object('config.celeryConfig')
# 注意:beat,worker,flower均可以不在同一台服务器上,分布开的话,须要在其余服务器上copy一份代码 # 开启beat(start是文件名) celery -A start.celery_app beat # 开启worker celery -A start.celery_app worker -c 1 -l info # 开启flower(默认地址是localhost:5555) celery -A start flower
如下是flower页面,能够查看worker数量、消息队列(须要rabbitmq开启rabbitmq_management)和任务执行结果等。web
celery-once文档redis
flower文档shell
celery官方文档json