公司前期改用quartz作任务调度,一日的调度量均在两百万次以上。随着调度量的增长,忽然开始出现job重复调度的状况,且没有规律可循。网上也没有说得较为清楚的解决办法,因而咱们开始调试Quartz源码,并最终找到了问题所在。 若是没有耐性看完源码解析,能够直接拉到文章最末,有直接简单的解决办法。
注:本文中使用的quartz版本为2.3.0,且使用JDBC模式存储Job。java
首先,由于本文是代码级别的分析文章,于是须要提早了解Quartz的用途和用法,网上仍是有不少不错的文章,能够提早自行了解。mysql
其次,在用法以外,咱们还须要了解一些Quartz框架的基础概念:sql
1) Quartz把触发job,叫作fire。TRIGGER_STATE是当前trigger的状态,PREV_FIRE_TIME是上一次触发时间,NEXT_FIRE_TIME是下一次触发时间,misfire是指这个job在某一时刻要触发,却由于某些缘由没有触发的状况。数据库
2) Quartz在运行时,会起两类线程(不止两类),一类用于调度job的调度线程(单线程),一类是用于执行job具体业务的工做池。安全
3) Quartz自带的表里面,本文主要涉及如下3张表:服务器
4) TRIGGER_STATE,也就是trigger的状态,主要有如下几类:并发
图2-1 trigger状态变化图框架
trigger的初始状态是WAITING,处于WAITING状态的trigger等待被触发。调度线程会不停地扫triggers表,根据NEXT_FIRE_TIME提早拉取即将触发的trigger,若是这个trigger被该调度线程拉取到,它的状态就会变为ACQUIRED。由于是提早拉取trigger,并未到达trigger真正的触发时刻,因此调度线程会等到真正触发的时刻,再将trigger状态由ACQUIRED改成EXECUTING。若是这个trigger再也不执行,就将状态改成COMPLETE,不然为WAITING,开始新的周期。若是这个周期中的任何环节抛出异常,trigger的状态会变成ERROR。若是手动暂停这个trigger,状态会变成PAUSED。异步
前文提到,trigger的状态储存在数据库,Quartz支持分布式,因此若是起了多个quartz服务,会有多个调度线程来抢夺触发同一个trigger。mysql在默认状况下执行select 语句,是不上锁的,那么若是同时有1个以上的调度线程抢到同一个trigger,是否会致使这个trigger重复调度呢?咱们来看看,Quartz是如何解决这个问题的。分布式
首先,咱们先来看下JobStoreSupport
类的executeInNonManagedTXLock()
方法:
图3-1 executeInNonManagedTXLock方法的具体实现
这个方法的官方介绍:
/** *Execute the given callback having acquired the given lock. *Depending on the JobStore,the surrounding transaction maybe *assumed to be already present(managed). * *@param lockName The name of the lock to acquire,for example *"TRIGGER_ACCESS".If null, then no lock is acquired ,but the *lockCallback is still executed in a transaction. */
也就是说,传入的callback方法在执行的过程当中是携带了指定的锁,并开启了事务,注释也提到,lockName就是指定的锁的名字,若是lockName是空的,那么callback方法的执行不在锁的保护下,但依然在事务中。
这意味着,咱们使用这个方法,不只能够保证事务,还能够选择保证,callback方法的线程安全。
接下来,咱们来看一下executeInNonManagedTXLock(…)
中的obtainLock(conn,lockName)
方法,即抢锁的过程。这个方法是在Semaphore
接口中定义的,Semaphore
接口经过锁住线程或者资源,来保护资源不被其余线程修改,因为咱们的调度信息是存在数据库的,因此如今查看DBSemaphore.java
中obtainLock
方法的具体实现:
图3-2 obtainLock方法具体实现
咱们经过调试查看expandedSQL
和expandedInsertSQL
这两个变量:
图3-3 expandedSQL和expandedInsertSQL的具体内容
图3-3能够看出,obtainLock
方法经过locks表的一个行锁(lockName肯定)来保证callback方法的事务和线程安全。拿到锁后,obtainLock
方法将lockName
写入threadlocal
。固然在releaseLock
的时候,会将lockName
从threadlocal
中删除。
总而言之,executeInNonManagedTXLock()
方法,保证了在分布式的状况,同一时刻,只有一个线程能够执行这个方法。
图3-4 Quartz的调度时序图
QuartzSchedulerThread
是调度线程的具体实现,图3-4 是这个线程run()
方法的主要内容,图中只提到了正常的状况下,也就是流程中没有出现异常的状况下的处理过程。由图能够看出,调度流程主要分为如下三步:
调度线程会一次性拉取距离如今,必定时间窗口内的,必定数量内的,即将触发的trigger信息。那么,时间窗口和数量信息如何肯定呢,咱们先来看一下,如下几个参数:
idleWaitTime
: 默认30s,可经过配置属性org.quartz.scheduler.idleWaitTime
设置。availThreadCount
:获取可用(空闲)的工做线程数量,总会大于1,由于该方法会一直阻塞,直到有工做线程空闲下来。maxBatchSize
:一次拉取trigger的最大数量,默认是1,可经过org.quartz.scheduler.batchTriggerAcquisitionMaxCount
改写batchTimeWindow
:时间窗口调节参数,默认是0,可经过org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow
改写misfireThreshold
: 超过这个时间还未触发的trigger,被认为发生了misfire,默认60s,可经过org.quartz.jobStore.misfireThreshold
设置。调度线程一次会拉取NEXT_FIRE_TIME小于(now + idleWaitTime +batchTimeWindow
),大于(now - misfireThreshold
)的,min(availThreadCount,maxBatchSize)
个triggers,默认状况下,会拉取将来30s,过去60s之间还未fire的1个trigger。随后将这些triggers的状态由WAITING改成ACQUIRED,并插入fired_triggers表。
首先,咱们会检查每一个trigger的状态是否是ACQUIRED,若是是,则将状态改成EXECUTING,而后更新trigger的NEXT_FIRE_TIME,若是这个trigger的NEXT_FIRE_TIME为空,也就是将来再也不触发,就将其状态改成COMPLETE。若是trigger不容许并发执行(即Job的实现类标注了@DisallowConcurrentExecution
),则将状态变为BLOCKED,不然就将状态改成WAITING。
遍历triggers,若是其中某个trigger在第二步出错,即返回值里面有exception或者为null,就会作一些triggers表,fired_triggers表的内容修正,跳过这个trigger,继续检查下一个。不然,则根据trigger信息实例化JobRunShell
(实现了Thread接口),同时依据JOB_CLASS_NAME
实例化Job
,随后咱们将JobRunShell
实例丢入工做线。
在JobRunShell
的run()
方法,Quartz会在执行job.execute()
的先后通知以前绑定的监听器,若是job.execute()
执行的过程当中有异常抛出,则执行结果jobExEx
会保存异常信息,反之若是没有异常抛出,则jobExEx
为null。而后根据jobExEx
的不一样,获得不一样的执行指令instCode
。
JobRunShell
将trigger信息,job信息和执行指令传给triggeredJobComplete()
方法来完成最后的数据表更新操做。例如若是job执行过程有异常抛出,就将这个trigger状态变为ERROR,若是是BLOCKED状态,就将其变为WAITING等等,最后从fired_triggers表中删除这个已经执行完成的trigger。注意,这些是在工做线程池异步完成。
在前文,咱们能够看到,Quartz的调度过程当中有3次(可选的)上锁行为,为何称为可选?由于这三个步骤虽然在executeInNonManagedTXLock
方法的保护下,但executeInNonManagedTXLock
方法能够经过设置传入参数lockName为空,取消上锁。在翻阅代码时,咱们看到第一步拉取待触发的trigger时:
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)throws JobPersistenceException { String lockName; //判断是否须要上锁 if (isAcquireTriggersWithinLock() || maxCount > 1) { lockName = LOCK_TRIGGER_ACCESS; } else { lockName = null; } return executeInNonManagedTXLock(lockName, new TransactionCallback<List<OperableTrigger>>(){ public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException { return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow); } }, new TransactionValidator<List<OperableTrigger>>() { //省略 }); }
在加锁以前对lockName作了一次判断,而非像其余加锁方法同样,默认传入的就是LOCK_TRIGGER_ACCESS:
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException { //默认上锁 return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS, new TransactionCallback<List<TriggerFiredResult>>() { //省略 },new TransactionValidator<List<TriggerFiredResult>>() { //省略 }); }
经过调试发现isAcquireTriggersWithinLock()
的值是false
,于是致使传入的lockName是null。我在代码中加入日志,能够更清楚的看到这个过程。
图3-5 调度日志
由图3-5能够清楚看到,在拉取待触发的trigger时,默认是不上锁。若是这种默认配置有问题,岂不是会频繁发生重复调度的问题?而事实上并无,缘由在于Quartz默认采起乐观锁,也就是容许多个线程同时拉取同一个trigger。咱们看一下Quartz在调度流程的第二步fire trigger的时候作了什么,注意此时是上锁状态:
protected TriggerFiredBundle triggerFired(Connection conn, OperableTrigger trigger) throws JobPersistenceException { JobDetail job; Calendar cal = null; // Make sure trigger wasn't deleted, paused, or completed... try { // if trigger was deleted, state will be STATE_DELETED String state = getDelegate().selectTriggerState(conn,trigger.getKey()); if (!state.equals(STATE_ACQUIRED)) { return null; } } catch (SQLException e) { throw new JobPersistenceException("Couldn't select trigger state: " + e.getMessage(), e); }
调度线程若是发现当前trigger的状态不是ACQUIRED,也就是说,这个trigger被其余线程fire了,就会返回null。在3.2,咱们提到,在调度流程的第三步,若是发现某个trigger第二步的返回值是null,就会跳过第三步,取消fire。在一般的状况下,乐观锁能保证不发生重复调度,可是不免发生ABA问题,咱们看一下这是发生重复调度时的日志:
图3-5 重复调度的日志
在第一步时,也就是quartz在拉取到符合条件的triggers 到将他们的状态由WAITING改成ACQUIRED之间停顿了有超过9ms的时间,而另外一台服务器正是趁着这9ms的空档完成了WAITING-->ACQUIRED-->EXECUTING-->WAITING(也就是一个完整的状态变化周期)的所有过程,图示参见图3-6。
图3-6 重复调度缘由示意图
如何去解决这个问题呢?在配置文件加上org.quartz.jobStore.acquireTriggersWithinLock=true
,这样,在调度流程的第一步,也就是拉取待即将触发的triggers时,是上锁的状态,即不会同时存在多个线程拉取到相同的trigger的状况,也就避免的重复调度的危险。
这次排查过程并不是一路顺风,走过一些坑,也有一些非技术相关的体会:
1)学习是一个须要不断打磨,修正的能力。就我我的而言,为了学Quartz,刚开始去翻一个2.4MB大小的源码是毫无头绪,而且效率低下的,因此马上转换方向,先了解这个框架的运行模式,在作什么,有哪些模块,是怎么作的,再找主线,翻相关的源码。以后在一次次使用中,碰到问题再翻以前没看的源码,就愈来愈顺利。
以前也听过其余同事的学习方法,感受并不彻底适合本身,可能每一个人状态经验不一样,学习方法也稍有不一样。在平时的学习中,须要去感觉本身的学习效率,参考建议,尝试,感觉效果,改进,会愈来愈清晰本身适合什么。这里很感谢个人师父,用简短的话先帮我捋顺了调度流程,这样我再看源码就不那么吃力了。
2)要质疑“经验”和“理所应当”,惯性思惟会蒙住你的双眼。在大规模的代码中很容易被习惯迷惑,一开始,咱们看到上锁的那个方法的时候,认为这个上锁技巧很棒,这个方法就是为了解决并发的问题,“应该”都上锁了,上锁了就不会有并发的问题了,怎么可能几回与数据库的交互都上锁,忽然某一次不上锁呢?直到看到拉取待触发的trigger方法时,以为有丝丝不对劲,打下日志,才发现其实是没上锁的。
3)日志很重要。虽然咱们能够调试,可是没有日志,咱们是没法发现并证实,程序发生了ABA问题。
4)最重要的是,不要惧怕问题,即便是Quartz这样大型的框架,解决问题也不必定须要把2.4MB的源码统统读懂。只要有时间,问题都能解决,只是好的技巧能缩短这个时间,而咱们须要在一次次实战中磨练技巧。