异步任务分发模块Celery

Celery简介

Celery是一个功能完备即插即用的任务队列。它使得咱们不须要考虑复杂的问题,使用很是简单。 celery适用异步处理问题,当遇到发送邮件、或者文件上传, 图像处理等等一些比较耗时的操做,咱们可将其异步执行,这样用户不须要等待好久,提升用户体验。html

celery的特色是:python

  • 简单,易于使用和维护,有丰富的文档。
  • 高效,单个celery进程每分钟能够处理数百万个任务。
  • 灵活,celery中几乎每一个部分均可以自定义扩展。

celery很是易于集成到一些web开发框架中。web

使用场景简介

咱们在作网站后端程序开发时,会碰到这样的需求:用户须要在咱们的网站填写注册信息,咱们发给用户一封注册激活邮件到用户邮箱,若是因为各类缘由,这封邮件发送所需时间较长,那么客户端将会等待好久,形成很差的用户体验。redis

咱们可使用异步任务分发系统Celery解决上面的问题。数据库

咱们将耗时任务放到后台异步执行。不会影响用户其余操做。除了注册功能,例如上传,图形处理等等耗时的任务,均可以按照这种思路来解决。 如何实现异步执行任务呢?咱们可以使用celery.。django

celery除了刚才所涉及到的异步执行任务以外,还能够实现定时处理某些任务。 后端

Celery的概念、配置及使用

任务队列是一种跨线程、跨机器工做的一种机制。服务器

任务队列中包含称做任务的工做单元。有专门的工做进程持续不断的监视任务队列,并从中得到新的任务并处理。app

celery经过消息进行通讯,一般使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。框架

一个celery系统能够包含不少的worker和broker,可加强横向扩展性和高可用性能。

安装celery

直接使用pip包管理工具能够安装celery:

pip install celery

也可从官方直接下载安装包:https://pypi.python.org/pypi/celery/

tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python3 setup.py build
python3 setup.py install

Broker的选择

Celery须要一种解决消息的发送和接受的方式,咱们把这种用来存储消息的的中间装置叫作message broker, 也可叫作消息中间人。

做为中间人,咱们有几种方案可选择:

RabbitMQ

RabbitMQ是一个功能完备,稳定的而且易于安装的broker. 它是生产环境中最优的选择。使用RabbitMQ的细节参照如下连接: http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq

若是咱们使用的是Ubuntu或者Debian发行版的Linux,能够直接经过下面的命令安装RabbitMQ: sudo apt-get install rabbitmq-server 安装完毕以后,RabbitMQ-server服务器就已经在后台运行。若是您用的并非Ubuntu或Debian, 能够在如下网址: http://www.rabbitmq.com/download.html 去查找本身所须要的版本软件。

Redis

Redis也是一款功能完备的broker可选项,可是其可能因意外中断或者电源故障致使数据丢失的状况。

关因而有那个Redis做为Broker,可访下面网址: http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis

存储结果

若是咱们想跟踪任务的状态,Celery须要将结果保存到某个地方。有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

Celery使用的一个简单示例

项目的目录结构以下:

咱们在tasks包中加入须要异步调用的任务,在task_demo.py文件中写具体的任务:

# -*- coding:utf-8 -*-
from celery import Celery

# 定义celery对象
# 123是我redis的密码;使用redis的1号库做为broker,2号库做为存取结果的地方
celery_app = Celery("demo1",
                    broker="redis://:123@127.0.0.1:6800/1",
                    backend="redis://:123@127.0.0.1:6800/2"
                    )


@celery_app.task
def print_now(now):
    """异步任务的demo"""
    print(now)
    # 定义一个返回值,若是不写的话会默认返回None
    return "当前时间:{}".format(now)

而后在外部的transfer.py文件中加入调用这个任务的代码:

# -*- coding:utf-8 -*-
import time

import redis
from tasks.task_demo import print_now


# 建立redis链接,选择2号库
conn = redis.Redis(host="127.0.0.1",port=6800,password=123,db=2)

# 使用celery异步任务
# delay函数调用后当即返回
now = time.strftime("%Y-%m-%d %X")
# 把参数放在delay中!
ret = print_now.delay(now)
print(ret,type(ret)) # 7af04c3c-8070-4bf1-9563-b220fd5aac9c <class 'celery.result.AsyncResult'>

而后在项目的目录下启动celery:

 

启动的结果及说明以下:

Celery启动时若是上报AttributeError: async 这个错误,建议把Celery的版本升级到4.1.1

启动成功后,终端会夯住;而后运行transfer.py文件,在终端会出现这样的提示:

而后咱们再利用redis可视化工具看看结果:

咱们能够看到,在执行完一次任务后,redis数据库1存放着celery的broker的信息,在库2中存放着执行的结果。

Django中使用Celery生成首页静态页面并将结果存在本地数据库中 *****

在实际中,咱们网站的首页内容是不怎么变化的,若是每次请求首页都对数据库中的内容进行查询的话,这样对数据库来讲负担会很大。

咱们可使用celery生成一下静态页面,当没有数据变化的时候用户每次请求主页咱们都让他们去访问这个生成好的“首页”,有当数据变化的时候再从新生成一下这个“静态页面”就能够了。

项目的目录以下

 

路由与视图以下 

路由:

from django.contrib import admin
from django.urls import path

from demo import views

urlpatterns = [
    path('admin/', admin.site.urls),
 path('index/',views.Index.as_view()),
]

视图:

from django.views import View
from django.shortcuts import render

from tasks.static_index import index_static

class Index(View):

    def get(self,request):

        # 这里只是模拟一下后台是否有数据改动的状况
        # 当数据有修改的时候须要进行判断,把修改后的数据传过去再从新进行页面的生成!
        # 若是a=1表示后台没有数据修改
        a = 12
        if a == 1:
            return render(request, "index.html")
        # 有数据修改了,就从新生成一下静态的页面
        else:
            index_static.delay()
            return render(request,"index.html")

将结果存储在本地数据库中的配置

此处须要用到额外包django_celery_results, 先安装包:

pip3 install django-celery-results

在celery_demo/settings.py中安装此应用:

INSTALLED_APPS = [
   xxx

    # 自定义app或第三方app
    'demo.apps.DemoConfig',
    # celery用于存储结果的应用
    'django_celery_results',
    # celery用于定时任务的应用
    'django_celery_beat'
]

此时,tasks/static_index.py文件中的配置以及生成静态页面的代码以下:

import os

from celery import Celery
from django.conf import settings
from django.template import loader,RequestContext

### 注意!必须引入Django环境!为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoCeleryDemo.settings') ## 建立应用 # 定义celery对象 # 123是我redis的密码;使用redis的1号库做为broker
celery_app = Celery("demo2") ## 配置应用
celery_app.conf.update( # 使用redis的库1做为消息队列
    BROKER_URL="redis://:123@127.0.0.1:6800/1", # 使用项目数据库存储任务执行结果
    CELERY_RESULT_BACKEND='django-db', # 如过想把把结果存在redis的库2中这样配置:
    # CELERY_RESULT_BACKEND = "redis://:123@127.0.0.1:6800/2", )

@celery_app.task
def index_static():

    # 这里可使用ORM查询的结果等等
    context = { "msg":"这是一个基于Celery实现的首页静态页面" } # 使用模板
    # 加载模板文件,返回模板对象
    temp = loader.get_template("index_static.html") # 模板渲染
    index_static_html = temp.render(context) # 生成首页静态页面 —— 存放在static目录下
    index_static_html_path = os.path.join(settings.BASE_DIR,'templates','index.html') with open(index_static_html_path,'w')as f: f.write(index_static_html) # 做为测试,这里返回一个字符串OK
    return "OK"

建立django_celery_results应用所需数据库表, 执行迁移文件:

这里须要注意,因为后续咱们会用到admin页面,所以须要先迁移一下“默认注册应用auth”相关的数据库:

python3 manage.py migrate

而后迁移django_celery_results的表:

python3 manage.py migrate django_celery_results

启动celery

进入项目根目录,启动celery:

celery -A tasks.static_index worker -l info

访问index页面查看结果

此时咱们再访问index页面,能够从结果的表中看到result:

Django中使用celery作定时任务 *****

若是咱们想某日某时执行某个任务,或者每隔一段时间执行某个任务,也可使用celery来完成。

使用定时任务,须要安装额外包:

pip3 install django_celery_beat

而后在settings.py中安装此应用: 

INSTALLED_APPS = [
    xxx

    # 自定义app或第三方app
    'demo.apps.DemoConfig',
    # celery用于存储结果的应用
    'django_celery_results',
    # celery用于定时任务的应用
    'django_celery_beat'
]

而后,tasks/static_index.py文件中的配置以及定时任务的代码以下:

# -*- coding:utf-8 -*-
import os

from celery import Celery
from django.conf import settings
from django.template import loader,RequestContext

### 注意!必须引入Django环境!为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoCeleryDemo.settings')


## 建立应用
# 定义celery对象
# 123是我redis的密码;使用redis的1号库做为broker
celery_app = Celery("demo2")

## 配置应用
celery_app.conf.update(
    # 使用redis的库1做为消息队列
    BROKER_URL="redis://:123@127.0.0.1:6800/1",
    # 使用项目数据库存储任务执行结果
    CELERY_RESULT_BACKEND='django-db',
    # 把结果存在redis的库2中
    # CELERY_RESULT_BACKEND = "redis://:123@127.0.0.1:6800/2",
    # 配置定时器模块,定时器信息存储在数据库中
    CELERYBEAT_SCHEDULER='django_celery_beat.schedulers.DatabaseScheduler',
)


### 定时任务
@celery_app.task def interval_task(): print("我每隔5秒执行一次...")
# 做为测试,这里返回666
return 666

因为定时器信息存储在数据库中,咱们须要先生成对应表, 对diango_celery_beat执行迁移操做,建立对应表:

python3 manage.py migrate django_celery_beat

其余的表是以前迁移的时候生成的:

因为咱们须要在数据库中添加数据才能使用定时任务,所以这里须要建立一下后台管理员帐号:

python3 manage.py createsuperuser

而后登录后台管理员admin界面:

其中Crontabs用于定时某个具体时间执行某个任务的时间;

Intervals用于每隔多久执行任务的事件;

具体任务的执行在Periodic tasks表中建立。 

咱们要建立每隔5秒执行某个任务,因此在Intervals表名后面点击Add按钮:

而后在Periodic tasks表名后面,点击Add按钮,添加任务:

 

启动定时任务须要在后面加上--beat参数!

celery -A tasks.static_index worker -l info --beat

结果以下:

咱们能够在admin的Task results表中查看结果:

其中上面3条是定时任务的结果,下面那一条是生成静态页面的结果。