系统中有各类定时任务,须要知足如下要求:html
Java API中的Timer提供了多种定时实现,对照需求:java
ScheduledExecutorService与Timer相似,对照需求:git
spring提供了开箱即用的轻量版定时任务schedule,以注解的形式使用,支持cron表达式,用起来可谓十分方便顺手
但针对这里的需求:github
Quartz的强大已经深得人心,使用Quartz实现这里的需求也是绰绰有余,针对第6条分布式集群Quartz也提出了解决方案redis
但在Quartz官方文档中出现了如下两段提示spring
Never run clustering on separate machines, unless their clocks are synchronized using some form of time-sync service (daemon) that runs very regularly (the clocks must be within a second of each other). See http://www.boulder.nist.gov/t... if you are unfamiliar with how to do this.
Never start (scheduler.start()) a non-clustered instance against the same set of database tables that any other instance is running (start()ed) against. You may get serious data corruption, and will definitely experience erratic behavior.
其中须要关注的一点是,各节点机器之间服务器时间偏差不能超过1秒。虽然服务器时间同步实现起来并不困难,但针对贵公司云主机环境近1年来的表现,只能说不信任,Quartz只能作保底方案。数据库
jesque是resque的java版实现,jesque的定位主要是延迟任务及简单的定时任务,不支持cron表达式,对照需求:springboot
综上服务器
需求编号 | Timer | ScheduledExecutorService | Spring Schedule | Quartz | jesque |
---|---|---|---|---|---|
1 | √ | √ | x | √ | √ |
2 | x | x | x | √ | x |
3 | x | x | √ | √ | x |
4 | √ | √ | √ | √ | √ |
5 | √ | √ | ? | √ | √ |
6 | x | x | x | √ | √ |
尽管Quartz秒杀当前需求,但鉴于贵公司云主机环境的表现,同时考虑到没有SQL数据库环境,故从新撸一套,奉上源码 https://github.com/manerfan/m...app
基本思路为,将任务数据存放到redis,定时获取redis中的任务数据,利用反射建立任务实例并执行。这里分别用到了redis中的有序集合及哈希表
建立/更新任务流程:
获取/执行任务流程:
@JsonInclude(JsonInclude.Include.NON_EMPTY) class JobEntity( var uuid: String = UUID.randomUUID().toString(), var name: String? = null, var className: String, var args: Array<Any?>? = null, @JsonInclude(JsonInclude.Include.NON_EMPTY) var vars: Map<String, Any?>? = null, @JsonDeserialize(using = DateTimeDeserializer::class) @JsonSerialize(using = DateTimeSerializer::class) var startedAt: DateTime? = null, @JsonDeserialize(using = DateTimeDeserializer::class) @JsonSerialize(using = DateTimeSerializer::class) var endedAt: DateTime? = null, cron: String? = null, // 优先使用此参数 var fixedRate: Long? = null ) { /** * cron表达式解析器 */ @JsonIgnore private var cronGenerator: CronSequenceGenerator? = cron?.let { CronSequenceGenerator(it, TimeZone.getTimeZone("Asia/Shanghai")) } /** * cron表达式 */ var cron = cron set(cron) { field = cron cronGenerator = cron?.let { CronSequenceGenerator(it, TimeZone.getTimeZone("Asia/Shanghai")) } } /** * 记录下次须要执行的时间 */ var nextScheduledAt: Long = -1 private set /** * 计算并更新下次执行时间 * 若指定endedAt且下次执行时间晚于endedAt,则说明任务已结束,并返回false * * @return 是否须要更新 | 是否已失效 */ fun updateNextScheduledAt(timestamp: Long = System.currentTimeMillis()): Boolean { val limit = startedAt?.let { max(it.millis, timestamp) } ?: timestamp nextScheduledAt = when { null != cronGenerator -> cronGenerator!!.next(Date(limit)).time null != fixedRate -> limit + fixedRate!! else -> nextScheduledAt } return endedAt?.let { it.millis > nextScheduledAt } ?: true } }
其中
className
真正任务实例的类路径args
对应className
类构造函数参数vars
对应className
类中的属性-值
startedAt
任务的开始时间,不指定则当即开始endedAt
任务的结束时间,不指定则永久执行
cron
任务执行cron表达式fixedRate
以该速率(毫秒)循环执行(若指定了cron,则该参数失效)
updateNextScheduledAt
函数用于计算任务下次执行时间,若下次执行时间晚于endedAt则说明任务已结束
/** * 添加任务Job * * 计算并更新job.nextScheduledAt * 若指定endedAt且nextScheduledAt晚于endedAt,则说明任务已结束,直接返回 * 反之,将更新后的job存入redis * * @param job 任务 * * @return job */ fun add(job: JobEntity): JobEntity { if (!job.updateNextScheduledAt(now)) { logger.warn("Job is Arrived! {}", job.toString()) // Update to DB return job } val connection = jobTemplate.connectionFactory.connection try { connection.multi() connection.hSet(hKey, job.uuid.toByteArray(), objectMapper.writeValueAsBytes(job)) connection.zAdd(zKey, job.nextScheduledAt.toDouble(), job.uuid.toByteArray()) connection.exec() } finally { connection.close() } return job } /** * 更新任务 * * 1. 删除任务 * 2. 计算并更新job.nextScheduledAt * 若指定endedAt且nextScheduledAt晚于endedAt,则说明任务已结束,直接返回 * 反之,将更新后的job存入redis * * @param job 任务 * * @return job */ fun update(job: JobEntity): JobEntity { delete(job.uuid) return add(job) } /** * 删除任务 * * 1. 从Hash中删除 * 2. 从SortedSet中删除 * * @param uuid 任务uuid */ fun delete(uuid: String) { val connection = jobTemplate.connectionFactory.connection try { connection.multi() connection.hDel(hKey, uuid.toByteArray()) connection.zRem(zKey, uuid.toByteArray()) connection.exec() } finally { connection.close() } }
其中
add
计算任务下次执行时间,若晚于结束时间则直接放回,反之更新到redis中update
计算任务下次执行时间,若晚于结束时间则从redis中删除,反之更新到redis中delete
将任务从redis中删除
/** * 任务Job执行器 * * 每隔1秒运行一次 * 1. 从SortedSet中将score在(0,now)之间的uuid取出 * 2. 从Hash中将uuid对应的job取出 * 3. 解析job,计算job的nextScheduledAt,并将job回写到redis中 * 4. 执行job */ @Scheduled(fixedRate = 1000) // 不使用cron是为了使集群中各节点执行时间随机分散开 fun schedule() { /** * SortedSet(有序集合)中,member为job.uuid,score为job.nextScheduledAt * 将score在 (0, now) 之间的uuid取出 * 其对应的便是如今须要执行的job */ var connection = jobTemplate.connectionFactory.connection var keys: Set<ByteArray>? try { val now = System.currentTimeMillis().toDouble() connection.multi() connection.zRangeByScore(zKey, 0.0, now) // 将score在(0,now)之间的uuid取出 connection.zRemRangeByScore(zKey, 0.0, now) // 同时从redis中删除 keys = connection.exec()[0] as? Set<ByteArray> } finally { connection.close() } if (ObjectUtils.isEmpty(keys)) { return } /** * Hash(哈希表)中,field为job.uuid,value为job * 经过uuid将对应job取出 */ connection = jobTemplate.connectionFactory.connection var values: List<ByteArray>? try { connection.multi() connection.hMGet(hKey, *keys!!.toTypedArray()) // 将uuid对应的job取出 connection.hDel(hKey, *keys.toTypedArray()) // 同时从redis中删除 values = connection.exec()[0] as? List<ByteArray> } finally { connection.close() } if (ObjectUtils.isEmpty(values)) { return } // 解析jobs并回写到redis中 val jobs = values!!.map { try { // 计算job的nextScheduledAt,并将其回写到redis中 add(objectMapper.readValue(it, JobEntity::class.java)) } catch (e: Exception) { logger.warn("JSON Parse Error {} {}", it.toString(), e.message) null } } // 执行jobs jobs.filterNotNull().forEach { var job = ReflectionUtils.createObject(Class.forName(it.className), it.args, it.vars) when (job) { is Runnable -> executorService.submit(job) else -> logger.warn("Job Must Implement Runnable {}", job) } } }
这里使用spring schedule,每1秒执行一次
首先,从有序集合中获取score小于当前时间的任务ID,并删除
其次,根据任务ID从哈希表中取出任务实体Job,并删除
以后,利用反射,根据Job中的className
args
vargs
建立任务实例,并放入线程池执行任务
最后,计算更新任务Job下次执行时间,若任务未过时,则将其更新到redis中,等待下次执行
示例代码见 https://github.com/manerfan/m...