quartz集群模式任务触发分析(三)

源码回顾java

quartz线程模型sql

quartz集群模式任务触发分析(二)数据库


JobStoreSupport

任务的存储类,这里面包含了上面提到的两个比较核心的方法缓存

acquireNextTriggers

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
    throws JobPersistenceException 
{

    String lockName;
    // 从这个地方能够看到maxCount大于1 的时候才会使用悲观锁, isAcquireTriggersWithinLock默认为false
    if(isAcquireTriggersWithinLock() || maxCount > 1) {
        lockName = LOCK_TRIGGER_ACCESS;
    } else {
        lockName = null;
    }
    return executeInNonManagedTXLock(lockName,
            new TransactionCallback<List<OperableTrigger>>() {
                public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                    // 重点看这个方法
                    // executeInNonManagedTXLock 里面最终主要的就是执行这个方法。
                    return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                }
            },
            new TransactionValidator<List<OperableTrigger>>() {
                 // 省略代码。。
            });
}


protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
    throws JobPersistenceException 
{
    if (timeWindow < 0) {
      throw new IllegalArgumentException();
    }

    List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
    Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
    // 最多重试三次
    final int MAX_DO_LOOP_RETRY = 3;
    int currentLoopCount = 0;
    do {
        // 进入do while循环
        currentLoopCount ++;
        try {
            //经过时间,获取nextFireTime<noLaterThan的trigger
            List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);

            // No trigger is ready to fire yet.
            if (keys == null || keys.size() == 0)
                return acquiredTriggers;
            // 设置截止时间
            long batchEnd = noLaterThan;

            for(TriggerKey triggerKey: keys) {
                // If our trigger is no longer available, try a new one.
                // 判断 trigger是否存在
                OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
                if(nextTrigger == null) {
                    continue// next trigger
                }

                JobKey jobKey = nextTrigger.getJobKey();
                // 判断trigger对应的jobDetail是否存在
                JobDetail job;
                try {
                    job = retrieveJob(conn, jobKey);
                } catch (JobPersistenceException jpe) {
                    try {
                        getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
                        getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR);
                    } catch (SQLException sqle) {
                        getLog().error("Unable to set trigger state to ERROR.", sqle);
                    }
                    continue;
                }
                // 是否容许并发执行, JobBean上面含@DisallowConcurrentExecution这个注解的,表示不容许并发执行
                if (job.isConcurrentExectionDisallowed()) {
                    // 进入这里,表示不容许并发执行
                    if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
                        continue// next trigger
                    } else {
                        acquiredJobKeysForNoConcurrentExec.add(jobKey);
                    }
                }
                // 若是该任务的下次执行时间大于截止时间,那么跳过
                if (nextTrigger.getNextFireTime().getTime() > batchEnd) {
                  break;
                }

                // 更新这个trigger的状态为ACQUIRED ,表示正在准备出发。
                int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
                if (rowsUpdated <= 0) {
                    continue// next trigger
                }
                nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
                // 插入出发记录
                getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);

                if(acquiredTriggers.isEmpty()) {
                    batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;
                }
                // 加入返回trigger
                acquiredTriggers.add(nextTrigger);
            }
            if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
                continue;
            }
            break;
        } catch (Exception e) {
            throw new JobPersistenceException(
                      "Couldn't acquire next trigger: " + e.getMessage(), e);
        }
    } while (true);

    // Return the acquired trigger list
    return acquiredTriggers;
}

上面看到的是触发器的获取详细实现,若是每次获取的maxCount大于1 ,那么就会使用悲观锁,防止任务在集群状态下被重复获取,默认maxCount=1 , 这也就致使了,在默认的集群模式下,若是不作这个配置,在并发状态下,就会有出现任务被重复获取,会产生任务被重复触发的状况。微信

triggersFired

在主线程里面调用以下:并发

List<TriggerFiredResult> res =qsRsrcs.getJobStore().triggersFired(triggers);
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
    // 直接传入锁名,使用悲观锁
    return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
            new TransactionCallback<List<TriggerFiredResult>>() {
                public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
                    List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();

                    TriggerFiredResult result;
                    for (OperableTrigger trigger : triggers) {
                        try {
                          // 单个任务来慢慢搞
                          TriggerFiredBundle bundle = triggerFired(conn, trigger);
                          result = new TriggerFiredResult(bundle);
                        } catch (JobPersistenceException jpe) {
                            result = new TriggerFiredResult(jpe);
                        } catch(RuntimeException re) {
                            result = new TriggerFiredResult(re);
                        }
                        results.add(result);
                    }

                    return results;
                }
            },
            new TransactionValidator<List<TriggerFiredResult>>() {
                // 省略代码。。
            });
}


protected TriggerFiredBundle triggerFired(Connection conn,
        OperableTrigger trigger)

    throws JobPersistenceException 
{
    JobDetail job;
    Calendar cal = null;

    try { // if trigger was deleted, state will be STATE_DELETED
        // 验证trigger的状态,若是不是等于ACQUIRED的,则直接return null
        String state = getDelegate().selectTriggerState(conn,
                trigger.getKey());
        if (!state.equals(STATE_ACQUIRED)) {
            return null;
        }
    } catch (SQLException e) {
        throw new JobPersistenceException("Couldn't select trigger state: "
                + e.getMessage(), e);
    }

    try {
        // 获取这个trigger的任务详情。
        job = retrieveJob(conn, trigger.getJobKey());
        if (job == null) { return null; }
    } catch (JobPersistenceException jpe) {
        try {
            getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
            getDelegate().updateTriggerState(conn, trigger.getKey(),
                    STATE_ERROR);
        } catch (SQLException sqle) {
            getLog().error("Unable to set trigger state to ERROR.", sqle);
        }
        throw jpe;
    }

    if (trigger.getCalendarName() != null) {
        // 这里主要是对非集群模式下作一些缓存处理
        cal = retrieveCalendar(conn, trigger.getCalendarName());
        if (cal == null) { return null; }
    }

    try {
        // 更新触发记录的状态为EXECUTING
        getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
    } catch (SQLException e) {
        throw new JobPersistenceException("Couldn't insert fired trigger: "
                + e.getMessage(), e);
    }

    Date prevFireTime = trigger.getPreviousFireTime();
    // 计算下一次的trigger的执行时间
    trigger.triggered(cal);

    String state = STATE_WAITING;
    boolean force = true;
    //若是任务是不容许并发执行的,那么须要将任务的状态修改成BLOCK,阻塞
    if (job.isConcurrentExectionDisallowed()) {
        state = STATE_BLOCKED;
        force = false;
        try {
            getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                    STATE_BLOCKED, STATE_WAITING);
            getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                    STATE_BLOCKED, STATE_ACQUIRED);
            getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                    STATE_PAUSED_BLOCKED, STATE_PAUSED);
        } catch (SQLException e) {
            throw new JobPersistenceException(
                    "Couldn't update states of blocked triggers: "
                            + e.getMessage(), e);
        }
    }

    if (trigger.getNextFireTime() == null) {
        // 下次执行时间为空,也就是说没有下次了,直接修改trigger的状态为完成
        state = STATE_COMPLETE;
        force = true;
    }
    // 修改trigger的撞他信息
    storeTrigger(conn, trigger, job, true, state, force, false);

    job.getJobDataMap().clearDirtyFlag();
    // 返回任务的执行信息
    return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
            .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger
            .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());

该方法作了如下工做:app

1.获取trigger当前状态异步

2.经过trigger中的JobKey读取trigger包含的Job信息分布式

3.将trigger更新至触发状态oop

4.更新数据库中trigger的信息,包括更改状态至STATE_COMPLETE,及计算下一次触发时间.

5.返回trigger触发结果的数据传输类TriggerFiredBundle

从该方法返回后,trigger的执行过程已基本完毕.回到执行quratz操做规范的executeInNonManagedTXLock方法,将数据库锁释放.

trigger触发操做完成

总结:
简单地说,quartz的分布式调度策略是以数据库为边界资源的一种异步策略.各个调度器都遵照一个基于数据库锁的操做规则保证了操做的惟一性.

同时多个节点的异步运行保证了服务的可靠.但这种策略有本身的局限性,集群特性对于高cpu使用率的任务效果很好,可是对于大量的短任务,

各个节点都会抢占数据库锁,这样就出现大量的线程等待资源.这种状况随着节点的增长会愈来愈严重.


本文分享自微信公众号 - sharedCode(sharedCode)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索