Celery 是一个“自带电池”的的任务队列。它易于使用,因此你能够无视其所解决问题的复杂程度而轻松入门。它遵守最佳实践设计,因此你的产品能够扩展,或与其余语言集成,而且它自带了在生产环境中运行这样一个系统所需的工具和支持。html
Celery 的最基础部分。包括:redis
选择和安装消息传输方式(中间人)----broker,如RabbitMQ,redis等。sql
pip install celery
tasks.pydjango
from celery import Celery #第一个参数是你的celery名称 #backen 用于存储结果 #broker 用于存储消息队列 app = Celery('tasks',backend='redis://:password@host:port/db', broker='redis://:password@host:port/db') @app.task def add(x, y): return x + y
Celery 的第一个参数是当前模块的名称,这个参数是必须的,这样的话名称能够自动生成。第二个参数是中间人关键字参数,指定你所使用的消息中间人的 URL,此处使用了 RabbitMQ,也是默认的选项。更多可选的中间人见上面的 选择中间人 一节。例如,对于 RabbitMQ 你能够写 amqp://localhost ,而对于 Redis 你能够写 redis://localhost .json
你定义了一个单一任务,称为 add ,返回两个数字的和。后端
步骤:缓存
启动一个工做者,建立一个任务队列app
// -A 指定celery名称,loglevel制定log级别,只有大于或等于该级别才会输出到日志文件 celery -A tasks worker --loglevel=info
若是你没有安装redis库,请先pip install redisnosql
如今咱们已经有一个celery队列了,我门只须要将工做所需的参数放入队列便可分布式
from tasks import add #调用任务会返回一个 AsyncResult 实例,可用于检查任务的状态,等待任务完成或获取返回值(若是任务失败,则为异常和回溯)。 #但这个功能默认是不开启的,你须要设置一个 Celery 的结果后端(即backen,咱们在tasks.py中已经设置了,backen就是用来存储咱们的计算结果) result=add.delay(4, 4) #若是任务已经完成 if(result.ready()): #获取任务执行结果 print(result.get(timeout=1))
经常使用接口
config.py
#broker BROKER_URL = 'redis://:password@host:port/db' #backen CELERY_RESULT_BACKEND = 'redis://:password@host:port/db' #导入任务,如tasks.py CELERY_IMPORTS = ('tasks', ) #列化任务载荷的默认的序列化方式 CELERY_TASK_SERIALIZER = 'json' #结果序列化方式 CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] #时间地区与形式 CELERY_TIMEZONE = 'Europe/Oslo' #时间是否使用utc形式 CELERY_ENABLE_UTC = True #设置任务的优先级或任务每分钟最多执行次数 CELERY_ROUTES = { # 若是设置了低优先级,则可能好久都没结果 #'tasks.add': 'low-priority', #'tasks.add': {'rate_limit': '10/m'}, #'tasks.add': {'rate_limit': '10/s'}, #'*': {'rate_limit': '10/s'} } #borker池,默认是10 BROKER_POOL_LIMIT = 10 #任务过时时间,单位为s,默认为一天 CELERY_TASK_RESULT_EXPIRES = 3600 #backen缓存结果的数目,默认5000 CELERY_MAX_CACHED_RESULTS = 10000
celery.py
from celery import Celery #指定名称 app = Celery('mycelery') #加载配置模块 app.config_from_object('config') if __name__=='__main__': app.start()
tasks.py
from .celery import app @app.task def add(a, b): return a + b
// -l 是 --loglevel的简写 celery -A mycelery worker -l info
from tasks import add #调用任务会返回一个 AsyncResult 实例,可用于检查任务的状态,等待任务完成或获取返回值(若是任务失败,则为异常和回溯)。 #但这个功能默认是不开启的,你须要设置一个 Celery 的结果后端(即backen,咱们在tasks.py中已经设置了,backen就是用来存储咱们的计算结果) result=add.delay(4, 4) #若是任务已经完成 if(result.ready()): #获取任务执行结果 print(result.get(timeout = 1))
启动多个celery worker,这样即便一个worker挂掉了其余worker也能继续提供服务
// 启动三个worker:w1,w2,w3 celery multi start w1 -A project -l info celery multi start w2 -A project -l info celery multi start w3 -A project -l info // 当即中止w1,w2,即使如今有正在处理的任务 celery multi stop w1 w2 // 重启w1 celery multi restart w1 -A project -l info // celery multi stopwait w1 w2 w3 # 待任务执行完,中止
// 启动多个worker,可是不指定worker名字 // 你能够在同一台机器上运行多个worker,但要为每一个worker指定一个节点名字,使用--hostname或-n选项 // concurrency指定处理进程数,默认与cpu数量相同,所以通常无需指定 $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h
celery能够指定在发生错误的状况下进行自定义的处理
config.py
def my_on_failure(self, exc, task_id, args, kwargs, einfo): print('Oh no! Task failed: {0!r}'.format(exc)) // 对全部类型的任务,当发生执行失败的时候所执行的操做 CELERY_ANNOTATIONS = {'*': {'on_failure': my_on_failure}}