Celery是异步消息队列, 能够在不少场景下进行灵活的应用.消息中包含了执行任务所需的的参数,用于启动任务执行, suoy因此消息队列也能够称做html
在web应用开发中, 用户触发的某些事件须要较长事件才能完成. 能够将任务交给celery去执行, 待任务完成后再将结果返回给用户. 用户同步请求触发的其它任务, 如发送邮件,请求云服务等也能够交由celery来完成.python
celery的另外一个重要应用场景则是各类计划任务.git
celery由5个主要组件组成:github
producer: 任务发布者, 经过调用API向celery发布任务的程序, 如web后端的控制器.web
celery beat: 任务调度, 根据配置文件发布定时任务redis
worker: 实际执行任务的程序数据库
broker: 消息代理, 接受任务消息,存入队列再按顺序分发给worker执行.django
result backend: 存储结果的服务器, 通常为各类数据库服务json
总体结构如图所示:后端
broker是celery的关键组件, 目前的可靠选择有RabbitMQ和Redis, 出于稳定性等缘由咱们选择官方推荐的RabbitMQ做为broker.顺便安装librabbitmq做为RabbitMQ的python客户端.
消息的发送接受过程须要对序列进行序列化和反序列化, 从celery3.2开始官方出于安全性缘由拒绝使用python内置的pickle做为序列化方案, 目前celery支持的序列化方案包括:
json: 跨语言的序列化方案
yaml: 拥有更多数据类型, 但python客户端性能不如json
msgpack: 二进制序列化方案, 比json更小更快
若对可读性有要求能够采用json方案, 若追求更高的性能则能够选用msgpack.
result backend用于存储异步任务的结果和状态, 目前可用的有Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等.
可使用boundless方式安装依赖:
pip install "celery[librabbitmq,redis,msgpack]"
建立tasks.py
文件, 并写入:
from celery import Celery app = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/1') @app.task def add(x, y): return x + y
这样咱们建立了celery实例, Celery()的第一个参数为当前module的名称(py文件名或包名).
在终端执行命令以启动服务器:
celery -A tasks worker -l info
-A tasks
参数指定app为模块tasks,-l info
参数指定log级别为info.
当看到这条log时说明celery已就绪:
[2016-09-11 18:04:43,758: WARNING/MainProcess] celery@finley-pc ready.
在python中导入任务并执行
>>> from tasks import add >>> result = add.delay(1,2) >>> result.result 3 >>> result.status 'SUCCESS' >>> result.successful() True
使用一个py文件做为module很是不便, 在更复杂的任务中能够采用python包做为module.
创建python包demo,创建下列文件:
app.py:
from celery import Celery app = Celery('demo', include=['demo.tasks']) app.config_from_object('demo.config') app.start()
config.py
BROKER_URL = 'redis://127.0.0.1:6379/0' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' CELERY_TASK_SERIALIZER = 'msgpack' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
tasks.py
from demo.app import app @app.task def add(x, y): return x + y
在终端中启动:
celery -A demo.app worker -l info
若将-A
参数设为demo
则会默认尝试启动demo.celery
.由于该module与celery重名可能在导入时出现错误, 因此咱们没有采用这种作法.
celery还支持绑定,日志,重试等特性:
from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @app.task(bind=True) def div(self, x, y): logger.info('doing div') try: result = x / y except ZeroDivisionError as e: raise self.retry(exc=e, countdown=5, max_retries=3) return result
bind=true
将app对象做为self参数传给task函数.
前文的示例须要producer主动检查任务的状态,存在诸多不便. 咱们能够在task函数中主动通知producer:
from celery import Celery from demo.app import app from urllib.request import urlopen @app.task def add(x, y): result = x + y url = 'http://producerhost/callback/add?result=%d' % result urlopen(url) return result
上述示例中咱们使用GET请求将结果发送给了producer的回调API, 固然不少状况下能够直接调用回调函数.
Celery易于与Web框架集成, 做者常采用的交互逻辑是:
提供提交任务, 查询任务结果两个API, 由客户端决定什么时候查询结果
采用websocket等技术, 服务器主动向客户端发送结果
固然也能够采用异步IO模式, 这须要一些扩展包的协助:
安装tornado-celery: pip install torando-celery
编写handler:
import tcelery tcelery.setup_nonblocking_producer() from demo.tasks import add calss Users(RequestHandler): @asynchronous def get(): add.apply_async(args=[1,2], callback=self.on_success) def on_success(self, response): users = response.result self.write(users) self.finish()
其它的Web框架也有本身的扩展包:
Django: django-celery
Tornado: tornado-celery
web2py: web2py-celery
Pylons: celery-pylons
Pyramid: pyramid_celery
celery的计划任务有schedule和crontab两种方式.
在config.py中添加配置:
CELERYBEAT_SCHEDULE = { 'add': { 'task': 'demo.tasks.add', 'schedule': timedelta(seconds=10), 'args': (16, 16) } }
启动beat:
celery beat -A demo.app
而后启动worker:
celery -A demo.app worker -l info
或者与celery app一同启动:
celery -B -A demo.app worker -l info
'schedule'能够接受datetime, timedelta或crontab对象:
from celery.schedules import crontab { 'schedule': crontab(hour=7, minute=30, day_of_week=1), pass }
上文中咱们使用本地python函数做为worker, webhook机制容许使用远程的Web服务做为worker.
在使用webhook做为worker时, broker将消息封装为http请求发送给worker, 并按照协议解析返回值.
使用webhook须要在CELERY_IMPORTS
参数中包含celery.task.http
, 或者在启动参数中指定-I celery.task.http
.
broker使用GET或POST方法发送请求, 参数由调用时的关键字参数指定. worker返回json格式的响应:
{'status': 'success', 'retval': ...}
在失败时返回响应:
{'status': 'failure', 'reason': ...}
咱们用django做为worker:
from django.http import HttpResponse import json def add(request): x = int(request.GET['x']) y = int(request.GET['y']) result = x + y response = {'status': 'success', 'retval': result} return HttpResponse(json.dumps(response), mimetype='application/json')
配置django为http://cloudservice/webhook/add
提供Web服务.
从本地添加任务:
>>>from celery.task.http import URL >>>result = URL('http://cloudservice/webhook/add').get_async(x=10, y=10) >>>result.get() 20
URL是HttpDispatchTask的快捷方法(shortcut):
>>> from celery.task.http import HttpDispatchTask >>> res = HttpDispatchTask.delay( ... url='http://cloudservice/webhook/add', ... method='GET', x=10, y=10) >>> res.get() 20
更多关于celery的内容请参阅: