1、简介java
Quartz是一个优秀的调度框架,彻底基于Java实现。具备如下几大特色:mysql
(1)强大的调度功能,例如支持丰富多样的调度方法,能够知足各类常规及特殊需求;git
(2)灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式;github
(3)分布式和集群能力,Terracotta收购后在原来功能基础上做了进一步提高。算法
核心概念sql
Job 表示一个工做,要执行的具体内容。此接口中只有一个方法,以下:数据库
void execute(JobExecutionContext context)
JobDetail 表示一个具体的可执行的调度程序,Job 是这个可执行程调度程序所要执行的内容,另外 JobDetail 还包含了这个任务调度的方案和策略。
服务器
Trigger 触发器,指定什么时候触发任务。
架构
Scheduler 表明一个调度容器,一个调度容器中能够注册多个 JobDetail 和 Trigger。当 Trigger 与 JobDetail 组合,就能够被 Scheduler 容器调度了。 并发
Quartz线程视图
在Quartz中,有两类线程,Scheduler调度线程和任务执行线程,其中任务执行线程一般使用一个线程池维护一组线程。
Scheduler调度线程主要有两个:执行常规调度的线程,和执行misfiredtrigger的线程。常规调度线程轮询存储的全部trigger,若是有须要触发的trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该trigger关联的任务。Misfire线程是扫描全部的trigger,查看是否有misfiredtrigger,若是有的话根据misfire的策略分别处理(fire now OR wait for the next fire)。
Quartz Job数据存储
Quartz中的trigger和job须要存储下来才能被使用。Quartz中有两种存储方式:RAMJobStore,JobStoreSupport,其中RAMJobStore是将trigger和job存储在内存中,而JobStoreSupport是基于jdbc将trigger和job存储到数据库中。RAMJobStore的存取速度很是快,可是因为其在系统被中止后全部的数据都会丢失,因此在集群应用中,必须使用JobStoreSupport
2、Quartz集群架构
一个Quartz集群中的每一个节点是一个独立的Quartz应用,它又管理着其余的节点。这就意味着你必须对每一个节点分别启动或中止。Quartz集群中,独立的Quartz节点并不与另外一其的节点或是管理节点通讯,而是经过相同的数据库表来感知到另外一Quartz应用的。
数据库准备
由于Quzrtz集群依赖于数据库,因此必须先建立数据库表,数据表示官方提供的,我用的是quartz2.3.0版本,有11张表,以下:
表信息介绍
qrtz_blob_triggers : 以Blob 类型存储的触发器。
qrtz_calendars存储Quartz的Calendar信息
qrtz_cron_triggers存储CronTrigger,包括Cron表达式和时区信息
qrtz_fired_triggers存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
qrtz_job_details存储每个已配置的Job的详细信息
qrtz_locks存储程序的悲观锁的信息
qrtz_paused_trigger_grps存储已暂停的Trigger组的信息
qrtz_scheduler_state存储少许的有关Scheduler的状态信息,和别的Scheduler实例
qrtz_simple_triggers存储简单的Trigger,包括重复次数、间隔、以及已触的次数
qrtz_simprop_triggers 存储CalendarIntervalTrigger和DailyTimeIntervalTrigger两种类型的触发器
qrtz_triggers存储已配置的Trigger的信息
qrtz_locks就是Quartz集群实现同步机制的行锁表,包括如下几个锁:CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS
从故障实例中恢复Job
当一个Sheduler实例在执行某个Job时失败了,有可能由另外一正常工做的Scheduler实例接过这个Job从新运行。要实现这种行为,配置给JobDetail对象的Job可恢复属性必须设置为true(job.setRequestsRecovery(true))。若是可恢复属性被设置为false(默认为false),当某个Scheduler在运行该job失败时,它将不会从新运行;而是由另外一个Scheduler实例在下一次触发时间触发。Scheduler实例出现故障后多快能被侦测到取决于每一个Scheduler的检入间隔(即2.3中提到的org.quartz.jobStore.clusterCheckinInterval)。
测试项目及配置
项目代码下载:https://github.com/feixiameiruhua/my-quartz-cluster.git
quartz.properties文件
# 固定前缀org.quartz # 主要分为scheduler、threadPool、jobStore、plugin等部分 # # #调度器实例编号自动生成 org.quartz.scheduler.instanceId = AUTO #调度器实例名称 org.quartz.scheduler.instanceName = DefaultQuartzScheduler org.quartz.scheduler.rmi.export = false org.quartz.scheduler.rmi.proxy = false org.quartz.scheduler.wrapJobExecutionInUserTransaction = false # 实例化ThreadPool时,使用的线程类为SimpleThreadPool org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool # threadCount和threadPriority将以setter的形式注入ThreadPool实例 # 并发个数 org.quartz.threadPool.threadCount = 5 # 优先级 org.quartz.threadPool.threadPriority = 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true org.quartz.jobStore.misfireThreshold = 5000 # 默认存储在内存中 #org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore #持久化 org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX #开启分布式部署 org.quartz.jobStore.isClustered = true org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.dataSource = qzDS org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver org.quartz.dataSource.qzDS.URL = jdbc:mysql://localhost:3306/quartz_test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true org.quartz.dataSource.qzDS.user = root org.quartz.dataSource.qzDS.password = 123456 org.quartz.dataSource.qzDS.maxConnections = 10
配置文件说明
org.quartz.jobStore.isClustered = true
在集群中的每个实例都必须有一个惟一的"instance id" ("org.quartz.scheduler.instanceId" 属性), 默认为AUTO就能够。还要有相同的"scheduler instance name" ("org.quartz.scheduler.instanceName"),也就是说集群中的每个实例都必须使用相同的quartz.properties 配置文件。
调度任务代码
package com.fwmagic.quartz.schedule; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; import java.util.ResourceBundle; public class SchedulerExecJob implements Job { private static Logger logger = LoggerFactory.getLogger(SchedulerExecJob.class); @Override public void execute(JobExecutionContext context) throws JobExecutionException { String jobName = context.getJobDetail().getKey().getName(); switch (jobName) { /*每5s执行一次*/ case "quartz_test1": System.err.println(getAddress()+" "+getDate()+"====>quartz_test1<===="); break; /*每5s执行一次*/ case "quartz_test2": System.err.println(getAddress()+" "+getDate()+"====>quartz_test2<===="); break; /*每5s执行一次*/ case "quartz_test3": System.err.println(getAddress()+" "+getDate()+"====>quartz_test3<===="); break; default: System.err.println(getAddress()+" "+getDate()+"====>other task<===="); break; } } public static String getDate(){ return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); } public static String getAddress(){ return "http://localhost:"+ResourceBundle.getBundle("application").getString("server.port"); } }
2、集群任务测试
项目中有三个任务,我用不一样端口(三个服务同时启动,端口不同,分别为:9099(ServerA),,9098(ServerB),9097(ServerC))在本机启动同一个项目,效果图以下:
当开启ServerA时,三个任务都在9099端口执行,控制台信息为:
开启ServerB后,有部分任务分配过来:
开启ServerC,集群所有启动的状况下,会有任务交错执行或者任务在同一台机器上执行的效果:
注意事项
Quartz实际并不关心你是在相同仍是不一样的机器上运行节点。当集群放置在不一样的机器上时,称之为水平集群。节点跑在同一台机器上时,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来讲是没法接受的,由于一旦机器崩溃了,全部的节点也就被终止了。对于水平集群,存在着时间同步问题。
节点用时间戳来通知其余实例它本身的最后检入时间。假如节点的时钟被设置为未来的时间,那么运行中的Scheduler将再也意识不到那个结点已经宕掉了。另外一方面,若是某个节点的时钟被设置为过去的时间,也许另外一节点就会认定那个节点已宕掉并试图接过它的Job重运行。最简单的同步计算机时钟的方式是使用某一个Internet时间服务器(Internet Time Server ITS)。
节点争抢Job问题
由于Quartz使用了一个随机的负载均衡算法, Job以随机的方式由不一样的实例执行。Quartz官网上提到当前,还不存在一个方法来指派(钉住) 一个 Job 到集群中特定的节点。
3、集群源码分析
Quartz如何保证多个节点的应用只进行一次调度(即某一时刻的调度任务只由其中一台服务器执行)?,正如Quartz集群架构上的那副图,
Quartz的集群是在同一个数据库下, 由数据库的数据来肯定调度任务是否正在执行, 正在执行则其余服务器就不能去执行该行调度数据。 这个跟不少项目是用Zookeeper作集群不同, 这些项目是靠Zookeeper选举出来的的服务器去执行, 能够理解为Quartz靠数据库选举一个服务器来执行。
Quartz最主要的一个类QuartzSchedulerThread职责是触发任务, 是一个不断运行的Quartz主线程, 仍是从这里入手了解集群原理。
QuartzSchedulerThread继承自Thread,实现了run方法,在run方法中调用了以下几个重要的方法,都进行了加锁的操做:
一、qsRsrcs.getJobStore().acquireNextTriggers【查找即将触发的Trigger】
二、sigLock.wait(timeUntilTrigger)【等待执行】
三、qsRsrcs.getJobStore().triggersFired(triggers)【执行】
四、qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)) 【释放Trigger】
以acquireNextTriggers为例,能够看到:
protected static final String LOCK_TRIGGER_ACCESS = "TRIGGER_ACCESS";//数据库锁名字
将锁名传入核心的加锁方法(executeInNonManagedTXLock)中:
protected <T> T executeInNonManagedTXLock( String lockName, TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException { boolean transOwner = false; Connection conn = null; try { if (lockName != null) { // If we aren't using db locks, then delay getting DB connection // until after acquiring the lock since it isn't needed. if (getLockHandler().requiresConnection()) { conn = getNonManagedTXConnection(); } //获取锁 transOwner = getLockHandler().obtainLock(conn, lockName); } if (conn == null) { conn = getNonManagedTXConnection(); } //回调执行 final T result = txCallback.execute(conn); try { commitConnection(conn); } catch (JobPersistenceException e) { rollbackConnection(conn); if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() { @Override public Boolean execute(Connection conn) throws JobPersistenceException { return txValidator.validate(conn, result); } })) { throw e; } } Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion(); if(sigTime != null && sigTime >= 0) { signalSchedulingChangeImmediately(sigTime); } return result; } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (RuntimeException e) { rollbackConnection(conn); throw new JobPersistenceException("Unexpected runtime exception: " + e.getMessage(), e); } finally { try { //释放锁 releaseLock(lockName, transOwner); } finally { cleanupConnection(conn); } } }
经过这行代码查找锁是怎么来的
transOwner = getLockHandler().obtainLock(conn, lockName);
在JobStoreSupport的initialize方法中:
public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler) throws SchedulerConfigException { if (dsName == null) { throw new SchedulerConfigException("DataSource name not set."); } // If the user hasn't specified an explicit lock handler, then // choose one based on CMT/Clustered/UseDBLocks. if (getLockHandler() == null) { // If the user hasn't specified an explicit lock handler, // then we *must* use DB locks with clustering if (isClustered()) { setUseDBLocks(true); } //…… // 在初始化方法里面赋值了 setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL())); } else { getLog().info( "Using thread monitor-based data access locking (synchronization)."); setLockHandler(new SimpleSemaphore()); } } }
在new StdRowLockSemaphore构造方法中
public StdRowLockSemaphore(String tablePrefix, String schedName, String selectWithLockSQL) { super(tablePrefix, schedName, selectWithLockSQL != null ? selectWithLockSQL : SELECT_FOR_LOCK, INSERT_LOCK); }
能够发现有两个锁名称:
public static final String SELECT_FOR_LOCK = "SELECT * FROM " + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + " AND " + COL_LOCK_NAME + " = ? FOR UPDATE"; public static final String INSERT_LOCK = "INSERT INTO " + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)";
数据库的qrtz_locks中存放两个锁的记录
能够看出采用了Quartz集群采用了悲观锁的方式对triggers表进行行加锁, 以保证任务同步的正确性。
当线程使用上述的SQL对表中的数据执行操做时,数据库对该行进行行加锁; 于此同时, 另外一个线程对该行数据执行操做前须要获取锁, 而此时已被占用, 那么这个线程就只能等待, 直到该行锁被释放。