微服务的概念能够说给程序设计打开了一个新世界,带来了众多的优势,可是也将一些以往容易处理的问题变得复杂,例如:缓存、事务、定时任务等。缓存能够用中间件例如redis、memcached等,事务有诸多分布式事务框架解决,定时任务也有分布式的解决方案,例如quartz、elastic job等,今天我要讲的是就是定时任务。java
既然已经有成熟的分布式定时任务框架,我要讲的东西并非用另外一种设计去实现相同的功能,而是从不一样的角度去解决分布式定时任务的问题。web
这个问题来起源于一个小功能,咱们有一个发送短信的微服务,须要获取短信的状态报告,状态报告对于短信发送不是同步的,短信提交到服务商,服务商要提交运营商发送以后才能生成状态报告,所以有必定的延迟,须要异步获取,而且服务商提供的接口有频率限制,所以须要作一个定时任务,且须要单点执行,那么问题来了,由于这一个功能我就须要引入一个定时任务框架吗,总感受有点大材小用的意思。redis
以前咱们的定时任务处理既有用过quartz,也用过elastic job,可是只为这样一个小功能就引入一个框架,再加上配置又得好半天,想一想都不划算。spring
例如要用quartz,要建立一堆数据库表,但表里面只存储了一个任务信息。数据库
用elastic job吧,还要使用zookeeper,即使用lite版,也须要一堆配置,远比我写业务的时间要长。缓存
我只想简简单单的写逻辑!!!websocket
谈分布式解决方案大体总离不开中间件,联想到上次解决websocket的分布式方案(参见Spring Cloud 微服务架构下的 WebSocket 解决方案)使用到的Spring Cloud Stream,大概有了思路:架构
根据上一步的方案,须要确认一些细节,以及一些特殊的状况,例如定时任务多是由微服务集群中单个实例执行,也可能存在集体执行(例如更新内存中的缓存),还可能存在分区执行。app
客户端(须要定时任务的为服务端)须要创建如下消息队列:框架
客户端与服务端须要经过惟一的任务id来确认须要执行的定时任务
服务端(任务分发微服务)须要根据状况将消息推送到不一样的队列,不能直接使用Spring Cloud Stream,须要使用rabbitmq
服务端自己也是分布式的,所以须要一个定时任务框架用于任务触发,我这里选择了quartz
Spring Cloud Stream的基本知识我再也不复述了,Spring Cloud 微服务架构下的 WebSocket 解决方案中有讲解。
data class ScheduleTask(
/** 任务的id,全局惟一,与客户端的taskId彻底匹配 */
var taskId: String = "",
/** 定时任务的cron 表达式 */
var cron: String = "",
/** 关联应用 */
var appId: Int = 0,
/** 任务描述 */
var description: String = "",
/** 接收任务的分区 */
var zone: String? = null,
/** 调度方式,广播到集群或单例执行,默认单例 */
var dispatchMode: DispatchMode = DispatchMode.Singleton,
/** 是否启用 */
var enabled: Boolean = true,
/** 任务的数据库记录 id,自增 */
var id: Int = -1)
复制代码
使用quartz进行任务调度
private fun scheduleJob(task: ScheduleTask) {
val job = JobBuilder.newJob(TaskEmitterJob::class.java)
.withIdentity(task.taskId, task.appId.toString())
.withDescription(task.description)
.storeDurably()
.requestRecovery()
.usingJobData("id", task.id)
.usingJobData("taskId", task.taskId)
.build()
val trigger = TriggerBuilder.newTrigger()
.withIdentity(task.taskId, task.appId.toString())
.withSchedule(CronScheduleBuilder.cronSchedule(task.cron))
.forJob(job)
.build()
scheduler.addJob(job, true, true)
if (scheduler.checkExists(trigger.key)) {
scheduler.rescheduleJob(trigger.key, trigger)
} else {
scheduler.scheduleJob(trigger)
}
}
复制代码
ScheduleTask是持久化的,插入的时候同时向quartz插入任务,更新的时候也要向quartz更新,删除的时候同时删除
class TaskEmitterJob : Job {
companion object {
private val log = LogFactory.getLog(TaskEmitterJob::class.java)
}
override fun execute(context: JobExecutionContext) {
try {
val taskId = context.jobDetail.jobDataMap["taskId"] as String
log.info("任务分发:$taskId")
val service = ScheduleCenterApplication.context.getBean(ScheduleTaskService::class.java)
service.launch(taskId)
} catch (e: Exception) {
log.error("任务失败$[taskId]", e)
}
}
}
复制代码
/** * 发布定时任务事件 */
fun launch(task: ScheduleTask) {
val exchange = when (task.dispatchMode) {
Cluster -> "aegisScheduleCluster"
Singleton -> "aegisScheduleSingleton"
}
val routingKey = when (task.dispatchMode) {
Cluster -> exchange
Singleton -> "$exchange.${task.appName}"
}
val executeTaskInfo = ScheduleTaskInfo(task.taskId, task.appName!!)
amqpTemplate.convertAndSend(exchange, routingKey,
executeTaskInfo)
taskExecuteRecordDAO.save(
TaskExecuteRecord(executeTaskInfo.uid, task.id, Date())
)
}
复制代码
@FunctionalInterface
interface ScheduledJob {
/** * 执行定时任务 */
fun execute(properties: Map<String, Any>)
/** * 获取定时任务id * @return 定时任务id,对应任务分发中心ScheduleTask的taskId */
fun getId(): String
}
复制代码
/**
* 接收单例任务
*/
@StreamListener(SINGLETON_INPUT)
fun acceptGroupTask(taskInfo: ScheduleTaskInfo) {
if (taskInfo.app == application) {
val receivedTime = Date()
val job = jobsProvider.ifAvailable?.firstOrNull {
it.getId() == taskInfo.id
}
job?.execute(taskInfo.properties ?: mapOf())
singletonOutput.send(GenericMessage(
ConfirmInfo(taskInfo.id, taskInfo.uid, job != null, receivedTime, Date())
))
}
}
复制代码
集群全体执行任务与单例任务的区别只在stream的配置,一个须要声明binding的group,一个不须要,这属于Spring Cloud Stream的知识范畴,能够本身看官方文档或查看我前面提到的文档,若是有不懂的能够私聊我。
/** * 定时任务信息的事件流接口 * @author 吴昊 * @since 0.1.0 */
interface AegisScheduleClient {
companion object {
const val CLUSTER_INPUT = "aegisScheduleClusterInput"
const val SINGLETON_INPUT = "aegisScheduleSingletonInput"
const val CONFIRM_OUTPUT = "aegisScheduleGroupOutput"
}
/** * * @return */
@Input(CLUSTER_INPUT)
fun scheduleInput(): SubscribableChannel
/** * * @return */
@Input(SINGLETON_INPUT)
fun singletonScheduleInput(): SubscribableChannel
/** * * @return */
@Output(CONFIRM_OUTPUT)
fun confirmOutput(): MessageChannel
}
复制代码
最后再加上服务端确认消息的接收代码:
@StreamListener(CONFIRM_INPUT)
fun acceptGroupTask(confirmInfo: ConfirmInfo) {
LOG.info("接收到确认消息:$confirmInfo")
scheduleTaskService.confirm(confirmInfo)
}
复制代码
主要的代码已经所有放上来了,总体思路也很简单,后面仍有不少须要优化的地方,例如消息推送失败,或者确认消息未送达等等,于总体设计并无多大的影响了。
这样在微服务端若是须要添加定时任务,只须要
至于在任务中心添加任务,主题代码有了,实现个简单管理界面很容易对不对,也就几个字段的输入。
最后附上管理界面的截图:
任务列表
任务详情
个人其余文章: