1、Celery使用指南
Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,而且提供维护这样一个系统的必需工具。python
它是一个专一于实时处理的任务队列,同时也支持任务调度。redis
任务队列是一种在线程或机器间分发任务的机制。数据库
消息队列的输入是工做的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有须要处理的新任务。django
Celery 用消息通讯,一般使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,以后中间人把消息派送给职程。json
Celery 系统可包含多个职程和中间人,以此得到高可用性和横向扩展能力。服务器
使用场景并发
将耗时的操做任务提交给 Celery 去异步执行,好比发送短信/邮件、消息推送、音视频处理等等app
相似于 crontab ,好比每日数据统计异步
Celery 扮演生产者和消费者的角色,分布式
Celery Beat : 任务调度器. Beat 进程会读取配置文件的内容, 周期性的将配置中到期须要执行的任务发送给任务队列.
Celery Worker : 执行任务的消费者, 一般会在多台服务器运行多个消费者, 提升运行效率.
Broker : 消息代理, 队列自己. 也称为消息中间件. 接受任务生产者发送过来的任务消息, 存进队列再按序分发给任务消费方(一般是消息队列或者数据库).
Producer : 任务生产者. 调用 Celery API , 函数或者装饰器, 而产生任务并交给任务队列处理的都是任务生产者.
Result Backend : 任务处理完成以后保存状态信息和结果, 以供查询.
结构图:
生产环境的消息代理有 RabbitMQ 和 Redis, 官方推荐 RabbitMQ.
安装celery相关库:
pip install celery==3.1.5 pip install django-celery==3.2.2 pip install redis==2.10.6
安装实时监控和管理Web界面工具:
pip install flower
若是安装最新django-celery3.2版本,则须要celery4版本小于4.0大于3。0的版本,且redis的版本必须小于3.0,所以各依赖库的版本号如上定义。
项目结构图以下:
INSTALLED_APPS = [ ... 'djcelery', #django-celery必须添加 'app', ] # 以下配置celery等信息 import djcelery # 当settings.py中的djcelery.setup_loader()运行时, # Celery便会查看全部INSTALLED_APPS中app目录中的tasks.py文件, 找到标记为task的function, # 并将它们注册为celery task. djcelery.setup_loader() #加载djcelery #并无北京时区,与下面TIME_ZONE应该一致 CELERY_TIMEZONE = 'Asia/Shanghai' # 消息队列 BROKER_URL = 'redis://127.0.0.1:6379' # 配置backend # CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend' # 设置worker的并发数量为2 CELERY_CONCURRENCY = 2 # 结果存储位置 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379' # 任务序列化和反序列化为json CELERY_TASK_SERIALIZER = 'json' # 存储结果序列化为json CELERY_RESULT_SERIALIZER = 'json'
在celery.py文件中定义:
from __future__ import absolute_import import os from celery import Celery from django.conf import settings # 设置环境变量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dj_celery_demo.settings') # 实例化Celery app = Celery('task') # 使用django的settings文件配置celery app.config_from_object('django.conf.settings') # Celery加载全部注册的应用 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
在tasks.py文件中定义:
import time from dj_celery_demo.celery import app #调度任务 @app.task def add(x, y): # 模拟长时间耗时操做 print('============耗时操做=============') time.sleep(10) print('============耗时操做结束============') return x + y
在views.py文件中定义调用异步任务task,定义以下:
from django.http import HttpResponse from app.tasks import add def task_add(request): # delay() 方法调用任务 # delay 返回的是一个 AsyncResult 对象,里面存的就是一个异步的结果, # 当任务完成时result.ready() 为 true,而后用 result.get() 取结果便可 # 发出request后异步执行该task, 立刻返回response, 从而不阻塞该request, # 使用户有一个流畅的访问过程.那么, 咱们可使用.delay, add.delay(3, 5) # Celery会将task加入到queue中, 并立刻返回.而在一旁待命的worker看到该task后, 便会按照设定执行它, 并将他从queue中移除 return HttpResponse("伪装操做很耗时,不等了")
启动celery命令:
python manage.py celery worker -l info 命令中 -l info 表示将程序中的调试信息打印在控制台中。命令也可简化为python manage.py celery worker
启动flower命令:
python manage.py celery flower --address=127.0.0.1 --port=5555 命令中adderss和port参数能够不用填写,默认启动的IP地址为127.0.0.1,默认端口为5555
启动命令,控制台中信息的展现,以下图所示:
项目启动命令启动后,redis中执行任务状况以下图所示:
注意: 若任务被修改,Celery 须要从新启动,不然没法随之更改
flower任务窗口界面截图:
flower信息中间件界面截图:
flower信息消费状况界面截图: