org.quartz-scheduler 代码分析

Scheduler 

  1. 经过调度器工厂SchedulerFactory的实例对象StdSchedulerFactory构建Scheduler ;

     
    1. 从指定的文件初始化配置信息(默认文件名"quartz.properties", 系统变量为"org.quartz.properties")

      定义的默认执行QuartzSchedulerThread的线程池为:
      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
    2.  通常返回StdScheduler,但核心是QuartzScheduler!

      StdSchedulerFactory.instantiate() 很重要!

      初始化RAMJobStoreSimpleThreadPoolQuartzSchedulerJobFactorySchedulerPluginJobRunShellFactoryQuartzSchedulerResources 等对象,返回StdScheduler实例。java

  2. QuartzScheduler核心: 线程对象QuartzSchedulerThread。随着Scheduler实例化而被建立并扔进线程池执行。
    该线程就是调度线程,主要任务就是不停的从JobStore中获取即将被触发的触发器来执行。
    缓存

scheduleJob

  1. 构建JobDetail<JobDetailImpl>
    JobDetail result = JobBuilder.newJob(Class <? extends Job> jobClass).withIdentity(String name).build();
  2.  构建Trigger (有各类类型,由ScheduleBuilder来指定。 eg. SimpleScheduleBuilder -> SimpleTrigger<SimpleTriggerImpl>;  CronScheduleBuilder-> CronTrigger<CronTriggerImpl>)
    TriggerBuilder.newTrigger().withIdentity(String name).withSchedule(ScheduleBuilder schedBuilder).build()
  3. scheduler<QuartzScheduler>.scheduleJob(JobDetail jobDetail, Trigger trigger)
    1. Trigger的JobKey必定与JobDetail的Key得相同, 不然异常!
    2. 计算出trigger在scheduler中可以第一次执行的时间,若无效则异常!(eg. CronCalendar
    3. 在JobStore中注册jobDetail和trigger;
    4. 唤醒QuartzSchedulerThread中的sigLock等待锁;并将trigger下一次要实行的时间NextFireTime经过SchedulerSignalerImpl传递到QuartzSchedulerThread
  4. scheduler<QuartzScheduler>.start(): 设置QuartzSchedulerThread中的paused为false,触发任务的执行。

SimpleThreadPool

根据count生成WorkerThread并保存在availWorkers 中;
当使用线程池时,经过synchronized 来控制并发。从availWorkers 移出第一个WorkerThread使用并保存到busyWorkers中 ;若是没有空闲线程则wait。并发

与JDK提供的线程池有很大的不一样, 没有缓存队列、最大线程数拒绝策略等。 经过阻塞wait直到有空闲workers再执行。若是Shutdow后,还有任务被提交执行,则直接新实例化WorkerThreadui

public class SimpleThreadPool implements ThreadPool {

    private int count = -1; // 线程个数
    private List<WorkerThread> workers; //只是维护初始化的workerThread集合
    private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>(); //可用线程
    private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>(); //繁忙线程
       ......
    public void initialize() throws SchedulerConfigException {
       ......
        // create the worker threads and start them
        Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
        while(workerThreads.hasNext()) {
            WorkerThread wt = workerThreads.next();
            wt.start();
            availWorkers.add(wt);
        }
    }

 public boolean runInThread(Runnable runnable) {
        if (runnable == null) {
            return false;
        }

        synchronized (nextRunnableLock) {
            handoffPending = true;
            // Wait until a worker thread is available
            while ((availWorkers.size() < 1) && !isShutdown) {
                try {
                    nextRunnableLock.wait(500);
                } catch (InterruptedException ignore) {
                }
            }
            if (!isShutdown) {
                WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
                busyWorkers.add(wt);
                wt.run(runnable);
            } else {
                // If the thread pool is going down, execute the Runnable
                // within a new additional worker thread (no thread from the pool).
                WorkerThread wt = new WorkerThread(this, threadGroup,
                        "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
                busyWorkers.add(wt);
                workers.add(wt);
                wt.start();
            }
            nextRunnableLock.notifyAll();
            handoffPending = false;
        }

        return true;
    }

WorkerThread

控制单个Runnable对象的执行过程。
this

成员变量有个Object类型对象做为lock,WorkerThread实例化后run()过程当中若是尚未注入runnable且执行标记run==false时循环调用:lock.wait(500);spa

当SimpleThreadPool.runThread(Runnable) 调用WorkerThread.run(Runnable)时,注入执行Runnable并 lock.notifyAll()
线程

QuartzSchedulerThread

quartz核心处理过程, 经过synchronized(sigLock ) 来控制线程并发。code

 run()对象

  1. 经过Object对象sigLock & paused & 原子整型 halted 来控制当先线程是否运行任务。
    1. sigLock: 方法signalSchedulingChange、togglePause、halt 都触发notifyAll()。
    2. paused:  初始化为true,默认阻塞线程执行。
  2. 执行会优先断定线程池<SimpleThreadPool>中是否有空闲有效的WorkerThread,没有则阻塞。blog

    int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
  3. 有空闲有效工做线程时从JobStore中获取指定时间内<默认30s>要执行的的Trigger列表。

    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                    now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
  4. 若是triggers中的第一个trigger的NextFireTime距离当前时间大于2ms, 则等待直到<2ms。

  5. 从JobStore中获取TriggerFiredResult列表(绑定了JobDetail和OperableTrigger<CronTrigger>间的关系)。

    List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
  6. 依每个TriggerFiredResult初始化JobRunShellPropertySettingJobFactory经过TriggerFiredBundle获取到JobDetail, 初始化Job实例对象 ,反射设置好属性 最后封装到执行上下文中JobExecutionContextImpl。

JobRunShell

最终执行实际任务的对象。

JobExecutionContextImpl中获取必要的JobDetail、trigger、Job等信息,并执行:job.execute(jec)  ---->本身的业务逻辑

当执行结束时trigger须要肯定下一个状态码,在JobStore.triggeredJobComplete处依此来断定trigger的生命周期

AbstractTrigger.executionComplete

RAMJobStore.triggeredJobComplete

相关文章
相关标签/搜索