实现一个异步的任务队列,最简单来看须要如下几个部分:任务消息的队列,任务执行者,任务结果存储python
消息中间件web
消息中间件即用来提供消息队列功能的组件,celery自己不提供,可是支持不少的类型的中间人:RabbitMQ, Redis, Mongodb, Django ORM等引擎。redis
任务执行者sql
任务执行者即worker,由celery提供的任务执行单元,这里就能够分布式的放在系统的各个节点中。实现并发的特性。mongodb
任务结果存储shell
用来存储任务运行的结果的组件,默认是没有的,能够自定义引擎,支持SQLAlchemy,cache, mongodb, redis等数据库
使用的环境为python2.7, redis3.0.7,这里准备使用redis做为消息中间件和结果存储。
django
1. 先定义tasks.py 服务器
from celery import Celery app = Celery('tasks', backend="redis://127.0.0.1:6379/1", broker='redis://127.0.0.1:6379/0') #app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite' #app.conf.CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' @app.task def add(x, y): return x + y
Celery第一参数为Celery App的名字,backend为任务结果存储,broker为消息中间件,这里使用的redis引擎。
session
下面定义一个task,用来计算两个参数的和,并返回。
2. 启动worker
在tasks.py目录的命令行执行:
celery -A tasks worker --loglevel=info
能够看到以下信息:
能够看到worker的基本信息,app, transport, results ,能够看到默认经过prefork起了4个进程
接下来咱们经过命令行调用tasks里面的add任务:
咱们使用add.delay执行,传入参数5,6,经过返回值result的ready()方法能够获得是否执行成功,经过get获得任务执行的结果。
在worker那里,能够看到任务的添加和执行的过程:
基本python 的使用就这里了。
celery易于web框架集成,与不少web框架都有支持的第三方库:
这里使用django-celery, 经过pip安装好后,建立一个django项目后,在settings.py中增长以下代码:
import djcelery djcelery.setup_loader() BROKER_URL = 'redis://127.0.0.1:6379/1' #BORKER_URL = 'django://' INSTALLED_APPS = ( 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'djcelery', #"kombu.transport.django", ... )
settings.py中设置了broker的url,固然测试环境可使用django做为broker,若是食用django做为broker,则在installed_apps中增长kombu.transport.django。
接下来,咱们在本身的django的app里面建立一个tasks.py文件:
from celery import task @task def add(x, y): return x + y
能够直接使用celery的task装饰器装饰定义的task,这样就告诉Celery这是一个task。
在settings.py中的djcelery.setup_loader()运行时,Celery便会查看全部INSTALLED_APPS中的app目录中的tasks.py文件,将全部标记task的function注册成celery task。
接下来启动workder,由于咱们使用了django-celery,那么直接使用命令:
python manager.py celery worker --loglevel=info
便可启动worker,终端中的输出跟以前celery直接开启worker基本一致。
而后咱们能够经过python manager.py shell 去调用add这个task:
经过worker的终端也能看到task的处理。
若是在settings.py中设置了CELERY_ALWAYS_EAGER=True,则执行task时,便不需加delay了。
虽然使用crnotab能够在服务器上定时执行写好的脚本,celery提供了定时任务的功能,可以经过scheduler很好的跟django结合起来。
要是celery执行定时任务的方式不少种,咱们采用将定时任务存储在django数据库中,咱们须要在settings.py中配置:
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
而后就能够在django的admin管理页面看到以下:
能够经过配置Periodic tasks增长定时任务。
经过python manager.py celery beat开启定时任务。
参考连接:http://www.weiguda.com/blog/73/
http://my.oschina.net/zhangxu0512/blog/212447?fromerr=r098W69A