阅读目录
1、apSheduler
2、Flask-APScheduler
3、动态定时任务
4、uwsgi部署注意事项
1、apSheduler
第一部份内容限于apSheduler3.0如下版本,以上版本可移步至 FastAPI+apSheduler动态定时任务html
1. 引子(Introduction)
Advanced Python Scheduler (APScheduler) 是一个轻量级但功能强大的进程内任务调度器,容许您调度函数(或任何其余python可调用文件)在您选择的时间执行。前端
2. 特性(Features)
- 没有(硬)外部依赖性
- api线程安全
- 支持CPython、Jython、PyPy
- 可配置的调度机制(触发器):
- 相似cron调度
- 单次运行延迟调度(如UNIX“at”命令)
- 基于时间间隔(以指定的时间间隔运行)
- 支持多种存储空间
- RAM
- 基于文件的简单数据库
- SQLAlchem
- MongoDB
- Redis
3. 使用(Usage)
3.1 安装
- pip install apscheduler
3.2 启动调度程序
from apscheduler.scheduler import Scheduler sched = Scheduler() sched.start()
3.3 调度job
3.3.1 简单日期调度job
在指定时间执行一次job。这是至关于UNIX“at”命令的进程内命令python
from datetime import date from apscheduler.scheduler import Scheduler # Start the scheduler sched = Scheduler() sched.start() # Define the function that is to be executed def my_job(text): print text # The job will be executed on November 6th, 2009 exec_date = date(2009, 11, 6) # 添加一个job job = sched.add_date_job(my_job, exec_date, ['text'])
更具体地安排时间linux
from datetime import datetime # The job will be executed on November 6th, 2009 at 16:30:05 job = sched.add_date_job(my_job, datetime(2009, 11, 6, 16, 30, 5), ['text'])
甚至能够将日期指定为字符串文本sql
job = sched.add_date_job(my_job, '2009-11-06 16:30:05', ['text']) # 支持微秒级别 job = sched.add_date_job(my_job, '2009-11-06 16:30:05.720400', ['text'])
3.3.2 基于时间间隔的调度job
job的执行在给定延迟后开始,或者在start_date(若是指定)开始,start_date参数能够做为date/datetime对象或字符串文本给出。数据库
from datetime import datetime from apscheduler.scheduler import Scheduler # Start the scheduler sched = Scheduler() sched.start() def job_function(): print "Hello World" # Schedule job_function to be called every two hours sched.add_interval_job(job_function, hours=2) # The same as before, but start after a certain time point sched.add_interval_job(job_function, hours=2, start_date='2010-10-10 09:30')
装饰语法flask
from apscheduler.scheduler import Scheduler # Start the scheduler sched = Scheduler() sched.start() # Schedule job_function to be called every two hours @sched.interval_schedule(hours=2) def job_function(): print "Hello World"
若是须要取消对装饰功能的job,能够这样作后端
scheduler.unschedule_job(job_function.job)
3.3.3 cron调度job
与crontab表达式不一样,您能够省略不须要的字段。大于最低有效明肯定义字段的字段默认为,而较小的字段默认为其最小值,除了默认为。例如,若是仅指定day=1,minute=20,则做业将在每一年每个月的第一天以每小时20分钟的速度执行。下面的代码示例应该进一步说明这种行为。
省略字段默认为*
api
from apscheduler.scheduler import Scheduler # Start the scheduler sched = Scheduler() sched.start() def job_function(): print "Hello World" # Schedules job_function to be run on the third Friday # of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00 sched.add_cron_job(job_function, month='6-8,11-12', day='3rd fri', hour='0-3') # Schedule a backup to run once from Monday to Friday at 5:30 (am) sched.add_cron_job(job_function, day_of_week='mon-fri', hour=5, minute=30)
装饰语法安全
@sched.cron_schedule(day='last sun') def some_decorated_task(): print "I am printed at 00:00:00 on the last Sunday of every month!"
若是须要取消对装饰功能的job,能够这样作
scheduler.unschedule_job(job_function.job)
3.3.4 使用自定义触发器调度
以上事例基于内置触发器调度job,若是须要使用自定义触发器调度须要使用add_job()方法
from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5') scheduler.add_job(func=aps_test, args=('一次性任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12)) scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3) scheduler.start()
3.4 关闭调度器
sched.shutdown() # 默认状况下,调度程序关闭其线程池,并等待直到全部当前正在执行的job完成。为了更快地退出,能够: sched.shutdown(wait=False) # 这仍然会关闭线程池,但不会等待任何正在运行的任务完成。此外,若是您给调度程序一个要在其余地方管理的线程池,您可能但愿彻底跳过线程池关闭: sched.shutdown(shutdown_threadpool=False) # 自动关闭调度程序的一个巧妙方法是为此使用atexit挂钩: import atexit sched = Scheduler(daemon=True) atexit.register(lambda: sched.shutdown(wait=False)) # Proceed with starting the actual application
3.5 Job stores
若是没有指定stores存储位置,则将转到默认job存储 -> ramjobstore不提供持久化保存
其它存储stores:
ShelveJobStore SQLAlchemyJobStore MongoDBJobStore RedisJobStore
经过配置选项或add_jobstore()方法添加做业存储。所以,如下是相等的:
config = {'apscheduler.jobstores.file.class': 'apscheduler.jobstores.shelve_store:ShelveJobStore', 'apscheduler.jobstores.file.path': '/tmp/dbfile'} sched = Scheduler(config)
3.6 获取调度器列表
sched.print_jobs()
2、Flask-APScheduler
1. 引子(Introduction)
- Flask-APScheduler 是Flask框架的一个扩展库,增长了Flask对apScheduler的支持
2. 特性(Features)
- 根据Flask配置加载调度器配置
- 根据Flask配置加载调度器job
- 容许指定调度程序将运行的主机名
- 提供REST API来管理调度job
- 为REST API提供认证
3. 安装(Installation)
pip install Flask-APScheduler
4. 使用(Usage)
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from flask import Flask from flask_apscheduler import APScheduler class Config(object): # 配置执行job JOBS = [ { 'id': 'job1', 'func': 'advanced:job1', 'args': (1, 2), 'trigger': 'interval', 'seconds': 10 } ] # 存储位置 SCHEDULER_JOBSTORES = { 'default': SQLAlchemyJobStore(url='sqlite://') } # 线程池配置 SCHEDULER_EXECUTORS = { 'default': {'type': 'threadpool', 'max_workers': 20} } SCHEDULER_JOB_DEFAULTS = { 'coalesce': False, 'max_instances': 3 } # 调度器开关 SCHEDULER_API_ENABLED = True def job1(a, b): print(str(a) + ' ' + str(b)) if __name__ == '__main__': app = Flask(__name__) app.config.from_object(Config()) scheduler = APScheduler() # 注册app scheduler.init_app(app) scheduler.start() app.run()
3、动态定时任务
- Flask + flask_apscheduler实现一个相似Jenkins的定时任务的功能,前端设置crontab,后端能够建立,修改,暂停,移除,恢复一个执行任务
文件目录 |--app |----config.py 配置文件 |----run_tasks.py 开启任务 |----tasks.py 任务job |----apSheduler.py 提供接口函数 |----extensions.py flask扩展 |----__init__.py 初始化文件 |----views.py 业务代码 |--manage.py 项目启动文件
1. config.py配置flask_apscheduler
class Config(object): # 开关 SCHEDULER_API_ENABLED = True # 持久化配置 SCHEDULER_JOBSTORES = { 'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db') } SCHEDULER_EXECUTORS = { 'default': {'type': 'threadpool', 'max_workers': 20} }
2. init.py建立app
from app.config import Config from app.extensions import scheduler # 建立app def create_app(config=None, app_name=None, blueprints=None): app = Flask(app_name, static_folder='thanos/static', template_folder='thanos/resource/report') # 导入flask配置 -> 这里根据本身的项目导入配置就好哇 # config = Config.get_config_from_host(app.name) app.config.from_object(config) # 初始化调度器配置 configure_scheduler(app) def configure_scheduler(app): """Configure Scheduler""" scheduler.init_app(app) scheduler.start() # 加载任务,选择了第一次请求flask后端时加载,能够选择别的方式... @app.before_first_request def load_tasks(): # 开启任务 from app import run_tasks
3. extensions.py实例化scheduler
from flask_apscheduler import APScheduler scheduler = APScheduler()
4. apSheduler.py提供调度器接口
"""此文件能够根据具体业务复杂化选择写或者直接调用原apscheduler接口""" from flask import current_app # from .extensions import scheduler 直接导入单例对象操做也行 class APScheduler(object): """调度器控制方法""" def add_job(self, jobid, func, args, **kwargs): """ 添加任务 :param args: 元祖 -> (1,2) :param jobstore: 存储位置 :param trigger: data -> run_date datetime表达式 cron -> second/minute/day_of_week interval -> seconds 延迟时间 next_run_time -> datetime.datetime.now() + datetime.timedelta(seconds=12)) :return: """ job_def = dict(kwargs) job_def['id'] = jobid job_def['func'] = func job_def['args'] = args job_def = self.fix_job_def(job_def) self.remove_job(jobid) # 删除原job current_app.apscheduler.scheduler.add_job(**job_def) def remove_job(self, jobid, jobstore=None): """删除任务""" current_app.apscheduler.remove_job(jobid, jobstore=jobstore) def resume_job(self, jobid, jobstore=None): """恢复任务""" current_app.apscheduler.resume_job(jobid, jobstore=jobstore) def pause_job(self, jobid, jobstore=None): """恢复任务""" current_app.apscheduler.pause_job(jobid, jobstore=jobstore) def fix_job_def(self, job_def): """维修job工程""" if job_def.get('trigger') == 'date': job_def['run_date'] = job_def.get('run_date') or None elif job_def.get('trigger') == 'cron': job_def['hour'] = job_def.get('hour') or "*" job_def['minute'] = job_def.get('minute') or "*" job_def['week'] = job_def.get('week') or "*" job_def['day'] = job_def.get('day') or "*" job_def['month'] = job_def.get('month') or "*" elif job_def.get('trigger') == 'interval': job_def['seconds'] = job_def.get('seconds') or "*" else: if job_def.get("andTri"): job_def['trigger'] = AndTrigger([job_def.pop("andTri", None), ]) # job_def['next_run_time'] = job_def.get('next_run_time') or None return job_def
5. views.py 实现调度器接口
from app.apSheduler import APScheduler # croniter库解析Linux cron格式的计划 # 以添加为例子 暂停 删除 恢复能够根据业务场景本身写接口 def add_crontab_task(self, params): """添加一个crontab任务""" try: self.crontab = params.get("crontab") self.id = params.get("id") self.task_id = params.get("task_id") except Exception as e: return False, str(e) # 记录数据库 res = addSql() # 更新任务信息 APScheduler().add_job(jobid=self.id, func=task_func, args=(self.task_id,), andTri=CronTrigger.from_crontab(self.crontab)) if res is False: return False, "数据库操做异常" return True, croniter(self.crontab, datetime.now()).get_next(datetime) def get_next_execute_time(self, params): """获取下一次执行时间""" try: self.crontab = params.get("crontab") except Exception as e: return False, str(e) return True, str(croniter(self.crontab, datetime.now()).get_next(datetime))
6. tasks.py 任务job
def task_func(task_id): """业务逻辑""" # 发邮件、写诗、画画 -> 爱干啥干啥
7. run_tasks.py 开启任务调度大门
from .task import task_func from apscheduler.triggers.cron import CronTrigger # 能够很友好的支持添加一个crontab表达式 def run_task(): # 查询数据库的crontab信息 -> 定时任务信息 res = fetall("select * from crontab_table") # 遍历添加任务 shche = APScheduler() for rs in res: shche.add_job(jobid=rs.get(id), func=task_func, args=(rs.get(task_id)), andTri=CronTrigger.from_crontab(rs.get(crontab))) # 最重要的 run_task() # 这样当__init__.py建立app时加载这个文件,就会执行添加历史任务啦!
8. manage.py 启动项目
from app import create_app app = create_app() app.run()
4、uwsgi部署注意事项
1. 常见问题及解决方案
1.1 线上部署uWSGI+APScheduler执行定时任务卡死
1.1.1问题分析:
APScheduler运行环境须要为多线程,uwsgi默认是one thread ,one process,须要在配置文件里面加上一条 enable-thread = true,也就是容许程序内部启动多线程。
1.1.2解决方案:
# uwsgi.ini文件追加如下配置 enable-threads = true preload=True #用--preload启动uWSGI,确保scheduler只在loader的时候建立一次 lazy-apps=true
1.2 定时任务屡次执行的问题
1.2.1问题分析:
1.本地缘由,错过了上次执行时间,下次会屡次执行
2.线上部署的,如uWSGI部署,配置了processes>1致使加载了多此apscheduler(apscheduler当前没有任何进程间同步和信令方案)
1.2.3解决方案:
1. 本地屡次执行能够在Flask启动方法中加use_reloader=False
app.run(host="0.0.0.0", port=8888, use_reloader=False)
2.线上linux能够借鉴下面的方法,网上借鉴的
在__init__.py文件中修改中configure_scheduler(),用全局锁确保scheduler只运行一次, 代码以下:
import atexit import fcntl # 只能用于linux from .extensions import scheduler def configure_scheduler(app): """Configure Scheduler""" f = open("scheduler.lock", "wb") try: fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) scheduler.init_app(app) scheduler.start() # 加载任务 @app.before_first_request def load_tasks(): from thanos import run_tasks except: pass def unlock(): fcntl.flock(f, fcntl.LOCK_UN) f.close() atexit.register(unlock) init函数为flask项目初始化所调用,这里为scheduler模块的初始化部分。首先打开(或建立)一个scheduler.lock文件,并加上非阻塞互斥锁。成功后建立scheduler并启动。若是加文件锁失败,说明scheduler已经建立,就略过建立scheduler的部分。 最后注册一个退出事件,若是这个flask项目退出,则解锁并关闭scheduler.lock文件的锁。
3.官网推荐rpyc/grpc解决
能够查看 FastAPI+apSheduler动态定时任务