关于我
编程界的一名小小程序猿,目前在一个创业团队任team lead,技术栈涉及Android、Python、Java和Go,这个也是咱们团队的主要技术栈。 联系:hylinux1024@gmail.compython
上一篇介绍了一个简单的Python
调度器的使用,后来我翻阅了一下它的源码,惊奇的发现核心库才一个文件,代码量短短700行不到。这是绝佳的学习材料。
让我喜出望外的是这个库的做者居然就是我最近阅读的一本书《Python Tricks》的做者!如今就让咱们看看大神的实现思路。linux
项目地址git
github.com/dbader/sche…github
将代码checkout
到本地编程
环境json
PyCharm+venv+Python3
小程序
这个在上一篇也介绍过了,很是简单bash
import schedule
# 定义须要执行的方法
def job():
print("a simple scheduler in python.")
# 设置调度的参数,这里是每2秒执行一次
schedule.every(2).seconds.do(job)
if __name__ == '__main__':
while True:
schedule.run_pending()
# 执行结果
a simple scheduler in python.
a simple scheduler in python.
a simple scheduler in python.
...
复制代码
这个库的文档也很详细,能够浏览 schedule.readthedocs.io/ 了解库的大概用法app
(venv) ➜ schedule git:(master) tree -L 2
.
...
├── requirements-dev.txt
├── schedule
│ └── __init__.py
├── setup.py
├── test_schedule.py
├── tox.ini
└── venv
├── bin
├── include
├── lib
├── pip-selfcheck.json
└── pyvenv.cfg
8 directories, 18 files
复制代码
schedule
目录下就一个__init__.py
文件,这是咱们须要重点学习的地方。setup.py
文件是发布项目的配置文件test_schedule.py
是单元测试文件,一开始除了看文档外,也能够从单元测试中入手,了解这个库的使用requirements-dev.txt
开发环境的依赖库文件,若是核心的库是不须要第三方的依赖的,可是单元测试须要venv
是我checkout
后建立的,本来的项目是没有的咱们知道__init__.py
是定义Python
包必需的文件。在这个文件中定义方法、类均可以在使用import
命令时导入到工程项目中,而后使用。dom
如下是schedule
会用到的模块,都是Python
内部的模块。
import collections
import datetime
import functools
import logging
import random
import re
import time
logger = logging.getLogger('schedule')
复制代码
而后定义了一个日志打印工具实例
接着是定义了该模块的3个异常类的结构体系,是由Exception
派生出来的,分别是ScheduleError
、ScheduleValueError
和IntervalError
class ScheduleError(Exception):
"""Base schedule exception"""
pass
class ScheduleValueError(ScheduleError):
"""Base schedule value error"""
pass
class IntervalError(ScheduleValueError):
"""An improper interval was used"""
pass
复制代码
还定义了一个CancelJob
的类,用于取消调度器的继续执行
class CancelJob(object):
""" Can be returned from a job to unschedule itself. """
pass
复制代码
例如在自定义的须要被调度方法中返回这个CancelJob
类就能够实现一次性的任务
# 定义须要执行的方法
def job():
print("a simple scheduler in python.")
# 返回CancelJob能够中止调度器的后续执行
return schedule.CancelJob
复制代码
接着就是这个库的两个核心类Scheduler
和Job
。
class Scheduler(object):
""" Objects instantiated by the :class:`Scheduler <Scheduler>` are factories to create jobs, keep record of scheduled jobs and handle their execution. """
class Job(object):
""" A periodic job as used by :class:`Scheduler`. :param interval: A quantity of a certain time unit :param scheduler: The :class:`Scheduler <Scheduler>` instance that this job will register itself with once it has been fully configured in :meth:`Job.do()`. Every job runs at a given fixed time interval that is defined by: * a :meth:`time unit <Job.second>` * a quantity of `time units` defined by `interval` A job is usually created and returned by :meth:`Scheduler.every` method, which also defines its `interval`. """
复制代码
Scheduler
是调度器的实现类,它负责调度任务(job
)的建立和执行。
Job
则是对须要执行任务的抽象。
这两个类是这个库的核心,后面咱们还会看到详细的分析。
接下来就是默认调度器default_scheduler
和任务列表jobs
的建立。
# The following methods are shortcuts for not having to
# create a Scheduler instance:
#: Default :class:`Scheduler <Scheduler>` object
default_scheduler = Scheduler()
#: Default :class:`Jobs <Job>` list
jobs = default_scheduler.jobs # todo: should this be a copy, e.g. jobs()?
复制代码
在执行import schedule
后,就默认建立了default_scheduler
。而Scheduler
的构造方法为
def __init__(self):
self.jobs = []
复制代码
在执行初始化时,调度器就建立了一个空的任务列表。
在文件的最后定义了一些链式调用的方法,使用起来也是很是人性化的,值得学习。 这里的方法都定义在模块下,并且都是封装了default_scheduler
实例的调用。
def every(interval=1):
"""Calls :meth:`every <Scheduler.every>` on the :data:`default scheduler instance <default_scheduler>`. """
return default_scheduler.every(interval)
def run_pending():
"""Calls :meth:`run_pending <Scheduler.run_pending>` on the :data:`default scheduler instance <default_scheduler>`. """
default_scheduler.run_pending()
def run_all(delay_seconds=0):
"""Calls :meth:`run_all <Scheduler.run_all>` on the :data:`default scheduler instance <default_scheduler>`. """
default_scheduler.run_all(delay_seconds=delay_seconds)
def clear(tag=None):
"""Calls :meth:`clear <Scheduler.clear>` on the :data:`default scheduler instance <default_scheduler>`. """
default_scheduler.clear(tag)
def cancel_job(job):
"""Calls :meth:`cancel_job <Scheduler.cancel_job>` on the :data:`default scheduler instance <default_scheduler>`. """
default_scheduler.cancel_job(job)
def next_run():
"""Calls :meth:`next_run <Scheduler.next_run>` on the :data:`default scheduler instance <default_scheduler>`. """
return default_scheduler.next_run
def idle_seconds():
"""Calls :meth:`idle_seconds <Scheduler.idle_seconds>` on the :data:`default scheduler instance <default_scheduler>`. """
return default_scheduler.idle_seconds
复制代码
咱们看下入口方法run_pending()
,从本文一开头的Demo
能够知道这个是启动调度器的方法。这里它执行了default_scheduler
中的方法。
default_scheduler.run_pending()
复制代码
因此咱们就把目光定位到Scheduler
类的相应方法
def run_pending(self):
""" Run all jobs that are scheduled to run. Please note that it is *intended behavior that run_pending() does not run missed jobs*. For example, if you've registered a job that should run every minute and you only call run_pending() in one hour increments then your job won't be run 60 times in between but only once. """
runnable_jobs = (job for job in self.jobs if job.should_run)
for job in sorted(runnable_jobs):
self._run_job(job)
复制代码
这个方法中首先从jobs
列表将须要执行的任务过滤后放在runnable_jobs
列表,而后将其排序后顺序执行内部的_run_job(job)
方法
def _run_job(self, job):
ret = job.run()
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)
复制代码
在_run_job
方法中就调用了job
类中的run
方法,并根据返回值判断是否须要取消任务。
这时候咱们要看下Job
类的实现逻辑。
首先咱们要看下Job
是何时建立的。仍是从Demo
中的代码入手
schedule.every(2).seconds.do(job)
复制代码
这里先执行了schedule.every()
方法
def every(interval=1):
"""Calls :meth:`every <Scheduler.every>` on the :data:`default scheduler instance <default_scheduler>`. """
return default_scheduler.every(interval)
复制代码
这个方法就是scheduler
类中的every
方法
def every(self, interval=1):
""" Schedule a new periodic job. :param interval: A quantity of a certain time unit :return: An unconfigured :class:`Job <Job>` """
job = Job(interval, self)
return job
复制代码
在这里建立了一个任务job
,并将参数interval
和scheduler
实例传入到构造方法中,最后返回job
实例用于实现链式调用。
跳转到Job
的构造方法
def __init__(self, interval, scheduler=None):
self.interval = interval # pause interval * unit between runs
self.latest = None # upper limit to the interval
self.job_func = None # the job job_func to run
self.unit = None # time units, e.g. 'minutes', 'hours', ...
self.at_time = None # optional time at which this job runs
self.last_run = None # datetime of the last run
self.next_run = None # datetime of the next run
self.period = None # timedelta between runs, only valid for
self.start_day = None # Specific day of the week to start on
self.tags = set() # unique set of tags for the job
self.scheduler = scheduler # scheduler to register with
复制代码
主要初始化了间隔时间配置、须要执行的方法、调度器各类时间单位等。
执行every
方法以后又调用了seconds
这个属性方法
@property
def seconds(self):
self.unit = 'seconds'
return self
复制代码
设置了时间单位,这个设置秒,固然还有其它相似的属性方法minutes
、hours
、days
等等。
最后就是执行了do
方法
def do(self, job_func, *args, **kwargs):
""" Specifies the job_func that should be called every time the job runs. Any additional arguments are passed on to job_func when the job runs. :param job_func: The function to be scheduled :return: The invoked job instance """
self.job_func = functools.partial(job_func, *args, **kwargs)
try:
functools.update_wrapper(self.job_func, job_func)
except AttributeError:
# job_funcs already wrapped by functools.partial won't have
# __name__, __module__ or __doc__ and the update_wrapper()
# call will fail.
pass
self._schedule_next_run()
self.scheduler.jobs.append(self)
return self
复制代码
在这里使用functools
工具的中的偏函数partial
将咱们自定义的方法封装成可调用的对象
而后就调用_schedule_next_run
方法,它主要是对时间的解析,按照时间对job
排序,我以为这个方法是本项目中的技术点,逻辑也是稍微复杂一丢丢,仔细阅读就能够看懂,主要是对时间datetime
的使用。因为篇幅,这里就再也不贴出代码。
这里就完成了任务job
的添加。而后在调用run_pending
方法中就可让任务执行。
schedule
库定义两个核心类Scheduler
和Job
。在导入包时就默认建立一个Scheduler
对象,并初始化任务列表。
schedule
模块提供了链式调用的接口,在配置schedule
参数时,就会建立任务对象job
,并会将job
添加到任务列表中,最后在执行run_pending
方法时,就会调用咱们自定义的方法。 这个库的核心思想是使用面向对象方法,对事物可以准确地抽象,它整体的逻辑并不复杂,是学习源码很不错的范例。