Java中任务调度通常用Quartz,Python中的任务调度工具也有很多:Celery,RQ,APScheduler等。
Celery:很是强大的分布式任务调度框架
RQ:基于Redis的做业队列工具
APScheduler:一款强大的任务调度工具html
RQ参考Celery,听说要比Celery轻量级(Really?)
APScheduler感受更像Quartz。
本人小小的建议是通常项目用APScheduler,由于不用像Celery那样再单独启动worker、beat进程,并且API也很简洁。
对于大点分布式项目用Celerygit
官网:http://apscheduler.readthedoc...
API:http://apscheduler.readthedoc...
当前版本:3.3.0
安装:$ pip install apscheduler
例子:https://github.com/agronholm/...github
Advanced Python Scheduler (APScheduler) 一款强大的任务调度工具.
内置了三种调度模式:sql
Cron风格mongodb
间隔性(Interval-based)执行并发
仅在某个时间执行一次app
做业存储支持如下几种方式:
Memory
SQLAlchemy (any RDBMS supported by SQLAlchemy works)
MongoDB
Redis
RethinkDB
ZooKeeper
除了Memory方式不须要序列化以外(一个例外是使用ProcessPoolExecutor),其他都须要做业函数参数可序列化。框架
支持与如下框架集成:
asyncio (PEP 3156)
gevent
Tornado
Twisted
Qt (using either PyQt or PySide)async
四大组件:
triggers
job stores
executors
schedulers分布式
内置如下几种调度器实现:
BlockingScheduler:
BackgroundScheduler: 默认采用ThreadPoolExecutor池(默认10),能够配置ProcessPoolExecutor,或同时使用
AsyncIOScheduler: 使用asyncio模块
GeventScheduler: 使用gevent
TornadoScheduler: use if you’re building a Tornado application
TwistedScheduler: use if you’re building a Twisted application
QtScheduler: use if you’re building a Qt application
基类:BaseScheduler,能够经过此类查询相关配置选项。
方式1、
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方式2、三:略。请看官网
经过调用start()方法来启动
scheduler.start() scheduler.shutdown() scheduler.shutdown(wait=False)
内置如下三种触发器实现:
date: 对已仅执行一次的情绪,指定某个之间点。请参考这里
interval: 指定时间间隔(fixed intervals)周期性执行。请参考这里
cron: 使用cron风格表达式周期性执行。请参考这里
添加jobs:
调用scheduler.add_job()方法,会返回apscheduler.job.Job
实例(可用于job修改、移除等)
使用装饰器scheduled_job()
做业存储注意事项:
除了Memory方式不须要序列化以外(一个例外是使用ProcessPoolExecutor),其他都须要做业函数参数可序列化。
若是须要存储做业,并且每次启动时你的应用都会从新添加一遍做业,那么请在添加job时指定一个惟一的ID
,以及指定replace_existing=True
,不然每次启动应用都会添加一次job的副本。
若是须要当即启动该任务,请在添加job时指定trigger参数。
移除job:
调用scheduler.remove_job()放到,参数为 job’s ID and job store alias
调用job实例的remove()方法 on the Job instance you got from add_job()
注意:若是任务已经调度完毕,而且以后也不会再被执行的状况下,会被自动移除。
job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove() scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id')
暂停和恢复job:
apscheduler.job.Job.pause() apscheduler.schedulers.base.BaseScheduler.pause_job() apscheduler.job.Job.resume() apscheduler.schedulers.base.BaseScheduler.resume_job()
获取jobs列表
apscheduler.get_jobs()
修改job:
能够经过apscheduler.job.Job.modify() or apscheduler.modify_job()
修改除了id以外
的job属性。
job.modify(max_instances=6, name='Alternate name')
若是你想修改job的调度器,你可使用apscheduler.job.Job.reschedule() or reschedule_job()
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
默认状况下同一个job,只容许一个job实例运行。这在某个job在下次运行时间到达以后仍未执行完毕时,能达到控制的目的。你也能够该变这一行为,在你调用add_job时能够传递max_instances
=5来运行同时运行同一个job的5个job实例。
一个job可能因为某些状况错过执行时间,好比上一点提到的,或者是线程池或进程池用光了,或者是当要调度job时,忽然down机了等。
这时能够经过设置job的misfire_grace_time选项来指示以后尝试执行的次数。
固然若是这不符合你的指望,你能够合并全部错过期间的job到一个job来执行,经过设定job的coalesce=True
。
能够监听调度、任务执行状况相关的事件。
def my_listener(event): if event.exception: print('The job crashed :(') else: print('The job worked :)') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
支持的事件列表:
http://apscheduler.readthedoc...
有木有很是强大,又很是易于理解的感受。Good Job!