Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操做提供维护此类系统所需的工具。它是一个任务队列,专一于实时处理,同时还支持任务调度。python
中间人boker:mysql
broker是一个消息传输的中间件。每当应用程序调用celery的异步任务的时候,会向broker传递消息,然后celery的worker将会取到消息,进行对于的程序执行。其中Broker的中文意思是 经纪人 ,其实就是一开始说的 消息队列 ,用来发送和接受消息。这个Broker有几个方案可供选择:RabbitMQ (消息队列),Redis(缓存数据库),数据库(不推荐),等等。redis
backend:sql
一般程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。Backend是在Celery的配置中的一个配置项 CELERY_RESULT_BACKEND ,做用是保存结果和状态,若是你须要跟踪任务的状态,那么须要设置这一项。可使用数据库做为backend。 mongodb
♦高可用: 假若链接丢失或失败,职程和客户端会自动重试,而且一些中间人经过 主/主 或 主/从 方式复制来提升可用性。数据库
♦快速: 单个 Celery 进程每分钟可处理数以百万计的任务,而保持往返延迟在亚毫秒级(使用 RabbitMQ、py-librabbitmq 和优化过的设置)。windows
♦灵活: Celery 几乎全部部分均可以扩展或单独使用。能够自制链接池、 序列化、压缩模式、日志、调度器、消费者、生产者、自动扩展、 中间人传输或更多。缓存
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。(百度上的图片)架构
消息中间件并发
Celery自己不提供消息服务,可是能够方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不一样方式存储任务的结果,包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。
安装Celery,(这里使用redis做为中间件,windows注意安装对应支持的版本)
pip3 install celery['redis']
使用celery包含三个方面:1. 定义任务函数。2. 运行celery服务。3. 客户应用程序的调用。
1.目录结构:
CeleryTest
¦--tasks.py
¦--user.py
#tasks.py import time from celery import Celery app = Celery('tasks', broker='redis://localhost:6379/0') @app.task def send(msg): print(f'send {msg}') time.sleep(3) return
#user.py from tasks import send import time def register(): start = time.time() send.delay('666') print('耗时:', time.time() - start) if __name__ == '__main__': register()
2.这里使用redis做borker,启动redis,进入redis目录启动
$ redis-server
3.启动worker,在CeleryTest的同级目录终端输入,—A为Celery实例所在位置
$ celery -A tasks worker -l info
启动成功会看到以下画面:
4.运行user.py文件,输入以下:
耗时: 0.15261435508728027
调用 delay 函数便可启动 add 这个任务。这个函数的效果是发送一条消息到broker中去,这个消息包括要执行的函数、函数的参数以及其余信息,具体的能够看 Celery官方文档。这个时候 worker 会等待 broker 中的消息,一旦收到消息就会马上执行消息。能够看到调用send.delay()后,耗时并无受到time.sleep()的影响,成功的完成异步调用。咱们能够在项目中执行耗时的任务时来使用Celery。这里简单演示并无使用backend储存任务的结果。
将Celery封装成一个项目进行使用,这里简单的配置一下,方便演示,更多配置参数能够参考官方文档。
celery_demo
¦--celery_app
¦--__init__.py
¦--celeryconfig.py
¦--task1.py
¦--task2.py
¦--client.py
__init__.py
from celery import Celery app = Celery('demo') # 生成实例 app.config_from_object('celery_app.celeryconfig') # 加载配置
celeryconfig.py
BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,默认是 UTC # CELERY_TIMEZONE='UTC' CELERY_IMPORTS = ( # 指定导入的任务模块 'celery_app.task1', 'celery_app.task2' )
task1.py
import time from celery_app import app @app.task def add(x, y): time.sleep(2) return x + y
task2.py
import time from celery_app import app @app.task def multiply(x, y): time.sleep(2) return x * y
client.py
from celery_app import task1 from celery_app import task2 res1 = task1.add.delay(2, 8) # 或者 task1.add.apply_async(args=[2, 8]) res2 = task2.multiply.delay(3, 7) # 或者 task2.multiply.apply_async(args=[3, 7]) print('hello world')
启动worker,在celery_demo目录执行下列命令
celery_demo $ celery -A celery_app worker -l info
接着,运行$ python client.py
,它会发送两个异步任务到 Broker,在 Worker 的窗口咱们能够看到以下输出:
在前面的例子中,咱们使用 delay()
或 apply_async()
方法来调用任务。事实上,delay 方法封装了 apply_async
,以下:
def delay(self, *partial_args, **partial_kwargs): """Shortcut to :meth:`apply_async` using star arguments.""" return self.apply_async(partial_args, partial_kwargs)
Celery 除了能够执行异步任务
,也支持执行周期性任务(Periodic Tasks)
,或者说定时任务。Celery Beat 进程经过读取配置文件的内容,周期性地将定时任务发往任务队列。在上面的例子中,修改配置文件便可实现。
celeryconfig.py
from celery.schedules import crontab from datetime import timedelta BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,默认是 UTC # CELERY_TIMEZONE='UTC' CELERY_IMPORTS = ( # 指定导入的任务模块 'celery_app.task1', 'celery_app.task2' ) # schedules CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'celery_app.task1.add', 'schedule': timedelta(seconds=30), # 每 30 秒执行一次 'args': (5, 8) # 任务函数参数 }, 'multiply-at-some-time': { 'task': 'celery_app.task2.multiply', 'schedule': crontab(hour=10, minute=50), # 天天早上 10 点 50 分执行一次 'args': (3, 7) # 任务函数参数 } }
启动worker,而后定时将任务发送到 Broker,在celery_demo目录下执行下面两条命令:
celery -A celery_app worker -l info
celery beat -A celery_app
上面两条命令也能够合并为一条:
celery -B -A celery_app worker -l info