Django的celery配置(包括定时任务、队列)

1、安装celery

Django项目不须要安装celery这个包,能够直接使用django-celery这个包,,先来安装它,在终端中输入:python

pip install django-celery

2、安装rabbitmq,创建celery队列

我作的项目用的就是rabbitmq,按道理来讲,也是能够用redis做为消息队列的,可是rabbitmq更好,此处不作详细解释,有兴趣的同窗的能够去研究下。 ubuntu环境下,在终端中输入:redis

sudo apt-get install rabbitmq-server

3、配置settings.py

首先要在INSTALLED_APPS中添加对djcelery的引用shell

INSTALLED_APPS = [
    'decelery',
]

再在settings.py中添加如下代码django

import djcelery
djcelery.setup_loader()

BROKER_URL = 'amqp://guest:guest@localhost:5672/'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_IMPORTS = (
    "common.tasks.task1",
    "common.tasks.task2",
)

4、添加celery异步任务

在common/tasks/task1.py中添加如下代码,就定义了celery异步任务。 由于celery任务可能会不少,为了便于管理,咱们就在项目下的common/tasks文件夹中建立了许多task.py文件,如task1.py、task2.py等。 在task1.py中添加如下代码:ubuntu

from celery.task import task

@task
def function1(a, b):
    print a + b

注意,想让这个celery任务能执行,还必需要在settings.py中加一段配置,这个在上面已经添加了,这里再特别提醒下,以下:服务器

# common、tasks是文件夹,task一、task2是tasks文件夹下面的py文件
# CELERY_IMPORTS:是导入目标任务文件
CELERY_IMPORTS = (
    "common.tasks.task1",
    "common.tasks.task2",
)

要调用这个异步任务的话,就用app

from common.tasks.task1 import function

function.delay(a, b)

是否是很方便?工具的好处就是把事情简单化,可是太依赖工具,不懂的底层原理的话容易把人搞傻。 好比其实还能够用threading来建立新线程来执行异步任务,此处再也不赘述,不然又是一套长篇大论,有兴趣的童鞋能够去学习一下如何使用threading库。frontend

5、启动celery

由于咱们这里用的是django-celery,而不是直接使用celery这个库,因此启动celery的命令与celery官网里面介绍的是不同的,新人极可能由于这个掉进坑里,因此这里我特别提醒一下。 终端命令以下:异步

# 先启动服务器
python manage.py runserver
# 再启动worker(--concurrency=2是开4个worker进程,不加也能够启动,只不过在生产环境仍是应该多开几个进程的)
python manage.py celery worker --concurrency=2 -l info

6、celery定时任务的配置

在项目中有时不只仅使用以上的异步任务,有时候须要建立不少定时任务,这样celery又能够大显身手了。在settings.py中添加如下配置,就能够添加定时任务函数

from celery.schedules import crontab

# 下方的common和tasks依然是文件夹
# function二、function3分别是tasks文件夹中的task1.py、task2.py文件的函数的函数名
CELERYBEAT_SCHEDULE = {
    'function2': {
        'task': 'common.tasks.task1',
        'schedule': crontab(minute='*/50'), # 每50分钟执行一次
    },
    'function3': {
        'task': 'common.tasks.task2',
        'schedule': crontab(minute=0, hour='8,13'), # 天天的8点0分和13点0分各执行一次
    },
}

下面是common/tasks/task1.py中的函数

from celery.task import task

@task
def function2():
    print '='*40
    print 'This is function2, celery is great!'
    print '='*40

下面是common/tasks/task2.py中的函数,跟task1.py中是同样的使用方法。

from celery.task import task

@task
def function3():
    print '='*40
    print 'This is function3, celery is great!'
    print 'Fuck celery start failure!'
    print '='*40

7、启动celery定时任务

得先启动第六节中介绍的命令后,再执行这个命令,定时任务才能执行。由于beat只是分配定时任务给celery的worker,因此只有worker启动后,定时任务才能异步执行。 顾名思义,worker就是干苦力的民工,累活脏或都给它干。beat能够形象的理解为工地作计划的人,到了要干活的时候就分配任务给民工,多是包工头,也多是通常的管理工程进度的小弟。反正都是苦命的人,屌丝何须难为屌丝。 (╥╯^╰╥) 终端命令以下:

python manage.py celery beat -l info

8、celery队列的配置

在项目中celery的异步任务不少的时候,这个时候咱们就须要将不一样的任务分配到不一样的队列(queue)中去执行,若是只有一个默认队列的话,全部异步任务都会在这个队列中执行(是须要排队的,先来的先执行),任务不少的时候,就无法同时执行不少任务了,甚至形成任务的拥堵。将不一样的任务分配到不一样的队列就能够保证同一时刻能够同时运行不一样队列中的任务,互补干扰,而且每一个队列能够单独开好几个进程。 进程数最好不要超过CPU的核数,由于CPU只有4个核的话,你开5个进程,同一时间仍是只能执行4个进程。

咱们能够在项目中设置三个队列(default, frontend, backend),队列的名字能够本身任意取。如下是在settings.py中添加队列的配置:

from kombu import Exchange, Queue

# 默认队列是default
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'

# x-priority是任务的优先级
# 优先级就是哪一个队列优先执行,比较紧迫的须要立刻执行的任务优先级能够设置为最高
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default', consumer_arguments={'x-priority': 5}),
    Queue('frontend', Exchange('frontend'), routing_key='frontend', consumer_arguments={'x-priority': 10}),
    Queue('backend', Exchange('backend'), routing_key='backend', consumer_arguments={'x-priority': 8}),
)

# 特别须要注意的是,异步任务的路径必须精确到函数名(好比下方的common、tasks是文件夹,task一、task2是py文件,function1就是task1.py中的定义的异步任务的函数名),否则的话异步任务就无法执行
CELERY_ROUTES = {
    "common.tasks.task1.function1": {'queue': "frontend", 'routing_key': 'frontend'},
    "common.tasks.task1.function2": {'queue': "backend", 'routing_key': 'backend'},
    "common.tasks.task2.function3": {'queue': "default", 'routing_key': 'default'},
}

9、启动celery队列

配置了队列的话,若是执行python manage.py celery worker --concurrency=2 -l info的话就只会建立一个默认的队列,而咱们须要建立多个队列,这样咱们就不须要运行这个命令了,咱们须要在终端中分别运行如下命令:

# -Q 后面加的是配置的队列名,concurrency(进程数)设置为几就由本身定了,只要不超过CPU核数就好了
python manage.py celery worker -l info -Q default --concurrency=1
python manage.py celery worker -l info -Q frontend --concurrency=2
python manage.py celery worker -l info -Q backend --concurrency=4

10、补充

Django下要查看其余celery的命令,包括参数配置、启动多worker进程的方式均可以经过python manage.py celery --help来查看,一下是终端输入命令后出来的提示信息:

Usage: manage.py celery <command> [options] 

Show help screen and exit.

Options:
  -A APP, --app=APP     app instance to use (e.g. module.attr_name)
  -b BROKER, --broker=BROKER
                        url to broker.  default is 'amqp://guest@localhost//'
  --loader=LOADER       name of custom loader class to use.
  --config=CONFIG       Name of the configuration module
  --workdir=WORKING_DIRECTORY
                        Optional directory to change to after detaching.
  -C, --no-color        
  -q, --quiet           
  --version             show program's version number and exit
  -h, --help            show this help message and exit

---- -- - - ---- Commands- -------------- --- ------------

+ Main: 
|    celery worker
|    celery events
|    celery beat
|    celery shell
|    celery multi
|    celery amqp

+ Remote Control: 
|    celery status
 
|    celery inspect --help
|    celery inspect active
|    celery inspect active_queues
|    celery inspect clock
|    celery inspect conf None
|    celery inspect memdump
|    celery inspect memsample
|    celery inspect objgraph None
|    celery inspect ping
|    celery inspect registered
|    celery inspect report
|    celery inspect reserved
|    celery inspect revoked
|    celery inspect scheduled
|    celery inspect stats
 
|    celery control --help
|    celery control add_consumer <queue> [exchange [type [routing_key]]]
|    celery control autoscale [max] [min]
|    celery control cancel_consumer <queue>
|    celery control disable_events
|    celery control enable_events
|    celery control pool_grow [N=1]
|    celery control pool_shrink [N=1]
|    celery control rate_limit <task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>
|    celery control time_limit <task_name> <soft_secs> [hard_secs]

+ Utils: 
|    celery purge
|    celery list
|    celery migrate
|    celery call
|    celery result
|    celery report
---- -- - - --------- -- - -------------- --- ------------

Type 'celery <command> --help' for help using a specific command.
相关文章
相关标签/搜索