【理论】python使用celery异步处理请求

Flask中使用celery队列处理执行时间较长的请求。html

一. 安装celerypython

pip install celery flask redis

二. celery简介redis

Celery是个异步分布式任务队列 经过Celery在后台跑任务并不像线程那么简单,可是用Celery的话,可以是应用有较好的扩展性,由于Celery是个分布式架构,下面介绍Celery的三个核心组件: 1. 生产者(Celery client): 生产者发送消息,在Flask上工做时,生产者在Flask应用内运行 2. 消费者(Celert worker): 消费者用于处理后台任务。消费者能够是本地的也能够是远程的。咱们能够在运行Flask的server上运行一个单一的消费者,当业务量上涨以后再去添加更多的消费者 3. 消息传递着(Celery broker): 生产者和消费者的信息交互使用的是消息队列,Celery支持若干方式的消息队列,其中最长用的是RabbitMQ和Redis, 咱们在使用过程当中使用的Redis

三. redis配置与使用json

redis配置文件/etc/redis.conf 1.设置为后台启动 daemonize yes 2.redis端口设置 port 6379 # default prot 3.日志文件 logfile /home/liuyadong/work/log/redis.log 4.数据保存文件 dir /home/liuyadong/data/redisData 经过下面命令指定配置文件启动redis: redis-server /etc/redis.conf 经过下面命令测试是否启动成功: redis-cli -p 6379 下面这样表示成功(进入了命令行模式): redis 127.0.0.1:6379> 查看启动端口: sudo netstat -ntlp | grep 6379 tcp 0 0 127.0.0.1:6379 0.0.0.0:* LISTEN 49380/redis-server 

四. celery使用简介flask

1.Choosing a broker 最经常使用的broker包括: RabbitMQ 和 Redis, 咱们使用Redis, Redis的安装及启动等查看第二部分 2.intall celery pip install celery 3.Application 使用celery的第一步是建立一个application, 一般叫作'app'。具体的建立一个app的代码以下: $ cat tasks.py #!/usr/bin/env python from celery import Celery app = Celery('tasks', broker='redis://localhost') @app.tasks def add(x, y): return x + y Note: Celery第一个参数必须是当前module的模拟购置,本次实例中为:tasks 4.Running the celery worker server $ celery -A tasks worker --loglevel=info 5.Calling the tasks 能够经过delay()或者apply_sync()方法来调用一个task >>> from tasks import add >>> add.delay(4, 4) 6. Keeping Results 咱们能够将task的执行状态保存起来,能够保存到broker中, 能够经过CELERY_RESULT_BACKEND字段来设置保存结果。 也能够经过Celery的backend参数来设置 app.Celery('tasks', broker='redis://localhost', backend='redis://localhost') >>> result = add.delay(4, 4) 能够经过ready()方法来判断程序执行是否完成,执行完成返回True. >>> result.ready() False 下面是AsyncResult对象的其余调用方法介绍: 1) AsyncResult.get(timeout=None, propagate=True, interval=0.5, no_ack=True, follow_parents=True) timeout : 设置一个等待的预操做时间,单位是s, 方法返回执行结果 propagate : 若是task执行失败,则Re-taise Exception interval : 等待必定时间从新执行操做,若是使用amqp来存储backend则此参数无效 no_ack : Enable amqp no ack (automatically acknowledge message) If this is False then the message will not be acked follow_parents : Reraise any exception raised by parent task 2) AsyncResult.state 或 status属性 方法返回当前task的执行状态,返回值包括下面多种状况: PENDING: task正在等待执行 STARTED: task已经开始执行了 RETRY : task从新执行了,这多是因为第一次执行失败引发的 FAILURE: task执行引起了异常,而且结果的属性当中包括了异常是由哪一个task引发的 SUCCESS: task执行成功,结果的属性当中包括执行结果 3) AsyncResult.success() 若是返回True,则表示task执行成功 4) AsyncResult.traceback() 获得一个执行失败的task的traceback 7.Configuration celert 默认的配置对于大多数用户来讲已经足够好了,可是咱们仍有许多想让celery按照咱们的想法去work,经过configuration实现是一个好的方式。 configutation能够经过app设置,也能够经过一个单独的模块进行设置。 好比,经过app设置CELERY_TASK_SERIALIZER属性:app.conf.CELERY_TASK_SERIALIZER = 'json' 若是你一次性有许多须要配置,则能够经过update()方法实现: app.conf.update( CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], # Ignore other content CELERY_RESULT_SERIALIZER='json', CELERY_TIMEZONE='Europe/Oslo', CELERY_ENABLE_UTC=True, ) 你也能够经过app.config_from_object() method告诉Celery经过一个模块来生成configuration: app.config_from_object('celeryconfig') 这个模块一般叫作 celeryconfig,但实际上你能够叫任何名字。 $ cat celeryconfig.py CELERY_ROUTES = {'tasks.add': 'low-priority', 'tasks.add': {'rate_limit': '10/m'} 8.Where to go from here 若是你想了解更多请阅读: http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#next-steps 以后阅读: http://docs.celeryproject.org/en/latest/userguide/index.html#guide