本文隶属于《Flask Web 开发实战》番外系列。这篇文章会介绍如何在 Flask 项目中集成 Celery。json
第一步是建立一个 Celery 程序实例。由于 Flask 程序实例一般会命名为 app,为了不冲突,咱们通常会把 Celery 实例命名为 celery 或 celery_app:flask
from celery import Celery
celery = Celery(__name__, broker='pyamqp://guest@localhost//')
@celery.task
def add(x, y):
return x + y复制代码
大多数教程,包括目前的 Flask 文档里都会介绍用下面的方式加载配置:app
celery.conf.update(app.config) # 这里的 app 是 Flask 程序实例复制代码
也就是把 Celery 配置和 Flask 配置写在一块儿,而后从 Flask 程序实例的配置字典里更新配置。函数
但问题是,Celery 从 4.0 开始启用新的小写配置名,某些配置被新的名称替换。虽然旧的大写配置仍然支持,但若是你打算使用小写配置名,或是打算在将来进行迁移,这里的配置加载方式就会失效,由于 Flask 在从文件或对象导入配置时只会导入大写形式的配置变量。url
所以,我建议将 Celery 配置写在单独的文件里,不要和 Flask 配置混在一块儿。按照 Celery 文档的示例,你能够在当前目录建立一个 celeryconfig.py 文件(或是其余名字)保存配置:spa
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True复制代码
而后使用下面的方法加载配置(使用其余模块名,或是在其余路径时,记得修改传入的字符串):code
celery.config_from_object('celeryconfig')复制代码
若是须要在建立 Celery 实例时传入 broker 和 backend 配置,能够直接写出或是从配置模块中导入:对象
from celeryconfig import broker_url
celery = Celery(__name__, broker=broker_url)复制代码
你能够单首创建 Celery 程序,但咱们一般会须要为它添加 Flask 程序上下文支持,由于有时候你的 Celery 任务函数会用到依赖于 Flask 程序上下文的某些变量。教程
下面咱们为 Celery 建立了一个工厂函数,在工厂函数中建立 Celery 实例,加载配置,并实现 Flask 程序上下文支持:开发
from flask import Flask
from celery import Celery
from celeryconfig import broker_url
def make_celery(app):
celery = Celery(__name__, broker=broker_url)
celery.config_from_object('celeryconfig')
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
celery = make_celery(app)复制代码
在定义 Celery 任务的模块里,好比 tasks.py,你能够导入这个 Celery 程序实例:
from app import celery
@celery.task
def add(x, y):
return x + y复制代码
当 Flask 程序也使用工厂函数建立时,咱们能够全局建立 Celery 程序实例,而后在建立 Flask 程序实例的工厂函数里更新 Celery 程序配置并进行上下文设置:
from flask import Flask
from celery import Celery
from celeryconfig import broker_url
celery = Celery(__name__, broker=broker_url)
def create_app():
app = Flask(__name__)
register_celery(app)
return app
def register_celery(app):
celery.config_from_object('celeryconfig')
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask复制代码
一样直接导入 Celery 实例并建立任务:
from app import celery
@celery.task
def add(x, y):
return x + y复制代码
这原本是一个完整的 Celery 入门教程,但由于去年的一次硬盘损坏,对应的示例程序弄丢了,暂时没有毅力重写一遍,因此这篇文章只抽取了其中和 Flask 相关的内容。
由于距离初稿写做的时间已通过去半年多,Celery 的最新版本也已是 4.3.0,若是文中有什么疏漏,或是有更好的实现方式,欢迎评论指出。