celery使用在django下,官方4.2文档 http://docs.celeryproject.org/en/latest/django/index.html, github文档:https://github.com/celery/celery/tree/master/examples/django/html
首先生成celery的apppython
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') # 导入django环境 app = Celery('proj') # 生成app # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() # 自动到每一个app下边寻找tasks文件,并导入 @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
使用@shared_task装饰器,为了使用它,首先在项目的__init__.py文件导入appmysql
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',)
建立tasks模块:git
# Create your tasks here from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y
运行celerygithub
celery -A proj worker -l info
window 10下运行报错,弄了很久,最后看别人的博客解决了:https://blog.csdn.net/qq_30242609/article/details/79047660, 解决办法:redis
celery -A proj worker -l info -P eventlet
把celery的并行执行方式改成 eventlet, 而不是默认的prefork,须要安装sql
pip install eventlet数据库
把保存结果后端设为django的结果后端,须要使用django-celery-resultsdjango
$ pip install django-celery-results
INSTALLED_APPS{
django_celery_results,
}
python manage.py migrate django_celery_results
数据库json
CELERY_RESULT_BACKEND = 'django-db'
缓存
CELERY_RESULT_BACKEND = 'django-cache'
帮助:
celery help
序列化配置,因为4.0默认是json序列化,序列化有时候会不成功,须要进行配置
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERY_TASK_SERIALIZER = 'pickle' # 'json'默认, 改成pickle
CELERY_RESULT_SERIALIZER= 'json' # 结果序列化,默认json能够改成pickle
使用redis做为中间人
CELERY_BROKER_URL = 'redis://:password@host:port/db'
'redis://localhost/0'
简写:'redis://'
RabbitMQ: 'amqp://guest:guest@localhost//'
其余后端
sqlite (filename) CELERY_RESULT_BACKEND = ‘db+sqlite:///results.sqlite’
# mysql CELERY_RESULT_BACKEND = ‘db+mysql://scott:tiger@localhost/foo’
# postgresql CELERY_RESULT_BACKEND = ‘db+postgresql://scott:tiger@localhost/mydatabase’
# oracle CELERY_RESULT_BACKEND = ‘db+oracle://scott:tiger@127.0.0.1:1521/sidname’
配置时区app.conf.timezone = 'Asia/Shanghai'
运行周期性任务:
进行配置
from celery import Celery from celery.schedules import crontab app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # 调用 test('hello') 每10秒运行 sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # test('world') 每30秒 sender.add_periodic_task(30.0, test.s('world'), expires=10) # 每一个星期一早上 7:30 a.m. sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg)
动态配置
app.conf.beat_schedule = { 'add-every-30-seconds': { 'task': 'tasks.add', 'schedule': 30.0, 'args': (16, 16) }, } app.conf.timezone = 'UTC'
args位置参数 元组 ,列表
kwargs关键字参数, 字典类型
定时任务示例
from celery.schedules import solar app.conf.beat_schedule = { # Executes at sunset in Melbourne 'add-at-melbourne-sunset': { 'task': 'tasks.add', 'schedule': solar('sunset', -37.81753, 144.96715), 'args': (16, 16), }, }
文档连接:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules