celery

 


Celery

1.什么是Celery

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统redis

专门处理实时处理的异步任务队列数据库

同时也支持任务调度django

2.Celery架构

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成windows

消息中间件

Celery自己是不提供消息服务,可是能够方便的和第三方提供的消息中间价集成,包括RabbitMQ,Redis等markdown

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。架构

任务结果存储

Task result store用来存储Worker执行的任务结果,Celery支持以不一样方式存储任务的结果,包括AMQP,Redis并发

版本支持状况

Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

3.使用场景

异步任务:将耗时操做任务提交给Celery去异步执行,好比发送短信、邮件、消息推送、音视频处理等等app

定时任务:定时执行某件事情,好比天天数据统计异步

4.Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app = Celery('任务名',backend='xxx',broker='xxx')

5.Celery执行异步任务

基本使用

建立项目celerytest

建立py文件:celery_app_task_.py

import celery
import time
broker = 'redis://127.0.0.1:6379/1' 
#消息中间件
#redis不加密,放到第1个库中
# broker = 'reids://:123@127.0.0.1:6379/1' redis加密,密码是123

#结果存储
backend = 'reids://127.0.0.1:6379/2'  # redis不加密,放到第二个库中

# 实例化产生一个Celery对象,必定要指定名字
cel = celery.Celery('test',backend=backend,broker=broker)

#任务其实就是一个函数
#须要用一个装饰器装饰,表示该任务是被celery管理的,而且能够用celery执行的
@cel.task
def add(x,y):
    time.sleep(2)
    return x+y

提交任务的py文件:add_task.py

import celery_task_s1
res = celery_task_s1.add.delay(3,4)
print(res)
# res就是任务的id号

右击运行提交任务,

启动worker,使用命令执行:celery worker -A celery_task_s1 -l info

注意:windows下:celery worker -A celery_task_s1 -l info -P eventlet

存储任务结果的py文件:celery_result.py

from celery.result import AsyncResult
from celery_task_s1 import cel
# 根据id去查询它的执行结果
async = AsyncResult(id="a5ea035f-0cc3-44ba-b334-f5d7c7ce681d", app=cel)
if async.successful():
    #取出它return的值
    result = async.get()
    print(result)
    # result.forget() # 将结果删除
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')

执行add_task.py,添加任务,并获取任务id

执行celery_result.py,检查任务状态并获取结果

执行定时任务,3s钟之后执行add任务

第一种获取时间的方法

from datetime import datetime
v1 = datetime(2019,7,12,16,15,56)
print(v1)
#默认用UTC时间
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
result = celery_task_s1.add.apply_async(args=[1,3],eta=v2)
print(result,id)

第二种获取时间的方法

from datetime import timedelta
ctime = datetime.now()
#默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=3)
task_time = utc_ctime + time_delay
result = celery_task_s1.add.apply_async(args=[2,8],eta=task_time)

多任务结构

# 项目结构
pro_cel
    ├── celery_task# celery相关文件夹
    │   ├── celery.py   # celery链接和配置相关文件,必须叫这个名字
    │   └── tasks1.py    # 全部任务函数
    │   └── tasks2.py    # 全部任务函数
    ├── check_result.py # 检查结果
    └── add_task.py    # 触发任务

celery.py文件

#必须叫celery,生成celery对象

from celery import Celery
from datetime import timedelta

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
cel = Celery('test',broker=broker,backend=backend,
             include=[
                 'celery_task.order_task',
                 'celery_task.user_task'
             ])

#时区
cel.conf.timezone = 'Asia/shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

order_task.py

from celery_task.celery import cel

@cel.task
def order_add(x,y):
    import time
    time.sleep(2)
    return x+y

user_task.py

from celery_task.celery import cel
@cel.task
def user_add(x,y):
    import time
    time.sleep(2)
    return x+y

add_task.py

from celery_task.order_task import order_add
from celery_task.user_task import user_add


res=order_add.delay(5,6)
print(res.id)
res=user_add.delay(10,60)
print(res.id)

celery_result.py

from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="c8815fd0-c126-4fed-b908-805974761381", app=cel)

if async.successful():
    #取出它return的值
    result = async.get()
    print(result)
    # result.forget() # 将结果删除
    # async.revoke(terminate=True) # 不管如今是何时,都要终止
    # async.revoke(terminate=False) # 若是任务尚未开始执行呢,那么就能够终止。 
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')

添加任务(执行send_task.py),开启work:celery worker -A celery_task -l info -P eventlet,检查任务执行结果(执行check_result.py)

定时任务:多任务结构中celery.py修改以下

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
cel = Celery('test',broker=broker,backend=backend,
             include=[
                 'celery_task.order_task',
                 'celery_task.user_task'
             ])

#时区
cel.conf.timezone = 'Asia/shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字随意命名
    'add_every_10_seconds':{
        'test':'celery_task.order_task.order_add',
        # 每隔2秒执行一次
        # 'sehedule':1.0,
        # 'schedule':crontab(minute="*/1"),
        'schedule':timedelta(seconds=2),
        # 传递参数
        'args':(5,6)
    },
    # 'add-every-12-seconds': {
    # 'task': 'celery_task.tasks1.test_celery',
    # 每一年4月11号,8点42分执行
    # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    # 'args': (16, 16)
    # },
}

启动一个beat:celery beat -A celery_task -l info

启动work执行:celery worker -A celery_task -l info -P eventlet

6.django中使用celery

多任务结构的celery_task文件夹直接拷到根目录下

views.py中

from django.shortcuts import render,HttpResponse

from celery_task.user_task import user_add

def index(request):
    result = user_add.delay(8,9)
    return HttpResponse(result.id)

def check_result(request):
    res = request.GET.get('id')
    from celery.result import AsynResult
    from celery_task.celery import cel
    async = AsynResult(id=res,app=cel)
    if async.sucessful():
        result = async.get()
        print(result)
        return HttpResponse('ok')

注意:上述作完后别忘了启动worker,再配置路由就能够了

强调:在celery的任务函数中不能直接调用django的环境(也就是不能直接操做数据库),须要手动添加

os.environ.setdefault('DJANGO_SETTINGS_MODEL','untitled15.settings')

import django

django.setup()

补充

pipreq:生成项目依赖的第三方包

项目依赖:pip3 install pipreqs

生成依赖文件:pipreqs ./

安装依赖文件:pips install -r requirements.txt

方法和函数

他们是有区别的

方法:好比一个类的对象去调用类内部写的函数就是方法

函数:类直接调用内部的函数就是函数

偏函数:先给函数传一个值进去

from functions import partial
def test(x,y,z):
    return x+y+z

tsst = partial(test,1)
print(test(2,3))  # 先传一个值进去,以后再传只需传两个就能够了

轮询和长轮询

轮询:不停的发请求

长轮询:hang一下子再发请求

相关文章
相关标签/搜索
本站公众号
   欢迎关注本站公众号,获取更多信息