APScheduler 3.0.1浅析

简介

APScheduler是一个小巧而强大的Python类库,经过它你能够实现相似Unix系统cronjob相似的定时任务系统。使用之余,阅读一下源码,一方面有助于更好的使用它,另外一方面,我的认为aps的架构设计质量很高,阅读它对于提高软件开发的sense颇有帮助。python

组成

APScheduler整个系统能够说由这五个概念组成:react

  • scheduler:控制器,能够看作整个系统的driver,外部世界经过它来实现任务(Job)的增删改查管理。根据IO模式的不一样,aps提供了多种scheduler实现。
  • job:描述一个任务自己。
  • jobstore:任务持久化仓库。aps提供了内存、redis、mongodb、sqlalchemy几种store
  • executor:执行任务的模块。根据不一样的IO模型有多种executor选择。
  • trigger:描述一个任务什么时候被触发,有按日期、按时间间隔、按cronjob描述式三种触发方式

这样的划分充分发挥了软件设计中抽象的威力,咱们下面对每一个模块进行描述redis

scheduler

BaseScheduler类是全部scheduler的抽象基类,它的初始化代码是这样的:sql

     def __init__(self, gconfig={}, **options):
         super(BaseScheduler, self).__init__()
         self._executors = {}
         self._executors_lock = self._create_lock()
         self._jobstores = {}
         self._jobstores_lock = self._create_lock()
         self._listeners = []
         self._listeners_lock = self._create_lock()
         self._pending_jobs = []
         self.configure(gconfig, **options)

能够看到一个scheduler维护了本身的executor和jobstore表,经过configure方法进行初始化。在configure中,scheduler读取传入的配置,对executors和jobstores进行初始化,一个典型的配置是这样的:mongodb

 APS_SCHEDULER_CONFIG = {
     'jobstores': {
         'default': {'type': 'sqlalchemy', 'url': 'postgres://127.0.0.1:5432/optimus'},
     },
     'executors': {
         'default': {'type': 'processpool', 'max_workers': 10}
     },
     'job_defaults': {
         'coalesce': True,
         'max_instances': 5,
         'misfire_grace_time': 30
     },
     'timezone': 'Asia/Shanghai'
 }

若是咱们把APS_SCHEDULER_CONFIG做为options传入给一个scheduler,会产生什么结果呢?首先,咱们添加了一个默认(名叫default)的jobstore,它的具体实现类型是sqlalchemy,数据库链接url是指向一个本地postgresql数据库,也就是说添加到这个scheduler的job会默认使用这个jobstore进行存储。其次,咱们添加了一个默认的executor,他是一个多进程实现,也就是说每一个job在运行时,是经过一个进程池来做为worker实际执行的,这个进程池最大size是10。job_defaults参数定义了一些特殊行为:数据库

  • coalesce:当因为某种缘由致使某个job积攒了好几回没有实际运行(好比说系统挂了5分钟后恢复,有一个任务是每分钟跑一次的,按道理说这5分钟内原本是“计划”运行5次的,但实际没有执行),若是coalesce为True,下次这个job被submit给executor时,只会执行1次,也就是最后此次,若是为False,那么会执行5次(不必定,由于还有其余条件,看后面misfire_grace_time的解释)
  • max_instance: 就是说同一个job同一时间最多有几个实例再跑,好比一个耗时10分钟的job,被指定每分钟运行1次,若是咱们max_instance值为5,那么在第6~10分钟上,新的运行实例不会被执行,由于已经有5个实例在跑了
  • misfire_grace_time:设想和上述coalesce相似的场景,若是一个job原本14:00有一次执行,可是因为某种缘由没有被调度上,如今14:01了,这个14:00的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于咱们设置的30秒限制,那么这个运行实例不会被执行。

这里还须要指出的一点是,为何scheduler的配置能够写成这种json形式,而scheduler会正确地找到对应的实现类进行初始化?这里运用了两个技巧:json

entry point

用python egg的机制把各个组件注册了成了entry point,以下所示架构

 [apscheduler.executors]
 asyncio = apscheduler.executors.asyncio:AsyncIOExecutor
 debug = apscheduler.executors.debug:DebugExecutor
 gevent = apscheduler.executors.gevent:GeventExecutor
 processpool = apscheduler.executors.pool:ProcessPoolExecutor
 threadpool = apscheduler.executors.pool:ThreadPoolExecutor
 twisted = apscheduler.executors.twisted:TwistedExecutor

 [apscheduler.jobstores]
 memory = apscheduler.jobstores.memory:MemoryJobStore
 mongodb = apscheduler.jobstores.mongodb:MongoDBJobStore
 redis = apscheduler.jobstores.redis:RedisJobStore
 sqlalchemy = apscheduler.jobstores.sqlalchemy:SQLAlchemyJobStore

 [apscheduler.triggers]
 cron = apscheduler.triggers.cron:CronTrigger
 date = apscheduler.triggers.date:DateTrigger
 interval = apscheduler.triggers.interval:IntervalTrigger

这样,在scheduler模块中就能够用entry point的名称反查出对应组件async

     _trigger_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers'))
     _trigger_classes = {}
     _executor_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.executors'))
     _executor_classes = {}
     _jobstore_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.jobstores'))
     _jobstore_classes = {}
     _stopped = True

从而实现了一个便利的插件机制函数

ref_to_obj

另外经过一个加载函数完成"apscheduler.executors.pool:ThreadPoolExecutor"字符串到ThreadPoolExecutor类对象的查询

 def ref_to_obj(ref):
     """
     Returns the object pointed to by ``ref``.

     :type ref: str
     """

     if not isinstance(ref, six.string_types):
         raise TypeError('References must be strings')
     if ':' not in ref:
         raise ValueError('Invalid reference')

     modulename, rest = ref.split(':', 1)
     try:
         obj = __import__(modulename)
     except ImportError:
         raise LookupError('Error resolving reference %s: could not import module' % ref)

     try:
         for name in modulename.split('.')[1:] + rest.split('.'):
             obj = getattr(obj, name)
         return obj
     except Exception:
         raise LookupError('Error resolving reference %s: error looking up object' % ref)

 

scheduler的主循环(main_loop),其实就是反复检查是否是有到时须要执行的任务,完成一次检查的函数是_process_jobs, 这个函数作这么几件事:

  1. 询问本身的每个jobstore,有没有到期须要执行的任务(jobstore.get_due_jobs())
  2. 若是有,计算这些job中每一个job须要运行的时间点(run_times = job._get_run_times(now))若是run_times有多个,这种状况咱们上面讨论过,有coalesce检查
  3. 提交给executor排期运行(executor.submit_job(job, run_times))

那么在这个_process_jobs的逻辑,何时调用合适呢?若是不间断地调用,而实际上没有要执行的job,是一种浪费。每次掉用_process_jobs后,其实能够预先判断一下,下一次要执行的job(离如今最近的)还要多长时间,做为返回值告诉main_loop, 这时主循环就能够去睡一觉,等大约这么长时间后再唤醒,执行下一次_process_jobs。这里唤醒的机制就会有IO模型的区别了

scheduler因为IO模型的不一样,能够有多种实现,如

  • BlockingScheduler:main_loop就在当前进程的主线程内运行,因此调用start函数后会阻塞当前线程。经过一个threading.Event条件变量对象完成scheduler的定时唤醒。
  • BackgroundScheduler:和BlockingScheduler基本同样,除了main_loop放在了单独线程里,因此调用start后主线程不会阻塞
  • AsyncIOScheduler:使用asyncio做为IO模型的scheduler,和AsyncIOExecutor配合使用,用asynio中event_loop的call_later完成定时唤醒
  • GeventScheduler:和BlockingScheduler基本同样,使用gevent做为IO模型,和GeventExecutor配合使用
  • QtScheduler:使用QTimer完成定时唤醒
  • TornadoScheduler:使用tornado的IO模型,用ioloop.add_timeout完成定时唤醒
  • TwistedScheduler:配合TwistedExecutor,用reactor.callLater完成定时唤醒

JobStore

jobstore提供给scheduler一个序列化jobs的统一抽象,提供对scheduler中job的增删改查接口,根据存储backend的不一样,分如下几种

  • MemoryJobStore:没有序列化,jobs就存在内存里,增删改查也都是在内存中操做
  • SQLAlchemyJobStore:全部sqlalchemy支持的数据库均可以作为backend,增删改查操做转化为对应backend的sql语句
  • MongoDBJobStore:用mongodb做backend
  • RedisJobStore: 用redis做backend

除了MemoryJobStore外,其余几种都使用pickle作序列化工具,因此这里要指出一点,若是你不是在用内存作jobstore,那么必须确保你提供给job的可执行函数必须是能够被全局访问的,也就是能够经过ref_to_obj反查出来的,不然没法序列化。

使用数据库作jobstore,就会发现,其实建立了一张有三个域的的jobs表,分别是id, next_run_time, job_state,其中job_state是job对象pickle序列化后的二进制,而id和next_run_time则是支持job的两类查询(按id和按最近运行时间)

 

Executor

aps把任务最终的执行机制也抽象了出来,能够根据IO模型选配,不须要讲太多,最经常使用的是threadpool和processpoll两种(来自concurrent.futures的线程/进程池)。

不一样类型的executor实现本身的_do_submit_job,完成一次实际的任务实例执行。以线程/进程池实现为例

     def _do_submit_job(self, job, run_times):
         def callback(f):
             exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else
                        (f.exception(), getattr(f.exception(), '__traceback__', None)))
             if exc:
                 self._run_job_error(job.id, exc, tb)
             else:
                 self._run_job_success(job.id, f.result())

         f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
         f.add_done_callback(callback)

Trigger

trigger是抽象出了“一个job是什么时候被触发”这个策略,每种trigger实现本身的get_next_fire_time函数

     @abstractmethod
     def get_next_fire_time(self, previous_fire_time, now):
         """
         Returns the next datetime to fire on, If no such datetime can be calculated, returns ``None``.

         :param datetime.datetime previous_fire_time: the previous time the trigger was fired
         :param datetime.datetime now: current datetime
         """

aps提供的trigger包括:

  • date:一次性指定日期
  • interval:在某个时间范围内间隔多长时间执行一次
  • cron:和unix crontab格式兼容,最为强大

总结

简要介绍了apscheduler类库的组成,强调抽象概念的理解

相关文章
相关标签/搜索