前段时间,公司要改造现有的单节点调度为分布式任务调度,而后就研究了目前市面上主流的开源分布式任务调度框架,用起来就一个感受:麻烦!特别是以前在一个类里写了好多个调度任务,改造起来更加麻烦。我这人又比较懒,总感受用了别人写好的工具还要改一大堆,内心就有点不舒服。因而我就想本身写一个框架,毕竟本身以为分布式任务调度在全部分布式系统中是最简单的,由于通常公司任务调度自己不可能同时调度海量的任务,很大的并发,改形成分布式主要仍是为了分散任务到多个节点,以便同一时间处理更多的任务。后面有一天,我在公司前台取快递,看到这样一个现象:咱们好几个同事(包括我)在前台那从头至尾看快递是否是本身的,是本身的就取走,不是就忽略,而后我就收到了启发。这个场景类比到分布式调度系统中,咱们能够认为是快递公司或者快递员已经把每一个快递按照咱们名字电话分好了快递,咱们只须要取走本身的就好了。可是从另一个角度看,也能够理解成咱们每一个人都是从头至尾看了全部快递,而后按照某种约定的规则,若是是本身的快递就拿走,不是本身的就忽略继续看下一个。若是把快递想象成任务,一堆人去拿一堆快递也能够很顺利的拿到各自的快递,那么一堆节点本身去取任务是否是也能够很好的处理各自的任务呢?java
传统的分布式任务调度都有一个调度中心,这个调度中心也都要部署称多节点的集群,避免单点故障,而后还有一堆执行器,执行器负责执行调度中心分发的任务。按照上面的启发,个人思路是放弃中心式的调度中心直接由各个执行器节点去公共的地方按照约定的规则去取任务,而后执行。设计示意图以下node
有人可能怀疑那任务db库不是有单点问题吗,我想反问下,难道其余的分布式任务调度框架没有这个问题吗?针对数据库单点咱们能够单独相似业务库那样考虑高可用方案,这里不是这篇文章的讨论重点。很明显咱们重点放在执行节点那里到底怎么保证高可用,单个任务不会被多个节点同时执行,单个节点执行到一半忽然失联了,这个任务怎么办等复杂的问题。后续咱们使用未经修饰的代码的方式一一解决这个问题(未经修饰主要是没有优化结构流水帐式的代码风格,主要是不少人包括我本身看别人源码时老是感受晕头转向的,仿佛置身迷宫般,看起来特别费劲,多是我本身境界未到吧)git
既然省略了集中式的调度,那么既然叫任务调度很明显必需要有调度的过程,否则多个节点去抢一个任务怎么避免冲突呢?我这里解决方式是:首先先明确一个任务的几种状态:待执行,执行中,有异常,已完成。每一个节点起一个线程一直去查很快就要开始执行的待执行任务,而后遍历这些任务,使用乐观锁的方式先更新这个任务的版本号(版本号+1)和状态(变成执行中),若是更新成功就放入节点本身的延时队列中等待执行。因为每一个节点的线程都是去数据库查待执行的任务,很明显变成执行中的任务下次就不会被其余节点再查询到了,至于对于那些在本节点更新状态以前就查到的待执行任务也会通过乐观锁尝试后更新失败从而跳过这个任务,这样就能够避免一个任务同时被多个节点重复执行。关键代码以下:程序员
package com.rdpaas.task.scheduler; import com.rdpaas.task.common.*; import com.rdpaas.task.config.EasyJobConfig; import com.rdpaas.task.repository.NodeRepository; import com.rdpaas.task.repository.TaskRepository; import com.rdpaas.task.strategy.Strategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Date; import java.util.List; import java.util.concurrent.*; /** * 任务调度器 * @author rongdi * @date 2019-03-13 21:15 */ @Component public class TaskExecutor { private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class); @Autowired private TaskRepository taskRepository; @Autowired private NodeRepository nodeRepository; @Autowired private EasyJobConfig config;/** * 建立任务到期延时队列 */ private DelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>(); /** * 能够明确知道最多只会运行2个线程,直接使用系统自带工具就能够了 */ private ExecutorService bossPool = Executors.newFixedThreadPool(2); /** * 声明工做线程池 */ private ThreadPoolExecutor workerPool; @PostConstruct public void init() { /** * 自定义线程池,初始线程数量corePoolSize,线程池等待队列大小queueSize,当初始线程都有任务,而且等待队列满后 * 线程数量会自动扩充最大线程数maxSize,当新扩充的线程空闲60s后自动回收.自定义线程池是由于Executors那几个线程工具 * 各有各的弊端,不适合生产使用 */ workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize())); /** * 执行待处理任务加载线程 */ bossPool.execute(new Loader()); /** * 执行任务调度线程 */ bossPool.execute(new Boss()); } class Loader implements Runnable { @Override public void run() { for(;;) { try { /** * 查找还有指定时间(单位秒)开始的主任务列表 */ List<Task> tasks = taskRepository.listPeddingTasks(config.getFetchDuration()); if(tasks == null || tasks.isEmpty()) { continue; } for(Task task:tasks) { task.setStatus(TaskStatus.DOING); task.setNodeId(config.getNodeId()); /** * 使用乐观锁尝试更新状态,若是更新成功,其余节点就不会更新成功。若是在查询待执行任务列表 * 和当前这段时间有节点已经更新了这个任务,version必然和查出来时候的version不同了,这里更新 * 必然会返回0了 */ int n = taskRepository.updateWithVersion(task); Date nextStartTime = task.getNextStartTime(); if(n == 0 || nextStartTime == null) { continue; } /** * 封装成延时对象放入延时队列 */ task = taskRepository.get(task.getId()); DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task); taskQueue.offer(delayItem); } Thread.sleep(config.getFetchPeriod()); } catch(Exception e) { logger.error("fetch task list failed,cause by:{}", e); } } } } class Boss implements Runnable { @Override public void run() { for (;;) { try { /** * 时间到了就能够从延时队列拿出任务对象,而后交给worker线程池去执行 */ DelayItem<Task> item = taskQueue.take(); if(item != null && item.getItem() != null) { Task task = item.getItem(); workerPool.execute(new Worker(task)); } } catch (Exception e) { logger.error("fetch task failed,cause by:{}", e); } } } } class Worker implements Runnable { private Task task; public Worker(Task task) { this.task = task; } @Override public void run() { logger.info("Begin to execute task:{}",task.getId()); TaskDetail detail = null; try { //开始任务 detail = taskRepository.start(task); if(detail == null) return; //执行任务 task.getInvokor().invoke(); //完成任务 finish(task,detail); logger.info("finished execute task:{}",task.getId()); } catch (Exception e) { logger.error("execute task:{} error,cause by:{}",task.getId(), e); try { taskRepository.fail(task,detail,e.getCause().getMessage()); } catch(Exception e1) { logger.error("fail task:{} error,cause by:{}",task.getId(), e); } } } } /** * 完成子任务,若是父任务失败了,子任务不会执行 * @param task * @param detail * @throws Exception */ private void finish(Task task,TaskDetail detail) throws Exception { //查看是否有子类任务 List<Task> childTasks = taskRepository.getChilds(task.getId()); if(childTasks == null || childTasks.isEmpty()) { //当没有子任务时完成父任务 taskRepository.finish(task,detail); return; } else { for (Task childTask : childTasks) { //开始任务 TaskDetail childDetail = null; try { //将子任务状态改为执行中 childTask.setStatus(TaskStatus.DOING); childTask.setNodeId(config.getNodeId()); //开始子任务 childDetail = taskRepository.startChild(childTask,detail); //使用乐观锁更新下状态,否则这里可能和恢复线程产生并发问题 int n = taskRepository.updateWithVersion(childTask); if (n > 0) { //再从数据库取一下,避免上面update修改后version不一样步 childTask = taskRepository.get(childTask.getId()); //执行子任务 childTask.getInvokor().invoke(); //完成子任务 finish(childTask, childDetail); } } catch (Exception e) { logger.error("execute child task error,cause by:{}", e); try { taskRepository.fail(childTask, childDetail, e.getCause().getMessage()); } catch (Exception e1) { logger.error("fail child task error,cause by:{}", e); } } } /** * 当有子任务时完成子任务后再完成父任务 */ taskRepository.finish(task,detail); } } }
如上所述,能够保证一个任务同一个时间只会被一个节点调度执行。这时候若是部署多个节点,正常应该能够很顺利的将任务库中的任务都执行到,就像一堆人去前台取快递同样,能够很顺利的拿走全部快递。毕竟对于每一个快递不是本身的就是其余人的,本身的快递也不会是其余人的。可是这里的调度和取快递有一点不同,取快递的每一个人都知道怎么去区分到底哪一个快递是本身的。这里的调度彻底没这个概念,彻底是哪一个节点运气好使用乐观锁更新了这个任务状态就是哪一个节点的。总的来讲区别就是须要一个约定的规则,快递是否是本身的,直接看快递上的名字和手机号码就知道了。任务到底该不应本身执行咱们也能够出一个这种规则,明确哪些任务那些应该是哪些节点能够执行,从而避免无谓的锁竞争。这里能够借鉴负载均衡的那些策略,目前我想实现以下规则:github
1) id_hash : 按照任务自增id的对节点个数取余,余数值和当前节点的实时序号匹配,能够匹配就能够拿走执行,不然请自觉忽略掉这个任务算法
2) least_count:最少执行任务的节点优先去取任务spring
3) weight:按照节点权重去取任务数据库
4) default: 默认先到先得,没有其它规则springboot
根据上面规则也能够说是任务的负载均衡策略能够知道除了默认规则,其他规则都须要知道全局的节点信息,好比节点执行次数,节点序号,节点权重等,因此咱们须要给节点添加一个心跳,隔一个心跳周期上报一下本身的信息到数据库,心跳核心代码以下:并发
/** * 建立节点心跳延时队列 */ private DelayQueue<DelayItem<Node>> heartBeatQueue = new DelayQueue<>(); /** * 能够明确知道最多只会运行2个线程,直接使用系统自带工具 */ private ExecutorService bossPool = Executors.newFixedThreadPool(2);
@PostConstruct public void init() { /** * 若是恢复线程开关是开着,而且心跳开关也是开着 */ if(config.isRecoverEnable() && config.isHeartBeatEnable()) { /** * 初始化一个节点到心跳队列,延时为0,用来注册节点 */ heartBeatQueue.offer(new DelayItem<>(0,new Node(config.getNodeId()))); /** * 执行心跳线程 */ bossPool.execute(new HeartBeat()); /** * 执行异常恢复线程 */ bossPool.execute(new Recover()); } }
class HeartBeat implements Runnable { @Override public void run() { for(;;) { try { /** * 时间到了就能够从延时队列拿出节点对象,而后更新时间和序号, * 最后再新建一个超时时间为心跳时间的节点对象放入延时队列,造成循环的心跳 */ DelayItem<Node> item = heartBeatQueue.take(); if(item != null && item.getItem() != null) { Node node = item.getItem(); handHeartBeat(node); } heartBeatQueue.offer(new DelayItem<>(config.getHeartBeatSeconds() * 1000,new Node(config.getNodeId()))); } catch (Exception e) { logger.error("task heart beat error,cause by:{} ",e); } } } } /** * 处理节点心跳 * @param node */ private void handHeartBeat(Node node) { if(node == null) { return; } /** * 先看看数据库是否存在这个节点 * 若是不存在:先查找下一个序号,而后设置到node对象中,最后插入 * 若是存在:直接根据nodeId更新当前节点的序号和时间 */ Node currNode= nodeRepository.getByNodeId(node.getNodeId()); if(currNode == null) { node.setRownum(nodeRepository.getNextRownum()); nodeRepository.insert(node); } else { nodeRepository.updateHeartBeat(node.getNodeId()); } }
数据库有了节点信息后,咱们就能够实现各类花式的取任务的策略了,代码以下:
/** * 抽象的策略接口 * @author rongdi * @date 2019-03-16 12:36 */ public interface Strategy { /** * 默认策略 */ String DEFAULT = "default"; /** * 按任务ID hash取余再和本身节点序号匹配 */ String ID_HASH = "id_hash"; /** * 最少执行次数 */ String LEAST_COUNT = "least_count"; /** * 按节点权重 */ String WEIGHT = "weight"; public static Strategy choose(String key) { switch(key) { case ID_HASH: return new IdHashStrategy(); case LEAST_COUNT: return new LeastCountStrategy(); case WEIGHT: return new WeightStrategy(); default: return new DefaultStrategy(); } } public boolean accept(List<Node> nodes,Task task,Long myNodeId); }
/** * 按照任务ID hash方式针对有效节点个数取余,而后余数+1后和各个节点的顺序号匹配, * 这种方式效果其实等同于轮询,由于任务id是自增的 * @author rongdi * @date 2019-03-16 */ public class IdHashStrategy implements Strategy { /** * 这里的nodes集合必然不会为空,外面调度那判断了,并且是按照nodeId的升序排列的 */ @Override public boolean accept(List<Node> nodes, Task task, Long myNodeId) { int size = nodes.size(); long taskId = task.getId(); /** * 找到本身的节点 */ Node myNode = nodes.stream().filter(node -> node.getNodeId() == myNodeId).findFirst().get(); return myNode == null ? false : (taskId % size) + 1 == myNode.getRownum(); } }
/** * 最少处理任务次数策略,也就是每次任务来了,看看本身是否是处理任务次数最少的,是就能够消费这个任务 * @author rongdi * @date 2019-03-16 21:56 */ public class LeastCountStrategy implements Strategy { @Override public boolean accept(List<Node> nodes, Task task, Long myNodeId) { /** * 获取次数最少的那个节点,这里能够类比成先按counts升序排列而后取第一个元素 * 而后是本身就返回true */ Optional<Node> min = nodes.stream().min((o1, o2) -> o1.getCounts().compareTo(o2.getCounts())); return min.isPresent()? min.get().getNodeId() == myNodeId : false; } }
/** * 按权重的分配策略,方案以下,假如 * 节点序号 1 ,2 ,3 ,4 * 节点权重 2 ,3 ,3 ,2 * 则取余后 0,1 | 2,3,4 | 5,6,7 | 8,9 * 序号1能够消费按照权重的和取余后小于2的 * 序号2能够消费按照权重的和取余后大于等于2小于2+3的 * 序号3能够消费按照权重的和取余后大于等于2+3小于2+3+3的 * 序号3能够消费按照权重的和取余后大于等于2+3+3小于2+3+3+2的 * 总结:本节点能够消费的按照权重的和取余后大于等于前面节点的权重和小于包括本身的权重和的这个范围 * 不知道有没有大神有更好的算法思路 * @author rongdi * @date 2019-03-16 23:16 */ public class WeightStrategy implements Strategy { @Override public boolean accept(List<Node> nodes, Task task, Long myNodeId) { Node myNode = nodes.stream().filter(node -> node.getNodeId() == myNodeId).findFirst().get(); if(myNode == null) { return false; } /** * 计算本节点序号前面的节点的权重和 */ int preWeightSum = nodes.stream().filter(node -> node.getRownum() < myNode.getRownum()).collect(Collectors.summingInt(Node::getWeight)); /** * 计算所有权重的和 */ int weightSum = nodes.stream().collect(Collectors.summingInt(Node::getWeight)); /** * 计算对权重和取余的余数 */ int remainder = (int)(task.getId() % weightSum); return remainder >= preWeightSum && remainder < preWeightSum + myNode.getWeight(); } }
而后咱们再改造下调度类
/** * 获取任务的策略 */ private Strategy strategy; @PostConstruct public void init() { /** * 根据配置选择一个节点获取任务的策略 */ strategy = Strategy.choose(config.getNodeStrategy()); /** * 自定义线程池,初始线程数量corePoolSize,线程池等待队列大小queueSize,当初始线程都有任务,而且等待队列满后 * 线程数量会自动扩充最大线程数maxSize,当新扩充的线程空闲60s后自动回收.自定义线程池是由于Executors那几个线程工具 * 各有各的弊端,不适合生产使用 */ workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize())); /** * 执行待处理任务加载线程 */ bossPool.execute(new Loader()); /** * 执行任务调度线程 */ bossPool.execute(new Boss()); } class Loader implements Runnable { @Override public void run() { for(;;) { try { /** * 先获取可用的节点列表 */ List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2); if(nodes == null || nodes.isEmpty()) { continue; } /** * 查找还有指定时间(单位秒)开始的主任务列表 */ List<Task> tasks = taskRepository.listPeddingTasks(config.getFetchDuration()); if(tasks == null || tasks.isEmpty()) { continue; } for(Task task:tasks) { boolean accept = strategy.accept(nodes, task, config.getNodeId()); /** * 不应本身拿就不要抢 */ if(!accept) { continue; } task.setStatus(TaskStatus.DOING); task.setNodeId(config.getNodeId()); /** * 使用乐观锁尝试更新状态,若是更新成功,其余节点就不会更新成功。若是在查询待执行任务列表 * 和当前这段时间有节点已经更新了这个任务,version必然和查出来时候的version不同了,这里更新 * 必然会返回0了 */ int n = taskRepository.updateWithVersion(task); Date nextStartTime = task.getNextStartTime(); if(n == 0 || nextStartTime == null) { continue; } /** * 封装成延时对象放入延时队列 */ task = taskRepository.get(task.getId()); DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task); taskQueue.offer(delayItem); } Thread.sleep(config.getFetchPeriod()); } catch(Exception e) { logger.error("fetch task list failed,cause by:{}", e); } } } }
如上能够经过各类花式的负载策略来平衡各个节点获取的任务,同时也能够显著下降各个节点对同一个任务的竞争。可是还有个问题,假如某个节点拿到了任务更新成了执行中,执行到一半,没执行完也没发生异常,忽然这个节点因为各类缘由挂了,那么这时候这个任务永远没有机会再执行了。这就是传说中的占着茅坑不拉屎。解决这个问题能够用最终一致系统常见的方法,异常恢复线程。在这种场景下只须要检测一下指定心跳超时时间(好比默认3个心跳周期)下没有更新心跳时间的节点所属的未完成任务,将这些任务状态从新恢复成待执行,而且下次执行时间改为当前就能够了。核心代码以下:
class Recover implements Runnable { @Override public void run() { for (;;) { try { /** * 查找须要恢复的任务,这里界定须要恢复的任务是任务还没完成,而且所属执行节点超过3个 * 心跳周期没有更新心跳时间。因为这些任务因为当时执行节点没有来得及执行完就挂了,因此 * 只须要把状态再改回待执行,而且下次执行时间改为当前时间,让任务再次被调度一次 */ List<Task> tasks = taskRepository.listRecoverTasks(config.getHeartBeatSeconds() * 3); if(tasks == null || tasks.isEmpty()) { return; } /** * 先获取可用的节点列表 */ List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2); if(nodes == null || nodes.isEmpty()) { return; } long maxNodeId = nodes.get(nodes.size() - 1).getNodeId(); for (Task task : tasks) { /** * 每一个节点有一个恢复线程,为了不没必要要的竞争,从可用节点找到一个最靠近任务所属节点的节点 */ long currNodeId = chooseNodeId(nodes,maxNodeId,task.getNodeId()); long myNodeId = config.getNodeId(); /** * 若是不应当前节点处理直接跳过 */ if(currNodeId != myNodeId) { continue; } /** * 直接将任务状态改为待执行,而且节点改为当前节点 */ task.setStatus(TaskStatus.PENDING); task.setNextStartTime(new Date()); task.setNodeId(config.getNodeId()); taskRepository.updateWithVersion(task); } Thread.sleep(config.getRecoverSeconds() * 1000); } catch (Exception e) { logger.error("Get next task failed,cause by:{}", e); } } } } /** * 选择下一个节点 * @param nodes * @param maxNodeId * @param nodeId * @return */ private long chooseNodeId(List<Node> nodes,long maxNodeId,long nodeId) { if(nodes.size() == 0 || nodeId >= maxNodeId) { return nodes.get(0).getNodeId(); } return nodes.stream().filter(node -> node.getNodeId() > nodeId).findFirst().get().getNodeId(); }
如上为了不每一个节点的异常恢复线程对同一个任务作无谓的竞争,每一个异常任务只能被任务所属节点ID的下一个正常节点去恢复。这样处理后就能确保就算出现了上面那种任务没执行完节点挂了的状况,一段时间后也能够自动恢复。总的来讲上面那些不考虑优化应该能够作为一个还不错的任务调度框架了。若是大家觉得这样就完了,我只能说抱歉了,还有,哈哈!前面提到我是嫌弃其它任务调度用起来麻烦,特别是习惯用spring的注解写调度的,那些极可能一个类里写了n个带有@Scheduled注解的调度方法,这样改造起来更加麻烦,我是但愿作到以下方式就能够直接整合到分布式任务调度里:
/** * 测试调度功能 * @author rongdi * @date 2019-03-17 16:54 */ @Component public class SchedulerTest { @Scheduled(cron = "0/10 * * * * ?") public void test1() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println("当前时间1:"+sdf.format(new Date())); } @Scheduled(cron = "0/20 * * * * ?",parent = "test1") public void test2() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println("当前时间2:"+sdf.format(new Date())); } @Scheduled(cron = "0/10 * * * * ?",parent = "test2") public void test3() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println("当前时间3:"+sdf.format(new Date())); } @Scheduled(cron = "0/10 * * * * ?",parent = "test3") public void test4() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println("当前时间4:"+sdf.format(new Date())); } }
为了达到上述目标,咱们还须要在spring启动后加载自定义的注解(名称和spring的同样),代码以下
/** * spring容器启动完后,加载自定义注解 * @author rongdi * @date 2019-03-15 21:07 */ @Component public class ContextRefreshedListener implements ApplicationListener<ContextRefreshedEvent> { @Autowired private TaskExecutor taskExecutor; /** * 用来保存方法名/任务名和任务插入后数据库的ID的映射,用来处理子任务新增用 */ private Map<String,Long> taskIdMap = new HashMap<>(); @Override public void onApplicationEvent(ContextRefreshedEvent event) { /** * 判断根容器为Spring容器,防止出现调用两次的状况(mvc加载也会触发一次) */ if(event.getApplicationContext().getParent()==null){ /** * 判断调度开关是否打开 * 若是打开了:加载调度注解并将调度添加到调度管理中 */ ApplicationContext context = event.getApplicationContext(); Map<String,Object> beans = context.getBeansWithAnnotation(org.springframework.scheduling.annotation.EnableScheduling.class); if(beans == null) { return; } /** * 用来存放被调度注解修饰的方法名和Method的映射 */ Map<String,Method> methodMap = new HashMap<>(); /** * 查找全部直接或者间接被Component注解修饰的类,由于无论Service,Controller等都包含了Component,也就是 * 只要是被归入了spring容器管理的类必然直接或者间接的被Component修饰 */ Map<String,Object> allBeans = context.getBeansWithAnnotation(org.springframework.stereotype.Component.class); Set<Map.Entry<String,Object>> entrys = allBeans.entrySet(); /** * 遍历bean和里面的method找到被Scheduled注解修饰的方法,而后将任务放入任务调度里 */ for(Map.Entry entry:entrys){ Object obj = entry.getValue(); Class clazz = obj.getClass(); Method[] methods = clazz.getMethods(); for(Method m:methods) { if(m.isAnnotationPresent(Scheduled.class)) { methodMap.put(clazz.getName() + Delimiters.DOT + m.getName(),m); } } } /** * 处理Sheduled注解 */ handleSheduledAnn(methodMap); /** * 因为taskIdMap只是启动spring完成后使用一次,这里能够直接清空 */ taskIdMap.clear(); } } /** * 循环处理方法map中的全部Method * @param methodMap */ private void handleSheduledAnn(Map<String,Method> methodMap) { if(methodMap == null || methodMap.isEmpty()) { return; } Set<Map.Entry<String,Method>> entrys = methodMap.entrySet(); /** * 遍历bean和里面的method找到被Scheduled注解修饰的方法,而后将任务放入任务调度里 */ for(Map.Entry<String,Method> entry:entrys){ Method m = entry.getValue(); try { handleSheduledAnn(methodMap,m); } catch (Exception e) { e.printStackTrace(); continue; } } } /** * 递归添加父子任务 * @param methodMap * @param m * @throws Exception */ private void handleSheduledAnn(Map<String,Method> methodMap,Method m) throws Exception { Class<?> clazz = m.getDeclaringClass(); String name = m.getName(); Scheduled sAnn = m.getAnnotation(Scheduled.class); String cron = sAnn.cron(); String parent = sAnn.parent(); /** * 若是parent为空,说明该方法表明的任务是根任务,则添加到任务调度器中,而且保存在全局map中 * 若是parent不为空,则表示是子任务,子任务须要知道父任务的id * 先根据parent里面表明的方法全名或者方法名(父任务方法和子任务方法在同一个类直接能够用方法名, * 否则要带上类的全名)从taskIdMap获取父任务ID * 若是找不到父任务ID,先根据父方法全名在methodMap找到父任务的method对象,调用本方法递归下去 * 若是找到父任务ID,则添加子任务 */ if(StringUtils.isEmpty(parent)) { if(!taskIdMap.containsKey(clazz.getName() + Delimiters.DOT + name)) { Long taskId = taskExecutor.addTask(name, cron, new Invocation(clazz, name, new Class[]{}, new Object[]{})); taskIdMap.put(clazz.getName() + Delimiters.DOT + name, taskId); } } else { String parentMethodName = parent.lastIndexOf(Delimiters.DOT) == -1 ? clazz.getName() + Delimiters.DOT + parent : parent; Long parentTaskId = taskIdMap.get(parentMethodName); if(parentTaskId == null) { Method parentMethod = methodMap.get(parentMethodName); handleSheduledAnn(methodMap,parentMethod); /** * 递归回来必定要更新一下这个父任务ID */ parentTaskId = taskIdMap.get(parentMethodName); } if(parentTaskId != null && !taskIdMap.containsKey(clazz.getName() + Delimiters.DOT + name)) { Long taskId = taskExecutor.addChildTask(parentTaskId, name, cron, new Invocation(clazz, name, new Class[]{}, new Object[]{})); taskIdMap.put(clazz.getName() + Delimiters.DOT + name, taskId); } } } }
上述代码就完成了spring初始化完成后加载了本身的自定义任务调度的注解,而且也受spring的调度开关@EnableScheduling的控制,实现无缝整合到spring或者springboot中去,达到了我这种的懒人的要求。
好了其实写这个框架差很少就用了5天业余时间,估计会有一些隐藏的坑,不过明显的坑我本身都解决了,开源出来的目的既是为了抛砖引玉,也为了广大屌丝程序员提供一种新的思路,但愿对你们有所帮助,同时也但愿你们多帮忙找找bug,一块儿来完善这个东西,大神们请忽略。文笔很差,主要是很久没写做文了,请你们多多担待。详细的流水帐式的源码加上长篇大论式的汉语注释尽情查看:https://github.com/rongdi/easy-job