Celery是一个简单,灵活且可靠的处理大量消息的分布式系统,煮煮鱼实时处理的一步任务队列,同事也支持任务调度。python
Celery由三部分组成:redis
消息中间件(message broker),django
任务执行单元(worker,架构
任务执行结果存储(task result store)并发
Celery自己不提供消息服务,可是能够方便和第三方提供的戏哦阿西中间件集成。包括rabbitMQ、redis等等。app
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。异步
Task result store用来存储Worker执行的任务的结果,Celery支持以不一样方式存储任务的结果,包括redis等。async
异步任务:将好事操做任务提交给Celery去异步执行,好比发送短信/邮件,消息推送,音频视频处理等等。分布式
定时任务:定时执行某件事情,好比天天数据统计。函数
# 安装celery组件 pip install celery # 消息中间件 RabbitMQ/Redis
建立项目celerytest
建立py文件:celery_app_task.py
import celery import time backend = 'redis://127.0.0.1:6379/2' # 有密码 # backend = 'redis://:password@127.0.0.1:6379/2' broker = 'redis://127.0.0.1:6379/1' # 生成一个Celery对象 celery_obj = celery.Celery('test',backend=backend,broker=broker) # 调用Celery对象下的方法来作装饰器使用 @celery.task def add_task(x,y): time.sleep(5) return x+y
建立py文件:add_task.py
from celery_app_task import add_task # 异步提交任务 result = add_task.delay(3,4) # print(result.id),这里的result是一个对象
启动workder
# 方式一:命令启动 celery workder -A celery_app_task -l info # 方式二:文件启动,建立run.py文件 from celery_app_task import cel if __name__ == '__main__': cel.worker_main() # cel.worker_main(argv=['--loglevel=info')
建立py文件:result.py,查看任务结果
from celery.result import AsyncResult from celery_app_task import cel async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel) if async.successful(): result = async.get() print(result) # result.forget() # 将结果删除 elif async.failed(): print('执行失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
执行add_task.py,添加任务,并获取任务id
执行run.py,或者命令celery worker -A celery_app_task -l info来启动worker
执行result.py,检查任务状态并获取结果
pro_cel ├── celery_task# celery相关文件夹 │ ├── celery.py # celery链接和配置相关文件,必须叫这个名字 │ └── tasks1.py # 全部任务函数 │ └── tasks2.py # 全部任务函数 ├── check_result.py # 检查结果 └── send_task.py # 触发任务
celery.py
from celery import Celery cel = Celery('cel_name', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=['celery_task.tasks1', 'celery_task.tasks2' ]) # 时区 cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC cel.conf.enable_utc = False
task1.py
import time from celery_task.celery import cel @cel.task def test_celery_1(res): time.sleep(5) return "test_celery任务结果:%s"%res
task2.py
import time from celery_task.celery import cel @cel.task def test_celery_2(res): time.sleep(5) return "test_celery2任务结果:%s"%res
check_result.py
from celery.result import AsyncResult from celery_task.celery import cel async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel) if async.successful(): result = async.get() print(result) # result.forget() # 将结果删除,执行完成,结果不会自动删除 # async.revoke(terminate=True) # 不管如今是何时,都要终止 # async.revoke(terminate=False) # 若是任务尚未开始执行呢,那么就能够终止。 elif async.failed(): print('执行失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
send_task.py
from celery_task.tasks1 import test_celery from celery_task.tasks2 import test_celery2 # 当即告知celery去执行test_celery任务,并传入一个参数 result = test_celery_1.delay('第一个的执行') print(result.id) result = test_celery_2.delay('第二个的执行') print(result.id)
添加任务(执行send_task.py),开启work:celery worker -A celery_task -l info -P eventlet,检查任务执行结果(执行check_result.py)
add_task.py
from celery_app_task import add from datetime import datetime # 方式一 v1 = datetime(2019, 2, 13, 18, 19, 56) print(v1) v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2) result = add.apply_async(args=[1, 3], eta=v2) print(result.id) # 方式二 ctime = datetime.now() # 默认用utc时间 utc_time = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay # 使用apply_async并设定时间 result = add.apply_async(args=[4, 3], eta=task_time) print(result.id)
多任务结构中celery.py修改以下
from datetime import timedelta from celery import Celery from celery.schedules import crontab cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[ 'celery_task.tasks1', 'celery_task.tasks2', ]) cel.conf.timezone = 'Asia/Shanghai' cel.conf.enable_utc = False cel.conf.beat_schedule = { # 名字随意命名 'add-every-10-seconds': { # 执行tasks1下的test_celery函数 'task': 'celery_task.tasks1.test_celery', # 每隔2秒执行一次 # 'schedule': 1.0, # 'schedule': crontab(minute="*/1"), 'schedule': timedelta(seconds=2), # 传递参数 'args': ('test',) }, # 'add-every-12-seconds': { # 'task': 'celery_task.tasks1.test_celery', # 每一年4月11号,8点42分执行 # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'args': (16, 16) # }, }
启动一个beat:celery beat -A celery_task -l info
启动work执行:celery worker -A celery_task -l info -P eventlet
celery == 3.1.25 django-celery = 3.1.20
在项目目录下建立celeryconfig.py
import djcelery djcelery.setup_loader() CELERY_IMPORTS=( 'app01.tasks', ) #有些状况能够防止死锁 CELERYD_FORCE_EXECV=True # 设置并发worker数量 CELERYD_CONCURRENCY=4 #容许重试 CELERY_ACKS_LATE=True # 每一个worker最多执行100个任务被销毁,能够防止内存泄漏 CELERYD_MAX_TASKS_PER_CHILD=100 # 超时时间 CELERYD_TASK_TIME_LIMIT=12*30
在app01目录下建立task.py
from celery import task @task def add(a,b): with open('a.text', 'a', encoding='utf-8') as f: f.write('a') print(a+b)
视图函数views.py
from django.shortcuts import render,HttpResponse from app01.tasks import add from datetime import datetime def test(request): # result=add.delay(2,3) ctime = datetime.now() # 默认用utc时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=5) task_time = utc_ctime + time_delay result = add.apply_async(args=[4, 3], eta=task_time) print(result.id) return HttpResponse('ok')
settings.py
INSTALLED_APPS = [ ... 'djcelery', 'app01' ] ... from djagocele import celeryconfig BROKER_BACKEND='redis' BOOKER_URL='redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'