Python中任务队列-芹菜celery的使用|Python 主题月

本文正在参加「Python主题月」,详情查看 活动链接html

1、关于celery

芹菜celery是一个python实现的异步任务队列,能够用于爬虫、web后台查询、计算等等。经过任务队列,当一个任务来临时再也不傻傻等待。python

他的架构以下:web

celery_architecture

  • Broker

咱们的生产者建立任务后会进入celery的任务调度队列中间件Broker,Broker经过调度规则将消息(任务)调度消息队列,Broker依赖第三方队列消息代理如rabbitmqredis等。redis

  • Worker

广大劳动者,盯着消息队列,当队列中有消息时把它拿过来给处理了。docker

  • Backend

用于结果存储经worker处理的结果,好比经常使用的数据库等。shell


#### 使用celery数据库

在本文中我们使用rabbitmq(celery推荐)做为消息代理中间件。django

咱们建立的celery目录以下后端

learn_celery/
...celery_env/
...celery.py
...my_task1.py
...my_task2.py
...task1_run.py
...task2_run.py
复制代码
1. 建立虚拟环境并安装celery、flower(web监控),这里不作赘述。
2.安装我们的消息队列中间件rabbitmq

这里以docker的方式运行并配置,指定主机名为rabbit(rabbitmq是以主机名来访问的,因此这是必须的),容器名称为celery_rabbitmqmarkdown

docker run -d -p 5672:5672 -h rabbit --name celery_rabbitmq rabbitmq
复制代码

添加用于celery访问的用户,以及配置configurewriteread权限,在下面咱们配置rabbit_user拥有全部配置、写入和读取权限。

docker exec -it celery_rabbitmq rabbitmqctl add_user rabbit_user rabbit_pass
docker exec -it celery_rabbitmq rabbitmqctl add_vhost rabbit_vhost
docker exec -it celery_rabbitmq rabbitmqctl set_user_tags rabbit_user celery
docker exec -it celery_rabbitmq rabbitmqctl  set_permissions -p rabbit_vhost rabbit_user ".*" ".*" ".*"
复制代码
3.建立celery应用
#celery.py
from celery import Celery

broker_rabbitmq="amqp://rabbit_user:rabbit_pass@i-k9pwet2d/rabbit_vhost"
app=Celery("learn_celery",broker=broker_rabbitmq,backend="rpc://",include=["learn_celery.my_task2","learn_celery.my_task2"])
复制代码

咱们经过建立app来实例化Celery,项目包的名称为learn_celery,经过broker_rabbitmq来链接rabbitmq,rabbitmq的amqp协议格式为

amqp://userid:password@hostname:port/virtual_host
复制代码

因为咱们是在docker中启动的rabbitmq,因此咱们的hostname应该为宿主机的hostname。

指定后端经过rpc回传数据,include加载带worker处理的任务learn_celery.my_task1learn_celery.my_task2

4.建立两个任务(消息)
#my_task1.py
from .celery import app
import time

@app.task
def args_add1(x,y):
    print("start task no.1 now!")
    time.sleep(10)
    print("task no.1 end!")
    return x+y

#my_task12.py
from .celery import app
import time

@app.task
def args_add2(x,y):
    print("start task no.2 now!")
    time.sleep(20)
    print("task no.2 end!")
    return x+y
复制代码

在这里咱们导入了celery中的app,并用它来装饰咱们的方法args_add,在args_add中模拟任务处理时间分别为10s、20s而后返回结果。

5.发送任务给celery
#tasks1_run.py
from .my_task1 import args_add1
import time

reslut=args_add1.delay(11,22)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
time.sleep(15)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))

#tasks2_run.py
from .my_task2 import args_add2
import time

reslut=args_add2.delay(33,44)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
time.sleep(25)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))

复制代码

关于任务的delay,官方文档(参考)是这样描述的,我把它理解为发送任务给celery或者celery调用待进来的任务。

image-20210707162220566

reslut.ready() 返回任务执行是否执行完成True or False

reslut.result 返回任务执行结果

咱们在任务进入celery和结束分别检查一次。


2、看看结果

1.启动worker

进入learn_celery的父目录。启动learn_celery的这个应用worker,并指定并发数为10个

celery -A learn_celery worker --loglevel=info --concurrency=10
复制代码

若celery链接rabbitmq正常,咱们能够看到以下的info

image-20210707112018241

2.执行任务

为了便于观察,咱们另外开启一个窗口2,到learn_celery父目录运行task1_run模块

python -m learn_celery.tasks1_run
复制代码

image-20210707164856051

开启窗口3,到learn_celery父目录运行task2_run模块

python -m learn_celery.tasks2_run
复制代码

image-20210707165012326

能够看到通过各自任务的等待时间后,两个任务都顺利执行结束,并获得结果,接下来咱们到worker上看一下info

因为celery的并发性,收到任务立刻被调入执行,任务1耗时10s结果为33,任务2耗时20s结果为77


3、使用Flower监控celery

1.启动flower
celery -A learn_celery flower
复制代码
2. 查看web监控 http://ip:5555

Tasks中能够查看到当前任务队列的状态、参数、接收和启动、执行时间。 image-20210707170905888Dashborad中查看当前worker节点的相关信息 image-20210707171023610


文章有不足的地方欢迎指出。

欢迎收藏、点赞、提问。关注顶级饮水机管理员,除了管烧热水,有时还作点别的。


NEXT

  • celery的深刻了解

  • celery在django中的使用

相关文章
相关标签/搜索