python3+celery+redis实现异步任务

1、原理javascript

Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。它是Python写的库,可是它实现的通信协议也能够使用ruby,php,javascript等调用。异步任务除了消息队列的后台执行的方式,仍是一种则是定时计划任务。php

Celery 是一个强大的分布式任务队列,它可让任务的执行彻底脱离主程序,甚至能够被分配到其余主机上运行。咱们一般使用它来实现异步任务(async task)和定时任务(crontab)。它的架构组成以下图 java

 

 

 

组件:python

一、任务(tasks)--用户定义的函数,用于实现用户的功能,好比执行一个耗时很长的任务web

二、中间介(Broker)--用于存放tasks的地方,可是这个中间介须要解决一个问题,就是可能须要存放很是很是多的tasks,并且要保证Worker可以从这里拿取redis

三、执行者(Worker)--用于执行tasks,也就是真正调用咱们在tasks中定义的函数数据库

四、存储(Backend)--把执行tasks返回的结果进行存储,以供用户查看或调用django

2、实现过程json

1.环境安装(RabbitMQ/Redis、Celery、django-celery、flower)ruby

个人python版本:3.7,celery版本:3.1.26.post2

2.建立工程

 

 

 红圈为本工程所需:

web_order下面须要修改的文件:celery.py、__init__.py、settings文件

web_test下面须要修改的文件:tasks.py文件、longTask.py文件

 

3.修改文件过程

1)修改settings.py。在settings的最后加上以下代码:

1 # CELERY SETTING
2 BROKER_URL = 'redis://localhost:6379'    #指定消息中间件
3 CELERY_RESULT_BACKEND = 'redis://localhost:6379'   ##指定结果存储位置为本地数据库
4 CELERY_ACCEPT_CONTENT = ['application/json']  # 5 CELERY_TASK_SERIALIZER = 'json'
6 CELERY_RESULT_SERIALIZER = 'json'
7 CELERY_TIMEZONE = 'Asia/Shanghai'
8 CELERY_IMPORTS = ("web_test.tasks")   #注册任务,

###在
INSTALLED_APPS中注册 djcelery
 
 

2)__init__ 文件。

#绝对导入,以避免celery和标准库中的celery模块冲突
from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app.

#如下导入时为了确保在Django启动时加载app,shared_task在app中会使用到
from .celery import app as celery_app __all__ = ['celery_app']

3)celery文件

from __future__ import absolute_import,unicode_literals import os from celery import Celery from django.conf import settings os.environ.setdefault("DJANGO_SETTINGS_MODULE", "web_order.settings")  # 设置celery能够在命令行中使用
app = Celery('web_order')  # 建立app实例 # app = Celery('tcelery', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.conf.CELERY_IGNORE_RESULT = False  # 结果不忽略 # app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' #结果保存在redis中
 app.config_from_object('django.conf:settings')  # 从文件中加载实例
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  # 自动加载tasks,注意:他会去app下面查找tasks.py文件,因此咱们必须将task放在tasks.py文件中

4)longTasks.py

from django.http import JsonResponse import json from .tasks import test   #导入异步任务的方法


def sendlongtask(request): #由此处调用test方法,执行异步命令
    run_res = test(sh, userIp, username) return JsonResponse("执行成功", safe=False)

5)Tasks.py

from celery import shared_task @shared_task def test(x, y): return (x+y)

4.启动Django和celery

  在项目根目录执行:

  python manage.py runserver 0.0.0.0:8000

  python manage.py celery worker -c 4 --loglevel=info

 

5.另外,Celery提供了一个工具flower,将各个任务的执行状况、各个worker的健康状态进行监控并以可视化的方式展示,以下图所示:

 

 

 

 

    Django下实现的方式以下: 

      1.) 安装flower:

    pip install flower

      2.) 启动flower(默认会启动一个webserver,端口为5555):

python manage.py celery flower

     3.) 进入http://localhost:5555便可查看。

6.python3踩过的坑:python3.7与celery不兼容

 

 1.出现这个错误时,须要将报错文件中全部的async改成asynchronous或者其余变量名便可。

运行后,你会发现celery能够正常使用了

2.将python版本降到3.6及如下,celery也可正常使用。