Celery 是一个 基于python开发的分布式异步消息任务队列,经过它能够轻松的实现任务的异步处理, 若是你的业务场景中须要用到异步任务,就能够考虑使用celery, 举几个实例场景中可用的例子: html
1. 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只须要拿着这个任务id就能够拿到任务执行结果, 在任务执行ing进行时,你能够继续作其它的事情。 python
2. 你想作一个定时任务,好比天天检测一下大家全部客户的资料,若是发现今天 是客户的生日,就给他发个短信祝福 linux
【简单讲:异步 + 定时】 redis
Celery有如下优势sql
1. 简单:一单熟悉了celery的工做流程后,配置和使用仍是比较简单的 shell
2. 高可用:当任务执行失败或执行过程当中发生链接中断,celery 会自动尝试从新执行任务 服务器
3. 快速:一个单进程的celery每分钟可处理上百万个任务 app
4. 灵活: 几乎celery的各个组件均可以被扩展及自定制异步
Celery 在执行任务时须要经过一个消息中间件来接收和发送任务消息,以及存储任务结果, 通常使用rabbitMQ or Redis async
Celery是一个上层任务,当有用户请求到来的时候,发送一个任务给celery,这个任务会被中间件Redis/RabbitMQ接收发送给Celery的节点去执行任务,有一个好处就是能够横向的扩展机器去执行任务。
Celery安装使用
Celery的默认broker[可理解为中间件Redis/RabbitMQ]是RabbitMQ, 仅需配置一行就能够
broker_url = 'amqp://guest:guest@localhost:5672//'
rabbitMQ 没装的话请装一下,安装看这里 http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3
使用Redis作broker也能够
安装redis组件
pip install -U "celery[redis]"
配置
Configuration is easy, just configure the location of your Redis database: app.conf.broker_url = 'redis://localhost:6379/0' Where the URL is in the format of:【若是中间件有认证操做】 redis://:password@hostname:port/db_number all fields after the scheme are optional, and will default to localhost on port 6379, using database 0. 若是想获取每一个任务的执行结果,还须要配置一下把任务结果存在哪 If you also want to store the state and return values of tasks in Redis, you should configure these settings: app.conf.result_backend = 'redis://localhost:6379/0'
Win7下安装celery模块
pip3 install celery
[测试发现celery安装完成后Win7的cmd能够用celery,不须要添加] 若是须要手动添加,则找到celery的安装路径,写入Win7的path里便可。 E:\PyCharm 2017.2.4\Python3.2.5\Lib\site-packages\celery\bin\
Ubuntu下安装
首先安装pip3
sudo apt-get install python3-pip
安装Celery
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple celery
Ubuntu下安装成功
Ubuntu下运行报错,缺乏Redis
运行Celery任务后报错
ImportError: Missing redis library (pip install redis)
安装Redis链接模块:
pip3 install redis
启动Celery Worker来开始监听并执行任务
celery -A Celery的Py文件名 worker --loglevel=info
操做前期条件: 安装并启动Redis
myCelery.py
from celery import Celery # 定义了一个Celery的App app = Celery('tasks', broker='redis://192.168.2.105', # Celery和用户请求的中间代理 backend='redis://192.168.2.105') # 接收Celery返回结果的 @app.task # 函数变成一个Celery的任务,调用celery实现异步任务 def add(x, y): print("running...", x, y) return x + y
运行:
celery -A myCelery worker --loglevel=info 【写文件名称便可】
运行结果[Win]
运行结果[Linux]
注: celery文件运行起来后只能接收和执行任务[等待任务状态...],还须要用户发送任务
Linux下发布任务
进入文件所在的路径下
omc@omc-virtual-machine:~/Celery$ cd /home/omc/Celery
进入Python环境
omc@omc-virtual-machine:~/Celery$ python
在Python环境下导入文件的函数
Python 3.5.2 (default, Nov 23 2017, 16:37:01) [GCC 5.4.0 20160609] on linux Type "help", "copyright", "credits" or "license" for more information. >>> from myCelery import add >>> add.delay(100,100) <AsyncResult: 22631adf-e54f-4718-99a1-1e76b5c1575f> >>> add(100,300) running... 100 300 400
注:中间件Redis只负责存储了中间的消息
打开Redis查看存储的消息内容
omc@omc-virtual-machine:~$ redis-cli
127.0.0.1:6379> get celery-task-meta-ebad12b8-d4ff-461b-b6e5-0dab4eaba855
>>> r = add.delay(20,20000) >>> r.
>>> r.get() 20020
返回对象的其余方法的使用:
>>> r = cmd_Celery.my_cmd.delay("df -h") The ready() method returns whether the task has finished processing or not: >>> r.ready() False You can wait for the r to complete, but this is rarely used since it turns the asynchronous call into a synchronous one: >>> r.get(timeout=1) # 设置超时时间 8 In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument: >>> r.get(propagate=False) # propagate 扩展,添加propagate后会对报错进行格式化输出 If the task raised an exception you can also gain access to the original traceback: >>> r.traceback # 报错调试用 …
前台发布:
后台有2个Celery的worker
work1:
Work2:
操做前期条件: 安装并启动Redis
cmd_Celery.py
from celery import Celery # 定义了一个Celery的App app = Celery('tasks', broker='redis://192.168.2.105', # redis://:password@hostname:port/db_number 有密码认证的链接 # broker='redis://:密码@192.168.2.105:6379/0', backend='redis://192.168.2.105') # 接收Celery返回结果的 # 函数变成一个Celery的任务,调用celery实现异步任务 import subprocess import time @app.task def my_cmd(cmd): print('Celery of CMD:', cmd) time.sleep(5) # 判断是不是异步的标志,会卡5秒后执行cmd的命令 cmd_obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return cmd_obj.stdout.read().decode("utf-8") # 返回结果必须是可JSON的,须要编码
后台启动Celery任务:
omc@omc-virtual-machine:~/Celery$ celery -A cmd_Celery worker --loglevel=debug
前台客户端发送命令:
>>> python3 >>> import cmd_Celery >>> r = cmd_Celery.my_cmd.delay("df -h") >>> r.get()
后台Celery服务器端执行任务:
若是返回结果不是可JSON会报错:
默认返回的是byte类型的,须要进行解码
2次Ctrl+C
前台启动命令: celery -A 项目名worker -loglevel=info 后台启动命令: celery multi start w1 -A 项目名 -l info 后台重启命令: celery multi start w1 -A 项目名 -l info 后台中止命令: celery multi stop w1 -A 项目名 -l info 先后台的区别: 后台是mult启动