原文连接:https://blog.csdn.net/freeking101/article/details/74707619css
Celery 官网:http://www.celeryproject.org/html
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.htmlpython
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/mysql
celery配置:http://docs.jinkan.org/docs/celery/configuration.html#configurationgit
参考:http://www.cnblogs.com/landpack/p/5564768.html http://blog.csdn.net/happyAnger6/article/details/51408266github
http://www.cnblogs.com/forward-wang/p/5970806.htmlweb
分布式队列神器 Celery:http://www.javashuo.com/article/p-gwnunhqd-hp.htmlredis
celery最佳实践:https://my.oschina.net/siddontang/blog/284107sql
Celery 分布式任务队列快速入门:http://www.cnblogs.com/alex3714/p/6351797.htmlmongodb
异步任务神器 Celery 快速入门教程:http://www.javashuo.com/article/p-exnuhozr-bt.html
定时任务管理之python篇celery使用:http://student-lp.iteye.com/blog/2093397
异步任务神器 Celery:http://python.jobbole.com/87086/
celery任务调度框架实践:http://www.javashuo.com/article/p-gsudbdhf-bd.html
Celery-4.1 用户指南: Monitoring and Management Guide:https://blog.csdn.net/libing_thinking/article/details/78592801
Celery安装及使用:https://blog.csdn.net/u012325060/article/details/79292243
Celery学习笔记(一):https://blog.csdn.net/sdulsj/article/details/73741350
除了redis,还可使用另一个神器---Celery。Celery是一个异步任务的调度工具。
Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了能够有多个 worker 的存在,队列表示其是异步操做,即存在一个产生任务提出需求的工头,和一群等着被分配工做的码农。
在 Python 中定义 Celery 的时候,咱们要引入 Broker,中文翻译过来就是“中间人”的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把全部的任务放到 Broker 里面,在 Broker 的另一头,一群码农等着取出一个个任务准备着手作。
这种模式注定了整个系统会是个开环系统,工头对于码农们把任务作的怎样是不知情的。因此咱们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像咱们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。咱们能够选择只让错误执行的任务返回结果到 Backend,这样咱们取回结果,即可以知道有多少任务执行失败了。
Celery(芹菜)是一个异步任务队列/基于分布式消息传递的做业队列。它侧重于实时操做,但对调度支持也很好。Celery用于生产系统天天处理数以百万计的任务。Celery是用Python编写的,但该协议能够在任何语言实现。它也能够与其余语言经过webhooks实现。Celery建议的消息队列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和数据库(使用SQLAlchemy的或Django的 ORM) 。Celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包便可。
在Celery中几个基本的概念,须要先了解下,否则不知道为何要安装下面的东西。概念:Broker、Backend。
什么是broker?
broker是一个消息传输的中间件,能够理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,然后celery的worker将会取到消息,进行对于的程序执行。好吧,这个邮箱能够当作是一个消息队列。其中Broker的中文意思是 经纪人 ,其实就是一开始说的 消息队列 ,用来发送和接受消息。这个Broker有几个方案可供选择:RabbitMQ (消息队列),Redis(缓存数据库),数据库(不推荐),等等
什么是backend?
一般程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。Backend是在Celery的配置中的一个配置项 CELERY_RESULT_BACKEND ,做用是保存结果和状态,若是你须要跟踪任务的状态,那么须要设置这一项,能够是Database backend,也能够是Cache backend,具体能够参考这里: CELERY_RESULT_BACKEND 。
对于 brokers,官方推荐是 rabbitmq 和 redis,至于 backend,就是数据库。为了简单能够都使用 redis。
我本身演示使用RabbitMQ做为Broker,用MySQL做为backend。
来一张图,这是在网上最多的一张Celery的图了,确实描述的很是好
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件
Celery自己不提供消息服务,可是能够方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不一样方式存储任务的结果,包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。
这里我先不去看它是如何存储的,就先选用redis来存储任务执行结果。
由于涉及到消息中间件(在Celery帮助文档中称呼为中间人<broker>),为了更好的去理解文档中的例子,能够安装两个中间件,一个是RabbitMQ,一个redis。
根据 Celery的帮助文档 安装和设置RabbitMQ, 要使用 Celery,须要建立一个 RabbitMQ 用户、一个虚拟主机,而且容许这个用户访问这个虚拟主机。
结果以下,成功运行:
安装Redis,它的安装比较简单
$ sudo pip install redis
而后进行简单的配置,只须要设置 Redis 数据库的位置:
BROKER_URL = 'redis://localhost:6379/0'
URL的格式为:
redis://:password@hostname:port/db_number
URL Scheme 后的全部字段都是可选的,而且默认为 localhost 的 6479 端口,使用数据库 0。个人配置是:
redis://:password@ubuntu:6379/5
安装Celery,我是用标准的Python工具pip安装的,以下:
$ sudo pip install celery
Celery 是一个强大的 分布式任务队列 的 异步处理框架,它可让任务的执行彻底脱离主程序,甚至能够被分配到其余主机上运行。咱们一般使用它来实现异步任务(async task)和定时任务(crontab)。咱们须要一个消息队列来下发咱们的任务。首先要有一个消息中间件,此处选择rabbitmq (也可选择 redis 或 Amazon Simple Queue Service(SQS)消息队列服务)。推荐 选择 rabbitmq 。使用RabbitMQ是官方特别推荐的方式,所以我也使用它做为咱们的broker。它的架构组成以下图:
能够看到,Celery 主要包含如下几个模块:
任务模块 Task
包含异步任务和定时任务。其中,异步任务一般在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
消息中间件 Broker
Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 自己不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
任务执行单元 Worker
Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。
任务结果存储 Backend
Backend 用于存储任务的执行结果,以供查询。同消息中间件同样,存储也可以使用 RabbitMQ, redis 和 MongoDB 等。
有了上面的概念,须要安装这么几个东西:RabbitMQ、SQLAlchemy、Celery
安装rabbitmq
官网安装方法:http://www.rabbitmq.com/install-windows.html
启动管理插件:sbin/rabbitmq-plugins enable rabbitmq_management
启动rabbitmq:sbin/rabbitmq-server -detached
rabbitmq已经启动,能够打开页面来看看
地址:http://localhost:15672/#/
用户名密码都是guest 。如今能够进来了,能够看到具体页面。 关于rabbitmq的配置,网上不少 本身去搜如下就ok了。
消息中间件有了,如今该来代码了,使用 celeby官网代码。
剩下两个都是Python的东西了,直接pip安装就行了,对于历来没有安装过mysql驱动的同窗可能须要安装MySQL-python。安装完成以后,启动服务: $ rabbitmq-server[回车]。启动后不要关闭窗口, 下面操做新建窗口(Tab)。
安装celery
Celery能够经过pip自动安装,若是你喜欢使用虚拟环境安装能够先使用virtualenv建立一个本身的虚拟环境。反正我喜欢使用virtualenv创建本身的环境。
pip install celery
http://www.open-open.com/lib/view/open1441161168878.html
使用celery包含三个方面:1. 定义任务函数。2. 运行celery服务。3. 客户应用程序的调用。
建立一个文件 tasks.py
输入下列代码:
上述代码导入了celery,而后建立了celery 实例 app,实例化的过程当中指定了任务名tasks
(和文件名一致),传入了broker和backend。而后建立了一个任务函数add
。下面启动celery服务。在当前命令行终端运行(分别在 env1 和 env2 下执行):
celery -A tasks worker --loglevel=info
目录结构 (celery -A tasks worker --loglevel=info 这条命令当前工做目录必须和 tasks.py 所在的目录相同。即 进入tasks.py所在目录执行这条命令。)
使用 python 虚拟环境 模拟两个不一样的 主机。
此时会看见一对输出。包括注册的任务啦。
交互式客户端程序调用方法
打开一个命令行,进入Python环境。
调用 delay 函数便可启动 add 这个任务。这个函数的效果是发送一条消息到broker中去,这个消息包括要执行的函数、函数的参数以及其余信息,具体的能够看 Celery官方文档。这个时候 worker 会等待 broker 中的消息,一旦收到消息就会马上执行消息。
启动了一个任务以后,能够看到以前启动的worker已经开始执行任务了。
如今是在python环境中调用的add函数,实际上一般在应用程序中调用这个方法。
注意:若是把返回值赋值给一个变量,那么原来的应用程序也会被阻塞,须要等待异步任务返回的结果。所以,实际使用中,不须要把结果赋值。
应用程序中调用方法
新建一个 main.py 文件 代码以下:
在celery命令行能够看见celery执行的日志。打开 backend的redis,也能够看见celery执行的信息。
使用 Redis Desktop Manager 查看 Redis 数据库内容如图:
Celery 的配置比较多,能够在 官方配置文档:http://docs.celeryproject.org/en/latest/userguide/configuration.html 查询每一个配置项的含义。
上述的使用是简单的配置,下面介绍一个更健壮的方式来使用celery。首先建立一个python包,celery服务,姑且命名为proj。目录文件以下:
☁ proj tree
.
├── __init__.py
├── celery.py # 建立 celery 实例
├── config.py # 配置文件
└── tasks.py # 任务函数
首先是 celery.py
这一次建立 app,并无直接指定 broker 和 backend。而是在配置文件中。
config.py
剩下的就是tasks.py
使用方法也很简单,在 proj 的同一级目录执行 celery:
celery -A proj worker -l info
如今使用任务也很简单,直接在客户端代码调用 proj.tasks 里的函数便可。
指定 路由 到的 队列
Celery的官方文档 。先看代码(tasks.py):
上面的tasks.py中,首先定义了一个Celery对象,而后用celeryconfig.py对celery对象进行设置,以后再分别定义了三个task,分别是taskA,taskB和add。接下来看一下celeryconfig.py 文件
在 celeryconfig.py 文件中,首先设置了brokel以及result_backend,接下来定义了三个Message Queue,而且指明了Queue对应的Exchange(当使用Redis做为broker时,Exchange的名字必须和Queue的名字同样)以及routing_key的值。
如今在一台主机上面启动一个worker,这个worker只执行for_task_A队列中的消息,这是经过在启动worker是使用-Q Queue_Name参数指定的。
celery -A tasks worker -l info -n worker.%h -Q for_task_A
而后到另外一台主机上面执行taskA任务。首先 切换当前目录到代码所在的工程下,启动python,执行下面代码启动taskA:
执行完上面的代码以后,task_A消息会被当即发送到for_task_A队列中去。此时已经启动的worker.atsgxxx 会当即执行taskA任务。
重复上面的过程,在另一台机器上启动一个worker专门执行for_task_B中的任务。修改上一步骤的代码,把 taskA 改为 taskB 并执行。
在上面的 tasks.py 文件中还定义了add任务,可是在celeryconfig.py文件中没有指定这个任务route到那个Queue中去执行,此时执行add任务的时候,add会route到Celery默认的名字叫作celery的队列中去。
由于这个消息没有在celeryconfig.py文件中指定应该route到哪个Queue中,因此会被发送到默认的名字为celery的Queue中,可是咱们尚未启动worker执行celery中的任务。接下来咱们在启动一个worker执行celery队列中的任务。
celery -A tasks worker -l info -n worker.%h -Q celery
而后再查看add的状态,会发现状态由PENDING变成了SUCCESS。
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
一种常见的需求是每隔一段时间执行一个任务。
在celery中执行定时任务很是简单,只须要设置celery对象的CELERYBEAT_SCHEDULE属性便可。
配置以下
config.py
注意配置文件须要指定时区。这段代码表示每隔30秒执行 add 函数。一旦使用了 scheduler, 启动 celery须要加上-B 参数。
celery -A proj worker -B -l info
设置多个定时任务
定义3个定时任务,即每隔20s执行taskA任务,参数为(5,6),每隔200s执行taskB任务,参数为(10,20,30),每隔10s执行add任务,参数为(1,2).经过下列命令启动一个定时任务: celery -A tasks beat。使用 beat 参数便可启动定时任务。
计划任务固然也能够用crontab实现,celery也有crontab模式。修改 config.py
scheduler的切分度很细,能够精确到秒。crontab模式就不用说了。
固然celery还有更高级的用法,好比 多个机器 使用,启用多个 worker并发处理 等。
发送任务到队列中
apply_async(args[, kwargs[, …]])、delay(*args, **kwargs) :http://docs.celeryproject.org/en/master/userguide/calling.html
send_task :http://docs.celeryproject.org/en/master/reference/celery.html#celery.Celery.send_task
输入 celery -h 能够看到 celery 的命令和帮助
更详细的帮助能够看官方文档:http://docs.celeryproject.org/en/master/userguide/monitoring.html
官网示例:http://docs.celeryproject.org/en/master/getting-started/first-steps-with-celery.html#first-steps
第一步
编写简单的纯python函数
第二步
若是这个函数不是简单的输出两个字符串相加,而是须要查询数据库或者进行复杂的处理。这种处理须要耗费大量的时间,仍是这种方式执行会是多么糟糕的事情。为了演示这种现象,可使用sleep函数来模拟高耗时任务。
第三步
这时候咱们可能会思考怎么使用多进程或者多线程去实现这种任务。对于多进程与多线程的不足这里不作讨论。如今咱们能够想一想celery到底能不能解决这种问题。
如今来解释一下新加入的几行代码,首先说明一下加入的新代码彻底不须要改变原来的代码。导入celery模块就不用解释了,声明一个celery实例app的参数须要解释一下。
第四步
如今咱们已经使用了celery框架了,咱们须要让它找几个工人帮咱们干活。好如今就让他们干活。
celery -A sample worker --loglevel=info
这条命令有些长,我来解释一下吧。
第五步
如今咱们的任务已经被加载到了内存中,咱们不能再像以前那样执行python sample.py来运行程序了。咱们能够经过终端进入python而后经过下面的方式加载任务。输入python语句。
咱们的函数会当即返回,不须要等待。就那么简单celery解决了咱们的问题。能够发现咱们的say函数不是直接调用了,它被celery 的 task 装饰器 修饰过了。因此多了一些属性。目前咱们只须要知道使用delay就好了。
确保你以前的RabbitMQ已经启动。仍是官网的那个例子,在任意目录新建一个tasks.py的文件,内容以下:
使用redis做为消息队列
在同级目录执行:
$ celery -A tasks.app worker --loglevel=info
该命令的意思是启动一个worker ( tasks文件中的app实例,默认实例名为app,-A 参数后也可直接加文件名,不须要 .app),把tasks中的任务(add(x,y))把任务放到队列中。保持窗口打开,新开一个窗口进入交互模式,python或者ipython:
到此为止,你已经可使用celery执行任务了,上面的python交互模式下简单的调用了add任务,并传递 4,4 参数。
但此时有一个问题,你忽然想知道这个任务的执行结果和状态,到底完了没有。所以就须要设置backend了
修改以前的tasks.py中的代码为:
除了添加backend以外,上面还添加了一个who的方法用来测试多服务器操做。修改完成以后,还按以前的方式启动。
一样进入python的交互模型:
作完上面的测试以后,产生了一个疑惑,Celery叫作分布式任务管理,那它的分布式体如今哪?它的任务都是怎么执行的?在哪一个机器上执行的?在当前服务器上的celery服务不关闭的状况下,按照一样的方式在另一台服务器上安装Celery,并启动:
$ celery -A tasks worker --loglevel=info
发现前一个服务器的Celery服务中输出你刚启动的服务器的hostname,前提是那台服务器连上了你的rabbitmq。而后再进入python交互模式:
看你输入的内容已经观察两台服务器上你启动celery服务的输出。
在实际的项目中咱们须要明确先后台的分界线,所以咱们的celery编写的时候就应该是分红先后台两个部分编写。在celery简单入门中的总结部分咱们也提出了另一个问题,就是须要分离celery的配置文件。
第一步
编写后台任务tasks.py脚本文件。在这个文件中咱们不须要再声明celery的实例,咱们只须要导入其task装饰器来注册咱们的任务便可。后台处理业务逻辑彻底独立于前台,这里只是简单的hello world程序须要多少个参数只须要告诉前台就能够了,在实际项目中可能你须要的是后台执行发送一封邮件的任务或者进行复杂的数据库查询任务等。
第二步
有了那么完美的后台,咱们的前台编写确定也轻松很多。到底简单到什么地步呢,来看看前台的代码吧!为了形象的代表其职能,咱们将其命名为client.py脚本文件。
能够看到只须要简单的几步:1. 声明一个celery实例。2. 加载配置文件。3. 发送任务。
第三步
继续完成celery的配置。官方的介绍使用celeryconfig.py做为配置文件名,这样能够防止与你如今的应用的配置同名。
能够看到咱们指定了CELERY_RESULT_BACKEND为amqp默认的队列!这样咱们就能够查看处理后的运行状态了,后面将会介绍处理结果的查看。
第四步
启动celery后台服务,这里是测试与学习celery的教程。在实际生产环境中,若是是经过这种方式启动的后台进程是不行的。所谓后台进程一般是须要做为守护进程运行在后台的,在python的世界里老是有一些工具可以知足你的须要。这里可使用supervisor做为进程管理工具。在后面的文章中将会介绍如何使用supervisor工具。
celery worker -l info --beat
注意如今运行worker的方式也与前面介绍的不同了,下面简单介绍各个参数。
-l info 与--loglevel=info的做用是同样的。
--beat 周期性的运行。即设置 心跳。
第五步
前台的运行就比较简单了,与平时运行的python脚本同样。python client.py。
如今前台的任务是运行了,但是任务是被写死了。咱们的任务大多数时候是动态的,为演示动态工做的状况咱们可使用终端发送任务。
在python终端导入celery模块声明实例而后加载配置文件,完成了这些步骤后就能够动态的发送任务而且查看任务状态了。注意在配置文件celeryconfig.py中咱们已经开启了处理的结果回应模式了CELERY_IGNORE_RESULT = False而且在回应方式配置中咱们设置了CELERY_RESULT_BACKEND = 'amqp'这样咱们就能够查看处处理的状态了。
能够看到任务发送给celery后立刻查看任务状态会处于PENDING状态。稍等片刻就能够查看到SUCCESS状态了。这种效果然棒不是吗?在图像处理中或者其余的一些搞耗时的任务中,咱们只须要把任务发送给后台就不用去管它了。当咱们须要结果的时候只须要查看一些是否成功完成了,若是返回成功咱们就能够去后台数据库去找处理后生成的数据了。
第一步
安装好mongodb了!就可使用它了,首先让咱们修改celeryconfig.py文件,使celery知道咱们有一个新成员要加入咱们的项目,它就是mongodb配置的方式以下。
把#CELERY_RESULT_BACKEND = 'amp'注释掉了,可是没有删除目的是对比先后的改变。为了使用mongodb咱们有简单了配置一下主机端口以及数据库名字等。显然你能够按照你喜欢的名字来配置它。
第二步
启动 mongodb 数据库:mongod。修改客户端client.py让他可以动态的传人咱们的数据,很是简单代码以下。
任务tasks.py不须要修改!
第三步
测试代码,先启动celery任务。
celery worker -l info --beat
再来启动咱们的客户端,注意此次启动的时候须要给两个参数啦!
mongo
python client.py welcome landpack
等上5秒钟,咱们的后台处理完成后咱们就能够去查看数据库了。
第四步
查看mongodb,须要启动一个mongodb客户端,启动很是简单直接输入 mongo 。而后是输入一些简单的mongo查询语句。
最后查到的数据结果多是你不想看到的,由于mongo已经进行了处理。想了解更多能够查看官方的文档。
--------------------- 本文来自 擒贼先擒王 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/freeking101/article/details/74707619?utm_source=copy