celery 是 python 中的经常使用的任务队列框架,常常用于异步调用、后台任务等工做。celery 自己以 python 写,但协议可在不一样的语言中实现,其它语言也能够用 celery 执行相应的任务。在 web 应用,为提升系统响应速度,发送邮件、数据整理等须要长时间执行的任务,一般以异步任务的方式执行,这时就须要用到像 celery 类的框架。另外一种常见的场景是大型系统的分布式处理,为了提高系统性能,各个组件一般以多个实例运行不一样主机上,而组件之间的调用就须要用到 celery 这样的框架。使用 celery (或消息队列),有助于下降系统组件之间的耦合,有助于实现灰度发布、实现服务的分布式、实现水平扩展,最终提高系统健壮性和处理性能。python
celery (和相似框架)的核心是任务队列。用户发起任务,celery 负责把任务排队和整理,而后交到任务执行器 worker 中。 worker 监视任务队列,获取新任务并执行。在 celery 内部,以消息机制协调各个组件工做,消息须要借助一个中间人 broker 进行,以下 ::web
client → celery task → broker → celery worker ↑ ↓ ← ← ← ← result backend
client 发起任务时,通常是以异步方式(除非必要的同步 rpc ),得到一个任务的 id 并保存下来,后续可经过 id 到 result backend 中查询任务执行结果。broker 是第三方组件,可以使用消息队列( rabbitmq 等)、redis、数据库等,只要能实现消息的存储和分发理论上都能使用。 worker 以线程或进程的形式运行,从 broker 中取任务执行,而后把结果保存到 result backend 。redis
目前 rabbitmq 的 broker 实现的功能最完备,在开发环境中也可使用 sqlite 等比较方便的方式,但性能会不好,不能用在生产环境上。sql
另外须要注意的是,因为不一样操做系统的进程模型的差别,celery 会在 windows 上产生一些配置方面的怪异问题。docker
celery 可直接经过 pip 安装,在 virtualenv 下,直接运行 ::数据库
pip install celery
再安装 broker 所须要的驱动,例如使用 rabbitmq ,则安装 ::json
pip install amqp
同时安装好 rabbitmq (建议经过 docker 安装,使用 rabbitmq:management 镜像,可在 15672 端口查看管理控制台)。canvas
而后使用下面的代码示例(摘录来自: Ask Solem. “Celery Manual, Version 3.1“) ::windows
# hello.py from celery import Celery app = Celery('hello', broker='amqp://guest:guest@localhost//') @app.task def hello(): return 'hello world' if __name__ == '__main__': r = hello.delay()
而后,启动 worker ::网络
celery -A hello worker --loglevel=info
client 执行任务 ::
python hello.py
app.task 装饰器标记一个函数为 celery 任务,client 用 delay 方法执行时。 delay 调用 apply_async() 进行异步执行, apply_async 还可配置如队列、countdown 等执行选项。 celery 返回一个 AsyncResult 对象,若是 result backend 配置正确,client 可暂时把对象中的任务 id 保存到数据库,后面再经过这个 id 获取异步执行的结果。
上面的简单例子是没有参数的,若是增长参数,以下 ::
# add.py from celery import Celery app = Celery('add', broker='amqp://guest:guest@localhost//', backend='db+sqlite:///celery_result.db') @app.task def add(x, y): return x+y if __name__ == '__main__': r = add.delay(1, 2) print(r.wait())
启动 worker ::
celery -A add worker --l info
调用 ::
python add.py
当任务结果用 amqp 保存时,结果只能取一次, 所以没法在后续调用中查询任务结果。这个例子用 sqlite 保存了任务执行结果,所以 client 可在 r.wait() 查询任务的结果、任务的状态等等不少信息,可把 r.id 保存到数据库,而后将来查询任务的 AsyncResult ::
r2 = app.AsyncResult(r.id) print(r2.wait()) print(r2.successful())
add.py 中使用了两个参数 x y ,而 celery 须要经过 broker 传递这两个参数,这时须要对数据进行序列化,将 x y 对象转换为无结构的数据,而后 worker 接收到后再把数据还原为 x y 对象。 celery 内置的序列化方法包括 pickle 、 json 等等,若是对象比较复杂,须要本身定义序列化方法。
若是不想当即执行任务,而是把任务传递到其它地方,经过 celery 的 subtask 支持。 subtask 是对 task 的调用参数和执行选项的一个封装,如 ::
add.subtask((2,2), countdown=10) add.s(2,2)
subtask 或 s 返回的是一个 task 的签名(celery.canvas.Signature),它可实现工做流、偏函数等效果。subtask 支持和 task 一样的调用方法,如 ::
s = add.s(2) # subtask ,partial s.delay(2) # 发送消息开始异步执行
在 celery 工做流中组织 subtask 的方式有 group / chain / chord 等等, group 中任务并发执行,chain 中任务顺序执行,chord 中进行回调。而这些组织方式自己也是 subtask ,可嵌套使用 ::
# workflow.py from celery import Celery, group, chain app = Celery('add', broker='amqp://guest:guest@localhost//', backend='db+sqlite:///celery_result.db') @app.task def add(x, y): return x+y if __name__ == '__main__': g = group((add.s(i, i) for i in range(10))) r = g.delay() print(r.get()) c = chain(add.s(1, 2) | add.s(3)) r2 = c.delay() print(r2.get())
celery 的任务调用经过网络发送任务的名字和参数,不发送任务代码, worker 收到任务后根据任务名和参数执行相应的代码。所以不一样 worker 中的代码版本不同时,会有不一样的处理结果。若是 worker 中不能处理相应的任务名,就会报错。