转载:异步处理 celery

1.什么是Celery?

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统前端

专一于实时处理的异步任务队列,同时也支持任务调度linux

2.Celery架构

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。redis

 

消息中间件

Celery自己不提供消息服务,可是能够方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等数据库

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。windows

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不一样方式存储任务的结果,包括AMQP, redis等cookie

执行流程:架构

user至关于提交任务的人,提交给broker也就是消息中间件,worker至关于工人,broker里面有了用户也就是程序提交的任务,worker就去取出来执行相似于生产者消费者模型,store说简单点就是worker执行结束后的返回结果并发

 

版本支持状况

复制代码
Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
复制代码

 

 注:Celery不支持windows但并非不可使用能够用第三方模块来完成,可是windows上使用出了问题官方不会提供帮助app

3.Celery的安装配置

pip install celery异步

消息中间件:RabbitMQ/Redis

app=Celery('任务名',backend='xxx',broker='xxx')

4.测试案例

 

复制代码
# 先导入
from celery import Celery

# 下面是配置信息,这里使用redis为列
# broker='redis://127.0.0.1:6379/2' 不加密码
# 消息中间件
backend = 'redis://:lmdxxx@139.196.**.**:6380/7'
# 处理结果
broker = 'redis://:lmdxxx@139.196.**.**:6380/8'
# 注:redis有密码的状况下前面加@输入密码便可,最后面的是指定储存在redis的那个库中
# 实列话产生一个对celery象,一个项目中可能会用到多个Celery # 第一个参数是当前任务的名字,必定要写 APP = Celery('test', broker=broker, backend=backend)
复制代码

 

1.首先没得说了确定要先导入celery,上文说了selery有消息中间件,处理者,结果存储,这里使用redis来做为测试

2.配置消息中间件,配置结果储存位置

APP = Celery('test', broker=broker, backend=backend)
# 实列话产生一个对celery象,一个项目中可能会用到多个Celery,因此在第一个参数中传入指定的名称,不能够重复
# 对象名称无所谓

3.建立一个selery任务

复制代码
# 任务其实就是一个函数
# 须要用一个装饰器去装饰才能说明这是一个被celery管理的任务,且能够用celery执行
# 装饰器实际就是实列化出来的那个对象,里面的一个固定方法
@APP.task
def add(x, y):
    import time
    time.sleep(2)
    return x + y
复制代码

 4.建立一个用于提交任务的

正常同步提交任务

提交任务不执行,有worker才会去执行

注:获得的这个ID其实咱们工做中能够把它set到cookie中,用户就能够经过轮询去查询redis数据库中去寻找结果

也能够直接return返回给前端,给前端处理

 

返回的对应的是存放redis中间件里面的提交任务属性的ID用于查询返回结果,看不懂不要紧

5.任务提交任务后须要建立工人执行任务

建立py文件:run.py,执行任务,或者使用命令执行(win不可使用):celery worker -A celery_task_cs -l info (celery_test_cs是建立任务的那个名字 -l info是打印的日志级别)

windows下:celery worker -A celery_test_cs -l info -P eventlet

win安装:pip install eventlet

代码执行一般不用:

from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

提交任务的时候注意导入方式,有时候导入方式问题会产生报错

启动后以下:

收到任务后分配任务切返回执行信息 7 就是咱们的执行结果 前面的就是执行的时间 这个是info级别的日志打印的

 

redis 存放结果数据以下,执行状态 结果 之类都在里面

6.查看结果

 

流程梳理:

celery的使用
1.先安装 pip install celery
2.写一个py文件:celery_task
3.指定broker(消息中间件),指定backend(结果存储)
4.实例化产生一个Celery对象 app=Celery('名字',broker,backend)
5.加装饰器绑定任务,在函数(add)上加装饰器app.task
6.其余程序提交任务,先导入add,add.delay(参数,参数),会将该函数提交到消息中间件,可是并不会执行,有个返回值,直接print会打印出任务的id,之后用id去查询任务是否执行完成
7.启动worker去执行任务:
linux:celery worker -A 建立的任务的那个py文件 -l info
windows下:celery worker 建立任务的那个py文件 -A -l info -P eventlet
8.查看结果:根据id去查询

 

应用场景

异步任务:将耗时操做任务提交给Celery去异步执行,好比生成图表,发送短信/邮件、消息推送、音视频处理等等

定时任务:定时执行某件事情,好比天天数据统计