将任务分配给其余的进程去运行,django的主进程只负责发起任务,而执行任务的不在使用django的主进程。Python有一个很棒的异步任务框架,叫作celery。python
Django为了让开发者开发更加方便,集成了celery,造成了django-celery插件web
1.安装django-celeryredis
#Redis模块的兼容不稳定,必须安装2.10.6 pip install django-celery pip install django-redis pip install redis==2.10.6
2.安装redis ,解压到指定目录数据库
Redis-x64-3.2.100.zip 提取码: jmbf django
#启动redis redis-server.exe redis.windows.conf #关闭redis redis-cli.exe shutdown
django-celery只是将任务发布出去,让长时间的任务:爬取一个网站,发送一个验证码这样的工做,再也不阻塞主线程,web服务器只负责发起任务和接受任务的结果,中间执行的部分交给其余线程、进程、服务器去作。json
一、在settings当中配置django-celerywindows
下面也是settings.py 配置服务器
# celery 配置 import djcelery djcelery.setup_loader()# 模块加载 BROKER_URL = 'redis://127.0.0.1:6379/1' # 任务容器地址,redis数据库地址 CELERY_IMPORTS = ('CeleryTask.tasks') # 具体任务文件 CELERY_TIMEZONE = 'Asia/Shanghai' # celery 时区 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # celey处理器,固定
2.建立使用celery的appapp
python manage.py startapp CeleryTask
在项目的主目录下,编写celery的控制文件,(控制文件的名字最好是celery)框架
celery.py
import os from celery import Celery from django.conf import settings # 设置celery的环境变量和django-celery的工做目录 os.environ.setdefault("DJANGO_SETTINGS_MODULE","CeleryTask.settings") # 实例化celery应用,传入服务器名称 app = Celery("art_project") # 加载celery配置 app.config_from_object("django.conf:settings") # 若是在项目中,建立了task.py,那么celery就会沿着app去查找task.py来生成任务 app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)
3.新建tasks.py在celeryTask app下
4.编写tasks.py文件
from __future__ import absolute_import from Qshop.celery import app @app.task def add(x,y): return x+y
5.而后为了djcelery进行数据库同步
python manage.py check
python manage.py makemigrations
python manage.py migrate
6.编写视图触发异步任务
ursl.py
urlpatterns = [ path('mtv/', middle_test_view), ]
views.py
from CeleryTask.tasks import add def get_task(request): num1 = request.GET.get("num1",1) num2 = request.GET.get("num2",2) add.delay(int(num1),int(num2)) return JsonResponse({"data":"success"})
7.启动celery worker
--loglevel=info 指定日志级别
python manage.py celery worker --loglevel=info
访问路由查看效果。
在settings .py配置
from celery.schedules import crontab from celery.schedules import timedelta CELERYBEAT_SCHEDULE = { u"测试任务":{ "task":"CeleryTask.tasks.sendDing", "schedule":timedelta(seconds=10) } }
tasks.py
@app.task def sendDing(content="定时任务执行",to="15037609692"): headers = { "Content-Type": "application/json", "Charset": "utf-8" } requests_data = { "msgtype": "text", "text": { "content": content }, "at": { "atMobiles": [ ], "isAtAll": True } } if to: requests_data["at"]["atMobiles"].append(to) requests_data["at"]["isAtAll"] = False else: requests_data["at"]["atMobiles"].clear() requests_data["at"]["isAtAll"] = True sendData = json.dumps(requests_data) response = requests.post(url=DING_URL, headers=headers, data=sendData) content = response.json() return content
启动worker
python manage.py celery worker --loglevel=info
启动定时任务
python manage.py celerybeat --loglevel=info
注意:须要启动的有,django项目,redis数据库,worker,定时任务