python 3.5.2python
rabbitmqgit
pip install -r requirements.txt
github
建立一个名为proj的Django项目redis
django-admin startproject proj
sql
建立一个用于演示的django app,这里名为demo数据库
django-admin startapp demo
django
在建立的app中,增长tasks.py文件,用于编写celery任务json
修改proj/settings.py配置文件,增长celery相关配置。浏览器
修改settings.py中INSTALLED_APPS,增长djcelery及app服务器
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'djcelery', 'demo' ]
若是仅仅需求使用celery异步执行任务的话,如下最基础的配置就能够知足需求
# 导入tasks文件,由于咱们使用autodiscover_tasks # 会自动导入每一个app下的tasks.py,因此这个配置不是很必要 # 若是须要导入其余非tasks.py的模块,则须要再此配置须要导入的模块 # CELERY_IMPORTS = ('demo.tasks', ) # 配置 celery broker CELERY_BROKER_URL = 'amqp://user:password@127.0.0.1:5672//' # 配置 celery backend 用Redis会比较好 # 由于手上没有redis服务器,因此演示时用RabbitMQ替代 CELERY_RESULT_BACKEND = 'amqp://user:password@127.0.0.1:5672//'
在proj目录下,编辑celery.py文件,用于建立celery实例
from celery import Celery from django.conf import settings # 建立celery应用 celery_app = Celery('proj', broker=settings.CELERY_BROKER_URL) # 从配置文件中加载除celery外的其余配置 celery_app.config_from_object('django.conf:settings') # 自动检索每一个app下的tasks.py celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
在以前建立的demo/tasks.py中,编写一个用于演示的异步任务。
注意每一个异步任务以前都须要使用@celery_app.task装饰器。
celery_app实际是以前在proj/celery.py中建立的celery的实例,若是你的实例名称不同,作对应的修改便可。
import logging from proj.celery import celery_app @celery_app.task def async_task(): logging.info('run async_task')
在demo/views.py中定义一个页面,只用来调用异步任务。
from django.http import HttpResponse from demo.tasks import async_demo_task # Create your views here. def demo_task(request): # delay表示将任务交给celery执行 async_demo_task.delay() return HttpResponse('任务已经运行')
在proj/urls.py中注册对应的url。
from django.contrib import admin from django.urls import path from demo.views import demo_task urlpatterns = [ path('admin/', admin.site.urls), path('async_demo_task', demo_task), ]
使用命令启动worker:
manage.py celery -A proj worker -l info
对参数作个简单的说明:
-A proj是指项目目录下的celery实例。演示项目名为proj,因此-A的值是proj。若是项目名是其余名字,将proj换成项目对应的名字。
-l info 是指日志记录的级别,这里记录的是info级别的日志。
若是配置没有问题,能成功链接broker,则会有相似如下的日志:
-------------- celery@Matrix.local v3.1.26.post2 (Cipater) ---- **** ----- --- * *** * -- Darwin-17.5.0-x86_64-i386-64bit -- * - **** --- - ** ---------- [config] - ** ---------- .> app: proj:0x108ab1eb8 - ** ---------- .> transport: amqp://user:**@127.0.0.1:5672// - ** ---------- .> results: amqp:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . demo.tasks.async_demo_task [2018-04-24 08:24:47,656: INFO/MainProcess] Connected to amqp://user:**@127.0.0.1:5672//
须要注意的是日志中的tasks部分,能够看到已经自动识别到了demo.tasks.async_demo_task这个用于演示的任务。
若是没有识别到,检查下celery实例是否调用autodiscover_tasks方法,或配置文件的CELERY_IMPORTS是否配置正确。
在demo/views.py中定义一个页面,只用来调用异步任务。
from django.http import HttpResponse from demo.tasks import async_demo_task # Create your views here. def demo_task(request): # delay表示将任务交给celery执行 async_demo_task.delay() return HttpResponse('任务已经运行')
在proj/urls.py中注册对应的url。
from django.contrib import admin from django.urls import path from demo.views import demo_task urlpatterns = [ path('admin/', admin.site.urls), path('async_demo_task', demo_task), ]
最后,启动django,访问url http://127.0.0.1:8000/async_demo_task 调用异步任务。
在worker的日志中,能够看到相似的执行结果,即说明任务已经由celery异步执行。
若是出现"Using settings.DEBUG leads to a memory leak, never "的警告信息,则在生产环境中关闭掉django的debug模式便可。
[2018-04-24 09:25:52,677: INFO/MainProcess] Received task: demo.tasks.async_demo_task[1105c262-9371-4791-abd2-6f78d654b391] [2018-04-24 09:25:52,681: INFO/Worker-4] run async_task [2018-04-24 09:25:52,899: INFO/MainProcess] Task demo.tasks.async_demo_task[1105c262-9371-4791-abd2-6f78d654b391] succeeded in 0.21868160199665s: None
请参考这里celery-demo
一样请参这里celery-demo
使用djcelery,而不直接使用celery的好处就在于能够经过Django Admin对Celery的计划任务进行管理。
使用计划任务时,除了保证原先的worker正常运行外(worker的启动方式见上),还须要启动beats:
python manage.py celery beat
也能够beat和worker一块儿启动
python manage.py celery -A project worker -l info --beat
python manage.py migrate
建立Django Admin和djcelery对应的表,这里的数据库使用默认的sqlite。
python manage.py createsuperuser
,依次输入超级管理员账号、邮箱、密码。
演示项目中设置账号:admin 密码: superplayer123
在settings.py中,增长两项配置:
# 设定时区,配置计划任务时须要 CELERY_TIMEZONE = 'Asia/Shanghai' CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
访问 http://127.0.0.1:8000/admin/djcelery/periodictask/add/,用于建立定时任务。
简单的解释下建立定时任务的选项:
字段 | 说明 |
---|---|
名称 | 便于理解的计划任务名称 |
Task (registered) | 选择一个已注册的任务 |
Task (custom) | |
Enabled | 任务是否启用 |
Interval | 按某个时间间隔执行 |
Crontab | 定时任务, 和Interval二选一 |
Arguments | 以list的形式传入参数,json格式 |
Keyword arguments: | 以dict的形式传入参数,json格式 |
Expires | 任务到期时间 |
Queue | 指定队列,队列名须要在配置文件的 CELERY_QUEUES定义好 |
Exchange | Exchange |
Routing key | Routing key |
本质上来讲,就是对PeriodicTask这个model的操做。
下面模拟一个简单的增长计划任务的接口:
def add_task(request): interval = IntervalSchedule.objects.filter(every=30, period='seconds').first() periodic_task = PeriodicTask(name='test', task='demo.tasks.async_demo_task', interval=interval) periodic_task.save() return HttpResponse('任务已经添加')
在proj/urls.py中增长url地址进行访问:
urlpatterns = [ path('admin/', admin.site.urls), path('async_demo_task', demo_task), path('add_task', add_task), path('get_periodic_task_list', get_periodic_task_list), ]
经过浏览器访问http://127.0.0.1:8000/add_task 就能够直接添加一个间隔30秒的计划任务了。
而后在beat中能够看到相似日志,检测到了Schedule改变,而且自动运行刚刚添加的任务。
[2018-05-03 17:18:10,012: INFO/MainProcess] DatabaseScheduler: Schedule changed. [2018-05-03 17:18:10,013: INFO/MainProcess] Writing entries (0)... [2018-05-03 17:18:40,020: INFO/MainProcess] Scheduler: Sending due task test (demo.tasks.async_demo_task) [2018-05-03 17:19:10,021: INFO/MainProcess] Scheduler: Sending due task test (demo.tasks.async_demo_task)
一样的,经过获取PeriodicTask的数据,也能够获得正在运行的任务。
def get_periodic_task_list(request): """ 获取周期性任务列表 :return: """ periodic_task_list = PeriodicTask.objects.all() data = [model_to_dict(periodic_task) for periodic_task in periodic_task_list] resp = json.dumps(data, cls=CustomJSONEncoder, ensure_ascii=False) return HttpResponse(resp, content_type='application/json', status=200)
更多的功能均可以经过操做djcelery的model进行实现。