首先学习一个开源项目,必定要先学习该开源项目的使用方法。该项目的使用方法本文再也不详述。请参考博文: java
http://pinsir.iteye.com/blog/882275 mysql
http://pinsir.iteye.com/blog/882518 git
http://pinsir.iteye.com/blog/882525 sql
经过学习开源项目的使用方法,来掌握开源项目实现的功能特性,了解开源项目中引入的不少概念和术语的含义,这对于后面阅读分析源码有很是大的帮助。 数据库
该项目的源码网上已经很难找到了,给你们分享一下。http://git.oschina.net/ywbrj042/taobao-pamirs-schedule-2.0.3.6 安全
本项目相对很是简单,对于阅读源码来讲也很是简单,它只有一个工程构成,并且工程中也只有一个包构成。若是是相对复杂的项目通常每每由多个工程组成,并且每一个工程里也会有很是多的包,那么咱们阅读源码的时候就必定要先了解各工程的职责和实现的功能,了解各工程之间的关系。 服务器
再选择具体的工程进入阅读分析,进入到具体工程后,则同样要了解工程内各包的职责和实现的功能,还包括包之间的关系。 多线程
掌握了这些内容以后,后期咱们阅读源码的时候才不会迷茫,咱们脑中就掌握了很好的导航结构,什么功能和代码要去何处查找就很是容易了。 运维
一个好的开源项目必定会有一个设计合理的类结构,咱们能够找到相关的类结构图辅助分析源代码。由于没有找到本项目的类设计图,我根据源码画出了本项目的核心类设计图。图中只画出了关键的类属性和方法,去除了不少细节。 分布式
这是淘宝调度的核心组件,从图中能够看出,它聚合了全部的其它类协同对外提供分布式定时调度服务。它的职责有这些:
1.启动及中止定时调度器。
2.暂停及恢复定时调度器。
3.定时向集中的数据配置中心更新当前调度服务器的心跳状态。
4.向数据配置中心获取全部服务器的状态来从新计算任务的分配。这么作的目标是避免集中任务调度中心的单点问题。
5.在每一个批次数据处理完毕后,检查是否有其它处理服务器申请本身把持的任务队列,若是有,则释放给相关处理服务器。
6.若是当前服务器在处理当前任务的时候超时,须要清除当前队列,并释放已经把持的任务。并向控制主动中心报警。
7.定时调度处理器处理任务。启动定时器,根据定时表达式配置,当达到可执行实现,则建立IScheduleProcessor对象并执行任务。
调度管理器工厂。该工厂负责建立TBScheduleManager对象,并负责组装其聚合的其它一些对象。该类的实现是依赖Spring容器环境的,所以使用该类建立
TBScheduleManager必须是在Spring容器环境下的应用。它从Spring容器中得到任务执行器对象。
调度处理器。该处理器的职责是负责执行任务处理,它会调用任务执行器获取任务,启动多线程来处理这些任务。该接口包含的职责有。
1.判断组件是否睡眠。
2.判断分配的数据是否处理完。
3.清除已经分配的数据。
4.中止任务调度。
我的聚的该接口设计的很是不合理,它的核心职责没有体如今它的接口定义上,而是在具体的实现中执行了这些操做,它在对象构造的时候就启动了一个定时任务及多个线程来处理任务。
1.接收一个unix格式的cron表达式。
2.计算下一个符合要求的时间。
任务执行器。该接口的职责有这些:
1.获取任务列表。
2.处理任务。处理任务有两种方式,单个任务处理及多个任务批量处理。
这个接口是提供给业务开发方根据本身的业务实现的。业务查询本身的业务任务队列,将查询到的任务队列中的任务调用处理任务方法处理。
IScheduleConfigCenterClient
这是调度器配置中心接口,提供了对于各类调度配置信息的存取操做。默认提供了基于数据库的配置中心实现,目前支持mysql和oralce数据库的实现。
该组件的职责有这些:
1.基本任务信息的获取、建立、删除等操做。
2.任务运行期信息的清除、查询等操做。
3.服务的任务队列信息查询、释放。
4.任务队列的查询、清除等操做,任务队列数量查询操做。
5.服务器信息的注册、查询、清除过时和注销等操做。
6.从新分配任务队列操做。
7.刷新服务器心跳信息。
其它的类都是该接口的实现类,项目自带的是基于数据库的实现类及辅助类。
IScheduleAlert
调度器报警组件。读运行期间的异常状况进行监控报警,目前提供的功能有:
1.超过10个心跳周期尚未获取到调度队列报警。
2.超过10个心跳周期,尚未进行从新装载操做报警。
/** * 任务类型 */ private String baseTaskType; /** * 向配置中心更新心跳信息的频率 */ private long heartBeatRate = 5*1000;//1分钟 /** * 判断一个服务器死亡的周期。为了安全,至少是心跳周期的两倍以上 */ private long judgeDeadInterval = 1*60*1000;//2分钟 /** * 当没有数据的时候,休眠的时间 * */ private int sleepTimeNoData = 500; /** * 在每次数据处理晚后休眠的时间 */ private int sleepTimeInterval = 0; /** * 任务队列数量 */ private int taskQueueNumber = -1; /** * 每次获取数据的数量 */ private int fetchDataNumber = 500; /** * 在批处理的时候,每次处理的数据量 */ private int executeNumber =1; private int threadNumber = 5; /** * 调度器类型 */ private String processorType="NOTSLEEP" ; /** * 容许执行的开始时间 */ private String permitRunStartTime; /** * 容许执行的开始时间 */ private String permitRunEndTime; /** * 清除过时环境信息的时间间隔,以天为单位 */ private double expireOwnSignInterval = 1; /** * 处理任务的BeanName */ private String dealBeanName; /** * 版本号 */ private long version;
private long id; /** * 任务类型:原始任务类型+"-"+ownSign */ private String taskType; /** * 原始任务类型 */ private String baseTaskType; /** * 环境 */ private String ownSign; /** * 最后一次任务分配的时间 */ private Timestamp lastAssignTime; /** * 最后一次执行任务分配的服务器 */ private String lastAssignUUID; private Timestamp gmtCreate; private Timestamp gmtModified;
/* * 全局惟一编号 */ private String uuid; private long id; /** * 任务类型 */ private String taskType; /** * 原始任务类型 */ private String baseTaskType; private String ownSign; /** * 机器IP地址 */ private String ip; /** * 机器名称 */ private String hostName; /** * 调度服务器远程控制端口 */ int managerPort; String jmxUrl; /** * 数据处理线程数量 */ private int threadNum; /** * 服务开始时间 */ private Timestamp registerTime; /** * 最后一次心跳通知时间 */ private Timestamp heartBeatTime; /** * 处理描述信息,例如读取的任务数量,处理成功的任务数量,处理失败的数量,处理耗时 * FetchDataCount=4430,FetcheDataNum=438570,DealDataSucess=438570,DealDataFail=0,DealSpendTime=651066 */ private String dealInfoDesc; private String nextRunStartTime; private String nextRunEndTime; /** * 配置中心的当前时间 */ private Timestamp centerServerTime; /** * 数据版本号 */ private long version;
任务队列信息。调度管理器启动后会申请分配对应的任务队列,之后该服务器就会只处理它负责的这些队列的任务的处理。
/** * 处理任务类型 */ private String taskType; /** * 原始任务类型 */ private String baseTaskType; /** * 队列的环境标识 */ private String ownSign; /** * 任务队列ID */ private String taskQueueId; /** * 持有当前任务队列的任务处理器 */ private String currentScheduleServer; /** * 正在申请此任务队列的任务处理器 */ private String requestScheduleServer; /** * 数据版本号 */ private long version;