celery简介

Celery简介

celery userguidehtml

知乎大神解释celerypython

Celery(芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。redis

Celery架构

架构图以下:
Celery架构数据库

Celery包括以下组件:json

  • Celery Beat

任务调度器,Beat进程会读取配置文件的内容,周期性的将配置中到期须要执行的任务发送给任务队列安全

  • celery Worker

执行任务的消费者,一般会在多台服务器运行多个消费者来提升执行效率bash

  • Broker

消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列在按序分发给任务消费方(一般是消息队列或者数据库)服务器

  • Producer

调用了Celery提供的API,函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者数据结构

  • Result Backend

任务处理完后保存状态信息和结果,以供查询。Celery默认已支持 Redis,RabbitMQ, MongoDB,Django ORM,SQLAIchemy等方式架构

中间件选择

Celery目前支持不少第三方软件做为消息代理,但适用于生产环境的只有RabbitMQ和Redis,至于其余的方式,一是支持有限,二是可能得不到更好的技术支持。Celery官方推荐的是RabbitMQ。

Celery序列化

在客户端和消费者之间传输数据须要序列化和反序列化,Celery支持以下表的序列化方案:

方案 说明
pickle pickle是Python标准库中的一个模块,支持Python内置的数据结构,可是它是Python的专有协议。
从Celery3.2开始,因为安全性等缘由Celery将拒绝pickle这个方案
json json支持多种语言,可用于跨语言方案
yaml yaml的表达能力更强,支持的数据类型比json多,可是python客户端的性能不如json
msgpack msgpack是一个二进制的类json的序列化方案,可是比json的数据结构更小、更快

简单项目

项目目录结构以下:

/root/test/proj/celery
├── celeryconfig.py
├── celery.py
├── __init__.py
└── tasks.py

先看一下主程序celery.py:

#!/usr/bin/env python
#coding:utf8

#拒绝隐式引入,由于celery.py的名字和celery的包名冲突,须要使用这条语句让程序正常运行,不然“from celery import Celery”这条语句将会报错,由于首先找到的celery.py文件中并无Celery这个类
from __future__ import absolute_import
from celery import Celery


# app是Celery类的实例,建立的时候添加了celery.tasks这个模块,也就是包含了celery/tasks.py这个文件
app = Celery('celery',include=['celery.tasks'])

# 把Celery配置存放进celery/celeryconfig.py文件,使用app.config_from_object加载配置
app.config_from_object('celery.celeryconfig')

if __name__ == "__main__":
    app.start()

存听任务函数的文件tasks.py:

#!/usr/bin/env python
#coding:utf8

from __future__ import absolute_import
from celery.celery import app



@app.task
def add(x, y):
    return x+y

tasks.py只有一个任务函数add,让它生效的最直接的方法就是添加app.task这个装饰器。

celery配置文件celeryconfig.py:

# 使用Redis做为消息代理
BROKER_URL = 'redis://192.168.189.100:6379/0'

# 把任务结果保存在Redis中
CELERY_RESULT_BACKEND = 'redis://192.168.189.100:6379/1'

# 任务序列化和反序列化使用msgpack方案
CELERY_TASK_SERIALIZER = 'msgpack'

# 读取任务结果通常性能要求不高,因此使用了可读性更好的JSON
CELERY_RESULT_SERIALIZER = 'json'

# 任务过时时间,这样写更加明显
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24

# 指定接受的内容类型
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']

这个例子中没有任务调度相关的内容,若是有的话就要使用Queue类了,因此只须要启动消费者:

celery -A proj worker -l info

-A参数默认会寻找proj.celery这个模块,其实使用celery做为模块文件名字不怎么合理。可使用其余名字。举个例子,假如是proj/app.py,可使用以下命令启动:

celery -A proj.app worker -l info
相关文章
相关标签/搜索