Celery

Celery是由Python开发的一个简单、灵活、可靠的处理大量任务的分发系统,它不只支持实时处理也支持任务调度。html

 

  • user:用户程序,用于告知celery去执行一个任务。
  • broker: 存听任务(依赖RabbitMQ或Redis,进行存储)
  • worker:执行任务

celery须要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,而且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.htmlpython

    Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
版本和要求

环境准备:redis

  • 安装rabbitMQ或Redis
        见:http://www.cnblogs.com/wupeiqi/articles/5132791.html
  • 安装celery
         pip3 install celery

快速上手

import time
from celery import Celery

app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')


@app.task
def xxxxxx(x, y):
    time.sleep(10)
    return x + y
s1.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from s1 import xxxxxx

# 当即告知celery去执行xxxxxx任务,并传入两个参数
result = xxxxxx.delay(4, 4)
print(result.id)
s2.py
from celery.result import AsyncResult
from s1 import app

async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')
s3.py

执行 s1.py 建立worker(终端执行命令):flask

celery worker -A s1 -l info

执行 s2.py ,建立一个任务并获取任务ID:app

python3 s2.py 

执行 s3.py ,检查任务状态并获取结果:async

python3 s3.py

多任务结构

pro_cel
    ├── celery_tasks# celery相关文件夹
    │   ├── celery.py   # celery链接和配置相关文件
    │   └── tasks.py    #  全部任务函数
    ├── check_result.py # 检查结果
    └── send_task.py    # 触发任务
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery

celery = Celery('xxxxxx',
                broker='redis://192.168.0.111:6379',
                backend='redis://192.168.0.111:6379',
                include=['celery_tasks.tasks'])

# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False
pro_cel/celery_tasks/celery
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
from .celery import celery


@celery.task
def xxxxx(*args, **kwargs):
    time.sleep(5)
    return "任务结果"


@celery.task
def hhhhhh(*args, **kwargs):
    time.sleep(5)
    return "任务结果"
pro_cel/celery_tasks/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-

from celery.result import AsyncResult
from celery_tasks.celery import celery

async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')
pro_cel/check_result.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import celery_tasks.tasks

# 当即告知celery去执行xxxxxx任务,并传入两个参数
result = celery_tasks.tasks.xxxxx.delay(4, 4)

print(result.id)
pro_cel/send_task.py

更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html分布式

定时任务

1. 设定时间让celery执行一个任务ide

import datetime
from celery_tasks.tasks import xxxxx
"""
from datetime import datetime

v1 = datetime(2017, 4, 11, 3, 0, 0)
print(v1)

v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)

"""
ctime = datetime.datetime.now()
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())

s10 = datetime.timedelta(seconds=10)
ctime_x = utc_ctime + s10

# 使用apply_async并设定时间
result = xxxxx.apply_async(args=[1, 3], eta=ctime_x)
print(result.id)

2. 相似于contab的定时任务函数

"""
celery beat -A proj
celery worker -A proj -l info

"""
from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False

app.conf.beat_schedule = {
    # 'add-every-10-seconds': {
    #     'task': 'proj.s1.add1',
    #     'schedule': 10.0,
    #     'args': (16, 16)
    # },
    'add-every-12-seconds': {
        'task': 'proj.s1.add1',
        'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
        'args': (16, 16)
    },
}

注:若是想要定时执行相似于crontab的任务,须要定制 Scheduler来完成。post

Flask中应用Celery

pro_flask_celery/
├── app.py
├── celery_tasks
    ├── celery.py
    └── tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-

from flask import Flask
from celery.result import AsyncResult

from celery_tasks import tasks
from celery_tasks.celery import celery

app = Flask(__name__)

TASK_ID = None


@app.route('/')
def index():
    global TASK_ID
    result = tasks.xxxxx.delay()
    # result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 5, 19, 1, 24, 0))
    TASK_ID = result.id

    return "任务已经提交"


@app.route('/result')
def result():
    global TASK_ID
    result = AsyncResult(id=TASK_ID, app=celery)
    if result.ready():
        return result.get()
    return "xxxx"


if __name__ == '__main__':
    app.run()
app.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
from celery.schedules import crontab

celery = Celery('xxxxxx',
                broker='redis://192.168.10.48:6379',
                backend='redis://192.168.10.48:6379',
                include=['celery_tasks.tasks'])

# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False
celery_tasks/celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
from .celery import celery


@celery.task
def hello(*args, **kwargs):
    print('执行hello')
    return "hello"


@celery.task
def xxxxx(*args, **kwargs):
    print('执行xxxxx')
    return "xxxxx"


@celery.task
def hhhhhh(*args, **kwargs):
    time.sleep(5)
    return "任务结果"
celery_task/tasks.py
相关文章
相关标签/搜索
本站公众号
   欢迎关注本站公众号,获取更多信息