Celery是Python开发的分布式任务调度模块。分为任务分发,任务队列,worker3个部分。celery的出现,解决了python运行后台任务的需求。html
这篇文章介绍的celery版本是3.1.18python
celery架构git
+------------------+ +------> | celery worker.1 | +-----------------+ +-----------------------+ | +------------------+ | web service +-----> | job queue(redis or ..)+----+ +-----------------+ +-----------------------+ | +------------------+ +------> | celery worker.2 | | +------------------+ | | +------------------+ +------> | celery worker.[n]| +------------------+
任务队列,支持如redis,RabbitMQ甚至数据库。一般redis是最好的选择,不过数据库在本地使用的时候,也是不错的。github
使用douban的pypi镜像,安装会快一点。web
pip install -i http://pypi.douban.com/simple celery
使用Redis做为Broker时,再安装一个celery-with-redis
redis
开始编写tasks.py:sql
import time from celery import Celery celery = Celery('tasks', broker='redis://localhost:6379/0') @celery.task def sendmail(mail): print('sending mail to %s...' % mail['to']) time.sleep(2.0) print('mail sent.') if __name__ == '__main__': sendmail.delay(dict(to='myemail@gogs.io'))
启动worker数据库
celery -A tasks worker
运行任务json
python tasks.py
这里普通的调用方式是sendmail(...)
若是改为后台运行就变成了sendmail.delay(...)
flask
Celery默认设置就能知足基本要求。Worker以Pool模式启动,默认大小为CPU核心数量,缺省序列化机制是pickle,但能够指定为json。因为Python调用UNIX/Linux程序实在太容易,因此,用Celery做为异步任务框架很是合适。
web中使用celery是常有的事情。heroku官方就推荐python程序使用celery。 代码,我放到github上一份https://github.com/codeskyblue/celery-examples
首先建立一个web.py文件
from flask import Flask from celery import Celery def make_celery(app): celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask return celery app = Flask(__name__) app.config.update( CELERY_BROKER_URL='redis://localhost:6379', CELERY_RESULT_BACKEND='redis://localhost:6379' ) celery = make_celery(app)
编写任务函数
@celery.task() def add_together(a, b): res = a + b print res return res
而后是一个主页, 请求主页的时候,就会自动调用add_together这个函数,交给celery运行
@app.route('/') def homepage(): a = random.randint(0, 10) b = random.randint(0, 10) add_together.delay(a, b) return 'Create new task {} + {}'.format(a, b)
而后还须要编写一个run.py
文件(这里须要分开两个文件)
from web import app if __name__ == '__main__': app.run()
启动web应用python web.py
,启动worker的方法celery -A web.celery worker
若是须要用sqlite做为后端的话,也是能够的,首先须要安装pip install sqlalchemy
flask的app配置稍微改下
app.config.update( CELERY_BROKER_URL='sqla+sqlite:///queue.db', CELERY_RESULT_BACKEND='db+sqlite:///queue.db', CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] )
而后,celery就会用本地的queue.db文件,做为队列了。