今天准备实现一个功能须要用到定时执行任务,因此就看到了Python的一个定时任务框架APScheduler,试了一下感受还不错。html
1.APScheduler简介: python
APScheduler是Python的一个定时任务框架,能够很方便的知足用户定时执行或者周期执行任务的需求,它提供了基于日期date、固定时间间隔interval 、以及相似于Linux上的定时任务crontab类型的定时任务。而且该框架不只能够添加、删除定时任务,还能够将任务存储到数据库中,实现任务的持久化,因此使用起来很是方便。sql
2.APScheduler安装:mongodb
APScheduler的安装相对来讲也很是简单,能够直接利用pip安装,若是没有pip能够下载源码,利用源码安装。数据库
1).利用pip安装:(推荐)bash
# pip install apscheduler
2).基于源码安装:https://pypi.python.org/pypi/APScheduler/ 框架
# python setup.py install
3.基本概念async
APScheduler有四种组件及相关说明:ide
1) triggers(触发器):触发器包含调度逻辑,每个做业有它本身的触发器,用于决定接下来哪个做业会运行,除了他们本身初始化配置外,触发器彻底是无状态的。ui
2)job stores(做业存储):用来存储被调度的做业,默认的做业存储器是简单地把做业任务保存在内存中,其它做业存储器能够将任务做业保存到各类数据库中,支持MongoDB、Redis、SQLAlchemy存储方式。当对做业任务进行持久化存储的时候,做业的数据将被序列化,从新读取做业时在反序列化。
3) executors(执行器):执行器用来执行定时任务,只是将须要执行的任务放在新的线程或者线程池中运行。看成业任务完成时,执行器将会通知调度器。对于执行器,默认状况下选择ThreadPoolExecutor就能够了,可是若是涉及到一下特殊任务如比较消耗CPU的任务则能够选择ProcessPoolExecutor,固然根据根据实际需求能够同时使用两种执行器。
4) schedulers(调度器):调度器是将其它部分联系在一块儿,通常在应用程序中只有一个调度器,应用开发者不会直接操做触发器、任务存储以及执行器,相反调度器提供了处理的接口。经过调度器完成任务的存储以及执行器的配置操做,如能够添加。修改、移除任务做业。
APScheduler提供了多种调度器,能够根据具体需求来选择合适的调度器,经常使用的调度器有:
BlockingScheduler:适合于只在进程中运行单个任务的状况,一般在调度器是你惟一要运行的东西时使用。
BackgroundScheduler: 适合于要求任何在程序后台运行的状况,当但愿调度器在应用后台执行时使用。
AsyncIOScheduler:适合于使用asyncio框架的状况
GeventScheduler: 适合于使用gevent框架的状况
TornadoScheduler: 适合于使用Tornado框架的应用
TwistedScheduler: 适合使用Twisted框架的应用
QtScheduler: 适合使用QT的状况
4.配置调度器
APScheduler提供了许多不一样的方式来配置调度器,你可使用一个配置字典或者做为参数关键字的方式传入。你也能够先建立调度器,再配置和添加做业,这样你能够在不一样的环境中获得更大的灵活性。
1)下面一个简单的示例:
import time from apscheduler.schedulers.blocking import BlockingScheduler def test_job(): print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) scheduler = BlockingScheduler() ''' #该示例代码生成了一个BlockingScheduler调度器,使用了默认的默认的任务存储MemoryJobStore,以及默认的执行器ThreadPoolExecutor,而且最大线程数为10。 ''' scheduler.add_job(test_job, 'interval', seconds=5, id='test_job') ''' #该示例中的定时任务采用固定时间间隔(interval)的方式,每隔5秒钟执行一次。 #而且还为该任务设置了一个任务id ''' scheduler.start()
2)若是想执行一些复杂任务,如上边所说的同时使用两种执行器,或者使用多种任务存储方式,而且须要根据具体状况对任务的一些默认参数进行调整。能够参考下面的方式。(http://apscheduler.readthedocs.io/en/latest/userguide.html)
第一种方式:
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
第二种方式:
from apscheduler.schedulers.background import BackgroundScheduler # The "apscheduler." prefix is hard coded scheduler = BackgroundScheduler({ 'apscheduler.jobstores.mongo': { 'type': 'mongodb' }, 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'sqlite:///jobs.sqlite' }, 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '20' }, 'apscheduler.executors.processpool': { 'type': 'processpool', 'max_workers': '5' }, 'apscheduler.job_defaults.coalesce': 'false', 'apscheduler.job_defaults.max_instances': '3', 'apscheduler.timezone': 'UTC', })
第三种方式:
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { 'mongo': {'type': 'mongodb'}, 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, 'processpool': ProcessPoolExecutor(max_workers=5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler() # .. do something else here, maybe add jobs etc. scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
5.对任务做业的基本操做:
1).添加做业有两种方式:第一种能够直接调用add_job(),第二种使用scheduled_job()修饰器。
而add_job()是使用最多的,它能够返回一个apscheduler.job.Job实例,于是能够对它进行修改或者删除,而使用修饰器添加的任务添加以后就不能进行修改。
例如使用add_job()添加做业:
#!/usr/bin/env python #-*- coding:UTF-8 import time import datetime from apscheduler.schedulers.blocking import BlockingScheduler def job1(f): print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), f def job2(arg1, args2, f): print f, args1, args2 def job3(**args): print args ''' APScheduler支持如下三种定时任务: cron: crontab类型任务 interval: 固定时间间隔任务 date: 基于日期时间的一次性任务 ''' scheduler = BlockingScheduler() #循环任务示例 scheduler.add_job(job1, 'interval', seconds=5, args=('循环',), id='test_job1') #定时任务示例 scheduler.add_job(job1, 'cron', second='*/5', args=('定时',), id='test_job2') #一次性任务示例 scheduler.add_job(job1, next_run_time=(datetime.datetime.now() + datetime.timedelta(seconds=10)), args=('一次',), id='test_job3') ''' 传递参数的方式有元组(tuple)、列表(list)、字典(dict) 注意:不过须要注意采用元组传递参数时后边须要多加一个逗号 ''' #基于list scheduler.add_job(job2, 'interval', seconds=5, args=['a','b','list'], id='test_job4') #基于tuple scheduler.add_job(job2, 'interval', seconds=5, args=('a','b','tuple',), id='test_job5') #基于dict scheduler.add_job(job3, 'interval', seconds=5, kwargs={'f':'dict', 'a':1,'b':2}, id='test_job7') print scheduler.get_jobs() scheduler.start() #带有参数的示例 scheduler.add_job(job2, 'interval', seconds=5, args=['a','b'], id='test_job4') scheduler.add_job(job2, 'interval', seconds=5, args=('a','b',), id='test_job5') scheduler.add_job(job3, 'interval', seconds=5, kwargs={'a':1,'b':2}, id='test_job6') print scheduler.get_jobs() scheduler.start()
或者使用scheduled_job()修饰器来添加做业:
@sched.scheduled_job('cron', second='*/5' ,id='my_job_id',) def test_task(): print("Hello world!")
2).得到任务列表:
能够经过get_jobs方法来获取当前的任务列表,也能够经过get_job()来根据job_id来得到某个任务的信息。而且apscheduler还提供了一个print_jobs()方法来打印格式化的任务列表。
例如:
scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job') print scheduler.get_job('my_job_id') print scheduler.get_jobs()
3).修改任务:
修改任务任务的属性可使用apscheduler.job.Job.modify()或者modify_job()方法,能够修改除了id的其它任何属性。
例如:
job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job' name='test_job') job.modify(max_instances=5, name='my_job')
4).删除任务:
删除调度器中的任务有能够用remove_job()根据job ID来删除指定任务或者使用remove(),若是使用remove()须要事先保存在添加任务时返回的实例对象,任务删除后就不会在执行。
注意:经过scheduled_job()添加的任务只能使用remove_job()进行删除。
例如:
job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job') job.remove()
或者
scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job') scheduler.remove_job('my_job')
5).暂停与恢复任务:
暂停与恢复任务能够直接操做任务实例或者调度器来实现。当任务暂停时,它的运行时间会被重置,暂停期间不会计算时间。
暂停任务:
apscheduler.job.Job.pause() apscheduler.schedulers.base.BaseScheduler.pause_job()
恢复任务
apscheduler.job.Job.resume() apscheduler.schedulers.BaseScheduler.resume_job()
6).启动调度器
可使用start()方法启动调度器,BlockingScheduler须要在初始化以后才能执行start(),对于其余的Scheduler,调用start()方法都会直接返回,而后能够继续执行后面的初始化操做。
例如:
from apscheduler.schedulers.blocking import BlockingScheduler def my_job(): print "Hello world!" scheduler = BlockingScheduler() scheduler.add_job(my_job, 'interval', seconds=5) scheduler.start()
7).关闭调度器:
使用下边方法关闭调度器:
scheduler.shutdown()
默认状况下调度器会关闭它的任务存储和执行器,并等待全部正在执行的任务完成,若是不想等待,能够进行以下操做:
scheduler.shutdown(wait=False)
注意:
当出现No handlers could be found for logger “apscheduler.scheduler”次错误信息时,说明没有
logging模块的logger存在,因此须要添加上,对应新增内容以下所示(仅供参考):
import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datafmt='%a, %d %b %Y %H:%M:%S', filename='/var/log/aaa.txt', filemode='a')