分布式队列神器 Celery,你了解多少?

咱们在web开发中会常常遇到异步任务,对于一些消耗资源和时间的操做,若是不从应用中单独抽出来的话,体验是很是很差的,例如:一个手机验证码登陆的过程,当用户输入手机号点击发送后,若是若是直接扔给后端应用去执行的话,就会引发网络IO的阻塞,那整个应用就很是不友好了,那如何优雅的解决这个问题呢?php

咱们可使用异步任务,当接收到请求后,咱们能够在业务逻辑的处理时触发一个异步任务,前端当即返回读秒让用户接收验证码,同时因为是异步执行的任务,后端也能够处理其余的请求,这就很是的完美了。html

实现异步任务的工具备不少,其原理也都是去实现一个消息队列,这里咱们主要来了解一下Celery。前端

Celery 是什么?

Celery简介

Celery是一个由Python编写的简单,灵活且可靠的分布式系统,它能够处理大量消息,同时也提供了操做、维护该分布式系统所需的工具。node

说白点就是,Celery 是一个异步任务的调度工具,它专一于实时任务处理,支持任务调度。有了Celery,咱们能够快速创建一个分布式任务队列并可以简单的管理。Celery虽然是由python编写, 但协议能够用任何语言实现。迄今,已有 Ruby 实现的 RCelery 、node.js 实现的 node-celery 以及一个 PHP 客户端 。python

Celery架构

此处借鉴一张网图,这张图很是明了把Celery的组成以及工做方式描述出来了。git

这里写图片描述

Celery的架构由下面三个部分组成:github

Brokers

意为中间件/中间人,在这里指的是任务队列, 咱们要注意Celery自己不是任务队列,它是管理分布式任务队列的工具,换一句话说,用Celery能够快速进行任务队列的使用与管理, Celery能够方便的和第三方提供的任务队列集成,例如RabbitMQ, Redis等。web

Worker

任务执行单元,咱们能够理解为工人,Worker是Celery提供的任务执行的单元,简单来讲,它就是Celery的工人,相似于消费者,它shi'shi监控着任务队列,当有新的任务入队时,它会从任务队列中取出任务并执行。redis

backend/Task result store

任务结构存储,顾名思义,它就是用来存储Worker执行的任务的结果的地方,Celery支持以不一样方式存储任务的结果,有redis,Memcached等。后端

简单来讲,当用户、或者咱们的应用中的触发器将任务入Brokers队列以后,Celery的Worker就会取出任务并执行,而后将结构保存到Task result store中

使用Celery

简单实现

Celery及消息队列(redis/RabbitMQ)的安装过程在这里就再也不赘述了,出于方便,咱们这里使用redis,点击这里查看官网给出的更多的Brokers和backend支持。

首先,咱们新建一个tasks.py文件。

import time
from celery import Celery

brokers = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'


app = Celery('tasks', broker=brokers, backend=backend)

@app.task
def add(x, y):
    time.sleep(2)
    return x + y

上述代码,咱们导入了celery库,新建了一个celery实例,传入了broker和backend,而后建立了任务函数add,咱们用time.sleep(2)来模拟耗时操做。

接下来咱们要启动Celery服务,在当前命令行终端运行:

celery -A tasks worker  --loglevel=info

注意:若是在Windows中要运行以下命令:

celery -A celery_app worker --loglevel=info -P eventlet

否则会报错。。。。。。

咱们会看到下面的输出结果:

D:\use_Celery>celery -A tasks worker --loglevel=info -P eventlet

 -------------- celery@DESKTOP-8E96VUV v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Windows-10-10.0.18362-SP0 2020-03-18 15:49:22
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x3ed95f0
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     redis://127.0.0.1:6379/1
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2020-03-18 15:49:22,264: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2020-03-18 15:49:22,294: INFO/MainProcess] mingle: searching for neighbors
[2020-03-18 15:49:23,338: INFO/MainProcess] mingle: all alone
[2020-03-18 15:49:23,364: INFO/MainProcess] celery@DESKTOP-8E96VUV ready.
[2020-03-18 15:49:23,371: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/0.

这些输出包括指定的启动Celer应用的一些信息,还有注册的任务等等。 

此时worker已经处于待命状态,而 broker中尚未任务 ,咱们须要触发任务进入broker中,worker才能去取出任务执行。

咱们新建一个add_task.py文件:

from tasks import add

result = add.delay(5, 6)  # 使用celery提供的接口delay进行调用任务函数

while not result.ready():
    pass
print("完成:", result.get())

咱们能够看到命令窗口的输出的celery执行的日志

[2020-03-18 15:53:15,967: INFO/MainProcess] Received task: tasks.add[8da270cb-7f07-4202-ad6a-51cc7f559107]
[2020-03-18 15:53:17,981: INFO/MainProcess] Task tasks.add[8da270cb-7f07-4202-ad6a-51cc7f559107] succeeded in 2.015999999974156s: 11

固然咱们在backend的redis中也能够看到执行任务的相关信息。

至此,一个简单的 celery 应用就完成啦。

周期/定时任务

Celery 也能够实现定时或者周期性任务,实现也很简单,只须要配置好周期任务,而后再启动要启动一个 beat 服务便可。

新建Celery配置文件celery_conf.py:

from datetime import timedelta
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'tasks.add',
        'schedule': timedelta(seconds=3),
        'args': (16, 16)
    }
}

而后在 tasks.py 中经过app.config_from_object('celery_config') 读取Celery配置:

# tasks.py
app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.config_from_object('celery_config')

而后从新运行 worker,接着再运行 beat:

celery -A tasks beat

咱们能够看到如下信息:

D:\use_Celery>celery -A tasks beat
celery beat v4.4.2 (cliffs) is starting.
__    -    ... __   -        _
LocalTime -> 2020-03-18 17:07:54
Configuration ->
    . broker -> redis://127.0.0.1:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)

而后咱们就能够看到启动worker的命令行在周期性的执行任务:

[2020-03-18 17:07:57,998: INFO/MainProcess] Received task: tasks.add[f5dab8ac-0809-415f-84e7-cba488ea2495]
[2020-03-18 17:07:59,995: INFO/MainProcess] Task tasks.add[f5dab8ac-0809-415f-84e7-cba488ea2495] succeeded in 2.0s: 32
[2020-03-18 17:08:00,933: INFO/MainProcess] Received task: tasks.add[b49a4c92-e007-46ef-9b5d-f93f451a6c1b]
[2020-03-18 17:08:02,946: INFO/MainProcess] Task tasks.add[b49a4c92-e007-46ef-9b5d-f93f451a6c1b] succeeded in 2.0160000000032596s: 32
[2020-03-18 17:08:03,934: INFO/MainProcess] Received task: tasks.add[1bdfe4d8-76c1-44cc-b1fa-dbbe242692ae]
[2020-03-18 17:08:05,940: INFO/MainProcess] Task tasks.add[1bdfe4d8-76c1-44cc-b1fa-dbbe242692ae] succeeded in 2.0s: 32

能够看出每3秒就有一个任务被加入队列中去执行。

那定时任务又怎样去实现呢?

也很简单,咱们只须要更改一下配置文件便可:

CELERYBEAT_SCHEDULE = {
    'add-crontab-func': {
        'task': 'tasks.add',
        'schedule': crontab(hour=8, minute=50, day_of_week=4),
        'args': (30, 20),
    },
}
CELERY_TIMEZONE = 'Asia/Shanghai'  # 配置时区信息

其中crontab(hour=8, minute=50, day_of_week=4)表明的是每周四的8点50执行一次,只要咱们的Celery服务一直开着,定时任务就会按时执行;在这里我也在配置里加入了时区信息。

我在这里是8点45启动的Celery服务、运行的beat,从下面的输出能够看出,50的时候咱们的定时任务就执行了。

[2020-03-19 08:45:19,934: INFO/MainProcess] celery@DESKTOP-8E96VUV ready.
[2020-03-19 08:50:00,086: INFO/MainProcess] Received task: tasks.add[45aa794d-a4ef-40e0-9480-80c7004318d5]
[2020-03-19 08:50:02,091: INFO/MainProcess] Task tasks.add[45aa794d-a4ef-40e0-9480-80c7004318d5] succeeded in 2.0s: 50

由此咱们能够看出,利用 Celery 进行分布式队列管理将会大大的提升咱们的开发效率,我这里也仅仅是关于Celery的简单介绍和使用,若是你们感兴趣,能够去官方文档 学习更高级更系统的用法。

最后,感谢女友在生活中,工做上的包容、理解与支持 !

相关文章
相关标签/搜索