spring boot / cloud (十五) 分布式调度中心进阶

spring boot / cloud (十五) 分布式调度中心进阶

<spring boot / cloud (十) 使用quartz搭建调度中心>这篇文章中介绍了如何在spring boot项目中集成quartz.java

今天这篇文章则会进一步跟你们讨论一下设计和搭建分布式调度中心所须要关注的事情.mysql

下面先看一下,整体的逻辑架构图:git

分布式调度-逻辑架构示意

分布式调度-逻辑架构示意

架构设计

整体思路是,将调度执行两个概念分离开来,造成调度中心执行节点两个模块:web

调度中心

是一个公共的平台,负责全部任务的调度,以及任务的管理,不涉及任何业务逻辑,从上图能够看到,它主要包括以下模块:spring

  • 核心调度器quartz : 调度中心的核心,按照jobDetail和trigger的设定发起做业调度,而且提供底层的管理apisql

  • 管理功能 : 可经过restful和web页面的方式动态的管理做业,触发器的CURD操做,而且实时生效,并且还能够记录调度日志,以及能够以图表,表格,等各类可视化的方式展示调度中心的各个维度的指标信息数据库

  • RmsJob和RmsJobDisallowConcurrent : 基于http远程调用(RMS)的做业和禁止并发执行的做业api

  • Callback : 用于接收"执行节点"异步执行完成后的信息restful

执行节点

是嵌入在各个微服务中的一个执行模块,负责接收调度中心的调度,专一于执行业务逻辑,无需关系调度细节,而且理论上来讲,它主要包括以下模块:架构

  • 同步执行器 : 同步执行而且返回调度中心触发的任务

  • 异步执行器 : 异步执行调度中心触发的任务,而且经过callback将执行结果反馈给调度中心

  • 做业链 : 可任意组合不一样任务的执行顺序和依赖关系,知足更复杂的业务需求

  • 业务bean : 业务逻辑的载体

架构优势

这样一来,调度中心只负责调度,执行节点只负责业务,相互经过http协议进行沟通,两部分能够彻底解耦合,加强系统总体的扩展性

而且引入了异步执行器的概念,这同样一来,调度中心就能以非阻塞的形式触发执行器,能够不受任务业务逻辑带来的性能影响,进一步提升了系统的性能

而后理论上来讲执行节点是不局限于任何的语言或者平台的,而且与调度中心采用的是通用的http协议,真正的能够作到跨平台

特色

集群,高可用,故障转移

总体的解决方案是创建在spring cloud基础上的,依赖于服务发现eureka,可以使全部的服务去中心化,来实现集群和高可用

调度中心的核心依赖于quartz,而quartz是原生支持集群的,它经过将做业和触发器的细节持久化到数据库中,而后在经过db锁的方式,与集群中的各个节点通信,从而实现了去中心化

执行节点调度中心都是注册在eureka上的,经过ribbon的客户端负载均衡的特性,自动屏蔽坏掉的节点,自动发现新增长的节点,可以使双方的http通讯都作到高可用.

以下是quartz集群配置的片断:

#Configure scheduler
org.quartz.scheduler.instanceName=clusterQuartzScheduler #实例名称
org.quartz.scheduler.instanceId=AUTO #自动设定实例ID
org.quartz.scheduler.skipUpdateCheck=true

#Configure JobStore and Cluster
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX #使用jdbc持久化到数据中
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate #sql代理,mysql
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.tablePrefix=QRTZ_ #表前缀
org.quartz.jobStore.isClustered=true #开启集群模式
org.quartz.jobStore.clusterCheckinInterval=20000
org.quartz.jobStore.misfireThreshold=60000

线程池调优

quartz的默认配置,可根据实际状况进行调整.

#Configure ThreadPool
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool #线程池类型
org.quartz.threadPool.threadCount=5 #线程池数量
org.quartz.threadPool.threadPriority=5 #优先级

这里就体现出了分离调度的业务逻辑的好处,在传统的作法中,调度器承载着业务逻辑,必然会占用执行线程更长时间,并发能力受业务逻辑限制.

将业务逻辑分离出去后,而且采用异步任务的方式,调度器触发某个任务后,将当即返回,这时占用执行线程的时间会大幅缩短.

因此在相同的线程池数量下,采用这种架构,是能够大幅度的提升调度中心的并发能力的.

集中化配置管理

一样,整个解决方案也依赖于spring cloud config server.

咱们在系统中抽象出了一系列的元数据用于作系统配置,这些元数据在org.itkk.udf.scheduler.meta包下,你们能够查看,这些元数据基本囊括了全部做业和触发器的属性,经过@ConfigurationProperties特性,可轻松的将这些元数据类转化为配置文件.

而且设计上简化了后续管理api的复杂度,咱们某个做业或者某个触发器的一套属性概括到一个CODE中,而后后续经过这个CODE就能操做所对应的做业或者触发器.

配置片断以下:

#jobGroup
org.itkk.scheduler.properties.jobGroup.general=通用
#triggerGroup
org.itkk.scheduler.properties.triggerGroup.general=通用
#rmsJob
org.itkk.scheduler.properties.jobDetail.rmsJob.name=generalJob
org.itkk.scheduler.properties.jobDetail.rmsJob.group=general
org.itkk.scheduler.properties.jobDetail.rmsJob.className=org.itkk.udf.scheduler.job.RmsJob
org.itkk.scheduler.properties.jobDetail.rmsJob.description=通用做业
org.itkk.scheduler.properties.jobDetail.rmsJob.recovery=false
org.itkk.scheduler.properties.jobDetail.rmsJob.durability=true
org.itkk.scheduler.properties.jobDetail.rmsJob.autoInit=true
#rmsJobDisallowConcurrent
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.name=generalJobDisallowConcurrent
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.group=general
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.className=org.itkk.udf.scheduler.job.RmsJobDisallowConcurrent
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.description=通用做业(禁止并发)
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.recovery=false
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.durability=true
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.autoInit=true
#simpleTrigger
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.jobCode=rmsJob
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.name=testSimpleTrigger
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.group=general
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.intervalInMilliseconds=10000
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.autoInit=true
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.description=测试简单触发器
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.serviceCode=SCH_CLIENT_UDF_SERVICE_A_DEMO
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.beanName=testBean
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.async=true
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.param1=a
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.param2=b
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.param3=123

以上能够看,咱们能够经过properties配置文件设定做业和触发器的任何属性,而且经过如:simpleTrigger这个code,就能随意的经过管理api进行curd操做.

基于rms的JobDetail

从上面的配置能够看到,解决方案中内置了两个默认的jobDetail,一个是rmsJob另外一个是rmsJobDisallowConcurrent.

想要使用它们很简单,为它们配置一个触发器便可,rmsjob经过如下属性来肯定本身将要调用那个任务:

#配置simple或者corn触发器的dataMap属性,而且添加以下值:

#指定要调用那个rms,这里设定的是rmscode,不太清楚的话能够回看第八篇文章
省略.serviceCode=SCH_CLIENT_UDF_SERVICE_A_DEMO 
#指定要调用哪个bean
省略.beanName=testBean 
#是否采用异步方式
省略.async=true 
#业务参数
省略.param1=a 
省略.param2=b
省略.param3=123

以下方式能够在执行节点中定义一个执行器

@Component("testBean")
public class TestSch extends AbstractExecutor {
    @Override
    public void handle(String id, Map<String, Object> jobDataMap) {
        try {
            LOGGER.info("任务执行了------id:{}, jobDataMap:{}", id, jobDataMap);
        } catch (JsonProcessingException e) {
            throw new SchException(e);
        }
    }
}

这样就能为某一个执行器设定触发器,从而作到调度的功能.

而rmsJob是能够并发的触发执行器的.

禁止并发的基于rms的JobDetail

在这个解决方案中禁止并发有两个层次

第一个层次就是默认实现的rmsJobDisallowConcurrent,你们看源码就知道,这个类上标注了@DisallowConcurrentExecution,这个注解的含义是禁止做业并发执行.

在传统的作法中jobdetail中包含了业务逻辑,没有异步的远程操做,因此说在类上标注这个注解能作到禁止并发.

可是如今有了异步任务的概念,触发器触发执行器后当即就返回结束了,若是这个时候,触发器的触发间隔小于执行器的执行时间,那么依然仍是会有任务并发执行的.

这显然是不但愿发生的,既然禁止并发,那么就必定要彻底的作到禁止并发,以下设定保证了这一点:

protected void disallowConcurrentExecute(RmsJobParam rmsJobParam) throws JobExecutionException {
    if (!this.hasRunning(rmsJobParam)) { //没有正在运行的任务才能运行
        this.execute(rmsJobParam);
    } else { //跳过执行,而且记录
        RmsJobResult result = new RmsJobResult();
        result.setId(rmsJobParam.getId());
        result.setStats(RmsJobStats.SKIP.value());
        save(rmsJobParam, result);
    }
}

在禁止并发的异步任务触发前,会校验当前这个任务是否正在执行,若是正在执行的话,跳过而且记录.

异步任务,异步回调

执行节点中的任务便可同步执行也可异步执行,经过配置触发器的async属性来控制的,

同步执行 : 的任务适合执行时间短,执行时间稳定,而且有必要当即知道返回结果的任务

异步执行 : 高并发,高性能的执行方式,没有特别的限制,推荐使用

以下实现片断:

//SchClientController中
public RestResponse<RmsJobResult> execute(@RequestBody RmsJobParam param) {
    //记录来接收时间
    Date receiveTime = new Date();
    //定义返回值
    RmsJobResult result = new RmsJobResult();
    result.setClientReceiveTime(receiveTime);
    result.setId(param.getId());
    result.setClientStartExecuteTime(new Date());
    //执行(区分同步跟异步)
    if (param.getAsync()) {
        schClientHandle.asyncHandle(param, result);
        result.setStats(RmsJobStats.EXECUTING.value());
    } else {
        schClientHandle.handle(param);
        result.setClientEndExecuteTime(new Date());
        result.setStats(RmsJobStats.COMPLETE.value());
    }
    //返回
    return new RestResponse<>(result);
}
//SchClientHandle中
//异步执行
@Async
public void asyncHandle(RmsJobParam param, RmsJobResult result) {
    try {
        //执行
        this.handle(param);
        result.setClientEndExecuteTime(new Date());
        result.setStats(RmsJobStats.COMPLETE.value());
        //回调
        this.callback(result);
    } catch (Exception e) {
        result.setClientEndExecuteTime(new Date());
        result.setStats(RmsJobStats.ERROR.value());
        result.setErrorMsg(ExceptionUtils.getStackTrace(e));
        //回调
        this.callback(result);
        //抛出异常
        log.error("asyncHandle error:", e);
        throw new SchException(e);
    }

}
//同步执行
public void handle(RmsJobParam param) {
    //判断bean是否存在
    if (!applicationContext.containsBean(param.getBeanName())) {
        throw new SchException(param.getBeanName() + " not definition");
    }
    //得到bean
    AbstractExecutor bean = applicationContext.getBean(param.getBeanName(), AbstractExecutor.class);
    //执行
    bean.handle(param);
}
//异步回调(重处理)
@Retryable(maxAttempts = 3, value = Exception.class)
private void callback(RmsJobResult result) {
    log.info("try to callback");
    final String serviceCode = "SCH_CLIENT_CALLBACK_1";
    rms.call(serviceCode, result, null, new ParameterizedTypeReference<RestResponse<String>>() {
    }, null);
}
//回调失败后的处理
@Recover
public void recover(Exception e) {
    log.error("try to callback failed:", e);
}

任务链

在执行器父类中提供以下方法,可在执行节点触发其余执行器:

//调用链 (容许并发,异步调用)
protected String chain(boolean isConcurrent, String parentId, String serviceCode, 
                String beanName, boolean async, Map<String, String> param)

而在执行器中的使用样例:

@Component("testBean")
public class TestSch extends AbstractExecutor {
    @Override
    public void handle(String id, Map<String, Object> jobDataMap) {
        try {
            LOGGER.info("任务执行了------id:{}, jobDataMap:{}", id, xssObjectMapper.writeValueAsString(jobDataMap)); //NOSONAR
            if (!jobDataMap.containsKey(TriggerDataMapKey.PARENT_TRIGGER_ID.value())) {
                LOGGER.info("job链---->"); //NOSONAR
                Map<String, String> param = new HashMap<>();
                param.put("chain1", "1");
                param.put("chain2", "2");
                this.chain(id, "SCH_CLIENT_UDF_SERVICE_A_DEMO", "testBean", param);
            }
        } catch (JsonProcessingException e) {
            throw new SchException(e);
        }
    }
}

这样可使得执行器更加灵活,能够随意组合

管理api

依赖于quartz的底层管理api,咱们能够抽象出一系列restFul的api,目前实现的功能以下:

做业管理 : 保存做业 , 保存做业(覆盖) , 移除做业 , 当即触发做业

触发器管理 : 保存简单触发器 , 保存简单触发器(覆盖) , 保存CRON触发器 , 保存CRON触发器(覆盖) , 删除触发器

计划任务管理 : 清理数据

misfire设定

quartz原生的设定,表示那些错过了触发时间的触发器,后续处理的规则,多是由于 : 服务不可用 , 线程阻塞,线程池耗尽 , 等..

simple触发器

MISFIRE_INSTRUCTION_FIRE_NOW

以当前时间为触发频率当即触发执行

MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT

不触发当即执行
等待下次触发频率周期时刻执行
以总次数-已执行次数做为剩余周期次数,从新计算FinalTime
调整后的FinalTime会略大于根据starttime计算的到的FinalTime值

MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT

不触发当即执行
等待下次触发频率周期时刻,执行至FinalTime的剩余周期次数
保持FinalTime不变,从新计算剩余周期次数(至关于错过的当作已执行)

MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT

以当前时间为触发频率当即触发执行
以总次数-已执行次数做为剩余周期次数,从新计算FinalTime
调整后的FinalTime会略大于根据starttime计算的到的FinalTime值

MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT

以当前时间为触发频率当即触发执行
保持FinalTime不变,从新计算剩余周期次数(至关于错过的当作已执行)

MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY

以错过的第一个频率时间马上开始执行

MISFIRE_INSTRUCTION_SMART_POLICY(默认)

智能根据trigger属性选择策略:
repeatCount为0,则策略同MISFIRE_INSTRUCTION_FIRE_NOW
repeatCount为REPEAT_INDEFINITELY,则策略同MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT
不然策略同MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT

cron触发器

MISFIRE_INSTRUCTION_DO_NOTHING

是什么都不作,继续等下一次预约时间再触发

MISFIRE_INSTRUCTION_FIRE_ONCE_NOW

是当即触发一次,触发后恢复正常的频率

MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY

以错过的第一个频率时间马上开始执行

MISFIRE_INSTRUCTION_SMART_POLICY(默认)

根据建立CronTrigger时选择的MISFIRE_INSTRUCTION_XXX更新CronTrigger的状态。
若是misfire指令设置为MISFIRE_INSTRUCTION_SMART_POLICY,则将使用如下方案:
指令将解释为MISFIRE_INSTRUCTION_FIRE_ONCE_NOW

你们可根据自身状况进行设定

结束

今天跟你们分享了分布式调度的设计思路和想法,因为我的时间问题,这个设计的核心部分虽然已经完成,可是好比web界面,restful api,都尚未完成,后续有空就会把这些东西都弄上去的.

不过整体来讲,把核心的思想讲出来了,也欢迎你们提出意见和建议

代码仓库 (博客配套代码)


想得到最快更新,请关注公众号

想得到最快更新,请关注公众号

相关文章
相关标签/搜索