第四十章:基于SpringBoot & Quartz完成定时任务分布式多节点负载持久化

在上一章【第三十九章:基于SpringBoot & Quartz完成定时任务分布式单节点持久化】中咱们已经完成了任务的持久化,当咱们建立一个任务时任务会被quartz定时任务框架自动持久化到数据库,咱们采用的是SpringBoot项目托管的dataSource来完成的数据源提供,固然也可使用quartz内部配置数据源方式,咱们的标题既然是提到了定时任务的分布式多节点,那么怎么才算是多节点呢?当有节点故障或者手动中止运行后是否能够自动漂移任务到可用的分布式节点呢?node

本章目标

  1. 完成定时任务分布式多节点配置,当单个节点关闭时其余节点自动接管定时任务。
  2. 建立任务时传递自定义参数,方便任务处理后续业务逻辑。

构建项目

注意:咱们本章项目须要结合上一章共同完成,有一点要注意的是任务在持久化到数据库内时会保存任务的全路径,如:com.hengyu.chapter39.timers.GoodStockCheckTimerquartz在运行任务时会根据任务全路径去执行,若是不一致则会提示找不到指定类,咱们本章在建立项目时package须要跟上一章彻底一致。git

咱们这里就不去直接建立新项目了,直接复制上一章项目的源码为新的项目命名为Chapter40spring

配置分布式

在上一章配置文件quartz.properties中咱们其实已经为分布式作好了相关配置,下面咱们就来看一下分布式相关的配置。
分布式相关配置:数据库

1. org.quartz.scheduler.instanceId : 定时任务的实例编号,若是手动指定须要保证每一个节点的惟一性,由于quartz不容许出现两个相同instanceId的节点,咱们这里指定为Auto就能够了,咱们把生成编号的任务交给quartzbash

2. org.quartz.jobStore.isClustered: 这个属性才是真正的开启了定时任务的分布式配置,当咱们配置为truequartz框架就会调用ClusterManager来初始化分布式节点。并发

3. org.quartz.jobStore.clusterCheckinInterval:配置了分布式节点的检查时间间隔,单位:毫秒。
下面是quartz.properties配置文件配置信息:app

#调度器实例名称
org.quartz.scheduler.instanceName = quartzScheduler

#调度器实例编号自动生成
org.quartz.scheduler.instanceId = AUTO

#持久化方式配置
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

#持久化方式配置数据驱动,MySQL数据库
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate

#quartz相关数据表前缀名
org.quartz.jobStore.tablePrefix = QRTZ_

#开启分布式部署
org.quartz.jobStore.isClustered = true
#配置是否使用
org.quartz.jobStore.useProperties = false

#分布式节点有效性检查时间间隔,单位:毫秒
org.quartz.jobStore.clusterCheckinInterval = 10000

#线程池实现类
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool

#执行最大并发线程数量
org.quartz.threadPool.threadCount = 10

#线程优先级
org.quartz.threadPool.threadPriority = 5

#配置为守护线程,设置后任务将不会执行
#org.quartz.threadPool.makeThreadsDaemons=true

#配置是否启动自动加载数据库内的定时任务,默认true
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true复制代码

当咱们启动任务节点时,会根据org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread属性配置进行是否自动加载任务,默认true自动加载数据库内的任务到节点。框架

测试分布式

上一章项目节点名称:quartz-cluster-node-first
本章项目节点名称:quartz-cluster-node-seconddom

因为咱们quartz-cluster-node-first的商品库存检查定时任务是每隔30秒执行一次,因此任务除非手动清除不然是不会被清空的,在运行项目测试以前须要将application.yml配置文件的端口号、项目名称修改下,保证quartz-cluster-node-secondquartz-cluster-node-first端口号不一致,能够同时运行,修改后为:分布式

spring:
    application:
        name: quzrtz-cluster-node-second
server:
  port: 8082复制代码

而后修改相应控制台输出,为了可以区分任务执行者是具体的节点。

Chapter40Application启动类修改日志输出:
logger.info("【【【【【【定时任务分布式节点 - quartz-cluster-node-second 已启动】】】】】】");

GoodAddTimer商品添加任务类修改日志输出:
logger.info("分布式节点quartz-cluster-node-second,商品添加完成后执行任务,任务时间:{}",new Date());

GoodStockCheckTimer商品库存检查任务类修改日志输出:
logger.info("分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:{}",new Date());复制代码

下面咱们启动本章项目,查看控制台输出内容,以下所示:

2017-11-12 10:28:39.969  INFO 11048 --- [           main] c.hengyu.chapter39.Chapter40Application  : 【【【【【【定时任务分布式节点 - quartz-cluster-node-second 已启动】】】】】】
2017-11-12 10:28:41.930  INFO 11048 --- [lerFactoryBean]] o.s.s.quartz.SchedulerFactoryBean        : Starting Quartz Scheduler now, after delay of 2 seconds
2017-11-12 10:28:41.959  INFO 11048 --- [lerFactoryBean]] org.quartz.core.QuartzScheduler          : Scheduler schedulerFactoryBean_$_yuqiyu1510453719308 started.
2017-11-12 10:28:51.963  INFO 11048 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: detected 1 failed or restarted instances.
2017-11-12 10:28:51.963  INFO 11048 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: Scanning for instance "yuqiyu1510450938654"'s failed in-progress jobs. 2017-11-12 10:28:51.967 INFO 11048 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore : ClusterManager: ......Freed 1 acquired trigger(s). 2017-11-12 10:29:00.024 INFO 11048 --- [ryBean_Worker-1] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 10:29:00 CST 2017复制代码

能够看到项目启动完成后自动分配的instanceIdyuqiyu1510450938654,生成的规则是当前用户的名称+时间戳。而后ClusterManager分布式管理者自动介入进行扫描是否存在匹配的触发器任务,若是存在则会自动执行任务逻辑,而商品库存检查定时任务确实由quartz-cluster-node-second进行输出的。

测试任务自动漂移

下面咱们也须要把quartz-cluster-node-first的输出进行修改,以下所示:

Chapter39Application启动类修改日志输出:
logger.info("【【【【【【定时任务分布式节点 - quartz-cluster-node-first 已启动】】】】】】");

GoodAddTimer商品添加任务类修改日志输出:
logger.info("分布式节点quartz-cluster-node-first,商品添加完成后执行任务,任务时间:{}",new Date());

GoodStockCheckTimer商品库存检查任务类修改日志输出:
logger.info("分布式节点quartz-cluster-node-first,执行库存检查定时任务,执行时间:{}",new Date());复制代码

接下来咱们启动quartz-cluster-node-first,并查看控制台的输出内容:

2017-11-12 10:34:09.750  INFO 192 --- [           main] c.hengyu.chapter39.Chapter39Application  : 【【【【【【定时任务分布式节点 - quartz-cluster-node-first 已启动】】】】】】
2017-11-12 10:34:11.690  INFO 192 --- [lerFactoryBean]] o.s.s.quartz.SchedulerFactoryBean        : Starting Quartz Scheduler now, after delay of 2 seconds
2017-11-12 10:34:11.714  INFO 192 --- [lerFactoryBean]] org.quartz.core.QuartzScheduler          : Scheduler schedulerFactoryBean_$_yuqiyu1510454049066 started.复制代码

项目启动完成后,定时节点并无实例化ClusterManager来完成分布式节点的初始化,由于quartz检测到有其余的节点正在处理任务,这样也是保证了任务执行的惟一性。

关闭quartz-cluster-node-second

咱们关闭quartz-cluster-node-second运行的项目,预计的目的能够达到quartz-cluster-node-first会自动接管数据库内的任务,完成任务执行的自动漂移,咱们来查看quartz-cluster-node-first的控制台输出内容:

2017-11-12 10:34:09.750  INFO 192 --- [           main] c.hengyu.chapter39.Chapter39Application  : 【【【【【【定时任务分布式节点 - quartz-cluster-node-first 已启动】】】】】】
2017-11-12 10:34:11.690  INFO 192 --- [lerFactoryBean]] o.s.s.quartz.SchedulerFactoryBean        : Starting Quartz Scheduler now, after delay of 2 seconds
2017-11-12 10:34:11.714  INFO 192 --- [lerFactoryBean]] org.quartz.core.QuartzScheduler          : Scheduler schedulerFactoryBean_$_yuqiyu1510454049066 started.
2017-11-12 10:41:11.793  INFO 192 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: detected 1 failed or restarted instances.
2017-11-12 10:41:11.793  INFO 192 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: Scanning for instance "yuqiyu1510453719308"'s failed in-progress jobs. 2017-11-12 10:41:11.797 INFO 192 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore : ClusterManager: ......Freed 1 acquired trigger(s). 2017-11-12 10:41:11.834 INFO 192 --- [ryBean_Worker-1] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-first,执行库存检查定时任务,执行时间:Sun Nov 12 10:41:11 CST 2017复制代码

控制台已经输出了持久的定时任务,输出节点是quartz-cluster-node-first,跟咱们预计的同样,节点quartz-cluster-node-first完成了自动接管quartz-cluster-node-second的工做,而这个过程确定有一段时间间隔,而这个间隔能够修改quartz.properties配置文件内的属性org.quartz.jobStore.clusterCheckinInterval进行调节。

关闭quartz-cluster-node-first

咱们一样能够测试启动任务节点quartz-cluster-node-second后,再关闭quartz-cluster-node-first任务节点,查看quartz-cluster-node-second控制台的输出内容:

2017-11-12 10:53:31.010  INFO 3268 --- [           main] c.hengyu.chapter39.Chapter40Application  : 【【【【【【定时任务分布式节点 - quartz-cluster-node-second 已启动】】】】】】
2017-11-12 10:53:32.967  INFO 3268 --- [lerFactoryBean]] o.s.s.quartz.SchedulerFactoryBean        : Starting Quartz Scheduler now, after delay of 2 seconds
2017-11-12 10:53:32.992  INFO 3268 --- [lerFactoryBean]] org.quartz.core.QuartzScheduler          : Scheduler schedulerFactoryBean_$_yuqiyu1510455210493 started.
2017-11-12 10:53:52.999  INFO 3268 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: detected 1 failed or restarted instances.
2017-11-12 10:53:52.999  INFO 3268 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: Scanning for instance "yuqiyu1510454049066"'s failed in-progress jobs. 2017-11-12 10:53:53.003 INFO 3268 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore : ClusterManager: ......Freed 1 acquired trigger(s). 2017-11-12 10:54:00.020 INFO 3268 --- [ryBean_Worker-1] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 10:54:00 CST 2017复制代码

获得的结果是一样能够完成任务的自动漂移。

若是两个节点同时启动,哪一个节点先把节点信息注册到数据库就得到了优先执行权。

传递参数到任务

咱们平时在使用任务时,若是是针对性比较强的业务逻辑,确定须要特定的参数来完成业务逻辑的实现。

下面咱们来模拟商品秒杀的场景,当咱们添加商品后自动添加一个商品提早五分钟的秒杀提醒,为关注该商品的用户发送提醒消息。
咱们在节点quartz-cluster-node-first中添加秒杀提醒任务,以下所示:

package com.hengyu.chapter39.timers;

import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean;

/**
 * 商品秒杀提醒定时器
 * 为关注该秒杀商品的用户进行推送提醒
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/12
 * Time:9:23
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
public class GoodSecKillRemindTimer
extends QuartzJobBean
{
    /**
     * logback
     */
    private Logger logger = LoggerFactory.getLogger(GoodSecKillRemindTimer.class);

    /**
     * 任务指定逻辑
     * @param jobExecutionContext
     * @throws JobExecutionException
     */
    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        //获取任务详情内的数据集合
        JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        //获取商品编号
        Long goodId = dataMap.getLong("goodId");

        logger.info("分布式节点quartz-cluster-node-first,开始处理秒杀商品:{},关注用户推送消息.",goodId);

        //.../
    }
}复制代码

在秒杀提醒任务逻辑中,咱们经过获取JobDetailJobDataMap集合来获取在建立任务的时候传递的参数集合,咱们这里约定了goodId为商品的编号,在建立任务的时候传递到JobDataMap内,这样quartz在执行该任务的时候就会自动将参数传递到任务逻辑中,咱们也就能够经过JobDataMap获取到对应的参数值。

设置秒杀提醒任务

咱们找到节点项目quartz-cluster-node-first中的GoodInfoService,编写方法buildGoodSecKillRemindTimer设置秒杀提醒任务,以下所示:

/**
     * 构建商品秒杀提醒定时任务
     * 设置五分钟后执行
     * @throws Exception
     */
    public void buildGoodSecKillRemindTimer(Long goodId) throws Exception
    {
        //任务名称
        String name = UUID.randomUUID().toString();
        //任务所属分组
        String group = GoodSecKillRemindTimer.class.getName();
        //秒杀开始时间
        long startTime = System.currentTimeMillis() + 1000 * 5 * 60;
        JobDetail jobDetail = JobBuilder
                .newJob(GoodSecKillRemindTimer.class)
                .withIdentity(name,group)
                .build();

        //设置任务传递商品编号参数
        jobDetail.getJobDataMap().put("goodId",goodId);

        //建立任务触发器
        Trigger trigger = TriggerBuilder.newTrigger().withIdentity(name,group).startAt(new Date(startTime)).build();
        //将触发器与任务绑定到调度器内
        scheduler.scheduleJob(jobDetail,trigger);
    }复制代码

咱们模拟秒杀提醒时间是添加商品后的5分钟,咱们经过调用jobDetail实例的getJobDataMap方法就能够获取该任务数据集合,直接调用put方法就能够进行设置指定key的值,该集合继承了StringKeyDirtyFlagMap而且实现了Serializable序列化,由于须要将数据序列化到数据库的qrtz_job_details表内。
修改保存商品方法,添加调用秒杀提醒任务:

/**
     * 保存商品基本信息
     * @param good 商品实例
     * @return
     */
    public Long saveGood(GoodInfoEntity good) throws Exception
    {
        goodInfoRepository.save(good);
        //构建建立商品定时任务
        buildCreateGoodTimer();
        //构建商品库存定时任务
        buildGoodStockTimer();
        //构建商品描述提醒定时任务
        buildGoodSecKillRemindTimer(good.getId());
        return good.getId();
    }复制代码

添加测试商品

下面咱们调用节点quartz-cluster-node-first的测试Chapter39ApplicationTests.addGood方法完成商品的添加,因为咱们的quartz-cluster-node-second项目并无中止,因此咱们在quartz-cluster-node-second项目的控制台查看输出内容:

2017-11-12 11:45:00.008  INFO 11652 --- [ryBean_Worker-5] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:45:00 CST 2017
2017-11-12 11:45:30.013  INFO 11652 --- [ryBean_Worker-6] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:45:30 CST 2017
2017-11-12 11:45:48.230  INFO 11652 --- [ryBean_Worker-7] c.hengyu.chapter39.timers.GoodAddTimer   : 分布式节点quartz-cluster-node-second,商品添加完成后执行任务,任务时间:Sun Nov 12 11:45:48 CST 2017
2017-11-12 11:46:00.008  INFO 11652 --- [ryBean_Worker-8] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:46:00 CST 2017
2017-11-12 11:46:30.016  INFO 11652 --- [ryBean_Worker-9] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:46:30 CST 2017
2017-11-12 11:47:00.011  INFO 11652 --- [yBean_Worker-10] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:47:00 CST 2017
2017-11-12 11:47:30.028  INFO 11652 --- [ryBean_Worker-1] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:47:30 CST 2017
2017-11-12 11:48:00.014  INFO 11652 --- [ryBean_Worker-2] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:48:00 CST 2017
2017-11-12 11:48:30.013  INFO 11652 --- [ryBean_Worker-3] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:48:30 CST 2017
2017-11-12 11:49:00.010  INFO 11652 --- [ryBean_Worker-4] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:49:00 CST 2017
2017-11-12 11:49:30.028  INFO 11652 --- [ryBean_Worker-5] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:49:30 CST 2017
2017-11-12 11:49:48.290  INFO 11652 --- [ryBean_Worker-6] c.h.c.timers.GoodSecKillRemindTimer      : 分布式节点quartz-cluster-node-second,开始处理秒杀商品:15,关注用户推送消息.
2017-11-12 11:50:00.008  INFO 11652 --- [ryBean_Worker-7] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:50:00 CST 2017复制代码

秒杀任务在添加完成商品后的五分钟开始执行的,而咱们也正常的输出了传递过去的goodId商品编号的参数,而秒杀提醒任务执行一次后也被自动释放了。

总结

本章主要是结合上一章完成了分布式任务的讲解,完成了测试持久化的定时任务自动漂移,以及如何向定时任务传递参数。固然在实际的开发过程当中,任务建立是须要进行封装的,目的也是为了减小一些冗余代码以及方面后期统一维护定时任务。

本章源码已经上传到码云:
SpringBoot配套源码地址:gitee.com/hengboy/spr…
SpringCloud配套源码地址:gitee.com/hengboy/spr…
SpringBoot相关系列文章请访问:目录:SpringBoot学习目录
QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录
SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录
SpringBoot相关文章请访问:目录:SpringBoot学习目录,感谢阅读!
欢迎加入QQ技术交流群,共同进步。

QQ技术交流群
相关文章
相关标签/搜索