celery (芹菜) 异步任务

第1章 celery
python

1.1什么是celery

Celery是一个简单灵活,且可靠,处理大量消息的分布式系统redis

python写的,用来执行定时任务和异步任务的框架docker

1.2celery的组成

消息中间àbrokerdjango

任务执行单元àworkersession

任务执行结果存储àstore并发

1.3使用场景

异步任务:将耗时操做任务交给celery去异步执行,好比发送短信/邮件,消息推送,音视频处理等app

定时任务:定时执行某件事情,好比天天的数据统计框架

第2章 celery的安装配置

 

pip install celeryless

消息中间件:rabbitmq/redis,这里使用redis异步

docker run -p 6379:6379 --name=redis -d redis:latest redis-server

第3章 celery执行异步任务

3.1基本使用

定义任务:

from celery import Celery
import time

任务产生存储
broker = 'redis://10.211.55.8:16379/1'
任务执行结果存储
backend = 'redis://10.211.55.8:16379/2'
第一个参数是别名,随便写便可
app = Celery('first_celery'backend=backendbroker=broker)

@
app.task
def test_celery(xy):
    time.sleep(
2)
    
return x + y

添加任务:

import task

if __name__ == '__main__':
    
# result不是函数的执行结果,是一个对象
    
result = tasks.add.delay(2,3)
    
print(result.id)

使用celery命令开启work进程

celery –A tasks worker –l info

 

3.2多任务结构:

当有多个任务须要执行时,会有多个任务函数,位了方便解藕,咱们能够使用这种方式,方便管理

celery.py

from celeryimport Celery

broker = 
'redis://10.211.55.8:16379/2'
backend = 'redis://10.211.55.8:16379/3'
# include包含两个任务的函数,去相应文件中找对应的任务函数
cel = Celery('test'broker=brokerbackend=backendinclude=['celery_task.task''celery_task.task2'])

时区,有时候咱们想在固定的时间来执行相关任务,就须要设置时区相关配置
cel.conf.timezone = 'Asia/Shanghai'
是否UTC
cel.conf.enable_utc = False

task.py

from .celeryimport app

@
app.task
def sum(xy):
    
returnx+y

task2.py

from .celeryimport app

@
app.task
def less(xy):
    
returnx-y

celery_task目录下建立添加任务的py文件

from celery_taskimport task
from celery_task import task2

task.sum.delay(
12)
task2.less.delay(
31)

第4章 celery执行定时任务

4.1设定时间让celery执行一个任务

from celery_taskimport task
from celery_task import task2
from datetime import datetimetimedelta

# # 方式一:
# #
获取时间对象
# time = datetime(2019, 8, 3, 15, 0, 0)
# print(time)
# # 
将时间转换成UTC时间
# utc_time = datetime.utcfromtimestamp(time.timestamp())
# print(utc_time)
# # 
apply_asyncargs是函数参数,eta是指定时间
# result = task.add.apply_async(args=[1, 2], eta=utc_time)

方式二:
ctime = datetime.now()
把当前时间转换成UTC时间
utc_time = datetime.utcfromtimestamp(ctime.utcfromtimestamp())
当前时间延迟十秒
time_delta = timedelta(seconds=10)
task_time = utc_time + time_delta
result = task.add.apply_async(
args=[23]eta=task_time)


# task.sum.delay(1, 2)
# task2.less.delay(3, 1)

4.2相似crontab的定时任务

 多任务结构中celery.py文件以下

from celeryimport Celery
from datetime import timedelta
from celery.schedules import crontab

broker = 
'redis://10.211.55.8:16379/2'
backend = 'redis://10.211.55.8:16379/3'

# include包含两个任务的函数,去相应文件中找对应的任务函数
app = Celery('test'broker=brokerbackend=backendinclude=['celery_task.task''celery_task.task2'])
时区
app.conf.timezone = 'Asia/Shanghai'
是否UTC
app.conf.enable_utc = False

app.conf.beat_schedules = {
    
#别名,可随意编写
    
'add_crontab': {
        
#执行task下的add函数
        
'task''celery_task.task.add',
        
#每隔两秒执行
        
'schedules': timedelta(seconds=2),

        
#固定时间执行,每一年411号,842分执行
        #'schedules': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),

        #
传递参数
        
'args': [12],
    
}
}

第5章 Django中使用celery

在项目的目录下建立celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    
'app01.tasks',
)
#有些状况能够防止死锁
CELERYD_FORCE_EXECV=True
设置并发worker数量
CELERYD_CONCURRENCY=4
#容许重试
CELERY_ACKS_LATE=True
每一个worker最多执行100个任务被销毁,能够防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
超时时间
CELERYD_TASK_TIME_LIMIT=12*30

app目录下建立tasks.py

from celeryimport task

@
task
def add(xy):
    
returnx+y

试图函数views中使用

from django.shortcutsimport render,HttpResponse
from app01.tasks import add
from datetime import datetimetimedelta
def test(request):
    
# result=add.delay(2,3)
    
ctime = datetime.now()
    
默认用utc时间
    
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay= timedelta(
seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(
args=[43]eta=task_time)
    
print(result.id)
    
returnHttpResponse('ok')

settings中注册

INSTALLED_APPS= [
    
'django.contrib.admin',
    
'django.contrib.auth',
    
'django.contrib.contenttypes',
    
'django.contrib.sessions',
    
'django.contrib.messages',
    
'django.contrib.staticfiles',
    
'djcelery',
    
'app01',
]

from djceleryimport celeryconfig

默认使用rabbitmqredis的话须要指定下
BROKER_BACKEND=
'redis'
BROKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'