其实我只是想把邮件发送这个动做移到Celery中执行。
既然用到了Celery,那么每次发邮件都单独开一个线程彷佛有点多余,异步任务仍是交给Celery吧。html
Celery和Flask一块儿使用并无什么不和谐的地方,均可以不用定制的Flask扩展,按照网上随处可见的示例也很简单:python
from flask import Flask from celery import Celery app = Flask(__name__) app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0' celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config) @celery.task def send_email(): ....
然而,稍微上点规模的Flask应用都会使用Factory模式(中文叫工厂函数,我听着特别扭),即只有在建立Flask实例时,才会初始化各类扩展,这样能够动态的修改扩展程序的配置。好比你有一套线上部署的配置和一套本地开发测试的配置,但愿经过不一样的启动入口,就使用不一样的配置。
使用Factory模式的话,上面的代码大概要修改为这个样:git
from flask import Flask from celery import Celery app = Flask(__name__) celery = Celery() def create_app(config_name): app.config.from_object(config[config_name]) celery.conf.update(app.config)
经过config_name
,来动态调整celery的配置。然而,这样子是不行的!
Celery的__init__()
函数会调用celery._state._register_app()
直接就经过传入的配置生成了Celery实例,上面的代码中,celery = Celery()
直接使用默认的amqp做为了broker,随后经过celery.conf.update(app.config)
是更改不了broker的。这也就是为何网上的示例代码中,在定义Celery实例时,就传入了broker=app.config['CELERY_BROKER_URL']
,而不是以后经过celery.conf.update(app.config)
传入。当你的多套配置文件中,broker设置的不一样时,就悲剧了。github
固然不用本身造轮子,Flask-Celery-Helper就是解决以上问题的FLask扩展。
看看它的__init__()
函数:redis
def __init__(self, app=None): """If app argument provided then initialize celery using application config values. If no app argument provided you should do initialization later with init_app method. :param app: Flask application instance. """ self.original_register_app = _state._register_app # Backup Celery app registration function. _state._register_app = lambda _: None # Upon Celery app registration attempt, do nothing. super(Celery, self).__init__() if app is not None: self.init_app(app)
将_state._register_app
函数备份,再置为空。这样__init__()
就不会建立Celery实例了。但若是指定了app
,那么进入init_app
,嗯,大多数Flask扩展都有这个函数,用来动态生成扩展实例。flask
def init_app(self, app): """Actual method to read celery settings from app configuration and initialize the celery instance. :param app: Flask application instance. """ _state._register_app = self.original_register_app # Restore Celery app registration function. if not hasattr(app, 'extensions'): app.extensions = dict() if 'celery' in app.extensions: raise ValueError('Already registered extension CELERY.') app.extensions['celery'] = _CeleryState(self, app) # Instantiate celery and read config. super(Celery, self).__init__(app.import_name, broker=app.config['CELERY_BROKER_URL']) ...
将_state._register_app
函数还原,再执行Celery本来的__init__
。这样就达到动态生成实例的目的了。接着往下看:app
task_base = self.Task # Add Flask app context to celery instance. class ContextTask(task_base): """Celery instance wrapped within the Flask app context.""" def __call__(self, *_args, **_kwargs): with app.app_context(): return task_base.__call__(self, *_args, **_kwargs) setattr(ContextTask, 'abstract', True) setattr(self, 'Task', ContextTask)
这里重载了celery.Task
类,经过with app.app_context():
,在app.app_context()
的上下文环境下执行Task。对于一个已生成的Flask实例,应用上下文不会随便改变。因此这就现实了在Celery中使用Flask的应用上下文。
下面是官方的示例代码:异步
# extensions.py from flask_celery import Celery celery = Celery() # application.py from flask import Flask from extensions import celery def create_app(): app = Flask(__name__) app.config['CELERY_IMPORTS'] = ('tasks.add_together', ) app.config['CELERY_BROKER_URL'] = 'redis://localhost' app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost' celery.init_app(app) return app # tasks.py from extensions import celery @celery.task() def add_together(a, b): return a + b # manage.py from application import create_app app = create_app() app.run()
跟普通的Flask扩展同样了。async
在Flask的view函数中调用task.delay()
时,这个task至关于一个离线的异步任务,它对Flask的应用上下文和请求上下文一无所知。可是这均可能是异步任务须要用到的。好比发送邮件要用到的render_template
和url_for
就分别要用到应用上下文和请求上下文。不在celery中引入它们的话,就是Running code outside of a request
。
引入应用上下文的工做Flask-Celery-Helper已经帮咱们作好了,在Flask的文档中也有相关介绍。实现方法和上面Flask-Celery-Helper的同样。然而,无论是Flask-Celery-Helper仍是Flask文档,都没有说起如何在Celery中使用请求上下文。ide
要引入请求上下文,须要考虑这两个问题:
如何在Celery中产生请求上下文。Flask中有request_context
和test_request_context
能够产生请求上下文。区别是request_context
须要WSGI环境变量environ
,而test_request_context
根据传入的参数生成请求上下文。我没有找到如何在Celery中获取到WSGI环境变量的方法,因此只能本身传入相关参数生成请求上下文了。
请求上下文是随HTTP请求产生的,要获取请求上下文,就必须在view函数中处理,view函数经过task.delay()
发送Celery任务。因此须要重载task.delay()
,以获取请求上下文。
具体的思路仍是在init_app
中重载celery.Task
类,经过with app.test_request_context():
,在app.test_request_context()
的上下文环境下执行Task。
首先获取request,从中整理出test_request_context()须要的参数。根据test_request_context
的函数注释,它须要的参数和werkzeug.test.EnvironBuilder类的参数同样。
CONTEXT_ARG_NAME = '_flask_request_context' def _include_request_context(self, kwargs): """Includes all the information about current Flask request context as an additional argument to the task. """ if not has_request_context(): return # keys correspond to arguments of :meth:`Flask.test_request_context` context = { 'path': request.path, 'base_url': request.url_root, 'method': request.method, 'headers': dict(request.headers), 'data': request.form } if '?' in request.url: context['query_string'] = request.url[(request.url.find('?') + 1):] kwargs[self.CONTEXT_ARG_NAME] = context
_include_request_context
函数从request中提取path
,base_url
,method
,headers
,data
,query_string
。将他们传入test_request_context
,生成伪造的请求上下文能够覆盖大多数的使用状况。
Celery经过apply_async
,apply
,retry
调用异步任务(delay
是apply_async
的简化方法)。这里须要重载它们,让这些函数获取request:
def apply_async(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).apply_async(args, kwargs, **rest) def apply(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).apply(args, kwargs, **rest) def retry(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).retry(args, kwargs, **rest)
最后重载celery.Task
的__call__
方法:
def __call__(self, *args, **kwargs): """Execute task code with given arguments.""" call = lambda: super(ContextTask, self).__call__(*args, **kwargs) context = kwargs.pop(self.CONTEXT_ARG_NAME, None) if context is None or has_request_context(): return call() with app.test_request_context(**context): result = call() # process a fake "Response" so that # ``@after_request`` hooks are executed app.process_response(make_response(result or '')) return result
context是咱们从request中获取的参数,将它传给test_request_context,伪造请求上下文,并在这个上下文环境中执行task。既然伪造了请求,那也得为这个假请求生成响应,万一你定义了after_request
这个在响应后执行的钩子呢?经过process_response
就能够激活after_request
。
注意这里并无传入应用上下文,由于Flask在建立请求上下文时,会判断应用上下文是否为空,为空就先建立应用上下文,再建立请求上下文。
完整代码在这里。celery = CeleryWithContext()
建立的Celery实例就能够给各类task使用了。
另外建立一个celery_worker.py文件,生成一个Flask实例,供Celery的worker使用。
# celery_worker.py #!/usr/bin/env python from app import create_app from app.extensions import celery app = create_app()
启动worker:celery -A celery_worker.celery worker -l info
这下就可使用Celery发邮件了。唉,还真是麻烦。
http://xion.io/post/code/celery-include-flask-request-context.html