官网文档:http://apscheduler.readthedoc...
API:http://apscheduler.readthedoc...html
APScheduler
是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是:python
triggers: 任务触发器组件,提供任务触发方式react
job stores: 任务商店组件,提供任务保存方式linux
executors: 任务调度组件,提供任务调度方式redis
schedulers: 任务调度组件,提供任务工做方式sql
pip 安装mongodb
$ pip install apscheduler
源码安装框架
$ python setup.py install
from apscheduler.schedulers.blocking import BlockingScheduler import time # 实例化一个调度器 scheduler = BlockingScheduler() def job1(): print "%s: 执行任务" % time.asctime() # 添加任务并设置触发方式为3s一次 scheduler.add_job(job1, 'interval', seconds=3) # 开始运行调度器 scheduler.start()
输出:async
$ python first.py Fri Sep 8 20:41:55 2017: 执行任务 Fri Sep 8 20:41:58 2017: 执行任务 ...
trigger
提供任务的触发方式,共三种方式:tornado
date:只在某个时间点执行一次run_date(datetime|str)
scheduler.add_job(my_job, 'date', run_date=date(2017, 9, 8), args=[]) scheduler.add_job(my_job, 'date', run_date=datetime(2017, 9, 8, 21, 30, 5), args=[]) scheduler.add_job(my_job, 'date', run_date='2017-9-08 21:30:05', args=[]) # The 'date' trigger and datetime.now() as run_date are implicit sched.add_job(my_job, args=[[])
interval: 每隔一段时间执行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None
scheduler.add_job(my_job, 'interval', hours=2) scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00', end_date='2018-06-15 21:30:00) @scheduler.scheduled_job('interval', id='my_job_id', hours=2) def my_job(): print("Hello World")
cron: 使用同linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)
sched.add_job(my_job, 'cron', hour=3, minute=30) sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10-30') @sched.scheduled_job('cron', id='my_job_id', day='last sun') def some_decorated_task(): print("I am printed at 00:00:00 on the last Sunday of every month!")
scheduler
组件提供执行的方式,在不一样的运用环境中选择合适的方式
BlockingScheduler: 进程中只运行调度器时的方式
from apscheduler.schedulers.blocking import BlockingScheduler import time scheduler = BlockingScheduler() def job1(): print "%s: 执行任务" % time.asctime() scheduler.add_job(job1, 'interval', seconds=3) scheduler.start()
BackgroundScheduler: 不想使用任何框架时的方式
from apscheduler.schedulers.background import BackgroundScheduler import time scheduler = BackgroundScheduler() def job1(): print "%s: 执行任务" % time.asctime() scheduler.add_job(job1, 'interval', seconds=3) scheduler.start() while True: pass
AsyncIOScheduler: asyncio module的方式(Python3)
from apscheduler.schedulers.asyncio import AsyncIOScheduler try: import asyncio except ImportError: import trollius as asyncio ... ... # while True:pass try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): pass
GeventScheduler: gevent方式
from apscheduler.schedulers.gevent import GeventScheduler ... ... g = scheduler.start() # while True:pass try: g.join() except (KeyboardInterrupt, SystemExit): pass
TornadoScheduler: Tornado方式
from tornado.ioloop import IOLoop from apscheduler.schedulers.tornado import TornadoScheduler ... ... # while True:pass try: IOLoop.instance().start() except (KeyboardInterrupt, SystemExit): pass
TwistedScheduler: Twisted方式
from twisted.internet import reactor from apscheduler.schedulers.twisted import TwistedScheduler ... ... # while True:pass try: reactor.run() except (KeyboardInterrupt, SystemExit): pass
QtScheduler: Qt方式
executors
组件提供任务的调度方式
base
debug
gevent
pool(max_workers=10)
twisted
jobstore
提供任务的各类持久化方式
base
memory
mongodbscheduler.add_jobstore('mongodb', collection='example_jobs')
redis scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')
rethinkdb scheduler.add_jobstore('rethinkdb', database='apscheduler_example')
sqlalchemyscheduler.add_jobstore('sqlalchemy', url=url)
zookeeperscheduler.add_jobstore('zookeeper', path='/example_jobs')
add_job
(如上)若是使用了任务的存储,开启时最好添加
replace_existing=True
,不然每次开启都会建立任务的副本
开启后任务不会立刻启动,可修改trigger参数
remove_job
# 根据任务实例删除 job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove() # 根据任务id删除 scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id')
pause_job
和继续resume_job
job = scheduler.add_job(myfunc, 'interval', minutes=2) # 根据任务实例 job.pause() job.resume() # 根据任务id暂停 scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.pause_job('my_job_id') scheduler.resume_job('my_job_id')
modify
和重设reschedule_job
修饰:job.modify(max_instances=6, name='Alternate name')
重设:scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
开启 scheduler.start()
关闭 scheduler.shotdown(wait=True | False)
暂停 scheduler.pause()
继续 scheduler.resume()
def my_listener(event): if event.exception: print('The job crashed :(') else: print('The job worked :)') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)