为啥要学这个?在作测试的时候,对于一些特殊场景,好比凌晨3点执行一批测试集,或者在前端发送100个请求时,而每一个请求响应至少1s以上,用户不可能等着后端执行完成后,将结果返回给前端,这个时候须要一个异步任务队列。而python提供一个分布式异步消息任务队列------- Celery。php
任务队列通常用于线程或计算机之间分配工做的一种机制。html
任务队列的输入是一个称为任务的工做单元,有专门的职程(Worker)进行不断的监视任务队列,进行执行新的任务工做。前端
Celery 经过消息机制进行通讯,一般使用中间人(Broker)做为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列发送一条消息,而后中间人(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)进行执行中间人(Broker)分配的任务。node
Celery 能够有多个职程(Worker)和中间人(Broker),用来提升Celery的高可用性以及横向扩展能力。python
Celery 是用 Python 编写的,但协议能够用任何语言实现。除了 Python 语言实现以外,还有Node.js的node-celery和php的celery-php。git
能够经过暴露 HTTP 的方式进行,任务交互以及其它语言的集成开发。github
Celery 是一个异步任务队列,一个Celery有三个核心组件:web
一、Celery 客户端: 用于发布后台做业;当与 Flask 一块儿工做的时候,客户端与 Flask 应用一块儿运行。redis
二、Celery workers: 运行后台做业的进程。Celery 支持本地和远程的 workers,能够在本地服务器上启动一个单独的 worker,也能够在远程服务器上启动worker,须要拷贝代码;数据库
三、消息代理: 客户端经过消息队列和 workers 进行通讯,Celery 支持多种方式来实现这些队列。最经常使用的代理就是 RabbitMQ 和 Redis。
前面引言中已经说了两种,这里再列举一下:
一、Celery 是一个 基于python开发的分布式异步消息任务队列,经过它能够轻松的实现任务的异步处理,
若是你的业务场景中须要用到异步任务,就能够考虑使用celery
二、你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,
你过一段时间只须要拿着这个任务id就能够拿到任务执行结果, 在任务执行ing进行时,你能够继续作其它的事情
三、Celery 在执行任务时须要经过一个消息中间件来接收和发送任务消息,以及存储任务结果, 通常使用rabbitMQ or Redis
一、简单:一单熟悉了celery的工做流程后,配置和使用仍是比较简单的
二、高可用:当任务执行失败或执行过程当中发生链接中断,celery 会自动尝试从新执行任务
三、快速:一个单进程的celery每分钟可处理上百万个任务
四、灵活: 几乎celery的各个组件均可以被扩展及自定制
草图:
user:用户程序,用于告知celery去执行一个任务。就是produce消息提供者。
broker: 存听任务(依赖RabbitMQ或Redis,进行存储)
worker:执行任务
一、方便查看定时任务的执行状况, 如 是否成功, 当前状态, 执行任务花费的时间等.
二、使用功能齐备的管理后台或命令行添加,更新,删除任务.
三、方便把任务和配置管理相关联.
四、可选 多进程, Eventlet 和 Gevent 三种模型并发执行.
五、提供错误处理机制.
六、提供多种任务原语, 方便实现任务分组,拆分,和调用链.
七、支持多种消息代理和存储后端.
八、Celery 是语言无关的.它提供了python 等常见语言的接口支持.
画一个简单的架构图,帮助理解:
角色:
一、Celery Beat : 任务调度器. Beat 进程会读取配置文件的内容, 周期性的将配置中到期须要执行的任务发送给任务队列.
2、Celery Worker : 执行任务的消费者, 一般会在多台服务器运行多个消费者, 提升运行效率.
三、Broker : 消息代理, 队列自己. 也称为消息中间件. 接受任务生产者发送过来的任务消息, 存进队列再按序分发给任务消费方(一般是消息队列或者数据库).
四、Producer : 任务生产者. 调用 Celery API , 函数或者装饰器, 而产生任务并交给任务队列处理的都是任务生产者.
5、Result Backend : 任务处理完成以后保存状态信息和结果, 以供查询.
任务执行方式:
1.发布者发布任务(WEB 应用)
2.任务调度定期发布任务(定时任务)
依赖库:
billiard : 基于 Python2.7 的 multisuprocessing 而改进的库, 主要用来提升性能和稳定性. librabbitmp : C 语言实现的 Python 客户端 kombu : Celery 自带的用来收发消息的库, 提供了符合 Python 语言习惯的, 使用 AMQP 协议的高级借口. 这三个库, 都由 Celery 的开发者开发和维护.
使用于生产环境的消息代理有 RabbitMQ 和 Redis, 官方推荐 RabbitMQ.
这里我使用redis。
BROKER_URL = 'redis://localhost:6379' #代理人 CELERY_RESULT_BACKEND = 'redis://localhost:6379' #结果存储地址 CELERY_ACCEPT_CONTENT = ['application/json'] #指定任务接收的内容序列化类型 CELERY_TASK_SERIALIZER = 'json' #任务序列化方式 CELERY_RESULT_SERIALIZER = 'json' #任务结果序列化方式 CELERY_TASK_RESULT_EXPIRES = 12 * 30 #超过期间 CELERY_MESSAGE_COMPRESSION = 'zlib' #是否压缩 CELERYD_CONCURRENCY = 4 #并发数默认已CPU数量定 CELERYD_PREFETCH_MULTIPLIER = 4 #celery worker 每次去redis取任务的数量 CELERYD_MAX_TASKS_PER_CHILD = 3 #每一个worker最多执行3个任务就摧毁,避免内存泄漏 CELERYD_FORCE_EXECV = True #能够防止死锁 CELERY_ENABLE_UTC = False #关闭时区 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定时任务调度器
一、delay
task.delay(args1, args2, kwargs=value_1, kwargs2=value_2)
二、apply_async
delay 其实是 apply_async 的别名, 还可使用以下方法调用, 可是 apply_async 支持更多的参数:
task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})
支持参数:
一、countdown : 等待一段时间再执行
add.apply_async((2,3), countdown=5)
二、eta : 定义任务的开始时间.
add.apply_async((2,3), eta=now+tiedelta(second=10))
三、expires : 设置超时时间.
add.apply_async((2,3), expires=60)
四、retry : 定时若是任务失败后, 是否重试.
add.apply_async((2,3), retry=False)
五、retry_policy : 重试策略.
一、max_retries : 最大重试次数, 默认为 3 次.
二、interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待.
三、interval_step : 每次重试让重试间隔增长的秒数, 能够是数字或浮点数, 默认为 0.2
四、interval_max : 重试间隔最大的秒数, 即 经过 interval_step 增大到多少秒以后, 就不在增长了, 能够是数字或者浮点数, 默认为 0.2 .
自定义发布者,交换机,路由键, 队列, 优先级,序列方案和压缩方法:
task.apply_async((2,2), compression='zlib', serialize='json', queue='priority.high', routing_key='web.add', priority=0, exchange='web_exchange')
在客户端和消费者之间传输数据须要 序列化和反序列化. Celery 支出的序列化方案以下所示:
CELERY_TASK_SERIALIZER=json
官方:https://docs.celeryproject.org/en/latest/index.html
中文手册:https://www.celerycn.io/
以上就是Celery的简介,有兴趣的朋友能够看看,或者进群讨论交流一下。