org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
初始化RAMJobStore、SimpleThreadPool、QuartzScheduler、JobFactory、SchedulerPlugin、JobRunShellFactory、QuartzSchedulerResources 等对象,返回StdScheduler实例。java
QuartzScheduler核心: 线程对象QuartzSchedulerThread。随着Scheduler实例化而被建立并扔进线程池执行。
该线程就是调度线程,主要任务就是不停的从JobStore中获取即将被触发的触发器来执行。
缓存
JobDetail result = JobBuilder.newJob(Class <? extends Job> jobClass).withIdentity(String name).build();
TriggerBuilder.newTrigger().withIdentity(String name).withSchedule(ScheduleBuilder schedBuilder).build()
根据count生成WorkerThread并保存在availWorkers 中;
当使用线程池时,经过synchronized 来控制并发。从availWorkers 移出第一个WorkerThread使用并保存到busyWorkers中 ;若是没有空闲线程则wait。并发
与JDK提供的线程池有很大的不一样, 没有缓存队列、最大线程数拒绝策略等。 经过阻塞wait直到有空闲workers再执行。若是Shutdow后,还有任务被提交执行,则直接新实例化WorkerThreadui
public class SimpleThreadPool implements ThreadPool { private int count = -1; // 线程个数 private List<WorkerThread> workers; //只是维护初始化的workerThread集合 private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>(); //可用线程 private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>(); //繁忙线程 ...... public void initialize() throws SchedulerConfigException { ...... // create the worker threads and start them Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator(); while(workerThreads.hasNext()) { WorkerThread wt = workerThreads.next(); wt.start(); availWorkers.add(wt); } } public boolean runInThread(Runnable runnable) { if (runnable == null) { return false; } synchronized (nextRunnableLock) { handoffPending = true; // Wait until a worker thread is available while ((availWorkers.size() < 1) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } if (!isShutdown) { WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); } else { // If the thread pool is going down, execute the Runnable // within a new additional worker thread (no thread from the pool). WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } nextRunnableLock.notifyAll(); handoffPending = false; } return true; }
控制单个Runnable对象的执行过程。
this
成员变量有个Object类型对象做为lock,WorkerThread实例化后run()过程当中若是尚未注入runnable且执行标记run==false时循环调用:lock.wait(500);spa
当SimpleThreadPool.runThread(Runnable) 调用WorkerThread.run(Runnable)时,注入执行Runnable并 lock.notifyAll()。
线程
quartz核心处理过程, 经过synchronized(sigLock ) 来控制线程并发。code
run()对象
执行会优先断定线程池<SimpleThreadPool>中是否有空闲有效的WorkerThread,没有则阻塞。blog
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
有空闲有效工做线程时从JobStore中获取指定时间内<默认30s>要执行的的Trigger列表。
triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
若是triggers中的第一个trigger的NextFireTime距离当前时间大于2ms, 则等待直到<2ms。
从JobStore中获取TriggerFiredResult列表(绑定了JobDetail和OperableTrigger<CronTrigger>间的关系)。
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
依每个TriggerFiredResult初始化JobRunShell。PropertySettingJobFactory经过TriggerFiredBundle获取到JobDetail, 初始化Job实例对象 ,反射设置好属性 最后封装到执行上下文中JobExecutionContextImpl。
最终执行实际任务的对象。
从JobExecutionContextImpl中获取必要的JobDetail、trigger、Job等信息,并执行:job.execute(jec) ---->本身的业务逻辑
当执行结束时trigger须要肯定下一个状态码,在JobStore.triggeredJobComplete处依此来断定trigger的生命周期
AbstractTrigger.executionComplete
RAMJobStore.triggeredJobComplete