Celery是一个专一于实时处理和任务调度的分布式任务队列。所谓任务就是消息,消息中的有效载荷中包含要执行任务须要的所有数据。html
使用Celery常见场景:python
Celery特性:redis
Celery架构图: 数据库
产生任务的方式有两种:django
Celery组件介绍:json
Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、Zookeeper、SQLAlchemy等做为消息代理,但用于生产环境只有RabbitMQ和Redis,官方推荐RabbitMQ来做为Celery的消息代理。后端
在客户端和消费者之间传输数据须要序列化和反序列化,Celery支持的序列化方案有pickle,json,yaml,msgpack,通常使用json浏览器
为了提供更高的性能,采用以下方案:缓存
sudo apt-get install rabbitmq-server sudo apt-get install redis-server sudo pip install "celery[librabbitmq,redis,msgpack]"
项目目录结构服务器
tree project project ├── celeryconfig.py ├── celery.py ├── __init__.py └── tasks.py
主程序celery.py:
#!/usr/bin/env python # -*- coding:utf-8 -*- # 拒绝隐式引入,由于celery.py的名字和celery的包名冲突,须要使用这条语句让程序正确运行 from __future__ import absolute_import from celery import Celery # app是 Celery类的实例,建立的时候添加了project.tasks这个模块,也就是包含了project/tasks.py这个文件 app = Celery('project', include=['project.tasks']) # 把Celery配置存放进project/celeryconfig文件,使用app.config_from_object加载配置 app.config_from_object('project.celeryconfig') if __name__ == '__main__': app.start()
任务函数文件tasks.py:
#!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import from project.celery import app # 让任务函数生效的方法是添加app.task装饰器 @app.task def add(x, y): return x + y
配置文件celeryconfig.py:
# -*- coding:utf-8 -*- BROKER_URL = 'amqp://guest:guest@localhost:5672//' # 使用RabbitMQ做为消息代理 CELERY_TASK_PROTOCOL = 1 # 如今celery升级到了4.0,是老版本的librabbitmq与最新的celery4.0 Message Protocol协议不兼容,celery4.0默认使用Task messages Version 2 ,而librabbitmq使用Task messages Version 1 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把结果存在Redis CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化肯反序列化使用msgpack方案 CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果通常性能要求不高,因此使用可读性更好的json CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过时时间 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接收的内容类型
启动消费者:
celery -A project.celery worker -l info
终端界面提供了消息代理和存储结果的地址、并发数量、任务列表、交换类型等
开启另外一个终端,用ipython调用add函数
In [1]: from project.tasks import add In [2]: r = add.delay(1,3) In [3]: r Out[3]: <AsyncResult: a14d2045-ad40-4240-bbcf-1a8f07899485> In [4]: r.result Out[4]: 4 In [5]: r.status Out[5]: u'SUCCESS' In [6]: r.successful() Out[6]: True In [7]: r.backend Out[7]: <celery.backends.redis.RedisBackend at 0x7faae433a450> # 保存在redis中
任务的task_id根据上面提到的task_id得到,能够用下面方法得到结果
# 方法一: In [9]: task_id = 'a14d2045-ad40-4240-bbcf-1a8f07899485' In [10]: add.AsyncResult(task_id).get() Out[10]: 4 # 方法二: In [12]: from celery.result import AsyncResult In [13]: AsyncResult(task_id).get() Out[13]: 4
下面在django中模拟一下如何用celery:
首先先看没有用celery的程序
这就形成不良好的用户体验了,那你先给用户返回程序执行成功,再在后台执行这5秒
下面就用selery
1.安装selery ,不写版本号默认最新,新版本只须要安装一个celery
pip install celery==3.1.25
pip install celery-with-redis==3.0
pip install django-celery==3.2.1
2.在应用目录下建立名为task.py的文件,该文件用于封装耗时任务的
3.配置setting.py
3.1注册进app
INSTALLED_APP=[
. . . . +
‘djcelery’]
3.2 再加上,
import djcelery
djcelery.setup_loader()
BROKER_URL='redis://:密码@数据库ip地址:6379/0' #格式不能错 ,注意分号
CELERY_IMPORTS=(‘myApp.task’) #任务文件
4.迁移,生成celery须要的表
python manage.migrate
5.将名为celery.py的文件加入到同工程目录同名的目录下,这个文件是官网给咱们写好的
from __future__ import absolute_import import os from celery import Celery from django.conf import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'whthas_home.settings') app = Celery('project') #这里改为本身工程名 app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
6.修改与工程目录同名目录下的__init__.py文件,上图红色箭头指处
from project.celery import app as celery_app
7.将耗时程序封装成任务,在task.py文件里
在view.py文件里
这时你去跑程序,会发现很快,但却没有打印开始与结束这两句,其实它没有执行到耗时程序,还缺最后一步
8.启动redis,确保服务器启动
9.启动worker,在黑窗口进入工程目录下,执行命令
python manage.py celery worker --loglevel=info
访问浏览器,就能够看到很快的返回程序执行成功,而耗时程序在黑窗口进行打印:开始程序执行,等待5秒,结束程序执行
相互不影响
浏览器返回
黑窗口打印