Blog 项目源码:https://github.com/JmilkFan/JmilkFan-s-Bloghtml
用 Flask 来写个轻博客 (1) — 建立项目
用 Flask 来写个轻博客 (2) — Hello World!
用 Flask 来写个轻博客 (3) — (M)VC_链接 MySQL 和 SQLAlchemy
用 Flask 来写个轻博客 (4) — (M)VC_建立数据模型和表
用 Flask 来写个轻博客 (5) — (M)VC_SQLAlchemy 的 CRUD 详解
用 Flask 来写个轻博客 (6) — (M)VC_models 的关系(one to many)
用 Flask 来写个轻博客 (7) — (M)VC_models 的关系(many to many)
用 Flask 来写个轻博客 (8) — (M)VC_Alembic 管理数据库结构的升级和降级
用 Flask 来写个轻博客 (9) — M(V)C_Jinja 语法基础快速概览
用 Flask 来写个轻博客 (10) — M(V)C_Jinja 经常使用过滤器与 Flask 特殊变量及方法
用 Flask 来写个轻博客 (11) — M(V)C_建立视图函数
用 Flask 来写个轻博客 (12) — M(V)C_编写和继承 Jinja 模板
用 Flask 来写个轻博客 (13) — M(V)C_WTForms 服务端表单检验
用 Flask 来写个轻博客 (14) — M(V)C_实现项目首页的模板
用 Flask 来写个轻博客 (15) — M(V)C_实现博文页面评论表单
用 Flask 来写个轻博客 (16) — MV(C)_Flask Blueprint 蓝图
用 Flask 来写个轻博客 (17) — MV(C)_应用蓝图来重构项目
用 Flask 来写个轻博客 (18) — 使用工厂模式来生成应用对象
用 Flask 来写个轻博客 (19) — 以 Bcrypt 密文存储帐户信息与实现用户登录表单
用 Flask 来写个轻博客 (20) — 实现注册表单与应用 reCAPTCHA 来实现验证码
用 Flask 来写个轻博客 (21) — 结合 reCAPTCHA 验证码实现用户注册与登陆
用 Flask 来写个轻博客 (22) — 实现博客文章的添加和编辑页面
用 Flask 来写个轻博客 (23) — 应用 OAuth 来实现 Facebook 第三方登陆
用 Flask 来写个轻博客 (24) — 使用 Flask-Login 来保护应用安全
用 Flask 来写个轻博客 (25) — 使用 Flask-Principal 实现角色权限功能python
Celery-分布式任务队列
基于后台做业的 Celery
Flask-Celery-Helper 1.1.0git
Celery 是使用 Python 多任务库来编写的任务队列工具, 能够 并行 的执行任务. 咱们会将执行时间较长但又不那么追求实时的功能以异步任务的形式完成, EG. 上传文件, 发送邮件…, Python 和 Celery 之间须要一个中间人(消息队列)来进行任务队列的管理, Celery 官方推荐使用 RabbirMQ 或 Redis 来充当这个角色. 固然也能够同时使用二者, 其中 MQ 做为中间人, Redis 传递 Celery 执行的结果给应用端. 这样作的优点在于, 返回给应用的结果是持久化保存在数据库中的.github
消息队列: 是一种专门设计的系统, 用于在生产者(往队列发送消息的程序)和消费者(从队列中取出消息的队列)之间传递消息.web
pip install Celery
pip install Flask-Celery-Helper
pip freeze > requirments.txt
/etc/init.d/rabbitmq-server start
class DevConfig(Config):
"""Development config class."""
...
# Celery <--> RabbitMQ connection
CELERY_RESULT_BACKEND = "amqp://guest:guest@localhost:5672//"
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
NOTE: RabbitMQ 使用默认的 guest 用户, 端口为 5672sql
from flask.ext.celery import Celery
...
# Create the Flask-Celery-Helper's instance
flask_celery = Celery()
from jmilkfansblog.extensions import flask_celery
def create_app(object_name):
"""Create the app instance via `Factory Method`"""
...
# Init the Flask-Celery-Helper via app object
# Register the celery object into app object
flask_celery.init_app(app)
NOTE 1: Celery 的进程必须在 Flask app 的上下文中运行, 这样 Celery 才可以跟其余的 Flask 扩展协同工做。因此必须将 flask_celery 对象注册到 app 对象中, 而且每建立一个 Celery 进程都须要建立一个新的 Flask app 对象, 这里咱们使用工厂模式来建立 celery 对象。这一点是很是重要的,实际上 Flask application 和 Celery application 是两个不一样的进程,在 Celery 没有加入 Flask 上下文的状况下,Celery 的程序逻辑就不能轻易的访问 Flask 相关资源,好比不能加载 Flask 的环境配置信息,没法经过 Flask 来访问数据库,不能使用 Flask 的扩展功能等。若是想作到这些,Celery 都须要本身再实现一套相同的逻辑,这样作显然是没有必要的。因此 Flask application 原生支持将本身的 Context 嵌入到别的 application 中,固然有些状况也须要相应扩展的辅助,例如 Flask-Celery-Helper 在这里就充当着这个角色。数据库
NOTE 2: flask_celery 对象是 Flask-Celery-Helper 扩展的对象, 用于辅助处理 Celery 的初始化, 因此实际上咱们是能够不使用这个扩展, 而直接使用 Celery 的. celery 对象才是真正的 Celery 的对象.flask
mport os
from celery import Celery
from jmilkfansblog import create_app
def make_celery(app):
"""Create the celery process."""
# Init the celery object via app's configuration.
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
# Flask-Celery-Helpwe to auto-setup the config.
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
"""Will be execute when create the instance object of ContextTesk."""
# Will context(Flask's Extends) of app object(Producer Sit)
# be included in celery object(Consumer Site).
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
# Include the app_context into celery.Task.
# Let other Flask extensions can be normal calls.
celery.Task = ContextTask
return celery
env = os.environ.get('BLOG_ENV', 'dev')
flask_app = create_app('jmilkfansblog.config.%sConfig' % env.capitalize())
# 1. Each celery process needs to create an instance of the Flask application.
# 2. Register the celery object into the app object.
celery = make_celery(flask_app)
NOTE 1: 咱们之后会以 CLI 的形式来管理和控制 Celery 的 worker, 因此咱们将 celery 对象的实现模块放置在 ./celery_runner.py
中, 而不是 jmilkfansblog/celery_runner.py
. Flask app 内部的 tasks 任何的定义和实现就交由 Flask-Celery-Helper 来支持就行了, 这也是 Flask-Celery-Helper 存在的意义.vim
NOTE 2: make_celery()
最重要的做用就是让每一个 Celery 的进程中(celery对象)都包含有 app 对象的上下文, 至于为何这么作呢? 上述已经给出了答案.api
NOTE 3: 这里经过 create_app()
建立的对象不可以命名为 app, 而是命名为 flask_app, 这是由于 Celery 会默认将命名为 app 或 celery 的对象都做为一个 Celery 对象来处理.
NOTE 4: celery.conf.update(app.config)
会将 app 对象的 config 更新到 celery 对象中, 固然也包括了刚刚定义的 RabbitMQ 链接 URL 配置.
(env) jmilkfan@JmilkFan-Devstack:/opt/JmilkFan-s-Blog$ celery worker -A celery_runner --loglevel=info
...
-------------- celery@JmilkFan-Devstack v4.0.1 (latentcall) ---- **** ----- --- * *** * -- Linux-4.4.0-53-generic-x86_64-with-Ubuntu-16.04-xenial 2016-12-15 19:12:33 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: jmilkfansblog:0x7f5b24345990 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: amqp:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
[tasks]
. jmilkfansblog.tasks.digest
. jmilkfansblog.tasks.log
. jmilkfansblog.tasks.multiply
. jmilkfansblog.tasks.remind
NOTE: 在启动 Celery 服务的时候, 可以用 Output 看见其自身的配置信息和如今所处理的 tasks.
下面使用 Celery 实如今用户建立帐户以后, 在指定的时间内异步的向新用户发送欢迎邮件.
class Reminder(db.Model):
"""Represents Proected reminders."""
__tablename__ = 'reminders'
id = db.Column(db.String(45), primary_key=True)
date = db.Column(db.DateTime())
email = db.Column(db.String(255))
text = db.Column(db.Text())
def __init__(self, id, text):
self.id = id
self.email = text
def __repr__(self):
return '<Model Reminder `{}`>'.format(self.text[:20])
import smtplib
import datetime
from email.mime.text import MIMEText
from flask_mail import Message
from jmilkfansblog.extensions import flask_celery, mail
@flask_celery.task(
bind=True,
igonre_result=True,
default_retry_delay=300,
max_retries=5)
def remind(self, primary_key):
"""Send the remind email to user when registered. Using Flask-Mail. """
reminder = Reminder.query.get(primary_key)
msg = MIMEText(reminder.text)
msg['Subject'] = 'Welcome!'
msg['FROM'] = <your_email>
msg['To'] = reminder.email
try:
smtp_server = smtplib.SMTP('localhost')
smtp_server.starttls()
smtp_server.login(<user>, <password>)
smtp_server.sendmail(<your_email>,
[reminder.email],
msg.as_string())
smtp_server.close()
return
except Exception as err:
self.retry(exc=err)
def on_reminder_save(mapper, connect, self):
"""Callbask for task remind."""
remind.apply_async(args=(self.id), eta=self.date)
NOTE 1: Celery Task 本质上就是一个被 celery.task()
装饰过的函数,
NOTE 2: 使用主键 primary_key 来获取 reminder 对象是为了不数据竞态的发生, 由于从生成 reminder 对象到 task 被执行的过程并不能保证数据是最新的, 这也是处理异步调用时, 须要时刻注意的地方.
NOTE 3: on_reminder_save()
是一个回调函数, 当咱们在一个特定的情景下调用这个函数的时候就触发了一个 Celery task.
from sqlalchemy import event
def create_app(object_name):
"""Create the app instance via `Factory Method`"""
...
# Will be callback on_reminder_save when insert recond into table `reminder`.
event.listen(Reminder, 'after_insert', on_reminder_save)