Quartz

2. 目录:

      1、Quartz 基本介绍html

           1.1 Quartz 概述前端

           1.2 Quartz特色java

           1.3 Quartz 集群配置mysql

    2、Quartz 原理及流程redis

           2.1 quartz基本原理spring

           2.2 quartz启动流程sql

   3、Spring + Quartz 实现企业级调度的实现示例shell

          3.1 环境信息数据库

          3.2 相关代码及配置express

    4、问题及解决方案

    5、相关知识

    6、参考资料

     总结

1、Quartz 基本介绍

    1.1 Quartz 概述

           Quartz 是 OpenSymphony 开源组织在任务调度领域的一个开源项目,彻底基于 Java 实现。该项目于 2009 年被 Terracotta 收购,目前是 Terracotta 旗下的一个项目。读者能够到 http://www.quartz-scheduler.org/站点下载 Quartz 的发布版本及其源代码。

   1.2 Quartz特色

       做为一个优秀的开源调度框架,Quartz 具备如下特色:

  1. 强大的调度功能,例如支持丰富多样的调度方法,能够知足各类常规及特殊需求;
  2. 灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式;
  3. 分布式和集群能力,Terracotta 收购后在原来功能基础上做了进一步提高。

      另外,做为 Spring 默认的调度框架,Quartz 很容易与 Spring 集成实现灵活可配置的调度功能。

    quartz调度核心元素

  1. Scheduler:任务调度器,是实际执行任务调度的控制器。在spring中经过SchedulerFactoryBean封装起来。
  2. Trigger:触发器,用于定义任务调度的时间规则,有SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger,其中CronTrigger用的比较多,本文主要介绍这种方式。CronTrigger在spring中封装在CronTriggerFactoryBean中。
  3. Calendar:它是一些日历特定时间点的集合。一个trigger能够包含多个Calendar,以便排除或包含某些时间点。
  4. JobDetail:用来描述Job实现类及其它相关的静态信息,如Job名字、关联监听器等信息。在spring中有JobDetailFactoryBean和 MethodInvokingJobDetailFactoryBean两种实现,若是任务调度只须要执行某个类的某个方法,就能够经过MethodInvokingJobDetailFactoryBean来调用。
  5. Job:是一个接口,只有一个方法void execute(JobExecutionContext context),开发者实现该接口定义运行任务,JobExecutionContext类提供了调度上下文的各类信息。Job运行时的信息保存在JobDataMap实例中。实现Job接口的任务,默认是无状态的,若要将Job设置成有状态的,在quartz中是给实现的Job添加@DisallowConcurrentExecution注解(之前是实现StatefulJob接口,如今已被Deprecated),在与spring结合中能够在spring配置文件的job detail中配置concurrent参数。

  1.3 Quartz 集群配置

    quartz集群是经过数据库表来感知其余的应用的,各个节点之间并无直接的通讯。只有使用持久的JobStore才能完成Quartz集群。
数据库表:之前有12张表,如今只有11张表,如今没有存储listener相关的表,多了QRTZ_SIMPROP_TRIGGERS表:

Table name Description
QRTZ_CALENDARS 存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE 存储少许的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS 存储程序的悲观锁的信息
QRTZ_JOB_DETAILS 存储每个已配置的Job的详细信息
QRTZ_SIMPLE_TRIGGERS 存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERS Trigger做为Blob类型存储
QRTZ_TRIGGERS 存储已配置的Trigger的信息
QRTZ_SIMPROP_TRIGGERS  

QRTZ_LOCKS就是Quartz集群实现同步机制的行锁表,包括如下几个锁:CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS。

  2、Quartz 原理及流程

      2.1 quartz基本原理    

核心元素

    Quartz 任务调度的核心元素是 scheduler, trigger 和 job,其中 trigger 和 job 是任务调度的元数据, scheduler 是实际执行调度的控制器。

在 Quartz 中,trigger 是用于定义调度时间的元素,即按照什么时间规则去执行任务。Quartz 中主要提供了四种类型的 trigger:SimpleTrigger,CronTirgger,DateIntervalTrigger,和 NthIncludedDayTrigger。这四种 trigger 能够知足企业应用中的绝大部分需求。咱们将在企业应用一节中进一步讨论四种 trigger 的功能。

    在 Quartz 中,job 用于表示被调度的任务。主要有两种类型的 job:无状态的(stateless)和有状态的(stateful)。对于同一个 trigger 来讲,有状态的 job 不能被并行执行,只有上一次触发的任务被执行完以后,才能触发下一次执行。Job 主要有两种属性:volatility 和 durability,其中 volatility 表示任务是否被持久化到数据库存储,而 durability 表示在没有 trigger 关联的时候任务是否被保留。二者都是在值为 true 的时候任务被持久化或保留。一个 job 能够被多个 trigger 关联,可是一个 trigger 只能关联一个 job。

    在 Quartz 中, scheduler 由 scheduler 工厂建立:DirectSchedulerFactory 或者 StdSchedulerFactory。 第二种工厂 StdSchedulerFactory 使用较多,由于 DirectSchedulerFactory 使用起来不够方便,须要做许多详细的手工编码设置。 Scheduler 主要有三种:RemoteMBeanScheduler, RemoteScheduler 和 StdScheduler。本文以最经常使用的 StdScheduler 为例讲解。这也是笔者在项目中所使用的 scheduler 类。

Quartz 核心元素之间的关系以下图所示:

        图 1. Quartz 核心元素关系图

        图 1. Quartz 核心元素关系图

线程视图

在 Quartz 中,有两类线程,Scheduler 调度线程和任务执行线程,其中任务执行线程一般使用一个线程池维护一组线程。

        图 2. Quartz 线程视图

        图 2. Quartz 线程视图

Scheduler 调度线程主要有两个: 执行常规调度的线程,和执行 misfired trigger 的线程。常规调度线程轮询存储的全部 trigger,若是有须要触发的 trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该 trigger 关联的任务。Misfire 线程是扫描全部的 trigger,查看是否有 misfired trigger,若是有的话根据 misfire 的策略分别处理。下图描述了这两个线程的基本流程:

        图 3. Quartz 调度线程流程图

        图 3. Quartz 调度线程流程图

关于 misfired trigger,咱们在企业应用一节中将进一步描述。

数据存储

Quartz 中的 trigger 和 job 须要存储下来才能被使用。Quartz 中有两种存储方式:RAMJobStore, JobStoreSupport,其中 RAMJobStore 是将 trigger 和 job 存储在内存中,而 JobStoreSupport 是基于 jdbc 将 trigger 和 job 存储到数据库中。RAMJobStore 的存取速度很是快,可是因为其在系统被中止后全部的数据都会丢失,因此在一般应用中,都是使用 JobStoreSupport。

在 Quartz 中,JobStoreSupport 使用一个驱动代理来操做 trigger 和 job 的数据存储:StdJDBCDelegate。StdJDBCDelegate 实现了大部分基于标准 JDBC 的功能接口,可是对于各类数据库来讲,须要根据其具体实现的特色作某些特殊处理,所以各类数据库须要扩展 StdJDBCDelegate 以实现这些特殊处理。Quartz 已经自带了一些数据库的扩展实现,能够直接使用,以下图所示:

图 4. Quartz 数据库驱动代理

图 4. Quartz 数据库驱动代理

做为嵌入式数据库的表明,Derby 近来很是流行。若是使用 Derby 数据库,可使用上图中的 CloudscapeDelegate 做为 trigger 和 job 数据存储的代理类。

   2.2 quartz启动流程

若quartz是配置在spring中,当服务器启动时,就会装载相关的bean。SchedulerFactoryBean实现了InitializingBean接口,所以在初始化bean的时候,会执行afterPropertiesSet方法,该方法将会调用SchedulerFactory(DirectSchedulerFactory 或者 StdSchedulerFactory,一般用StdSchedulerFactory)建立Scheduler。SchedulerFactory在建立quartzScheduler的过程当中,将会读取配置参数,初始化各个组件,关键组件以下:

  1. ThreadPool:通常是使用SimpleThreadPool,SimpleThreadPool建立了必定数量的WorkerThread实例来使得Job可以在线程中进行处理。WorkerThread是定义在SimpleThreadPool类中的内部类,它实质上就是一个线程。在SimpleThreadPool中有三个list:workers-存放池中全部的线程引用,availWorkers-存放全部空闲的线程,busyWorkers-存放全部工做中的线程;
    线程池的配置参数以下所示:

    1
    2
    3
    org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
    org.quartz.threadPool.threadCount=3
    org.quartz.threadPool.threadPriority=5
  2. JobStore:分为存储在内存的RAMJobStore和存储在数据库的JobStoreSupport(包括JobStoreTX和JobStoreCMT两种实现,JobStoreCMT是依赖于容器来进行事务的管理,而JobStoreTX是本身管理事务),若要使用集群要使用JobStoreSupport的方式;
  3. QuartzSchedulerThread:用来进行任务调度的线程,在初始化的时候paused=true,halted=false,虽然线程开始运行了,可是paused=true,线程会一直等待,直到start方法将paused置为false;

另外,SchedulerFactoryBean还实现了SmartLifeCycle接口,所以初始化完成后,会执行start()方法,该方法将主要会执行如下的几个动做:

  1. 建立ClusterManager线程并启动线程:该线程用来进行集群故障检测和处理,将在下文详细讨论;
  2. 建立MisfireHandler线程并启动线程:该线程用来进行misfire任务的处理,将在下文详细讨论;
  3. 置QuartzSchedulerThread的paused=false,调度线程才真正开始调度;

整个启动流程以下图:
quartz启动时序图

 3、Spring + Quartz 实现企业级调度的实现示例

3.1 环境信息

   此示例中的环境: Spring 4.1.6.RELEASE   + quartz 2.2.1 + Mysql 5.6

3.2 相关代码及配置

   3.2.1 Maven 引入

       

 3.2.2 数据库脚本准备

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for task_schedule_job
-- ----------------------------
DROP TABLE IF EXISTS `task_schedule_job`;
CREATE TABLE `task_schedule_job` (
  `job_id` bigint(20) NOT NULL AUTO_INCREMENT,
  `create_time` timestamp NULL DEFAULT NULL,
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `job_name` varchar(255) DEFAULT NULL,
  `job_group` varchar(255) DEFAULT NULL,
  `job_status` varchar(255) DEFAULT NULL,
  `cron_expression` varchar(255) NOT NULL,
  `description` varchar(255) DEFAULT NULL,
  `bean_class` varchar(255) DEFAULT NULL,
  `is_concurrent` varchar(255) DEFAULT NULL COMMENT '1',
  `spring_id` varchar(255) DEFAULT NULL,
  `method_name` varchar(255) NOT NULL
  PRIMARY KEY (`job_id`),
  UNIQUE KEY `name_group` (`job_name`,`job_group`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;


在Quartz包下docs/dbTables,选择对应的数据库脚本,建立相应的数据库表便可,我用的是mysql5.6,这里有一个须要注意的地方,mysql5.5以前用的表存储引擎是MyISAM,使用的是表级锁,锁发生冲突的几率比较高,并发度低;5.6以后默认的存储引擎为InnoDB,InnoDB采用的锁机制是行级锁,并发度也较高。而quartz集群使用数据库锁的

机制来来实现同一个任务在同一个时刻只被实例执行,因此为了防止冲突,咱们建表的时候要选取InnoDB做为表的存

储引擎。以下:

      

 3.2.3 关键代码及配置

   <1>spring-quartz.xml 配置 在application.xml 文件中引入

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
   http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

    <!-- 注册本地调度任务
    <bean id="localQuartzScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"></bean>-->

    <!-- 注册集群调度任务 -->
    <bean id="schedulerFactoryBean" lazy-init="false" autowire="no"
          class="org.springframework.scheduling.quartz.SchedulerFactoryBean" destroy-method="destroy">
        <!--可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 -->
        <property name="overwriteExistingJobs" value="true" />
        <!--必须的,QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动 -->
        <property name="startupDelay" value="3" />
        <!-- 设置自动启动 -->
        <property name="autoStartup" value="true" />
        <property name="applicationContextSchedulerContextKey" value="applicationContext" />
        <property name="configLocation" value="classpath:quartz.properties" />
    </bean>

</beans>

<2>quartz.properties 文件配置

#==============================================================
#Configure Main Scheduler Properties
#==============================================================
org.quartz.scheduler.instanceName = KuanrfGSQuartzScheduler
org.quartz.scheduler.instanceId = AUTO

#==============================================================
#Configure JobStore
#==============================================================
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 20000
org.quartz.jobStore.dataSource = myDS
org.quartz.jobStore.maxMisfiresToHandleAtATime = 1
org.quartz.jobStore.misfireThreshold = 120000
org.quartz.jobStore.txIsolationLevelSerializable = false

#==============================================================
#Configure DataSource
#==============================================================
org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL = 你的数据连接
org.quartz.dataSource.myDS.user = 用户名
org.quartz.dataSource.myDS.password = 密码
org.quartz.dataSource.myDS.maxConnections = 30
org.quartz.jobStore.selectWithLockSQL = SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE

#==============================================================
#Configure ThreadPool
#==============================================================
org.quartz.threadPool.class= org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount= 10
org.quartz.threadPool.threadPriority= 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread= true

#==============================================================
#Skip Check Update
#update:true
#not update:false
#==============================================================
org.quartz.scheduler.skipUpdateCheck = true

#============================================================================
# Configure Plugins
#============================================================================
org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin
org.quartz.plugin.shutdownhook.class = org.quartz.plugins.management.ShutdownHookPlugin
org.quartz.plugin.shutdownhook.cleanShutdown = true

<3>关键代码

package com.netease.ad.omp.service.sys;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

import com.netease.ad.omp.common.utils.SpringUtils;
import com.netease.ad.omp.dao.sys.mapper.ScheduleJobMapper;
import com.netease.ad.omp.entity.sys.ScheduleJob;
import com.netease.ad.omp.quartz.job.JobUtils;
import com.netease.ad.omp.quartz.job.MyDetailQuartzJobBean;
import com.netease.ad.omp.quartz.job.QuartzJobFactory;
import com.netease.ad.omp.quartz.job.QuartzJobFactoryDisallowConcurrentExecution;
import org.apache.log4j.Logger;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;


/**
 * 计划任务管理
 */
@Service
public class JobTaskService {
   public final Logger log = Logger.getLogger(this.getClass());
   @Autowired
   private SchedulerFactoryBean schedulerFactoryBean;

   @Autowired
   private ScheduleJobMapper scheduleJobMapper;


   /**
    * 从数据库中取 区别于getAllJob
    * 
    * @return
    */
   public List<ScheduleJob> getAllTask() {
      return scheduleJobMapper.select(null);
   }

   /**
    * 添加到数据库中 区别于addJob
    */
   public void addTask(ScheduleJob job) {
      job.setCreateTime(new Date());
      scheduleJobMapper.insertSelective(job);
   }

   /**
    * 从数据库中查询job
    */
   public ScheduleJob getTaskById(Long jobId) {
      return scheduleJobMapper.selectByPrimaryKey(jobId);
   }

   /**
    * 更改任务状态
    * 
    * @throws SchedulerException
    */
   public void changeStatus(Long jobId, String cmd) throws SchedulerException {
      ScheduleJob job = getTaskById(jobId);
      if (job == null) {
         return;
      }
      if ("stop".equals(cmd)) {
         deleteJob(job);
         job.setJobStatus(JobUtils.STATUS_NOT_RUNNING);
      } else if ("start".equals(cmd)) {
         job.setJobStatus(JobUtils.STATUS_RUNNING);
         addJob(job);
      }
      scheduleJobMapper.updateByPrimaryKeySelective(job);
   }

   /**
    * 更改任务 cron表达式
    * 
    * @throws SchedulerException
    */
   public void updateCron(Long jobId, String cron) throws SchedulerException {
      ScheduleJob job = getTaskById(jobId);
      if (job == null) {
         return;
      }
      job.setCronExpression(cron);
      if (JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) {
         updateJobCron(job);
      }
      scheduleJobMapper.updateByPrimaryKeySelective(job);
   }

   /**
    * 添加任务
    * 
    * @throws SchedulerException
    */
   public void addJob(ScheduleJob job) throws SchedulerException {
      if (job == null || !JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) {
         return;
      }
      Scheduler scheduler = schedulerFactoryBean.getScheduler();
      log.debug(scheduler + ".......................................................................................add");
      TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());

      CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);

      // 不存在,建立一个
      if (null == trigger) {
         Class clazz = JobUtils.CONCURRENT_IS.equals(job.getIsConcurrent()) ? QuartzJobFactory.class : QuartzJobFactoryDisallowConcurrentExecution.class;
         JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()).build();

         jobDetail.getJobDataMap().put("scheduleJob", job);

         CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());

         trigger = TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup()).withSchedule(scheduleBuilder).build();

         scheduler.scheduleJob(jobDetail, trigger);
      } else {
         // Trigger已存在,那么更新相应的定时设置
         CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());

         // 按新的cronExpression表达式从新构建trigger
         trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();

         // 按新的trigger从新设置job执行
         scheduler.rescheduleJob(triggerKey, trigger);
      }
   }

   @PostConstruct
   public void init() throws Exception {

      // 这里获取任务信息数据
      List<ScheduleJob> jobList = scheduleJobMapper.select(null);
   
      for (ScheduleJob job : jobList) {
         addJob(job);
      }
   }

   /**
    * 获取全部计划中的任务列表
    * 
    * @return
    * @throws SchedulerException
    */
   public List<ScheduleJob> getAllJob() throws SchedulerException {
      Scheduler scheduler = schedulerFactoryBean.getScheduler();
      GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
      Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
      List<ScheduleJob> jobList = new ArrayList<ScheduleJob>();
      for (JobKey jobKey : jobKeys) {
         List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
         for (Trigger trigger : triggers) {
            ScheduleJob job = new ScheduleJob();
            job.setJobName(jobKey.getName());
            job.setJobGroup(jobKey.getGroup());
            job.setDescription("触发器:" + trigger.getKey());
            Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
            job.setJobStatus(triggerState.name());
            if (trigger instanceof CronTrigger) {
               CronTrigger cronTrigger = (CronTrigger) trigger;
               String cronExpression = cronTrigger.getCronExpression();
               job.setCronExpression(cronExpression);
            }
            jobList.add(job);
         }
      }
      return jobList;
   }

   /**
    * 全部正在运行的job
    * 
    * @return
    * @throws SchedulerException
    */
   public List<ScheduleJob> getRunningJob() throws SchedulerException {
      Scheduler scheduler = schedulerFactoryBean.getScheduler();
      List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
      List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(executingJobs.size());
      for (JobExecutionContext executingJob : executingJobs) {
         ScheduleJob job = new ScheduleJob();
         JobDetail jobDetail = executingJob.getJobDetail();
         JobKey jobKey = jobDetail.getKey();
         Trigger trigger = executingJob.getTrigger();
         job.setJobName(jobKey.getName());
         job.setJobGroup(jobKey.getGroup());
         job.setDescription("触发器:" + trigger.getKey());
         Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
         job.setJobStatus(triggerState.name());
         if (trigger instanceof CronTrigger) {
            CronTrigger cronTrigger = (CronTrigger) trigger;
            String cronExpression = cronTrigger.getCronExpression();
            job.setCronExpression(cronExpression);
         }
         jobList.add(job);
      }
      return jobList;
   }

   /**
    * 暂停一个job
    * 
    * @param scheduleJob
    * @throws SchedulerException
    */
   public void pauseJob(ScheduleJob scheduleJob) throws SchedulerException {
      Scheduler scheduler = schedulerFactoryBean.getScheduler();
      JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
      scheduler.pauseJob(jobKey);
   }

   /**
    * 恢复一个job
    * 
    * @param scheduleJob
    * @throws SchedulerException
    */
   public void resumeJob(ScheduleJob scheduleJob) throws SchedulerException {
      Scheduler scheduler = schedulerFactoryBean.getScheduler();
      JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
      scheduler.resumeJob(jobKey);
   }

   /**
    * 删除一个job
    * 
    * @param scheduleJob
    * @throws SchedulerException
    */
   public void deleteJob(ScheduleJob scheduleJob) throws SchedulerException {
      Scheduler scheduler = schedulerFactoryBean.getScheduler();
      JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
      scheduler.deleteJob(jobKey);

   }

   /**
    * 当即执行job
    * 
    * @param scheduleJob
    * @throws SchedulerException
    */
   public void runAJobNow(ScheduleJob scheduleJob) throws SchedulerException {
      Scheduler scheduler = schedulerFactoryBean.getScheduler();
      JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
      scheduler.triggerJob(jobKey);
   }

   /**
    * 更新job时间表达式
    * 
    * @param scheduleJob
    * @throws SchedulerException
    */
   public void updateJobCron(ScheduleJob scheduleJob) throws SchedulerException {
      Scheduler scheduler = schedulerFactoryBean.getScheduler();

      TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());

      CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);

      CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression());

      trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();

      scheduler.rescheduleJob(triggerKey, trigger);
   }

   public static void main(String[] args) {
      CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("xxxxx");
   }
}
package com.netease.ad.omp.quartz.job;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import com.netease.ad.omp.common.utils.SpringUtils;
import com.netease.ad.omp.entity.sys.ScheduleJob;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.quartz.JobExecutionContext;
import org.springframework.context.ApplicationContext;

/**
 * Created with IntelliJ IDEA
 * ProjectName: omp
 * Author:  bjsonghongxu
 * CreateTime : 15:58
 * Email: bjsonghongxu@crop.netease.com
 * Class Description:
 *   定时任务工具类
 */
public class JobUtils {
    public final static Logger log = Logger.getLogger(JobUtils.class);
    public static final String STATUS_RUNNING = "1"; //启动状态
    public static final String STATUS_NOT_RUNNING = "0"; //未启动状态
    public static final String CONCURRENT_IS = "1";
    public static final String CONCURRENT_NOT = "0";

    private ApplicationContext ctx;

    /**
     * 经过反射调用scheduleJob中定义的方法
     *
     * @param scheduleJob
     */
    public static void invokMethod(ScheduleJob scheduleJob,JobExecutionContext context) {
        Object object = null;
        Class clazz = null;
        if (StringUtils.isNotBlank(scheduleJob.getSpringId())) {
            object = SpringUtils.getBean(scheduleJob.getSpringId());
        } else if (StringUtils.isNotBlank(scheduleJob.getBeanClass())) {
            try {
                clazz = Class.forName(scheduleJob.getBeanClass());
                object = clazz.newInstance();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
        if (object == null) {
            log.error("任务名称 = [" + scheduleJob.getJobName() + "]---------------未启动成功,请检查是否配置正确!!!");
            return;
        }
        clazz = object.getClass();
        Method method = null;
        try {
            method = clazz.getMethod(scheduleJob.getMethodName(), new Class[] {JobExecutionContext.class});
        } catch (NoSuchMethodException e) {
            log.error("任务名称 = [" + scheduleJob.getJobName() + "]---------------未启动成功,方法名设置错误!!!");
        } catch (SecurityException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        if (method != null) {
            try {
                method.invoke(object, new Object[] {context});
            } catch (IllegalAccessException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IllegalArgumentException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        log.info("任务名称 = [" + scheduleJob.getJobName() + "]----------启动成功");
    }
}
package com.netease.ad.omp.quartz.job;

import com.netease.ad.omp.entity.sys.ScheduleJob;
import org.apache.log4j.Logger;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;


/**
 * 
 * @Description: 计划任务执行处 无状态
 *  Spring调度任务 (重写 quartz 的 QuartzJobBean 类缘由是在使用 quartz+spring 把 quartz 的 task 实例化进入数据库时,会产生: serializable 的错误)
 */
public class QuartzJobFactory implements Job {
   public final Logger log = Logger.getLogger(this.getClass());

   @Override
   public void execute(JobExecutionContext context) throws JobExecutionException {
      ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob");
      JobUtils.invokMethod(scheduleJob,context);
   }
}
package com.netease.ad.omp.quartz.job;

import com.netease.ad.omp.entity.sys.ScheduleJob;
import org.apache.log4j.Logger;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;


/**
 * 
 * @Description: 若一个方法一次执行不完下次轮转时则等待该方法执行完后才执行下一次操做
 *  Spring调度任务 (重写 quartz 的 QuartzJobBean 类缘由是在使用 quartz+spring 把 quartz 的 task 实例化进入数据库时,会产生: serializable 的错误)
 */
@DisallowConcurrentExecution
public class QuartzJobFactoryDisallowConcurrentExecution implements Job {
   public final Logger log = Logger.getLogger(this.getClass());

   @Override
   public void execute(JobExecutionContext context) throws JobExecutionException {
      ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob");
      JobUtils.invokMethod(scheduleJob,context);

   }
}

 

package com.netease.ad.omp.entity.sys;

import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
import java.util.Date;

/**
 * Created with IntelliJ IDEA
 * ProjectName: omp
 * Author:  bjsonghongxu
 * CreateTime : 15:48
 * Email: bjsonghongxu@crop.netease.com
 * Class Description:
 *   计划任务信息
 */
@Table(name = "task_schedule_job")
public class ScheduleJob implements Serializable {
    @Id
    private Long jobId;

    private Date createTime;

    private Date updateTime;
    /**
     * 任务名称
     */
    private String jobName;
    /**
     * 任务分组
     */
    private String jobGroup;
    /**
     * 任务状态 是否启动任务
     */
    private String jobStatus;
    /**
     * cron表达式
     */
    private String cronExpression;
    /**
     * 描述
     */
    private String description;
    /**
     * 任务执行时调用哪一个类的方法 包名+类名
     */
    private String beanClass;
    /**
     * 任务是否有状态
     */
    private String isConcurrent;
    /**
     * spring bean
     */
    private String springId;
    /**
     * 任务调用的方法名
     */
    private String methodName;

    public Long getJobId() {
        return jobId;
    }

    public void setJobId(Long jobId) {
        this.jobId = jobId;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }

    public String getJobName() {
        return jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public String getJobGroup() {
        return jobGroup;
    }

    public void setJobGroup(String jobGroup) {
        this.jobGroup = jobGroup;
    }

    public String getJobStatus() {
        return jobStatus;
    }

    public void setJobStatus(String jobStatus) {
        this.jobStatus = jobStatus;
    }

    public String getCronExpression() {
        return cronExpression;
    }

    public void setCronExpression(String cronExpression) {
        this.cronExpression = cronExpression;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public String getBeanClass() {
        return beanClass;
    }

    public void setBeanClass(String beanClass) {
        this.beanClass = beanClass;
    }

    public String getIsConcurrent() {
        return isConcurrent;
    }

    public void setIsConcurrent(String isConcurrent) {
        this.isConcurrent = isConcurrent;
    }

    public String getSpringId() {
        return springId;
    }

    public void setSpringId(String springId) {
        this.springId = springId;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }
}
package com.netease.ad.omp.common.utils;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;

public final class SpringUtils implements BeanFactoryPostProcessor {

   private static ConfigurableListableBeanFactory beanFactory; // Spring应用上下文环境

   @Override
   public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
      SpringUtils.beanFactory = beanFactory;
   }

   /**
    * 获取对象
    * 
    * @param name
    * @return Object 一个以所给名字注册的bean的实例
    * @throws BeansException
    * 
    */
   @SuppressWarnings("unchecked")
   public static <T> T getBean(String name) throws BeansException {
      return (T) beanFactory.getBean(name);
   }

   /**
    * 获取类型为requiredType的对象
    * 
    * @param clz
    * @return
    * @throws BeansException
    * 
    */
   public static <T> T getBean(Class<T> clz) throws BeansException {
      @SuppressWarnings("unchecked")
      T result = (T) beanFactory.getBean(clz);
      return result;
   }

   /**
    * 若是BeanFactory包含一个与所给名称匹配的bean定义,则返回true
    * 
    * @param name
    * @return boolean
    */
   public static boolean containsBean(String name) {
      return beanFactory.containsBean(name);
   }

   /**
    * 判断以给定名字注册的bean定义是一个singleton仍是一个prototype。
    * 若是与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
    * 
    * @param name
    * @return boolean
    * @throws NoSuchBeanDefinitionException
    * 
    */
   public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
      return beanFactory.isSingleton(name);
   }

   /**
    * @param name
    * @return Class 注册对象的类型
    * @throws NoSuchBeanDefinitionException
    * 
    */
   public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
      return beanFactory.getType(name);
   }

   /**
    * 若是给定的bean名字在bean定义中有别名,则返回这些别名
    * 
    * @param name
    * @return
    * @throws NoSuchBeanDefinitionException
    * 
    */
   public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
      return beanFactory.getAliases(name);
   }

}

 

至于前端本身画个简单的界面便可使用了。

4、问题及解决方案

    4.1quartz mysql 死锁问题

      quartz文档提到,若是在集群环境下,最好将配置项org.quartz.jobStore.txIsolationLevelSerializable设置为true

问题:

这个选项在mysql下会很是容易出现死锁问题。

2014-12-29 09:55:28.006 [QuartzScheduler_clusterQuartzSchedular-BJ-YQ-64.2491419487774923_ClusterManager] ERROR o.q.impl.jdbcjobstore.JobStoreTX [U][] - ClusterManager: Error managing cluster: Failure updating scheduler state when checking-in: Deadlock found when trying to get lock; try restarting transaction

这个选项存在乎义:

quartz须要提高隔离级别来保障本身的运做,不过,因为各数据库实现的隔离级别定义都不同,因此quartz提供一个设置序列化这样的隔离级别存在,由于例如oracle中是没有未提交读和可重复读这样的隔离级别存在。可是因为mysql默认的是可重复读,比提交读高了一个级别,因此已经能够知足quartz集群的正常运行。

5、相关知识

 5.一、QuartzSchedulerThread线程

线程的主要逻辑代码以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
while (!halted.get()) {
  int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
  triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());
  long triggerTime = triggers.get(0).getNextFireTime().getTime();
  long timeUntilTrigger = triggerTime - now;
  while(timeUntilTrigger > 2) {
    now = System.currentTimeMillis();
    timeUntilTrigger = triggerTime - now;
  }
  List<TriggerFiredResult> bndle = qsRsrcs.getJobStore().triggersFired(triggers);
  for(int i = 0;i < res.size();i++){
    JobRunShell shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
    shell.initialize(qs);
    qsRsrcs.getThreadPool().runInThread(shell);
  }
}
  1. 先获取线程池中的可用线程数量(若没有可用的会阻塞,直到有可用的);
  2. 获取30m内要执行的trigger(即acquireNextTriggers):
    获取trigger的锁,经过select …for update方式实现;获取30m内(可配置)要执行的triggers(须要保证集群节点的时间一致),若@ConcurrentExectionDisallowed且列表存在该条trigger则跳过,不然更新trigger状态为ACQUIRED(刚开始为WAITING);插入firedTrigger表,状态为ACQUIRED;(注意:在RAMJobStore中,有个timeTriggers,排序方式是按触发时间nextFireTime排的;JobStoreSupport从数据库取出triggers时是按照nextFireTime排序);
  3. 等待直到获取的trigger中最早执行的trigger在2ms内;
  4. triggersFired:
    1)更新firedTrigger的status=EXECUTING;
    2)更新trigger下一次触发的时间;
    3)更新trigger的状态:无状态的trigger->WAITING,有状态的trigger->BLOCKED,若nextFireTime==null ->COMPLETE;
    4) commit connection,释放锁;
  5. 针对每一个要执行的trigger,建立JobRunShell,并放入线程池执行:
    1)execute:执行job
    2)获取TRIGGER_ACCESS锁
    3)如果有状态的job:更新trigger状态:BLOCKED->WAITING,PAUSED_BLOCKED->BLOCKED
    4)若@PersistJobDataAfterExecution,则updateJobData
    5)删除firedTrigger
    6)commit connection,释放锁

线程执行流程以下图所示:
QuartzSchedulerThread时序图QuartzSchedulerThread时序图

任务调度执行过程当中,trigger的状态变化以下图所示:
该图来自参考文献5该图来自参考文献5

5.2.misfireHandler线程

下面这些缘由可能形成 misfired job:

  1. 系统由于某些缘由被重启。在系统关闭到从新启动之间的一段时间里,可能有些任务会被 misfire;
  2. Trigger 被暂停(suspend)的一段时间里,有些任务可能会被 misfire;
  3. 线程池中全部线程都被占用,致使任务没法被触发执行,形成 misfire;
  4. 有状态任务在下次触发时间到达时,上次执行尚未结束;为了处理 misfired job,Quartz 中为 trigger 定义了处理策略,主要有下面两种:MISFIRE_INSTRUCTION_FIRE_ONCE_NOW:针对 misfired job 立刻执行一次;MISFIRE_INSTRUCTION_DO_NOTHING:忽略 misfired job,等待下次触发;默认是MISFIRE_INSTRUCTION_SMART_POLICY,该策略在CronTrigger中=MISFIRE_INSTRUCTION_FIRE_ONCE_NOW线程默认1分钟执行一次;在一个事务中,默认一次最多recovery 20个;

执行流程:

  1. 若配置(默认为true,可配置)成获取锁前先检查是否有须要recovery的trigger,先获取misfireCount;
  2. 获取TRIGGER_ACCESS锁;
  3. hasMisfiredTriggersInState:获取misfired的trigger,默认一个事务里只能最大20个misfired trigger(可配置),misfired判断依据:status=waiting,next_fire_time < current_time-misfirethreshold(可配置,默认1min)
  4. notifyTriggerListenersMisfired
  5. updateAfterMisfire:获取misfire策略(默认是MISFIRE_INSTRUCTION_SMART_POLICY,该策略在CronTrigger中=MISFIRE_INSTRUCTION_FIRE_ONCE_NOW),根据策略更新nextFireTime;
  6. 将nextFireTime等更新到trigger表;
  7. commit connection,释放锁8.若是还有更多的misfired,sleep短暂时间(为了集群负载均衡),不然sleep misfirethreshold时间,后继续轮询;

misfireHandler线程执行流程以下图所示:
misfireHandler线程时序图misfireHandler线程时序图

5.3.clusterManager线程

初始化:
failedInstance=failed+self+firedTrigger表中的schedulerName在scheduler_state表中找不到的(孤儿)

线程执行:
每一个服务器会定时(org.quartz.jobStore.clusterCheckinInterval这个时间)更新SCHEDULER_STATE表的LAST_CHECKIN_TIME,若这个字段远远超出了该更新的时间,则认为该服务器实例挂了;
注意:每一个服务器实例有惟一的id,若配置为AUTO,则为hostname+current_time

线程执行的具体流程:

  1. 检查是否有超时的实例failedInstances;
  2. 更新该服务器实例的LAST_CHECKIN_TIME;
    如有超时的实例:
  3. 获取STATE_ACCESS锁;
  4. 获取超时的实例failedInstances;
  5. 获取TRIGGER_ACCESS锁;
  6. clusterRecover:
  • 针对每一个failedInstances,经过instanceId获取每一个实例的firedTriggers;
  • 针对每一个firedTrigger:
    1) 更新trigger状态:
    BLOCKED->WAITING
    PAUSED_BLOCKED->PAUSED
    ACQUIRED->WAITING
    2) 若firedTrigger不是ACQUIRED状态(在执行状态),且jobRequestRecovery=true:
    建立一个SimpleTrigger,存储到trigger表,status=waiting,MISFIRE_INSTR=MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY.
    3) 删除firedTrigger

clusterManager线程执行时序图以下图所示:
clusterRecover线程时序图

5.4源码分析锁

目前代码中行锁只用到了STATE_ACCESS 和TRIGGER_ACCESS 这两种。 

一、TRIGGER_ACCESS
先了解一篇文章,经过源码来分析quartz是如何经过加锁来实现集群环境,触发器状态的一致性。 
http://www.360doc.com/content/14/0926/08/15077656_412418636.shtml
能够看到触发器的操做主要用主线程StdScheduleThread来完成,无论是获取须要触发的30S内的触发器,仍是触发过程。select和update触发器表时
都会先加锁,后解锁。若是数据库资源竞争比较大的话,锁会影响整个性能。能够考虑将任务信息放在分布式内存,如redis上进行处理。数据库只是定时从redis上load数据下来作统计。

实现都在JobStoreSupport类 

加锁类型 加锁方法 底层数据库操做 备注
executeInNonManagedTXLock acquireNextTrigger selectTriggerToAcquire
selectTrigger
selectJobDetail
insertFiredTrigger
查询须要点火的trigger
选择须要执行的trigger加入到fired_trigger表
for执行 triggerFired selectJobDetail
selectCalendar
updateFiredTrigger
triggerExists updateTrigger
点火trigger
修改trigger状态为可执行状态。
recoverJobs updateTriggerStatesFromOtherStates
hasMisfiredTriggersInState doUpdateOfMisfiredTrigger
selectTriggersForRecoveringJobs
selectTriggersInState
deleteFiredTriggers
非集群环境下从新执行
failed与misfired的trigger
retryExecuteInNonManagedTXLock releaseAcquiredTrigger updateTriggerStateFromOtherState
deleteFiredTrigger
异常状况下从新释放trigger到初使状态。
triggeredJobComplete selectTriggerStatus
removeTrigger   updateTriggerState
deleteFiredTrigger
触发JOB任务完成后的处理。
obtainLock recoverMisfiredJobs hasMisfiredTriggersInState doUpdateOfMisfiredTrigger 从新执行misfired的trigger
能够在启动时执行,也能够由misfired线程按期执行。
clusterRecover selectInstancesFiredTriggerRecords
updateTriggerStatesForJobFromOtherState
storeTrigger
deleteFiredTriggers
selectFiredTriggerRecords
removeTrigger
deleteSchedulerState
集群有结点faied,让JOB能从新执行。
executeInLock
数据库集群里等同于
executeInNonManagedTXLock
storeJobAndTrigger updateJobDetail insertJobDetail
triggerExists
selectJobDetail
updateTrigger insertTrigger
保存JOB和TRIGGER配置
storeJob   保存JOB
removeJob   删除JOB
removeJobs   批量删除JOB
removeTriggers   批量删除triggers
storeJobsAndTriggers   保存JOB和多个trigger配置
removeTrigger   删除trigger
replaceTrigger   替换trigger
storeCalendar   保存定时日期
removeCalendar   删除定时日期
clearAllSchedulingData   清除全部定时数据
pauseTrigger   中止触发器
pauseJob   中止任务
pauseJobs   批量中止任务
resumeTrigger   恢复触发器
resumeJob   恢复任务
resumeJobs   批量恢复任务
pauseTriggers   批量中止触发器
resumeTriggers   批量恢复触发器
pauseAll   中止全部
resumeAll   恢复全部

二、STATE_TRIGGER
实现都在JobStoreSupport类 

加锁类型 加锁方法 底层数据库操做 备注
obtainLock doCheckin clusterCheckIn 判断集群状态
先用LOCK_STATE_ACCESS锁集群状态
再用LOCK_TRIGGER_ACCESS恢复集群运行
     

    6、参考资料

    

  1. Quartz Documentation http://quartz-scheduler.org/documentation
  2. spring javadoc-api http://docs.spring.io/spring/docs/4.3.0.BUILD-SNAPSHOT/javadoc-api/
  3. 基于Quartz开发企业级任务调度应用 https://www.ibm.com/developerworks/cn/opensource/os-cn-quartz/
  4. quartz应用与集群原理分析 http://tech.meituan.com/mt-crm-quartz.html
  5. quartz详解2:quartz由浅入深 http://ecmcug.itpub.net/11627468/viewspace-1763498/
  6. quartz详解4:quartz线程管理 http://blog.itpub.net/11627468/viewspace-1766967/
  7. quartz学习笔记 http://www.cnblogs.com/yunxuange/archive/2012/08/28/2660141.html
  8. quartz集群调度机制调研及源码分析 http://demo.netfoucs.com/gklifg/article/details/27090179

 

     总结

       经过这段时间对quartz资料的整理和结合工做中的运用,深刻理解了quartz这一优秀的调度框架。在技术这条路上,作技术千万不要浅尝辄止,必定要深刻的去理解所用的东西,才会使本身的能力提高,此外,一些系统的知识分析对本身和他人都是十分有益的。




相关文章
相关标签/搜索