Python异步处理——Celery

为何要使用异步

首先,咱们要知道计算机的处理分为两种,CPU处理和IO处理。通常来讲,处理一个任务,须要CPU和IO相互协调。python

CPU的处理速度远远快于IO的,因此当一个任务的CPU处理部分完成后,还须要等待IO的完成。这个等待的过程也就是进程阻塞。进程占着CPU空等没活干。web

而异步就是把进程在等待过程当中占用的CPU释放掉,让它去处理其余的东西。那正在处理的IO怎么办?系统提供了一种通知机制,使得在IO处理完成后通知系统,该进程进入就绪队列,等待CPU分配。redis

因此异步使得CPU资源得以充分使用。mongodb

1、Celery简介

Celery 是一个基于 Python 开发的分布式异步消息任务队列,经过它能够轻松的实现任务的异步处理,shell

Celery使用场景

▷ 异步任务:解放CPU,将耗时的IO任务交给Celery去异步执行,好比发送邮件、音频处理数据库

▷ 定时任务:相似于crontab,好比每日数据统计app

Celery构架

在这里插入图片描述
▶ Producer:任务委托方异步

▶ Broker:任务中心(中介),如RabbitMQ、Redis等1分布式

▶ Beat:任务调度器svg

▶ Worker:任务执行者,能够有多个(分布式)

▶ Result:任务中心的数据库,储存任务执行结果2

▶ Backend:由于任务经由中介,而非直接委派到Worker手上,因此Producer并不知道任务被委派给了谁,以及任务的完成结果,因此这时候须要一个Backend(理解成手机,经过手机查看任务完成状况)

Celery安装

安装消息中间件Redis

sudo apt-get install redis-server

sudo pip3 install redis

安装Celery

sudo pip3 install celery

2、Celery的使用

第一个Celery程序

新建一个task.py

from celery import Celery
 
app = Celery('add_task',
             broker='redis://localhos:6379/1',
             backend='redis://localhost:6379/2') 
            
@app.task
def add(x, y):
    print("running...",x,y)
    return x+y

肯定Redis服务已经开启

redis-server

启动 Celery Worker 来开始监听并执行任务

celery -A tasks worker -l info

# -A 参数表示app名称,-l 参数表示日志类型

好了,如今任务给定了,Workers也已经就绪,再新建一个test.py文件

from task import add

result = add.delay(2, 3)    # 提交任务,返回任务id
print(result.ready())    # 返回True的话,表示提交成功
print(result.get())    # 获取任务执行结果
Celery配置

从上面的例子能够看出,每次建立app都须要指定broker和backend。
怎样简化这个过程呢?

咱们能够将相关的文件模块化(模块名celery_app,再添加__init__.py文件)

模块下新建一个celeryconfig.py文件

BROKER_URL = 'redis://localhos:6379/1'

CELERY_RESULT_BACKEND = 'redis://localhost:6379/2')

# 导入指定的任务模块
CELERY_TASKS = {
	'celery_app.task1',    # task1.py是模块下的任务文件
	'celery_app.task2',
}

__init__.py

from celery import Celery

app = Celery('demo')
app.config_from_object('celery_app.celeryconfig')

以上配置完成后,则在task1.py等文件(存在@app.task)中添加一行,便可

from celery_app import app

  1. Celery 自己不提供消息服务,可是能够方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ ↩︎

  2. Task result store 用来存储 Worker 执行的任务的结果,Celery 支持以不一样方式存储任务的结果,包括 AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。 ↩︎