Flask 扩展: 使用 Flask-Apscheduler 集成定时任务

Flask-Apscheduler 是集成了 Apscheduler 的扩展, 给 Flask 程序提供了 定时任务 的支持, 使用此扩展能够很方便的添加定时任务, 下面咱们看一下具体的使用方法.html

安装

pip install Flask-APScheduler
复制代码

示例

from flask import Flask
from flask_apscheduler import APScheduler

app = Flask(__name__)
scheduler = APScheduler(app=app)

@scheduler.task(trigger='interval', id='test_job', seconds=1)
def test_job():
    print('hello world')

if __name__ == '__main__':
    scheduler.start()
    app.run()
复制代码

以上代码将实现 每隔一秒 执行一次 test_job 任务, id 表示当前任务的 惟一标识. 全部被 task 装饰器装饰的函数将被视为 定时任务, 定时调度逻辑将经过装饰器参数设置. 经过以上代码能够看到, 集成一个定时任务是十分方便的.python

触发器

APScheduler 提供了一个 trigger 的概念, 也就是任务调度的触发方式, 可选的值为:git

  • interval: 间隔任务, 即在固定的时间间隔循环执行
  • cron: 指定在某一天的固定时间点执行
  • date: 在指定时间点执行, 仅执行一次

选择了 trigger 后, 将根据 关键字参数 设置具体的调度时间, 每个 trigger 对应的参数都不同, 下面将一一介绍.github

interval

interval 对应 apscheduler.triggers.interval.IntervalTrigger 类, 下面是它的初始化方法:redis

IntervalTrigger(weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None, end_date=None, timezone=None, jitter=None)
复制代码

其中的 初始化参数 是须要在 task 装饰器中传入的 关键字参数. 其中 run_date 表示任务开始执行时间, 若是为 None 则从任务生成时开始算起, end_date 是结束时间, 到达这个时间点后, 任务将中止. 如下是每个参数的含义:sql

  • weeks(int): 须要等待的周数
  • days(int): 须要等待的天数
  • hours(int): 须要等待的小时数
  • minutes(int): 须要等待的分钟数
  • seconds(int): 须要等待的秒数
  • start_data(datetime|str): 任务开始时间
  • end_date(datetime|str): 任务结束时间
  • timezone(datetime.tzinfo|str): 时区
  • jitter(int|None): 延迟窗口, 设置此值后, 真正的间隔时间将在 ±jitter 间浮动

cron

cron 对应 apscheduler.triggers.cron.CronTrigger 类, 下面是它的初始化方法:mongodb

CronTrigger(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None, jitter=None)
复制代码

最后四个参数与 interval 一致, 再也不解释, 如下为每一个参数的含义:数据库

  • year(int|str): YYYY 格式的年
  • month(int|str): 1-12
  • day(int|str): 1-31
  • week(int|str): 1-53
  • day_of_week(int|str): 一周的某一天, 能够为 数字 或者 名字
    • 数字为 0-6,
    • 名字为 mon,tue,wed,thu,fri,sat,sun
  • hour(int|str): 0-23 小时
  • minute(int|str): 0-59 分钟
  • second(int|str): 0-59

cronApscheduler 中最强大的 trigger, 它的使用方式相似于 *nix 下的 crontab, 因此以上参数的值可使用 cron 表达式 设置, 如下为可用的表达式, 表格中 任意字段含义 将以 进行解释, 使用时表示具体使用的 参数字段:flask

表达式 参数字段 含义
* 任意 任意秒, 即每秒执行
*/a 任意 当前秒数为 a 的倍数时执行
a-b 任意 当前秒数在 ab 之间时执行
a-b/c 任意 当前秒数在 ab 之间且是 c 的倍数时执行
xth y day 这个月第 x, 例如 3rd mon 表示 第三个周一
last x day 这个月最后一个
last day 这个月最后的一天
x,y,z 任意 以上表达式能够写多个, 逗号分隔

在设置参数时,没有设置的参数将设置默认值. 默认参数规则以一个例子解释. 若是定义 day=1, minute=20 两个字段, 则这两个字段中 含义 最小的为 minute, 而后比 minute 小的字段设为 最小值, 比它大的字段设置为 *, 可是 weekday_of_week 一直设置为 *. 因此比 minute 小的字段 second 设置为 0, 其余设置为 *. 以上例子对应的结果为:api

year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0
复制代码

date

date 比较简单, 只有一个 run_date 参数, 用于设置执行时间, 只执行一次.

使用 API 管理

flask-apscheduler 提供了使用 api 管理任务的功能, 可是默认是关闭的, 能够在 flask 配置中设置 app.config['SCHEDULER_API_ENABLED'] = True 打开, 打开后在 app 实例所在目录执行 flask routes | grep scheduler 便可查看. flask routesflask 提供的命令行交互程序, 只在 1.* 版本有效, 若是使用的是旧版本的, 可使用 app.url_map.iter_rules() 查看路由信息, 全部路由信息以下:

scheduler.add_job                POST     /scheduler/jobs
scheduler.delete_job             DELETE   /scheduler/jobs/<job_id>
scheduler.get_job                GET      /scheduler/jobs/<job_id>
scheduler.get_jobs               GET      /scheduler/jobs
scheduler.get_scheduler_info     GET      /scheduler
scheduler.pause_job              POST     /scheduler/jobs/<job_id>/pause
scheduler.resume_job             POST     /scheduler/jobs/<job_id>/resume
scheduler.run_job                POST     /scheduler/jobs/<job_id>/run
scheduler.update_job             PATCH    /scheduler/jobs/<job_id>
复制代码

这些 API 分别调用了 apscheduler 提供的方法, 有了这些 API 咱们就能够很方便的对任务进行管理.

持久化存储

上节提到咱们能够 API 对任务进行管理, 即 增删改查暂停重启. 可是默认状况下, 全部 任务 都存储在内存中, 一旦项目重启, 全部已修改的任务都将失效, 不会被保存下来, 因此就须要能够保存数据的地方可让任务持久化保存, 也就是 数据库, apscheduler 提供了如下数据库做为存储介质:

  • mongodb
  • redis
  • 全部被 sqlalchemy 支持的数据库
  • rethinkdb
  • zookeeper

flask-apscheduler 将直接从 flask 配置读取 SCHEDULER_JOBSTORES 的值, 值的结构以下:

SCHEDULER_JOBSTORES = {
    'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db'),
    'redis': RedisJobStore()
}
复制代码

存储介质能够配置多个, 在使用 task 装饰器设置任务时传入 jobstore='default' 或者 jobstore='redis' 便可. 当使用了 存储 后, 定时任务一启动就从 数据库 获取任务列表, 而后根据每一个任务的描述进行任务调度. 使用 task 装饰器添加 任务 时, 默认会覆盖 数据库 中已存在的任务, 若是不想被覆盖则须要手动调用 add_job 方法并设置 replace_existing 参数为 False, 以后再添加已存在的任务会抛异常.

我不知道使用持久化存储的正确姿式是什么样的, 根据个人理解, 在添加一系列任务而且进行了修改后, 下次再次启动项目, 应该仍是保持当时的任务状态, 而使用 task 装饰器添加的任务将默认覆盖已有的任务, 因此只能使用 add_job 手动添加任务, 而且不容许覆盖已存在任务, 若是遇到重复抛出异常, 捕捉便可. 这样就可让任务状态一直保持.

执行器

apscheduler 里有一个 executor 的概念, 用来执行全部的任务, 他们能够是 线程池 或者 进程池. flask-apscheduler 经过 SCHEDULER_EXECUTORS 配置来设置执行器, 如下为示例:

from flask import Flask
from flask_apscheduler import APScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

app = Flask(__name__)
app.config['SCHEDULER_EXECUTORS'] = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}

scheduler = APScheduler()
scheduler.init_app(app)
复制代码

在添加任务的时候使用 executor 参数选择便可.

调度器

调度器对任务中涉及到的 jobstore, trigger, exector 以及 增删改查 进行调度, 使全部任务平稳运行. 通常一个应用只存在一个调度器. 下面是提供的调度器:

  • BlockingScheduler: 阻塞运行, 仅在应用为独立的定时任务程序时使用
  • BackgroundScheduler: 后台运行, flask-apscheduler 默认为此项
  • AsyncIOScheduler: 在应用使用了 asyncio 模块时使用
  • GeventScheduler: 在应用使用了 gevent 模块时使用
  • TornadoScheduler: 在应用使用了 tornado 模块时使用
  • TwistedScheduler: 在应用使用了 twisted 模块时使用
  • QtScheduler: 在构建 Qt 应用时使用

若是选择了以上的任何一个 Scheduler, 将会默认选择合适的 executor, 不然默认的 executorThreadPoolExecutor.

日志及事件监听

日志

apscheduler 提供了名为 apscheduler日志器, 能够对其配置定制日志输出, 如下代码仅供参考:

logger = logging.getLogger('apscheduler')
logger.setLevel(logging.DEBUG)

logger_handler = logging.FileHandler('scheduler.log', mode='a', encoding='utf-8')
logger_handler.setLevel(logging.DEBUG)
logger_handler.setFormatter(logging.Formatter(fmt='%(asctime)s %(levelname)s: %(message)s',
                                              datefmt='%Y-%m-%d %H:%M:%S'))
logger.addHandler(logger_handler)
复制代码

以上代码设置日志等级为 DEBUG, 将日志输出到 scheduler.log 文件.

事件监听

若是任务在执行过程当中抛出了异常, 会将异常堆栈写入日志. 可是但愿在出错时能够作一些其余的逻辑, 不如出错后进行邮件提醒等功能就须要添加事件监听. 如下代码为示例:

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')
        
scheduler.add_listener(my_listener, EVENT_JOB_ERROR)
复制代码

以上代码将添加 错误事件 监听, 当任务执行出错时, 将会执行 my_listener 函数. 具体事件从 apscheduler.events 导入.

常见问题

经过以上这些, 对于简单的 定时任务 已经能够很方便的集成了. 其实 flask-apscheduler 我也是初次使用, 大多数都是从过阅读 文档 后根据本身理解翻译的, 下面是我使用过程当中遇到过的问题.

任务运行屡次

我在使用装饰器注册一个任务后, 使用 gunicorn 服务器部署应用后, 每一个任务会同时运行屡次, 排查才知道使用 gunicorn 时采用了多个 worker, 每一个 worker 都会在内存中维护一个独立的 应用实例, 而后每一个任务在每一个应用实例中都存在一份. 因此定时任务应该在应用初始化后初始化, 这时 钩子函数 就起做用了. 使用 app.before_first_request 钩子函数, 就能够在第一个请求开始以前将全部任务激活了.

须要使用应用上下文

咱们通常须要使用 flask 实例的相关信息时会使用 current_app 代理进行获取. 因为 flask 使用了 本地线程 技术来封装对象, 因此 current_app 仅指向当前线程下的 flask 实例, 可是 定时任务 是另开一个全新的 线程, 这个线程是脱离了应用上下文的, 若是想要使用, 就须要使用 _get_current_object() 获取真实实例后做为参数传递到新的线程内, 而后使用 app.app_context() 获取应用上下文.

flask-apscheduler 中初始化 scheduler 时会将 flask 实例挂载到 scheduler 上, 须要使用到应用上下文时, 只须要使用 scheduler.app.app_context() 便可.

参考

flask-apscheduler

apscheduler

相关文章
相关标签/搜索