前段时间领导让将一个老项目中的定时发送短信的中定时任务独立出来,实现一个可公用的定时任务平台,且须要支持集群环境. 基于以上须要实现的功能有: 1. 定时任务管理:包括任务的crud, 任务的暂停、恢复 2. 任务可持久化
1. pom.xml文件中加入所需jar包(这里spring相关的jar就不展现了)java
<dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.3</version> </dependency>
2. quartz.properties 配置文件spring
# Configure Main Scheduler Properties =========================================== #能够为任何值,用在jdbc Jobstore中来惟一标识实例,集群中必须相同 org.quartz.scheduler.instanceName: MyClusteredScheduler #AUTO 基于主机和时间戳来产生成实例ID,集群中的每个实例都必须有一个惟一的“instance id”, 应该有相同的“scheduler instance name” org.quartz.scheduler.instanceId: AUTO #禁用quartz软件更新 org.quartz.scheduler.skipUpdateCheck: true #Configure ThreadPool 执行任务线程池配置============================================== #线程池类型,执行任务的线程 org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool #线程数量 org.quartz.threadPool.threadCount: 2 #线程优先级 org.quartz.threadPool.threadPriority: 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true # Configure JobStore 任务存储方式 ==================================================== org.quartz.jobStore.misfireThreshold: 60000 #两种存储方式:基于内存的RAMJobStore和基于数据库的JobStoreSuppot #(包括JobStoreTX和JobStoreCMT两种实现,JobStoreCMT是依赖于容器来进行事务管理,而JobStoreTX是本身管理事务) #这里的属性为JobStoreTX,将任务持久化到数据库, #由于集群中节点依赖数据库来传播Scheduler实例的状态,意味着在集群里必须使用JobStoreTX或是JobStoreCMT做为job存储 #org.quartz.simpl.RAMJobStore org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX #JobStoreSupport 使用一个驱动代理来操做 trigger 和 job 的数据存储 org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.oracle.OracleDelegate #若要设置为true,则将JobDataMaps中的值看成string org.quartz.jobStore.useProperties=false #你就告诉了Scheduler实例要它参与到一个集群当中。这一属性会贯穿于调度框架的始终,用于修改集群环境中操做的默认行为。 org.quartz.jobStore.isClustered=true #属性定义了Scheduler实例检入到数据库中的频率(单位:毫秒)。默认值是 15000 (即15 秒)。 org.quartz.jobStore.clusterCheckinInterval = 20000 org.quartz.jobStore.isClustered=true #这是 JobStore 能处理的错过触发的 Trigger 的最大数量。 #处理太多(超过两打) 很快会致使数据库表被锁定够长的时间,这样就妨碍了触发别的(还未错过触发) trigger 执行的性能。 org.quartz.jobStore.maxMisfiresToHandleAtATime = 1 org.quartz.jobStore.misfireThreshold = 120000 org.quartz.jobStore.txIsolationLevelSerializable = false #当事件的JVM终止后,在调度器上也将此事件终止 org.quartz.plugin.shutdownHook.class: org.quartz.plugins.management.ShutdownHookPlugin org.quartz.plugin.shutdownHook.cleanShutdown: true #============================================================================ # Other Example Delegates 其余的数据库驱动管理委托类 #============================================================================ #org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.DB2v6Delegate #org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.DB2v7Delegate #org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.DriverDelegate #org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.HSQLDBDelegate #org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.MSSQLDelegate #org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.PointbaseDelegate #org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.PostgreSQLDelegate #org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate #org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.WebLogicDelegate #org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.oracle.OracleDelegate #org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.oracle.WebLogicOracleDelegate
3. springContext-quartz.xml 与spring整合sql
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.3.xsd "> <!-- 调度器 --> <bean id="clusterScheduler" lazy-init="false" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <!--由于集群环境下须要将任务状态持久化 这里须要为调度器指定一个数据源, 同时须要将quartz框架相关的几个表在你的数据库中初始化好,去官网下载quartz的压缩包quartz-2.2.3-distribution.tar.gz,解压以后在docs\dbTables文件下就能够找到与之相对应的sql文件 --> <property name="dataSource" ref="dataSourceRWORR"/> <property name="configLocation" value="classpath:conf/quartz.properties" /> <property name="applicationContextSchedulerContextKey" value="applicationContext"/> <property name="autoStartup" value="true"></property> </bean> <!--quartz定时任务管理类 QuartzManager: 该类由本身实现,须要为该类注入上文中定义的调度器:clusterScheduler 有关定时器的全部操做都在该类中实现 --> <bean id="quartzManager" class="com.fotic.common.quartz.QuartzManager" init-method="init"> <property name="clusterScheduler" ref="clusterScheduler"></property> </bean> </beans>
4. 编写定时器管理类:QuartzManager.java数据库
package com.fotic.common.quartz; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.Job; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /* * @Description: 定时器管理 * @author dgq * @date 2018年4月25日 */ public class QuartzManager{ private static Logger logger = LoggerFactory.getLogger(QuartzManager.class); private Scheduler clusterScheduler;//项目启动时注入 public void init() throws SchedulerException{ /*JobKey jobKey = JobKey.jobKey("sms_job", "sms_job_group"); JobDetail jobDetail = clusterScheduler.getJobDetail(jobKey);//xml中配置了 List<? extends Trigger> tigge1 = clusterScheduler.getTriggersOfJob(jobKey); System.out.println(tigge1.size()); Trigger trigger3 = tigge1.get(0); TriggerKey triggerKey = TriggerKey.triggerKey("sms_trigger", "sms_trigger_group"); Trigger trigger = clusterScheduler.getTrigger(triggerKey); TriggerState triggerState2 = clusterScheduler.getTriggerState(triggerKey); Set<String> pausedTriggerGroups = clusterScheduler.getPausedTriggerGroups(); clusterScheduler.scheduleJob(trigger);*/ /*TriggerKey triggerKey = TriggerKey.triggerKey("sms_trigger", "sms_trigger_group"); clusterScheduler.getListenerManager().addTriggerListener(new MyTriggerListeners(), KeyMatcher.keyEquals(triggerKey));*/ try { this.addJob("push_overduedata", "com.fotic.management.sms.job.PushOverdueDataJob", "0 55 17 * * ?"); this.addJob("sms_send", "com.fotic.sms.job.SmsSendJob", "0/5 * * ? * *"); } catch (Exception e) { e.printStackTrace(); } } /** * 启动定时器 */ public boolean start(){ try { clusterScheduler.start(); return true; } catch ( SchedulerException e) { e.printStackTrace(); return false; } } /** * 关闭调度器 * @return */ public boolean shutdown(){ try { clusterScheduler.shutdown(true); return true; } catch (Exception e) { logger.info("定时器中止失败................"); e.printStackTrace(); return false; } } public void setClusterScheduler(Scheduler clusterScheduler) { this.clusterScheduler = clusterScheduler; } /** * 新增一个job * @param jobName job名称 * @param jobClass job类,该类必须继承: org.quartz.job * @param cronExpression "0/5 * * ? * *" * @throws ClassNotFoundException * @throws SchedulerException */ public void addJob(String jobName, String jobClassPath, String cronExpression) throws ClassNotFoundException, SchedulerException{ JobKey jobKey = new JobKey(jobName+"_job", jobName+"_group"); JobDetail jobDetail = clusterScheduler.getJobDetail(jobKey); if(!clusterScheduler.checkExists(jobKey)){ @SuppressWarnings("unchecked") Class<? extends Job> targetJob = (Class<? extends Job>) Class.forName(jobClassPath); jobDetail = JobBuilder .newJob(targetJob) .withIdentity(jobKey) .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(jobName+"_trigger", jobName+"_trigger_group") .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) .build(); clusterScheduler.scheduleJob(jobDetail, trigger); } } /** * 暂停定时任务 * @param allPushMessage * @throws Exception */ public void pauseJob(String jobName) throws Exception { JobKey jobKey = JobKey.jobKey(jobName+"_job", jobName+"_group"); try { clusterScheduler.pauseJob(jobKey); } catch (SchedulerException e) { logger.info("暂停定时任务失败"+e); throw new Exception("暂停定时任务失败"); } } /** * 恢复任务 * @param * @param * @param * @throws Exception */ public void resumeJob(String jobName) throws Exception { JobKey jobKey = JobKey.jobKey(jobName+"_job", jobName+"_group"); try { clusterScheduler.resumeJob(jobKey); } catch (SchedulerException e) { logger.info("恢复定时任务失败"+e); throw new Exception("恢复定时任务失败"); } } /** * 删除任务 * @param jobName * @throws Exception */ public void deleteJob(String jobName) throws Exception { JobKey jobKey = JobKey.jobKey(jobName+"_job", jobName+"_group"); try { clusterScheduler.deleteJob(jobKey); } catch (SchedulerException e) { logger.info("删除定时任务失败"+e); throw new Exception("删除定时任务失败"); } } /** * 修改一个触发器的触发规则cron Expression * @param triggerName * @param cron * @return */ public boolean updateTrigger(String triggerName, String cron){ try { CronTrigger oldtrigger = (CronTrigger) clusterScheduler.getTrigger(TriggerKey.triggerKey(triggerName+"_trigger", triggerName+"_trigger_group")); TriggerBuilder<CronTrigger> tb = oldtrigger.getTriggerBuilder(); Trigger newTrigger = tb.withSchedule(CronScheduleBuilder.cronSchedule(cron)).build(); clusterScheduler.rescheduleJob(oldtrigger.getKey(), newTrigger); return true; } catch (Exception e) { logger.info("修改定触发器失败................"); e.printStackTrace(); return false; } } }
**5. 提供一个我实现好的job:服务器
实现一个job很简单,只需实现org.quartz.Job接口便可,org.quartz.Job接口只有一个 execute()方法。这个方法就是定时任务执行时调用的方法,这里就是咱们业务代码的入口。 由于job须要持久化,因此也必须实现Serializable类。因为quartz的bean是由本身管理的没有交友spring ioc管理,因此没法经过相似@Autowired这样的注解注入bean。可是咱们在springContext-quartz.xml中配置了applicationContextSchedulerContextKey,就能够经过execute 方法的JobExecutionContext参数获取到ApplicationContext,那么也就能够同bean name拿到bea了
**oracle
package com.fotic.management.sms.job; import java.io.Serializable; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.SchedulerContext; import org.quartz.SchedulerException; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; import com.fotic.management.sms.service.impl.SmsServiceImpl; /* * @Description: 按期向短信平台推送数据 * @author dgq * @date 2018年4月25日 */ @Service public class PushOverdueDataJob implements Job, Serializable{ private static final long serialVersionUID = -6605766126594260962L; @Override public void execute(JobExecutionContext context) throws JobExecutionException { try { SchedulerContext schedulerContext = context.getScheduler().getContext(); ApplicationContext applicationContext = (ApplicationContext) schedulerContext.get("applicationContext"); SmsServiceImpl ss = (SmsServiceImpl)applicationContext.getBean("smsServiceImpl");//获取spring bean实例 ss.sendSms(); } catch (SchedulerException e) { e.printStackTrace(); } } }
结尾
项目打包集群部署就能够看到同一个job并不会在两个服务器中同时被触发。我这里在本地用
Apache搭建了一个简单的集群,测试没问,这里就不提供Apache集群的搭建代码了。核心代码都在上面了,以此记录。app