Flask-Apscheduler
是集成了 Apscheduler
的扩展, 给 Flask
程序提供了 定时任务
的支持, 使用此扩展能够很方便的添加定时任务, 下面咱们看一下具体的使用方法.html
pip install Flask-APScheduler
复制代码
from flask import Flask
from flask_apscheduler import APScheduler
app = Flask(__name__)
scheduler = APScheduler(app=app)
@scheduler.task(trigger='interval', id='test_job', seconds=1)
def test_job():
print('hello world')
if __name__ == '__main__':
scheduler.start()
app.run()
复制代码
以上代码将实现 每隔一秒
执行一次 test_job
任务, id
表示当前任务的 惟一标识
. 全部被 task
装饰器装饰的函数将被视为 定时任务
, 定时调度逻辑将经过装饰器参数设置. 经过以上代码能够看到, 集成一个定时任务是十分方便的.python
APScheduler
提供了一个 trigger
的概念, 也就是任务调度的触发方式, 可选的值为:git
interval
: 间隔任务, 即在固定的时间间隔循环执行cron
: 指定在某一天的固定时间点执行date
: 在指定时间点执行, 仅执行一次选择了 trigger
后, 将根据 关键字参数
设置具体的调度时间, 每个 trigger
对应的参数都不同, 下面将一一介绍.github
interval
对应 apscheduler.triggers.interval.IntervalTrigger
类, 下面是它的初始化方法:redis
IntervalTrigger(weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None, end_date=None, timezone=None, jitter=None)
复制代码
其中的 初始化参数
是须要在 task
装饰器中传入的 关键字参数
. 其中 run_date
表示任务开始执行时间, 若是为 None
则从任务生成时开始算起, end_date
是结束时间, 到达这个时间点后, 任务将中止. 如下是每个参数的含义:sql
weeks(int)
: 须要等待的周数days(int)
: 须要等待的天数hours(int)
: 须要等待的小时数minutes(int)
: 须要等待的分钟数seconds(int)
: 须要等待的秒数start_data(datetime|str)
: 任务开始时间end_date(datetime|str)
: 任务结束时间timezone(datetime.tzinfo|str)
: 时区jitter(int|None)
: 延迟窗口, 设置此值后, 真正的间隔时间将在 ±jitter
间浮动cron
对应 apscheduler.triggers.cron.CronTrigger
类, 下面是它的初始化方法:mongodb
CronTrigger(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None, jitter=None)
复制代码
最后四个参数与 interval
一致, 再也不解释, 如下为每一个参数的含义:数据库
year(int|str)
: YYYY
格式的年month(int|str)
: 1-12
月day(int|str)
: 1-31
天week(int|str)
: 1-53
周day_of_week(int|str)
: 一周的某一天, 能够为 数字
或者 名字
0-6
,mon,tue,wed,thu,fri,sat,sun
hour(int|str)
: 0-23
小时minute(int|str)
: 0-59
分钟second(int|str)
: 0-59
秒cron
是 Apscheduler
中最强大的 trigger
, 它的使用方式相似于 *nix
下的 crontab
, 因此以上参数的值可使用 cron 表达式
设置, 如下为可用的表达式, 表格中 任意字段
的 含义
将以 秒
进行解释, 使用时表示具体使用的 参数字段
:flask
表达式 | 参数字段 | 含义 |
---|---|---|
* |
任意 | 任意秒, 即每秒执行 |
*/a |
任意 | 当前秒数为 a 的倍数时执行 |
a-b |
任意 | 当前秒数在 a 和 b 之间时执行 |
a-b/c |
任意 | 当前秒数在 a 和 b 之间且是 c 的倍数时执行 |
xth y |
day | 这个月第 x 个 周 , 例如 3rd mon 表示 第三个周一 |
last x |
day | 这个月最后一个 周 |
last |
day | 这个月最后的一天 |
x,y,z |
任意 | 以上表达式能够写多个, 逗号分隔 |
在设置参数时,没有设置的参数将设置默认值. 默认参数规则以一个例子解释. 若是定义 day=1, minute=20
两个字段, 则这两个字段中 含义
最小的为 minute
, 而后比 minute
小的字段设为 最小值
, 比它大的字段设置为 *
, 可是 week
和 day_of_week
一直设置为 *
. 因此比 minute
小的字段 second
设置为 0
, 其余设置为 *
. 以上例子对应的结果为:api
year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0
复制代码
date
比较简单, 只有一个 run_date
参数, 用于设置执行时间, 只执行一次.
API
管理flask-apscheduler
提供了使用 api
管理任务的功能, 可是默认是关闭的, 能够在 flask
配置中设置 app.config['SCHEDULER_API_ENABLED'] = True
打开, 打开后在 app
实例所在目录执行 flask routes | grep scheduler
便可查看. flask routes
是 flask
提供的命令行交互程序, 只在 1.*
版本有效, 若是使用的是旧版本的, 可使用 app.url_map.iter_rules()
查看路由信息, 全部路由信息以下:
scheduler.add_job POST /scheduler/jobs
scheduler.delete_job DELETE /scheduler/jobs/<job_id>
scheduler.get_job GET /scheduler/jobs/<job_id>
scheduler.get_jobs GET /scheduler/jobs
scheduler.get_scheduler_info GET /scheduler
scheduler.pause_job POST /scheduler/jobs/<job_id>/pause
scheduler.resume_job POST /scheduler/jobs/<job_id>/resume
scheduler.run_job POST /scheduler/jobs/<job_id>/run
scheduler.update_job PATCH /scheduler/jobs/<job_id>
复制代码
这些 API
分别调用了 apscheduler
提供的方法, 有了这些 API
咱们就能够很方便的对任务进行管理.
上节提到咱们能够 API
对任务进行管理, 即 增删改查
和 暂停重启
. 可是默认状况下, 全部 任务
都存储在内存中, 一旦项目重启, 全部已修改的任务都将失效, 不会被保存下来, 因此就须要能够保存数据的地方可让任务持久化保存, 也就是 数据库
, apscheduler
提供了如下数据库做为存储介质:
mongodb
redis
sqlalchemy
支持的数据库rethinkdb
zookeeper
flask-apscheduler
将直接从 flask
配置读取 SCHEDULER_JOBSTORES
的值, 值的结构以下:
SCHEDULER_JOBSTORES = {
'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db'),
'redis': RedisJobStore()
}
复制代码
存储介质能够配置多个, 在使用 task
装饰器设置任务时传入 jobstore='default'
或者 jobstore='redis'
便可. 当使用了 存储
后, 定时任务一启动就从 数据库
获取任务列表, 而后根据每一个任务的描述进行任务调度. 使用 task
装饰器添加 任务
时, 默认会覆盖 数据库
中已存在的任务, 若是不想被覆盖则须要手动调用 add_job
方法并设置 replace_existing
参数为 False
, 以后再添加已存在的任务会抛异常.
我不知道使用持久化存储的正确姿式是什么样的, 根据个人理解, 在添加一系列任务而且进行了修改后, 下次再次启动项目, 应该仍是保持当时的任务状态, 而使用
task
装饰器添加的任务将默认覆盖已有的任务, 因此只能使用add_job
手动添加任务, 而且不容许覆盖已存在任务, 若是遇到重复抛出异常, 捕捉便可. 这样就可让任务状态一直保持.
apscheduler
里有一个 executor
的概念, 用来执行全部的任务, 他们能够是 线程池
或者 进程池
. flask-apscheduler
经过 SCHEDULER_EXECUTORS
配置来设置执行器, 如下为示例:
from flask import Flask
from flask_apscheduler import APScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
app = Flask(__name__)
app.config['SCHEDULER_EXECUTORS'] = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
scheduler = APScheduler()
scheduler.init_app(app)
复制代码
在添加任务的时候使用 executor
参数选择便可.
调度器对任务中涉及到的 jobstore
, trigger
, exector
以及 增删改查
进行调度, 使全部任务平稳运行. 通常一个应用只存在一个调度器. 下面是提供的调度器:
BlockingScheduler
: 阻塞运行, 仅在应用为独立的定时任务程序时使用BackgroundScheduler
: 后台运行, flask-apscheduler
默认为此项AsyncIOScheduler
: 在应用使用了 asyncio
模块时使用GeventScheduler
: 在应用使用了 gevent
模块时使用TornadoScheduler
: 在应用使用了 tornado
模块时使用TwistedScheduler
: 在应用使用了 twisted
模块时使用QtScheduler
: 在构建 Qt
应用时使用若是选择了以上的任何一个 Scheduler
, 将会默认选择合适的 executor
, 不然默认的 executor
为 ThreadPoolExecutor
.
apscheduler
提供了名为 apscheduler
的 日志器
, 能够对其配置定制日志输出, 如下代码仅供参考:
logger = logging.getLogger('apscheduler')
logger.setLevel(logging.DEBUG)
logger_handler = logging.FileHandler('scheduler.log', mode='a', encoding='utf-8')
logger_handler.setLevel(logging.DEBUG)
logger_handler.setFormatter(logging.Formatter(fmt='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'))
logger.addHandler(logger_handler)
复制代码
以上代码设置日志等级为 DEBUG
, 将日志输出到 scheduler.log
文件.
若是任务在执行过程当中抛出了异常, 会将异常堆栈写入日志. 可是但愿在出错时能够作一些其余的逻辑, 不如出错后进行邮件提醒等功能就须要添加事件监听. 如下代码为示例:
def my_listener(event):
if event.exception:
print('The job crashed :(')
else:
print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_ERROR)
复制代码
以上代码将添加 错误事件
监听, 当任务执行出错时, 将会执行 my_listener
函数. 具体事件从 apscheduler.events
导入.
经过以上这些, 对于简单的 定时任务
已经能够很方便的集成了. 其实 flask-apscheduler
我也是初次使用, 大多数都是从过阅读 文档 后根据本身理解翻译的, 下面是我使用过程当中遇到过的问题.
我在使用装饰器注册一个任务后, 使用 gunicorn
服务器部署应用后, 每一个任务会同时运行屡次, 排查才知道使用 gunicorn
时采用了多个 worker
, 每一个 worker
都会在内存中维护一个独立的 应用实例
, 而后每一个任务在每一个应用实例中都存在一份. 因此定时任务应该在应用初始化后初始化, 这时 钩子函数
就起做用了. 使用 app.before_first_request
钩子函数, 就能够在第一个请求开始以前将全部任务激活了.
咱们通常须要使用 flask
实例的相关信息时会使用 current_app
代理进行获取. 因为 flask
使用了 本地线程
技术来封装对象, 因此 current_app
仅指向当前线程下的 flask
实例, 可是 定时任务
是另开一个全新的 线程
, 这个线程是脱离了应用上下文的, 若是想要使用, 就须要使用 _get_current_object()
获取真实实例后做为参数传递到新的线程内, 而后使用 app.app_context()
获取应用上下文.
flask-apscheduler
中初始化 scheduler
时会将 flask
实例挂载到 scheduler
上, 须要使用到应用上下文时, 只须要使用 scheduler.app.app_context()
便可.