APscheduler是执行定时任务的python库,其做用能够代替Linux系统下的crontab,github上有该库的例子。html
该模块由4个基本组件组成:python
其中triggers定义了定时任务的类别、触发条件以及具体要执行的任务名。
job stores能够将任务储存起来,有些须要重复执行的任务,或有数据生成的任务,有将数据储存的要求,能够经过数据库或文件的形式将数据储存起来以便下次使用。
executors负责具体的任务执行。
schedulers是最高级的组件,负责其它的管理和调度工做,该模块有几种可选的调度器可供选择,使用方法都是同样的:linux
BlockingScheduler: use when the scheduler is the only thing running in your process BackgroundScheduler: use then you’re not using any of the frameworks below, and want the scheduler to run in the background inside your application AsyncIOScheduler: use if your application uses the asyncio module GeventScheduler: use if your application uses gevent TornadoScheduler: use if you’re building a Tornado application TwistedScheduler: use if you’re building a Twisted application QtScheduler: use if you’re building a Qt application
定时任务的执行内容放在一个函数中使用,而调度的类型和时间等设置则在建立这个job的时候进行定义,在这个模块中,一个具体的任务是一个JOB实例,所以能够经过修改这个实例的属性对任务进行从新设置,而每个job实例都是依赖于一个scheduler,因此也能够经过sheduler对sheduler进行从新设定。git
参考源码中scheduler.add_job()的参数说明
'cron'跟linux下crontab同样的定时类型写法,
'interval'使用时间间隔的写法
...github
from apscheduler.schedulers.background import BackgroundScheduler # 建立scheduler scheduler = BackgroundScheduler() # 建立job # 方法1 job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 方法2 @scheduler.scheduled_job def myfunc(): # do something pass # 启动 scheduler.start() # 添加job scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 删除job scheduler.remove_job('my_job_id') # 暂停job scheduler.pause_job() # 恢复job scheduler.resume_job() # 查看jobs scheduler.get_jobs() scheduler.print_jobs() # 修改job scheduler.modify_job(max_instances=6, name='Alternate name') # 从新设置定时任务类型 scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5') # 关闭 scheduler.shutdown(wait=False)
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
from apscheduler.schedulers.background import BackgroundScheduler # The "apscheduler." prefix is hard coded scheduler = BackgroundScheduler({ 'apscheduler.jobstores.mongo': { 'type': 'mongodb' }, 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'sqlite:///jobs.sqlite' }, 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '20' }, 'apscheduler.executors.processpool': { 'type': 'processpool', 'max_workers': '5' }, 'apscheduler.job_defaults.coalesce': 'false', 'apscheduler.job_defaults.max_instances': '3', 'apscheduler.timezone': 'UTC', })
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { 'mongo': {'type': 'mongodb'}, 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, 'processpool': ProcessPoolExecutor(max_workers=5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler() # .. do something else here, maybe add jobs etc. scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
flask下使用该模块能够使用flask扩展模块flask-apshedulerweb
from flask import Flask from flask_apscheduler import APScheduler class Config(object): JOBS = [ { 'id': 'job1', 'func': 'jobs:job1', 'args': (1, 2), 'trigger': 'interval', 'seconds': 10 } ] SCHEDULER_API_ENABLED = True def job1(a, b): print(str(a) + ' ' + str(b)) if __name__ == '__main__': app = Flask(__name__) app.config.from_object(Config()) scheduler = APScheduler() # it is also possible to enable the API directly # scheduler.api_enabled = True scheduler.init_app(app) scheduler.start() app.run()
问题在使用gunicorn对flask应用进行控制的时候若是设置了gunicorn的--worker参数大于1时,会出现一个定时任务执行屡次的问题,此时要给gunicorn提供一个额外的--preload参数,这样,flask的app在run以前的全部操做只会执行一次,所以定时任务就只会执行一次。
env/bin/gunicorn module_containing_app:app -b 0.0.0.0:8080 --workers 3 --preloadsql
问题与上面的问题相似,调试模式下只需给app.run()添加一个参数use_reloader便可。
app.run(debug=True, use_reloader=False)mongodb