分布式定时任务框架Quartz

前言

项目中总要写那么几个定时任务来处理一些事情。一些简单的定时任务使用Spring自带的定时任务就能完成。可是若是须要大量的定时任务的话要怎么才能统一管理呢?java

本文介绍Quartz分布式调度框架。mysql

介绍

Quartz介绍

Quartz是OpenSymphony开源组织在Job scheduling领域又一个开源项目,是彻底由java开发的一个开源的任务日程管理系统。 目前是 Terracotta 旗下的一个项目。官网地址 http://www.quartz-scheduler.org/ 能够 下载 Quartz 的发布版本及其源代码。git

特色

  • 集成方便(彻底使用Java编写)
  • 无需依赖可集群部署也可单机运行
  • 能够经过JVM独立运行

Job

建立一个任务只须要实现Job接口便可spring

触发器

  • 能够经过 Calendar 执行(排除节假日)
  • 指定某个时间无线循环执行 好比每五分钟执行一次
  • 固定时间执行 例如每周周一上午十点执行

通常状况使用SimpleTrigger,和CronTrigger,这些触发器实现了Trigger接口。或者 ScheduleBuilder 子类 SimpleScheduleBuilder和CronScheduleBuilder。sql

对于简单的时间来讲,好比天天执行几回,使用SimpleTrigger。对于复杂的时间表达式来讲,好比每月15日上午几点几分,使用CronTrigger以及CromExpression 类。数据库

注意 :一个job能够被多个Trigger 绑定,可是一个Trigger只能绑定一个job服务器

存储

有两种存储方式 RAMJobStore和 JDBCJobStore 。并发

RAMJobStore不须要外部数据库调度信息存储在JVM内存中 因此,当应用程序中止运行时,全部调度信息将被丢失存储多少个Job和Trigger也会受到限制。框架

JDBCJobStore 支持集群全部触发器和job都存储在数据库中不管服务器中止和重启均可以恢复任务同时支持事务处理。分布式

实战

准备

上面简单的介绍了一下Quartz,而后如今开始实战,本文使用SpringBoot整合。

项目地址: https://gitee.com/lqlm/toolsL...

首先建立数据库表由于太多了就不房子文章中了能够去官方网站下载,也能够用个人下载地址下载

地址: https://lqcoder.com/quartz.sql

建立完成以后:

Table Name Description
QRTZ_CALENDARS 存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE 存储少许的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS 存储程序的悲观锁的信息
QRTZ_JOB_DETAILS 存储每个已配置的Job的详细信息
QRTZ_JOB_LISTENERS 存储有关已配置的JobListener的信息
QRTZ_SIMPLE_TRIGGERS 存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERS Trigger做为Blob类型存储
QRTZ_TRIGGER_LISTENERS 存储已配置的TriggerListener的信息
QRTZ_TRIGGERS 存储已配置的Trigger的信息

本文统一使用Cron方式来建立。

注意:cron方式须要用到的4张数据表:
qrtz_triggers,qrtz_cron_triggers,qrtz_fired_triggers,qrtz_job_details

整合项目

建立一个SpringBoot项目而后加入quartz依赖,同时也要加入c3p0的依赖由于quartz使用的数据库是和项目分开的。

<!--spring boot集成quartz-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
        <dependency>
            <groupId>c3p0</groupId>
            <artifactId>c3p0</artifactId>
            <version>0.9.0.2</version>
        </dependency>

同时在resources下建立quartz.properties文件内容

org.quartz.scheduler.instanceName = MyScheduler
org.quartz.threadPool.threadCount = 10
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.dataSource = myDS
org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL = jdbc:mysql:数据库地址
org.quartz.dataSource.myDS.user = 数据库帐号
org.quartz.dataSource.myDS.password = 数据库密码
org.quartz.dataSource.myDS.maxConnections = 链接数

而后建立一个Job类

/**
 * @author snluomeng
 * @date 2019/12/19 16:27
 */
@Slf4j
public class MyJob implements Job {

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("==================开始执行任务==================");
    }
}

建立一个工具类而后进行定时任务的增删改

首先建立一个调度工厂

private static SchedulerFactory schedulerFactory = new StdSchedulerFactory();

添加定时任务

public static void addJob(String jobName, String jobGroupName,
                              String triggerName, String triggerGroupName, Class jobClass, String cron) {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            // 任务名,任务组,任务执行类
//            Trigger.TriggerState state = sched.getTriggerState();

            JobDetail jobDetail=  JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();
            // 触发器
            TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
            // 触发器名,触发器组
            triggerBuilder.withIdentity(triggerName, triggerGroupName);
            triggerBuilder.startNow();
            // 触发器时间设定
            triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
            // 建立Trigger对象
            CronTrigger trigger = (CronTrigger) triggerBuilder.build();

            // 调度容器设置JobDetail和Trigger
            sched.scheduleJob(jobDetail, trigger);

            // 启动
            if (!sched.isShutdown()) {
                sched.start();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

建立流程

经过工厂获取 Scheduler对象

Scheduler sched = schedulerFactory.getScheduler();

设置Job的实现类和一些静态信息

//jobClass 设置Job的实现类
  //jobName Job名称
  //jobGroupName Job组名称
JobDetail jobDetail=  JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();

构建触发器

// 触发器
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
 // 触发器名,触发器组
triggerBuilder.withIdentity(triggerName, triggerGroupName);
triggerBuilder.startNow();
// 触发器时间设定
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
// 建立Trigger对象
CronTrigger trigger = (CronTrigger) triggerBuilder.build();

​ 而后把Job和触发器都设置到Scheduler对象中

// 调度容器设置JobDetail和Trigger
sched.scheduleJob(jobDetail, trigger);

启动

// 启动
sched.start();

运行

由于使用的是SpringBoot项目因此就直接在启动类加入添加定时任务

参数分别为:JobName JsobgropName 中间省略 实现类,任务执行时间

QuartUtil.addJob("测试定时任务","test","测试定时任务","testTrigger",MyJob.class,"0/5 * * * * ?");

而后查看输出日志:

能够看到已经在执行了,如今咱们去看一下数据库中的数据要查看的表有qrtz_triggers,qrtz_cron_triggers,qrtz_fired_triggers,qrtz_job_details

qrtz_job_details:

已经存在

如今把项目中止而后在从新启动会发生什么?

发现抛出了异常,由于咱们已经添加过这个定时任务了因此重复添加是行不通的。

这时候咱们直接启动便可。

一样封装启动方法

public static void startJobs() {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            sched.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

启动全部定时任务很是简单直接获取Scheduler对象而后start便可。

修改定时任务

修改定时任务一样须要获取Scheduler对象,和添加流程基本一致,只不过最后不是调用的scheduleJob()而是调用的rescheduleJob()方法.有两种方式都须要指定定时器名称

  • 第一种是调用rescheduleJob()直接修改
  • 第二种是先删除而后在新增
/**
     * @Description: 修改一个任务的触发时间
     *
     * @param jobName
     * @param jobGroupName
     * @param triggerName 触发器名
     * @param triggerGroupName 触发器组名
     * @param cron   时间设置,参考quartz说明文档
     */
    public static void modifyJobTime(String jobName,
                                     String jobGroupName, String triggerName, String triggerGroupName, String cron) {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
            CronTrigger trigger = (CronTrigger) sched.getTrigger(triggerKey);
            if (trigger == null) {
                return;
            }

            String oldTime = trigger.getCronExpression();
            if (!oldTime.equalsIgnoreCase(cron)) {
                /** 方式一 :调用 rescheduleJob 开始 */
                // 触发器
                TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
                // 触发器名,触发器组
                triggerBuilder.withIdentity(triggerName, triggerGroupName);
                triggerBuilder.startNow();
                // 触发器时间设定
                triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
                // 建立Trigger对象
                trigger = (CronTrigger) triggerBuilder.build();
                // 方式一 :修改一个任务的触发时间
                sched.rescheduleJob(triggerKey, trigger);
                /** 方式一 :调用 rescheduleJob 结束 */

                /** 方式二:先删除,而后在建立一个新的Job  */
                //JobDetail jobDetail = sched.getJobDetail(JobKey.jobKey(jobName, jobGroupName));
                //Class<? extends Job> jobClass = jobDetail.getJobClass();
                //removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
                //addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, cron);
                /** 方式二 :先删除,而后在建立一个新的Job */
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

删除任务

删除定时任务在修改的时候已经有实例.注意都须要指定任务名称 任务分组和触发器名称触发器分组

/**
     * @Description: 移除一个任务
     *
     * @param jobName
     * @param jobGroupName
     * @param triggerName
     * @param triggerGroupName
     */
    public static void removeJob(String jobName, String jobGroupName,
                                 String triggerName, String triggerGroupName) {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
            sched.pauseTrigger(triggerKey);// 中止触发器
            sched.unscheduleJob(triggerKey);// 移除触发器
            sched.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 删除任务
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

传递参数

如今还有一个问题就是我想把参数传递到Job实现类里面咋整?

在添加定时任务时,建立JobDetail的时候有一个setJobData()方法参数为JobDataMap,看下JobBuilder源码

能够看到JobBuilder提供了setJobData方法传递的参数为JobDataMap是Map类型.

在建立定时任务的时候能够:

JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("testKey","测试传递参数");
JobDetail jobDetail=  JobBuilder.newJob(jobClass).setJobData(jobDataMap).withIdentity(jobName, jobGroupName).build();

而后在Job实现类方法中直接取

@Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("==================开始执行任务==================");
        log.info("执行任务线程ID{}",Thread.currentThread().getId());
        JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        log.info("参数为{}",jobDataMap.get("testKey"));
    }

JobDataMap能够直接看成Map进行操做.

单个参数可使用usingJobData()来添加,参数为K V 取值方法一致,同时参数也是持久化到数据库的

若是须要查询管理的话能够直接查询数据库

原理解析

上面简单的介绍了一下怎么使用,那么你必定对它是怎么运行的感兴趣.接下来就分析一下Quartz究竟是怎么实现的

注意到上面增删改都要先经过schedulerFactory工厂(工厂模式)来先获取Scheduler实例,如今就从第一步开始分析

本文就简单分析一下Scheduler工厂和添加定时任务这两步骤.

Scheduler工厂

public Scheduler getScheduler() throws SchedulerException {
        //读取quartz配置文件,未指定则顺序遍历各个path下的quartz.properties文件
        if (this.cfg == null) {
            //若是为空就初始化
            this.initialize();
        }
        // 获取调度器池,采用了单例模式
        // 为了不并发getInstance是synchronized加锁的
        SchedulerRepository schedRep = SchedulerRepository.getInstance();
        // 从调度器池中取出当前配置所用的调度器
        Scheduler sched = schedRep.lookup(this.getSchedulerName());
        if (sched != null) {
            if (!sched.isShutdown()) {
                return sched;
            }

            schedRep.remove(this.getSchedulerName());
        }
        // 若是调度器池中没有当前配置的调度器,则实例化一个调度器,主要动做包括:
        // 1)初始化threadPool(线程池):开发者能够经过org.quartz.threadPool.class配置指定使用哪一个线程池类,好比SimpleThreadPool。先class load线程池类,接着动态生成线程池实例bean,而后经过反射,使用setXXX()方法将以org.quartz.threadPool开头的配置内容赋值给bean成员变量;
        // 2)初始化jobStore(任务存储方式):开发者能够经过org.quartz.jobStore.class配置指定使用哪一个任务存储类,好比RAMJobStore。先class load任务存储类,接着动态生成实例bean,而后经过反射,使用setXXX()方法将以org.quartz.jobStore开头的配置内容赋值给bean成员变量;
        // 3)初始化dataSource(数据源):开发者能够经过org.quartz.dataSource配置指定数据源详情,好比哪一个数据库、帐号、密码等。jobStore要指定为JDBCJobStore,dataSource才会有效;
        // 4)初始化其余配置:包括SchedulerPlugins、JobListeners、TriggerListeners等;
        // 5)初始化threadExecutor(线程执行器):默认为DefaultThreadExecutor;
        // 6)建立工做线程:根据配置建立N个工做thread,执行start()启动thread,并将N个thread顺序add进threadPool实例的空闲线程列表availWorkers中;
        // 7)建立调度器线程:建立QuartzSchedulerThread实例,并经过threadExecutor.execute(实例)启动调度器线程;
        // 8)建立调度器:建立StdScheduler实例,将上面全部配置和引用组合进实例中,并将实例存入调度器池中
        sched = instantiate();
        return sched;
    }

添加定时任务

public Date scheduleJob(JobDetail jobDetail,
                            Trigger trigger) throws SchedulerException {
        // 检查调度器是否开启
        validateState();
        //参数校验省略
        if (jobDetail == null) {
            throw new SchedulerException("JobDetail cannot be null");
        }.....
        OperableTrigger trig = (OperableTrigger)trigger;
        //校验触发器参数
        if (trigger.getJobKey() == null) {
            trig.setJobKey(jobDetail.getKey());
        } else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
            throw new SchedulerException(
                    "Trigger does not reference given job!");
        }
        trig.validate();
        Calendar cal = null;
        if (trigger.getCalendarName() != null) {
            cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
        }
        //获取时间
        Date ft = trig.computeFirstFireTime(cal);
        // 把job和trigger注册进调度器的jobStore
        resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
        // 通知job监听者
        notifySchedulerListenersJobAdded(jobDetail);
        // 通知调度器线程
        notifySchedulerThread(trigger.getNextFireTime().getTime());
        // 通知trigger监听者
        notifySchedulerListenersSchduled(trigger);
        return ft;
    }
    public void validateState() throws SchedulerException {
        //若是关闭则抛出异常
        if (isShutdown()) {
            throw new SchedulerException("The Scheduler has been shutdown.");
        }
        // other conditions to check (?)
    }