本文对Celery进行了研究,因为其实现相对比较复杂没有足够的时间和精力对各方各面的源码进行分析,所以本文根据Celery的使用方法以及实际行为分析其运行原理,并根据查阅相关代码进行了必定程度的验证。
但愿本文能有助于读者理解celery是如何工做的,从而可以更好地使用这个任务框架,而不只仅是复制官网上的例子来配置。python
Celery是Python中任务队列的事实标准。其特色在于:redis
下面咱们结合Celery的基本使用来分析一下Celery是怎么工做的。本文以Python2为例。django
首先,咱们须要定义咱们的Celery进程访问哪一个Redis进程(假设咱们使用Redis做为message backend,在celery的术语中叫作broker)。
Celery提供的方式是建立一个celery instance。咱们假设文件目录以下:服务器
lab - play - __init__.py - celery.py - tasks.py
而后建立lab/play/celery.py
文件:app
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('play', broker='redis://127.0.0.1:6379', include=['play.tasks']) if __name__ == '__main__': app.start()
因为可能会有多个celery进程访问同一个redis,为了让它们之间隔离开就须要给每一个celery实例一个名字,咱们这里就叫play
。
除了name和broker参数之外,还使用了include
参数来告诉全部的works到哪里去import tasks的代码,由于workers才是真正执行全部这些任务的单位。框架
好了,接下来就能够启动celery了。在lab
目录下执行:函数
celery -A play.celery worker -l info
便可启动celery进程。Python的路径和模块系统仍是比较复杂的,所以在指定包名的时候要注意。code
除了使用celery命令之外,因为咱们再celery.py中已经加了
if __name__ == '__main__':
部分代码,所以也能够在lab
下直接执行:
python -m play.celery -A play.celery worker -l info
对象
在启动了celery之后,celery进程监听redis消息,并fork出多个worker进程准备将监听到的消息分发给它们执行。队列
如今执行的部分有了,咱们开始定义真正须要执行的部分。
咱们能够专门写一个文件来存听任务代码(也能够直接写在celery.py里面):
# lab/play/tasks.py from __future__ import absolute_import, unicode_literals import time from celery import Celery app = Celery('play', broker='redis://127.0.0.1:6379') @app.task def say_hi(): print 'hi!'
使用另外一个Python进程(也可使用交互式python或者ipython),在lab
下执行:
>>> from play.tasks import say_hi >>> say_hi.delay() >>> <AsyncResult: db6737ba-ecee-4fd2-8227-a76c594ba338> >>>
结果就是say_hi
函数向消息队列中发出了一个调用请求由某个worker执行。Celery进程会输出:
[2017-09-03 13:49:57,340: INFO/MainProcess] Received task: play.tasks.say_hi[85ff01ca-d7c9-4401-bfa3-0a9ad96c7192] [2017-09-03 13:49:57,343: WARNING/ForkPoolWorker-1] hi! [2017-09-03 13:49:57,344: INFO/ForkPoolWorker-1] Task play.tasks.say_hi[85ff01ca-d7c9-4401-bfa3-0a9ad96c7192] succeeded in 0.0016004400095s: None
如今咱们来分析一下tasks.py这个文件。很奇怪的一点是,一上来咱们又建立了一个app实例。当咱们import了task文件后会不会又建立了一个celery进程呢?答案是不会的,由于只有调用了app.start()才会启动。这只有手动调用或者借助celery命令执行后才会发生。若是只是new了一个instance出来,至关于建立了一个配置文件,不会发生任何重要的实质性的操做。
可是这个app对象也不是什么都不干的。接下来咱们定义了两个task函数,并将这个两个函数使用@app.task
包装了起来。这样的效果是把这两个普通函数包装成了celery的task对象,这样他们就有了delay
方法。当咱们执行delay
方法时,这些task会找本身所属的那个celery instance,从中获取配置信息(主要是broker的地址)后将调用请求发往消息队列。
不过,这样定义task的方法并非很好,由于须要在代码中就显式将task函数和一个具体的celery instance绑定了起来。这就使得咱们没法复用这些tasks。所以咱们可使用celery的另外一种定义tasks的方式来重写咱们现有的代码(这也是推荐给django使用的方案):
from __future__ import absolute_import, unicode_literals import time from celery import shared_task @shared_task def say_hi(): print 'hi!'
这里咱们再也不建立app实例,而是直接使用@shared_task
来包装。这样就没有绑定哪一个app的问题了。可是正如咱们以前所说,在调用tasks的时候,task仍是会去寻找本身属于哪一个celery instance从而获取配置信息。若是你都不绑定app instance,配置信息哪里来呢?
答案是,tasks和celery instance之间仍然具备绑定或关联的关系,只不过再也不是显式的了。简单来讲,每一个celery instance被建立之后,它就会被自动的注册到某个全局的位置。当一个shared task被执行时,这个task就会本身去这个全局的位置找有哪些celery instances能够从中获取配置信息。若是有多个celery instance都注册了,那么可能它们的消息队列都会被这个task发消息(没有确认过,只是猜想。但这可能就是shared_task的来源)。这就意味着,只要在咱们Python进程的任何一个地方(对Django服务器进程也是如此),只要随便哪一个地方建立一个celery instance就能够,而后只要import tasks而后使用delay执行便可。这样就解决了celery tasks复用的问题。代码之间的耦合也更小。
更进一步,在咱们的python进程中,甚至都不用再手写一遍celery instance的建立调用。直接import play.celery 就能够了,这个文件虽然被celery进程用做了配置文件,但这不妨碍咱们在本身的进程中也用这个文件。不如说这是更好的一种解决方案。