1、介绍html
celery是一个基于python开发的分布式异步消息任务队列,用于处理大量消息,同时为操做提供维护此类系统所需的工具。
它是一个任务队列,专一于实时处理,同时还支持任务调度。若是你的业务场景中须要用到异步任务,就能够考虑使用celerypython
2、实例场景redis
一、你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只须要拿着这个任务id就能够拿到任务执行结果, 在任务执行ing进行时,你能够继续作其它的事情。
二、你想作一个定时任务,好比天天检测一下大家全部客户的资料,若是发现今天 是客户的生日,就给他发个短信祝福django
3、优势json
4、入门后端
celery 须要一个解决方案来发送和接受消息,一般,这是以称为消息代理的单独服务的形式出现的
有如下几种解决方案,包括:
一:RabbitMQ(消息队列,一种程序之间的通讯方式)
rabbitmq 功能齐全,稳定,耐用且易于安装。它是生产环境的绝佳选择。
若是您正在使用Ubuntu或Debian,请执行如下命令安装RabbitMQ:服务器
$ sudo apt-get install rabbitmq-server
命令完成后,代理已经在后台运行,准备为您移动消息:。Starting rabbitmq-server: SUCCESS
2、redisapp
redis功能齐全,但在忽然停止或者电源故障时更容易丢失数据异步
5、安装async
$ pip install celery
6、应用
建立一个tasks.py文件
from celery import Celery app = Celery('tasks', broker='pyamqp://guest@localhost//') @app.task def add(x, y): return x + y
第一个参数Celery是当前模块的名称。只有在__main__模块中定义任务时才能自动生成名称。
第二个参数是broker关键字参数,指定要使用的消息代理的URL。这里使用RabbitMQ(也是默认选项)。
您可使用RabbitMQ amqp://localhost,或者您可使用Redis redis://localhost。
您定义了一个名为add的任务,返回两个数字的总和。
1 from __future__ import absolute_import 2 import os 3 from celery import Celery 4 from django.conf import settings 5 # set the default Django settings module for the 'celery' program. 6 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'saruman_server.settings') 7 app = Celery('saruman_server') 8 9 # Using a string here means the worker will not have to 10 # pickle the object when using Windows. 11 app.config_from_object('django.conf:settings') 12 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 13 14 @app.task(bind=True) 15 def debug_task(self): 16 print('Request: {0!r}'.format(self.request))
7、运行celery工做服务器
您如今能够经过使用worker 参数执行咱们的程序来运行worker :
celery -A tasks worker --loglevel=info
有关可用命令行选项的完整列表,请执行如下操做:
$ celery worker --help
还有其余几个可用的命令,也能够提供帮助:
$ celery help
8、调用任务
要调用咱们的任务,您可使用该delay()方法。
apply_async() 能够更好地控制任务执行
>>> from tasks import add >>> add.delay(4, 4)
调用任务会返回一个AsyncResult实例。这可用于检查任务的状态,等待任务完成,或获取其返回值(或者若是任务失败,则获取异常和回溯)。
9、保持结果
若是您想跟踪任务的状态,Celery须要在某处存储或发送状态。有几个内置的结果后端可供选择:SQLAlchemy / Django ORM, Memcached,Redis,RPC(RabbitMQ / AMQP),以及 - 或者您能够定义本身的。
在本例中,咱们使用rpc结果后端,它将状态做为瞬态消息发回。后端经过backend参数 指定Celery
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
或者,若是您想使用Redis做为结果后端,但仍然使用RabbitMQ做为消息代理(一种流行的组合):
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
如今配置告终果后端,让咱们再次调用该任务。此次你将保持AsyncResult调用任务时返回的实例:
>>> result = add.delay(4, 4)
该ready()方法返回任务是否已完成处理:
>>> result.ready()
False
10、配置
与消费类电器同样,celery不须要太多配置便可运行。它有一个输入和一个输出。输入必须链接代理,输出能够
选择到结果后端。
能够直接在应用程序上或使用专用配置模块设置配置。例如,您能够经过更改task_serializer设置来配置用于序列化任务有效负载的默认序列化程序:
app.conf.task_serializer = 'json'
若是您一次配置了许多设置,则可使用update:
app.conf.update( task_serializer='json', accept_content=['json'], # Ignore other content result_serializer='json', timezone='Europe/Oslo', enable_utc=True, )
对于大型项目,建议使用专用配置模块。不鼓励硬编码周期性任务间隔和任务路由选项。将它们保存在集中位置要好得多。对于库来讲尤为如此,由于它使用户可以控制其任务的行为方式。集中配置还容许您的SysAdmin在发生系统故障时进行简单的更改。
您能够经过调用app.config_from_object()方法告诉Celery实例使用配置模块:
app.config_from_object('celeryconfig')
此模块一般称为“ celeryconfig”,但您可使用任何模块名称。
在上面的例子中,一个名为的模块celeryconfig.py必须能够从当前目录或Python路径加载。它可能看起来像这样:
celeryconfig.py:
broker_url = 'pyamqp://' result_backend = 'rpc://' task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'Europe/Oslo' enable_utc = True
1 from datetime import timedelta 2 3 import djcelery 4 5 djcelery.setup_loader() 6 BROKER_URL = 'amqp://guest@localhost//' #输入 7 CELERY_RESULT_BACKEND = 'amqp://guest@localhost//' #返回的结果 8 9 #导入指定的任务模块 10 CELERY_IMPORTS = ( 11 'fir.app.fir.tasks', 12 ) 13 14 CELERYBEAT_SCHEDULE = { 15 'receive_mail': { 16 "task": "fir.app.fir.tasks.receive_mail", 17 "schedule": timedelta(seconds=5), 18 "args": (), 19 }, 20 }
要验证配置文件是否正常工做且不包含任何语法错误,您能够尝试导入它:
####################################################
python -m celeryconfig
为了演示配置文件的强大功能,您能够将行为不当的任务路由到专用队列:
celeryconfig.py:
task_routes = {
'tasks.add': 'low-priority',
}
或者不是路由它,而是能够对任务进行速率限制,这样在一分钟(10 / m)内只能处理10种此类任务:
celeryconfig.py:
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}
若是您使用RabbitMQ或Redis做为代理,那么您还能够指示工做人员在运行时为任务设置新的速率限制:
$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
new rate limit set successfully
11、在项目中如何使用celery
一、能够把celery配置成一个应用
二、目录结构以下:
proj/__init__.py /celery.py /tasks.py
三、proj/celery.py内容
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
四、proj/tasks.py中的内容
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
五、启动worker
$ celery -A proj worker -l info
输出
-------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall) ---- **** ----- --- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: proj:0x103a020f0 - ** ---------- .> transport: redis://localhost:6379// - ** ---------- .> results: redis://localhost/ - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
django 中使用celery:参考连接:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django
12、监控工具flower
若是有些任务出现问题,能够用flower工具监控(基于tornado)
安装:pip install flower
使用:
三种启动方式
celery flower celery flower --broker python manage.py celery flower #就能读取到配置里的broker_url 默认是rabbitmq
打开运行后的连接 打开worker python manage.py celery worker -l info