用Quartz实现工做流

Quartz简介

做为一个优秀的开源调度框架,Quartz 具备如下特色:
强大的调度功能,支持当即调度、定时调度、周期调度、并发调度;
灵活的应用方式,支持job间经过listener实现依赖调度,能够方便的进行调度组合,支持调度数据的多种存储方式;
分布式和集群能力;
做为 Spring 默认的调度框架,Quartz 很容易与 Spring 集成实现灵活可配置的调度功能。算法

需求

对于工做流须要知足如下需求:
支持任务按照顺序进行调度(这是工做流的基本需求)
存在多任务并发调度的状况
存在某一个任务等待多个上游任务都结束才启动调度的状况
任务失败后,依赖该任务的下游结点要中止运行数据库

Quartz的主要组件

Quartz的主要组件以下图所示,任务调度三个主要的类是 Scheduler、Trigger、Job。
Scheduler 是执行调度的控制器。
Trigger 是用于定义调度时间的元素,咱们项目没有定时调度的需求,全部调度都选用理解触发就能够了。
Job 表示被调度的任务,Job和Trigger成对传递给Scheduler,当Trigger的条件知足时,它对应的Job就会被Scheduler触发。
clipboard.png
Trigger/Job的组合不能实现顺序调度,实现顺序调度须要用到JobListener,JobListener对指定Job进行监听,如上图所示,JobLisener能够捕捉到三个任务触发点.
咱们须要的是在Job已执行完成这个触发点,把下一个Job启动起来。
也有TriggerLisener/SchedulerLisener,触发点和Trigger、Scheduler相关,和咱们的需求关系不大,暂忽略。缓存

实现

为每一个算法任务建立一个Job,任务失败不能启动后续任务,因此在job运行失败的状况下,须要把启动Job的JobLisener删除掉。并发

public class HelloJob implements Job {
    private String JobName;
     
    public HelloJob(String name) {
        JobName = name;
    }
 
    public void execute(JobExecutionContext context)  throws JobExecutionException {
 
        /* 获取传递参数 */
        JobDataMap jobDataMap = context.getMergedJobDataMap();
 
       /* 从jobDataMap中获取下游JobLisener名称  */
 
       /* 执行spark mlib 做业 */
 
       if (/* 执行失败 */){
           /* 删除依赖本任务的JobLisener */
           context.getScheduler().getListenerManager().removeJobListener("next_job_lisener");
       }  
  
       /* 当前任务结果写入数据库 */
    }
}

基于全部的依赖关系,建立JobLisener,并将JobLisener与它依赖的Job绑定,在JobLisener中将下一步的Job启动起来。框架

public class HelloJobListener implements JobListener {
    private String lisenerName;
    private JobDetail nextJob;
    HelloJobListener(String name, JobDetail job){
        lisenerName = name;
        nextJob = job;
    }
 
    public String getName() {
        return lisenerName;
    }
 
    public void jobWasExecuted(JobExecutionContext inContext,JobExecutionException inException) {
 
        /* 建立Trigger */
        Trigger trigger =  newTrigger()
                .withIdentity(lisenerName)
                .startNow()
                .build();
 
        inContext.getScheduler().scheduleJob(nextJob, trigger);
 
        try {
            /* 拉起下一个Job */
            inContext.getScheduler().scheduleJob(nextJob, trigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }
}

当前任务依赖多个上游Job时,试验了AndMatcher,这个方法是对多个条件进行判断的接口,不能进行多上游依赖判断。
须要本身在JobLisener中实现多个依赖是否完成的检查。JobLisener须要知道其它依赖的完成状况,而且在本身完成后更新本身的状态。
全部Job、JobLisener的关系配置好之后,调用scheduler.start()就能够启动整个调度。
后续主线程的任务就是检查工做流是否已经完成。每一个任务结点在任务完成后,会将当前任务结点的的运行结果写入数据库或缓存。
主线程依据上下游依赖关系去数据库中定时检查数据的结果,当全部分支都运行完成或运行失败后,得出算法的整体结果。
为提升更新效率,上一轮检查事后,已经完成的任务记录已查阅标记,下一轮检查从未查阅结点开始检查。分布式

相关文章
相关标签/搜索