APScheduler任务调度利器

Java中任务调度通常用Quartz,Python中的任务调度工具也有很多:Celery,RQ,APScheduler等。
Celery:很是强大的分布式任务调度框架
RQ:基于Redis的做业队列工具
APScheduler:一款强大的任务调度工具html

RQ参考Celery,听说要比Celery轻量级(Really?)
APScheduler感受更像Quartz。
本人小小的建议是通常项目用APScheduler,由于不用像Celery那样再单独启动worker、beat进程,并且API也很简洁。
对于大点分布式项目用Celerygit

官网:http://apscheduler.readthedoc...
API:http://apscheduler.readthedoc...
当前版本:3.3.0
安装:$ pip install apscheduler
例子:https://github.com/agronholm/...github

特性

Advanced Python Scheduler (APScheduler) 一款强大的任务调度工具.
内置了三种调度模式:sql

  • Cron风格mongodb

  • 间隔性(Interval-based)执行并发

  • 仅在某个时间执行一次app

做业存储支持如下几种方式:
Memory
SQLAlchemy (any RDBMS supported by SQLAlchemy works)
MongoDB
Redis
RethinkDB
ZooKeeper
除了Memory方式不须要序列化以外(一个例外是使用ProcessPoolExecutor),其他都须要做业函数参数可序列化。框架

支持与如下框架集成:
asyncio (PEP 3156)
gevent
Tornado
Twisted
Qt (using either PyQt or PySide)async

基本概念

四大组件:
triggers
job stores
executors
schedulers分布式

schedulers

内置如下几种调度器实现:
BlockingScheduler:
BackgroundScheduler: 默认采用ThreadPoolExecutor池(默认10),能够配置ProcessPoolExecutor,或同时使用
AsyncIOScheduler: 使用asyncio模块
GeventScheduler: 使用gevent
TornadoScheduler: use if you’re building a Tornado application
TwistedScheduler: use if you’re building a Twisted application
QtScheduler: use if you’re building a Qt application

基类:BaseScheduler,能够经过此类查询相关配置选项

调度器配置示例:

方式1、

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方式2、三:略。请看官网

启动与关闭调度器:

经过调用start()方法来启动

scheduler.start()
scheduler.shutdown()
scheduler.shutdown(wait=False)

triggers

内置如下三种触发器实现:
date: 对已仅执行一次的情绪,指定某个之间点。请参考这里
interval: 指定时间间隔(fixed intervals)周期性执行。请参考这里
cron: 使用cron风格表达式周期性执行。请参考这里

job管理

添加jobs:

  • 调用scheduler.add_job()方法,会返回apscheduler.job.Job实例(可用于job修改、移除等)

  • 使用装饰器scheduled_job()

做业存储注意事项:

除了Memory方式不须要序列化以外(一个例外是使用ProcessPoolExecutor),其他都须要做业函数参数可序列化。
若是须要存储做业,并且每次启动时你的应用都会从新添加一遍做业,那么请在添加job时指定一个惟一的ID,以及指定replace_existing=True,不然每次启动应用都会添加一次job的副本。
若是须要当即启动该任务,请在添加job时指定trigger参数。

移除job:
调用scheduler.remove_job()放到,参数为 job’s ID and job store alias
调用job实例的remove()方法 on the Job instance you got from add_job()

注意:若是任务已经调度完毕,而且以后也不会再被执行的状况下,会被自动移除。

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

暂停和恢复job:

apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()

获取jobs列表

apscheduler.get_jobs()

修改job:

能够经过apscheduler.job.Job.modify() or apscheduler.modify_job()修改除了id以外的job属性。

job.modify(max_instances=6, name='Alternate name')

若是你想修改job的调度器,你可使用apscheduler.job.Job.reschedule() or reschedule_job()

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

限制同一个job实例的并发执行数

默认状况下同一个job,只容许一个job实例运行。这在某个job在下次运行时间到达以后仍未执行完毕时,能达到控制的目的。你也能够该变这一行为,在你调用add_job时能够传递max_instances=5来运行同时运行同一个job的5个job实例。

job错过执行时间与job合并(Missed job executions and coalescing):

一个job可能因为某些状况错过执行时间,好比上一点提到的,或者是线程池或进程池用光了,或者是当要调度job时,忽然down机了等。
这时能够经过设置job的misfire_grace_time选项来指示以后尝试执行的次数。
固然若是这不符合你的指望,你能够合并全部错过期间的job到一个job来执行,经过设定job的coalesce=True

Scheduler events

能够监听调度、任务执行状况相关的事件。

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

支持的事件列表:
http://apscheduler.readthedoc...

小结

有木有很是强大,又很是易于理解的感受。Good Job!

相关文章
相关标签/搜索