Celery
1.什么是Celery
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统redis
专门处理实时处理的异步任务队列数据库
同时也支持任务调度django
2.Celery架构
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成windows
消息中间件
Celery自己是不提供消息服务,可是能够方便的和第三方提供的消息中间价集成,包括RabbitMQ,Redis等markdown
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。架构
任务结果存储
Task result store用来存储Worker执行的任务结果,Celery支持以不一样方式存储任务的结果,包括AMQP,Redis并发
版本支持状况
Celery version 4.0 runs on
Python ❨2.7, 3.4, 3.5❩
PyPy ❨5.4, 5.5❩
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
If you’re running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
3.使用场景
异步任务:将耗时操做任务提交给Celery去异步执行,好比发送短信、邮件、消息推送、音视频处理等等app
定时任务:定时执行某件事情,好比天天数据统计异步
4.Celery的安装配置
pip install celery
消息中间件:RabbitMQ/Redis
app = Celery('任务名',backend='xxx',broker='xxx')
5.Celery执行异步任务
基本使用
建立项目celerytest
建立py文件:celery_app_task_.py
import celery
import time
broker = 'redis://127.0.0.1:6379/1'
#消息中间件
#redis不加密,放到第1个库中
# broker = 'reids://:123@127.0.0.1:6379/1' redis加密,密码是123
#结果存储
backend = 'reids://127.0.0.1:6379/2' # redis不加密,放到第二个库中
# 实例化产生一个Celery对象,必定要指定名字
cel = celery.Celery('test',backend=backend,broker=broker)
#任务其实就是一个函数
#须要用一个装饰器装饰,表示该任务是被celery管理的,而且能够用celery执行的
@cel.task
def add(x,y):
time.sleep(2)
return x+y
提交任务的py文件:add_task.py
import celery_task_s1
res = celery_task_s1.add.delay(3,4)
print(res)
# res就是任务的id号
右击运行提交任务,
启动worker,使用命令执行:celery worker -A celery_task_s1 -l info
注意:windows下:celery worker -A celery_task_s1 -l info -P eventlet
存储任务结果的py文件:celery_result.py
from celery.result import AsyncResult
from celery_task_s1 import cel
# 根据id去查询它的执行结果
async = AsyncResult(id="a5ea035f-0cc3-44ba-b334-f5d7c7ce681d", app=cel)
if async.successful():
#取出它return的值
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
执行add_task.py,添加任务,并获取任务id
执行celery_result.py,检查任务状态并获取结果
执行定时任务,3s钟之后执行add任务
第一种获取时间的方法
from datetime import datetime
v1 = datetime(2019,7,12,16,15,56)
print(v1)
#默认用UTC时间
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
result = celery_task_s1.add.apply_async(args=[1,3],eta=v2)
print(result,id)
第二种获取时间的方法
from datetime import timedelta
ctime = datetime.now()
#默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=3)
task_time = utc_ctime + time_delay
result = celery_task_s1.add.apply_async(args=[2,8],eta=task_time)
多任务结构
# 项目结构
pro_cel
├── celery_task# celery相关文件夹
│ ├── celery.py # celery链接和配置相关文件,必须叫这个名字
│ └── tasks1.py # 全部任务函数
│ └── tasks2.py # 全部任务函数
├── check_result.py # 检查结果
└── add_task.py # 触发任务
celery.py文件
#必须叫celery,生成celery对象
from celery import Celery
from datetime import timedelta
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
cel = Celery('test',broker=broker,backend=backend,
include=[
'celery_task.order_task',
'celery_task.user_task'
])
#时区
cel.conf.timezone = 'Asia/shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
order_task.py
from celery_task.celery import cel
@cel.task
def order_add(x,y):
import time
time.sleep(2)
return x+y
user_task.py
from celery_task.celery import cel
@cel.task
def user_add(x,y):
import time
time.sleep(2)
return x+y
add_task.py
from celery_task.order_task import order_add
from celery_task.user_task import user_add
res=order_add.delay(5,6)
print(res.id)
res=user_add.delay(10,60)
print(res.id)
celery_result.py
from celery.result import AsyncResult
from celery_task.celery import cel
async = AsyncResult(id="c8815fd0-c126-4fed-b908-805974761381", app=cel)
if async.successful():
#取出它return的值
result = async.get()
print(result)
# result.forget() # 将结果删除
# async.revoke(terminate=True) # 不管如今是何时,都要终止
# async.revoke(terminate=False) # 若是任务尚未开始执行呢,那么就能够终止。
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
添加任务(执行send_task.py),开启work:celery worker -A celery_task -l info -P eventlet,检查任务执行结果(执行check_result.py)
定时任务:多任务结构中celery.py修改以下
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
cel = Celery('test',broker=broker,backend=backend,
include=[
'celery_task.order_task',
'celery_task.user_task'
])
#时区
cel.conf.timezone = 'Asia/shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
cel.conf.beat_schedule = {
# 名字随意命名
'add_every_10_seconds':{
'test':'celery_task.order_task.order_add',
# 每隔2秒执行一次
# 'sehedule':1.0,
# 'schedule':crontab(minute="*/1"),
'schedule':timedelta(seconds=2),
# 传递参数
'args':(5,6)
},
# 'add-every-12-seconds': {
# 'task': 'celery_task.tasks1.test_celery',
# 每一年4月11号,8点42分执行
# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
# 'args': (16, 16)
# },
}
启动一个beat:celery beat -A celery_task -l info
启动work执行:celery worker -A celery_task -l info -P eventlet
6.django中使用celery
多任务结构的celery_task文件夹直接拷到根目录下
views.py中
from django.shortcuts import render,HttpResponse
from celery_task.user_task import user_add
def index(request):
result = user_add.delay(8,9)
return HttpResponse(result.id)
def check_result(request):
res = request.GET.get('id')
from celery.result import AsynResult
from celery_task.celery import cel
async = AsynResult(id=res,app=cel)
if async.sucessful():
result = async.get()
print(result)
return HttpResponse('ok')
注意:上述作完后别忘了启动worker,再配置路由就能够了
强调:在celery的任务函数中不能直接调用django的环境(也就是不能直接操做数据库),须要手动添加
os.environ.setdefault('DJANGO_SETTINGS_MODEL','untitled15.settings')
import django
django.setup()
补充
pipreq:生成项目依赖的第三方包
项目依赖:pip3 install pipreqs
生成依赖文件:pipreqs ./
安装依赖文件:pips install -r requirements.txt
方法和函数
他们是有区别的
方法:好比一个类的对象去调用类内部写的函数就是方法
函数:类直接调用内部的函数就是函数
偏函数:先给函数传一个值进去
from functions import partial
def test(x,y,z):
return x+y+z
tsst = partial(test,1)
print(test(2,3)) # 先传一个值进去,以后再传只需传两个就能够了
轮询和长轮询
轮询:不停的发请求
长轮询:hang一下子再发请求