做为一个优秀的开源调度框架,Quartz 具备如下特色:
强大的调度功能,支持当即调度、定时调度、周期调度、并发调度;
灵活的应用方式,支持job间经过listener实现依赖调度,能够方便的进行调度组合,支持调度数据的多种存储方式;
分布式和集群能力;
做为 Spring 默认的调度框架,Quartz 很容易与 Spring 集成实现灵活可配置的调度功能。算法
对于工做流须要知足如下需求:
支持任务按照顺序进行调度(这是工做流的基本需求)
存在多任务并发调度的状况
存在某一个任务等待多个上游任务都结束才启动调度的状况
任务失败后,依赖该任务的下游结点要中止运行数据库
Quartz的主要组件以下图所示,任务调度三个主要的类是 Scheduler、Trigger、Job。
Scheduler 是执行调度的控制器。
Trigger 是用于定义调度时间的元素,咱们项目没有定时调度的需求,全部调度都选用理解触发就能够了。
Job 表示被调度的任务,Job和Trigger成对传递给Scheduler,当Trigger的条件知足时,它对应的Job就会被Scheduler触发。
Trigger/Job的组合不能实现顺序调度,实现顺序调度须要用到JobListener,JobListener对指定Job进行监听,如上图所示,JobLisener能够捕捉到三个任务触发点.
咱们须要的是在Job已执行完成这个触发点,把下一个Job启动起来。
也有TriggerLisener/SchedulerLisener,触发点和Trigger、Scheduler相关,和咱们的需求关系不大,暂忽略。缓存
为每一个算法任务建立一个Job,任务失败不能启动后续任务,因此在job运行失败的状况下,须要把启动Job的JobLisener删除掉。并发
public class HelloJob implements Job { private String JobName; public HelloJob(String name) { JobName = name; } public void execute(JobExecutionContext context) throws JobExecutionException { /* 获取传递参数 */ JobDataMap jobDataMap = context.getMergedJobDataMap(); /* 从jobDataMap中获取下游JobLisener名称 */ /* 执行spark mlib 做业 */ if (/* 执行失败 */){ /* 删除依赖本任务的JobLisener */ context.getScheduler().getListenerManager().removeJobListener("next_job_lisener"); } /* 当前任务结果写入数据库 */ } }
基于全部的依赖关系,建立JobLisener,并将JobLisener与它依赖的Job绑定,在JobLisener中将下一步的Job启动起来。框架
public class HelloJobListener implements JobListener { private String lisenerName; private JobDetail nextJob; HelloJobListener(String name, JobDetail job){ lisenerName = name; nextJob = job; } public String getName() { return lisenerName; } public void jobWasExecuted(JobExecutionContext inContext,JobExecutionException inException) { /* 建立Trigger */ Trigger trigger = newTrigger() .withIdentity(lisenerName) .startNow() .build(); inContext.getScheduler().scheduleJob(nextJob, trigger); try { /* 拉起下一个Job */ inContext.getScheduler().scheduleJob(nextJob, trigger); } catch (SchedulerException e) { e.printStackTrace(); } } }
当前任务依赖多个上游Job时,试验了AndMatcher,这个方法是对多个条件进行判断的接口,不能进行多上游依赖判断。
须要本身在JobLisener中实现多个依赖是否完成的检查。JobLisener须要知道其它依赖的完成状况,而且在本身完成后更新本身的状态。
全部Job、JobLisener的关系配置好之后,调用scheduler.start()就能够启动整个调度。
后续主线程的任务就是检查工做流是否已经完成。每一个任务结点在任务完成后,会将当前任务结点的的运行结果写入数据库或缓存。
主线程依据上下游依赖关系去数据库中定时检查数据的结果,当全部分支都运行完成或运行失败后,得出算法的整体结果。
为提升更新效率,上一轮检查事后,已经完成的任务记录已查阅标记,下一轮检查从未查阅结点开始检查。分布式