以前写过一篇:本身动手实现分布式任务调度框架原本是用来闲来分享一下本身的思惟方式,时至今日发现竟然有些人正在使用了,本着对代码负责任的态度,对代码部分已知bug进行了修改,并增长了若干功能,如当即启动,实时中止等功能,新增长的功能会在这一篇作详细的说明。html
提到分布式任务调度,市面上自己已经有一些框架工具可使用,可是我的以为功能作的都太丰富,架构都过于复杂,因此才有了我重复造轮子。我的喜欢把复杂的问题简单化,利用有限的资源实现竟可能多的功能。由于有几个朋友问部署方式,这里再次强调下:个人这个服务能够直接打成jar放在本身本地仓库,而后依赖进去,或者直接copy代码过去,当成本身项目的一部分就能够了。也就是说跟随大家本身的项目启动,因此我这里也没有写界面。下面先谈谈怎么基于上次的代码实现任务当即启动吧!前端
调度和本身服务整合后部署图抽象成以下:java
用户在前端点击当即请求按钮,经过各类负载均衡软件或者设备,到达某台机器的某个带有本调度框架的服务,而后进行具体的执行,也就是说这个当即启动就是一个最多见最简单的请求,没有过多复杂的问题(好比多节点会不会重复执行这些)。最简单的办法,当用户请求过来直接用一个线程或者线程池执行用户点的那个任务的逻辑代码就好了,固然我这里没有那么粗暴,现有的调度代码资源以下:node
package com.rdpaas.task.scheduler; import com.rdpaas.task.common.Invocation; import com.rdpaas.task.common.Node; import com.rdpaas.task.common.NotifyCmd; import com.rdpaas.task.common.Task; import com.rdpaas.task.common.TaskDetail; import com.rdpaas.task.common.TaskStatus; import com.rdpaas.task.config.EasyJobConfig; import com.rdpaas.task.repository.NodeRepository; import com.rdpaas.task.repository.TaskRepository; import com.rdpaas.task.strategy.Strategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 任务调度器 * @author rongdi * @date 2019-03-13 21:15 */ @Component public class TaskExecutor { private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class); @Autowired private TaskRepository taskRepository; @Autowired private NodeRepository nodeRepository; @Autowired private EasyJobConfig config; /** * 建立任务到期延时队列 */ private DelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>(); /** * 能够明确知道最多只会运行2个线程,直接使用系统自带工具就能够了 */ private ExecutorService bossPool = Executors.newFixedThreadPool(2); /** * 正在执行的任务的Future */ private Map<Long,Future> doingFutures = new HashMap<>(); /** * 声明工做线程池 */ private ThreadPoolExecutor workerPool; /** * 获取任务的策略 */ private Strategy strategy; @PostConstruct public void init() { /** * 根据配置选择一个节点获取任务的策略 */ strategy = Strategy.choose(config.getNodeStrategy()); /** * 自定义线程池,初始线程数量corePoolSize,线程池等待队列大小queueSize,当初始线程都有任务,而且等待队列满后 * 线程数量会自动扩充最大线程数maxSize,当新扩充的线程空闲60s后自动回收.自定义线程池是由于Executors那几个线程工具 * 各有各的弊端,不适合生产使用 */ workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize())); /** * 执行待处理任务加载线程 */ bossPool.execute(new Loader()); /** * 执行任务调度线程 */ bossPool.execute(new Boss()); } class Loader implements Runnable { @Override public void run() { for(;;) { try { /** * 先获取可用的节点列表 */ List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2); if(nodes == null || nodes.isEmpty()) { continue; } /** * 查找还有指定时间(单位秒)才开始的主任务列表 */ List<Task> tasks = taskRepository.listNotStartedTasks(config.getFetchDuration()); if(tasks == null || tasks.isEmpty()) { continue; } for(Task task:tasks) { boolean accept = strategy.accept(nodes, task, config.getNodeId()); /** * 不应本身拿就不要抢 */ if(!accept) { continue; } /** * 先设置成待执行 */ task.setStatus(TaskStatus.PENDING); task.setNodeId(config.getNodeId()); /** * 使用乐观锁尝试更新状态,若是更新成功,其余节点就不会更新成功。若是其它节点也正在查询未完成的 * 任务列表和当前这段时间有节点已经更新了这个任务,version必然和查出来时候的version不同了,这里更新 * 必然会返回0了 */ int n = taskRepository.updateWithVersion(task); Date nextStartTime = task.getNextStartTime(); if(n == 0 || nextStartTime == null) { continue; } /** * 封装成延时对象放入延时队列,这里再查一次是由于上面乐观锁已经更新了版本,会致使后面结束任务更新不成功 */ task = taskRepository.get(task.getId()); DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task); taskQueue.offer(delayItem); } Thread.sleep(config.getFetchPeriod()); } catch(Exception e) { logger.error("fetch task list failed,cause by:{}", e); } } } } class Boss implements Runnable { @Override public void run() { for (;;) { try { /** * 时间到了就能够从延时队列拿出任务对象,而后交给worker线程池去执行 */ DelayItem<Task> item = taskQueue.take(); if(item != null && item.getItem() != null) { Task task = item.getItem(); /** * 真正开始执行了设置成执行中 */ task.setStatus(TaskStatus.DOING); /** * loader线程中已经使用乐观锁控制了,这里不必了 */ taskRepository.update(task); /** * 提交到线程池 */ Future future = workerPool.submit(new Worker(task)); /** * 暂存在doingFutures */ doingFutures.put(task.getId(),future); } } catch (Exception e) { logger.error("fetch task failed,cause by:{}", e); } } } } class Worker implements Callable<String> { private Task task; public Worker(Task task) { this.task = task; } @Override public String call() { logger.info("Begin to execute task:{}",task.getId()); TaskDetail detail = null; try { //开始任务 detail = taskRepository.start(task); if(detail == null) return null; //执行任务 task.getInvokor().invoke(); //完成任务 finish(task,detail); logger.info("finished execute task:{}",task.getId()); /** * 执行完后删了 */ doingFutures.remove(task.getId()); } catch (Exception e) { logger.error("execute task:{} error,cause by:{}",task.getId(), e); try { taskRepository.fail(task,detail,e.getCause().getMessage()); } catch(Exception e1) { logger.error("fail task:{} error,cause by:{}",task.getId(), e); } } return null; } } /** * 完成子任务,若是父任务失败了,子任务不会执行 * @param task * @param detail * @throws Exception */ private void finish(Task task,TaskDetail detail) throws Exception { //查看是否有子类任务 List<Task> childTasks = taskRepository.getChilds(task.getId()); if(childTasks == null || childTasks.isEmpty()) { //当没有子任务时完成父任务 taskRepository.finish(task,detail); return; } else { for (Task childTask : childTasks) { //开始任务 TaskDetail childDetail = null; try { //将子任务状态改为执行中 childTask.setStatus(TaskStatus.DOING); childTask.setNodeId(config.getNodeId()); //开始子任务 childDetail = taskRepository.startChild(childTask,detail); //使用乐观锁更新下状态,否则这里可能和恢复线程产生并发问题 int n = taskRepository.updateWithVersion(childTask); if (n > 0) { //再从数据库取一下,避免上面update修改后version不一样步 childTask = taskRepository.get(childTask.getId()); //执行子任务 childTask.getInvokor().invoke(); //完成子任务 finish(childTask, childDetail); } } catch (Exception e) { logger.error("execute child task error,cause by:{}", e); try { taskRepository.fail(childTask, childDetail, e.getCause().getMessage()); } catch (Exception e1) { logger.error("fail child task error,cause by:{}", e); } } } /** * 当有子任务时完成子任务后再完成父任务 */ taskRepository.finish(task,detail); } } /** * 添加任务 * @param name * @param cronExp * @param invockor * @return * @throws Exception */ public long addTask(String name, String cronExp, Invocation invockor) throws Exception { Task task = new Task(name,cronExp,invockor); return taskRepository.insert(task); } /** * 添加子任务 * @param pid * @param name * @param cronExp * @param invockor * @return * @throws Exception */ public long addChildTask(Long pid,String name, String cronExp, Invocation invockor) throws Exception { Task task = new Task(name,cronExp,invockor); task.setPid(pid); return taskRepository.insert(task); } }
上面主要就是三组线程,Loader负责加载将要执行的任务放入本地的任务队列,Boss线程负责取出任务队列的任务,而后分配Worker线程池的一个线程去执行。由上面的代码能够看到若是要当即执行,其实只须要把一个延时为0的任务放入任务队列,等着Boss线程去取而后分配给worker执行就能够实现了,代码以下:git
/** * 当即执行任务,就是设置一下延时为0加入任务队列就行了,这个能够外部直接调用 * @param taskId * @return */ public boolean startNow(Long taskId) { Task task = taskRepository.get(taskId); task.setStatus(TaskStatus.DOING); taskRepository.update(task); DelayItem<Task> delayItem = new DelayItem<Task>(0L, task); return taskQueue.offer(delayItem); }
启动不用再多说,下面介绍一下中止任务,根据面向对象的思惟,用户要想中止一个任务,最终执行中止任务的就是正在执行任务的那个节点。中止任务有两种状况,第一种任务没有正在运行如何中止,第二种是任务正在运行如何中止。第一种其实直接改变一下任务对象的状态为中止就好了,没必要多说。下面主要考虑如何中止正在运行的任务,细心的朋友可能已经发现上面代码和以前那一篇代码有点区别,以前用的Runnble做为线程实现接口,这个用了Callable,其实在java中中止线程池中正在运行的线程最经常使用的就是直接调用future的cancel方法了,要想获取到这个future对象就须要将之前实现Runnbale改为实现Callable,而后提交到线程池由execute改为submit就能够了,而后每次提交到线程池获得的future对象使用taskId一块儿保存在一个map中,方便根据taskId随时找到。固然任务执行完后要及时删除这个map里的任务,以避免常驻其中致使内存溢出。中止任务的请求流程以下程序员
图仍是原来的图,可是这时候状况不同了,由于中止任务的时候假如当前正在执行这个任务的节点处于服务1,负载均衡是不知道要去把你引到服务1的,他可能会引入到服务2,那就悲剧了,因此通用的作法就是中止请求过来无论落到哪一个节点上,那个节点就往一个公用的mq上发一个带有中止任务业务含义的消息,各个节点订阅这个消息,而后判断都判断任务在不在本身这里执行,若是在就执行中止操做。可是这样势必让咱们的调度服务又要依赖一个外部的消息队列服务,就算很方便的就能够引入一个外部的消息队列,可是你真的能够驾驭的了吗,消息丢了咋办,重复发送了咋办,消息服务挂了咋办,网络断了咋办,又引入了一大堆问题,那我是否是又要写n篇文章来分别解决这些问题。每每现实倒是就是这么残酷,你解决了一个问题,引入了更多的问题,这就是为何bug永远改不完的道理了。固然这不是个人风格,个人风格是利用有限的资源作尽量多的事情(多是因为我工做的企业都是那种资源贫瘠的,养成了我这种习惯,土豪公司的程序员请绕道,哈哈)。github
简化一下问题:目前的问题就是如何让正在执行任务的节点知道,而后中止正在执行的这个任务,其实就是这个中止通知如何实现。这难免让我想起了12306网站上买票,其实咱们做为老百姓多么但愿12306能够在有票的时候发个短信通知一下咱们,而后咱们上去抢,可是现实倒是,你要么使用软件一直刷,要么是本身隔一段时间上去瞄一下有没有票。若是把有票了给咱们发短信通知定义为异步通知,那么这种咱们要隔一段时间本身去瞄一下的方式就是同步轮训。这两种方式都能达到告知的目的,关键的区别在于你到底有没有时间去一直去瞄,不过相比于能够回家,这些时间都是值得的。我的认为软件的设计其实就是一个权衡是否值得的过程。若是约定了不使用外部消息队列这种异步通知的方式,那么咱们只能使用同步轮训的方式了。不过正好咱们的任务调度自己已经有一个心跳机制,没隔一段时间就去更新一下节点状态,若是咱们把用户的中止请求做为命令信息更新到每一个节点的上,而后随着心跳获取到这个节点的信息,而后判断这个命令,作相应的处理是否是就能够完美解决这个问题。值得吗?很明显是值得的,咱们只是在心跳逻辑上加一个小小的反作用就实现了通知功能了。代码以下spring
package com.rdpaas.task.common; /** * @author rongdi * @date 2019/11/26 */ public enum NotifyCmd { //没有通知,默认状态 NO_NOTIFY(0), //开启任务(Task) START_TASK(1), //修改任务(Task) EDIT_TASK(2), //中止任务(Task) STOP_TASK(3); int id; NotifyCmd(int id) { this.id = id; } public int getId() { return id; } public static NotifyCmd valueOf(int id) { switch (id) { case 1: return START_TASK; case 2: return EDIT_TASK; case 3: return STOP_TASK; default: return NO_NOTIFY; } } }
package com.rdpaas.task.handles; import com.rdpaas.task.common.NotifyCmd; import com.rdpaas.task.utils.SpringContextUtil; /** * @author: rongdi * @date: */ public interface NotifyHandler<T> { static NotifyHandler chooseHandler(NotifyCmd notifyCmd) { return SpringContextUtil.getByTypeAndName(NotifyHandler.class,notifyCmd.toString()); } public void update(T t); }
package com.rdpaas.task.handles; import com.rdpaas.task.scheduler.TaskExecutor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author: rongdi * @date: */ @Component("STOP_TASK") public class StopTaskHandler implements NotifyHandler<Long> { @Autowired private TaskExecutor taskExecutor; @Override public void update(Long taskId) { taskExecutor.stop(taskId); } }
class HeartBeat implements Runnable { @Override public void run() { for(;;) { try { /** * 时间到了就能够从延时队列拿出节点对象,而后更新时间和序号, * 最后再新建一个超时时间为心跳时间的节点对象放入延时队列,造成循环的心跳 */ DelayItem<Node> item = heartBeatQueue.take(); if(item != null && item.getItem() != null) { Node node = item.getItem(); handHeartBeat(node); } heartBeatQueue.offer(new DelayItem<>(config.getHeartBeatSeconds() * 1000,new Node(config.getNodeId()))); } catch (Exception e) { logger.error("task heart beat error,cause by:{} ",e); } } } } /** * 处理节点心跳 * @param node */ private void handHeartBeat(Node node) { if(node == null) { return; } /** * 先看看数据库是否存在这个节点 * 若是不存在:先查找下一个序号,而后设置到node对象中,最后插入 * 若是存在:直接根据nodeId更新当前节点的序号和时间 */ Node currNode= nodeRepository.getByNodeId(node.getNodeId()); if(currNode == null) { node.setRownum(nodeRepository.getNextRownum()); nodeRepository.insert(node); } else { nodeRepository.updateHeartBeat(node.getNodeId()); NotifyCmd cmd = currNode.getNotifyCmd(); String notifyValue = currNode.getNotifyValue(); if(cmd != null && cmd != NotifyCmd.NO_NOTIFY) { /** * 借助心跳作一下通知的事情,好比及时中止正在执行的任务 * 根据指令名称查找Handler */ NotifyHandler handler = NotifyHandler.chooseHandler(currNode.getNotifyCmd()); if(handler == null || StringUtils.isEmpty(notifyValue)) { return; } /** * 执行操做 */ handler.update(Long.valueOf(notifyValue)); } } }
最终的任务调度代码以下:数据库
package com.rdpaas.task.scheduler; import com.rdpaas.task.common.Invocation; import com.rdpaas.task.common.Node; import com.rdpaas.task.common.NotifyCmd; import com.rdpaas.task.common.Task; import com.rdpaas.task.common.TaskDetail; import com.rdpaas.task.common.TaskStatus; import com.rdpaas.task.config.EasyJobConfig; import com.rdpaas.task.repository.NodeRepository; import com.rdpaas.task.repository.TaskRepository; import com.rdpaas.task.strategy.Strategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 任务调度器 * @author rongdi * @date 2019-03-13 21:15 */ @Component public class TaskExecutor { private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class); @Autowired private TaskRepository taskRepository; @Autowired private NodeRepository nodeRepository; @Autowired private EasyJobConfig config; /** * 建立任务到期延时队列 */ private DelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>(); /** * 能够明确知道最多只会运行2个线程,直接使用系统自带工具就能够了 */ private ExecutorService bossPool = Executors.newFixedThreadPool(2); /** * 正在执行的任务的Future */ private Map<Long,Future> doingFutures = new HashMap<>(); /** * 声明工做线程池 */ private ThreadPoolExecutor workerPool; /** * 获取任务的策略 */ private Strategy strategy; @PostConstruct public void init() { /** * 根据配置选择一个节点获取任务的策略 */ strategy = Strategy.choose(config.getNodeStrategy()); /** * 自定义线程池,初始线程数量corePoolSize,线程池等待队列大小queueSize,当初始线程都有任务,而且等待队列满后 * 线程数量会自动扩充最大线程数maxSize,当新扩充的线程空闲60s后自动回收.自定义线程池是由于Executors那几个线程工具 * 各有各的弊端,不适合生产使用 */ workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize())); /** * 执行待处理任务加载线程 */ bossPool.execute(new Loader()); /** * 执行任务调度线程 */ bossPool.execute(new Boss()); } class Loader implements Runnable { @Override public void run() { for(;;) { try { /** * 先获取可用的节点列表 */ List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2); if(nodes == null || nodes.isEmpty()) { continue; } /** * 查找还有指定时间(单位秒)才开始的主任务列表 */ List<Task> tasks = taskRepository.listNotStartedTasks(config.getFetchDuration()); if(tasks == null || tasks.isEmpty()) { continue; } for(Task task:tasks) { boolean accept = strategy.accept(nodes, task, config.getNodeId()); /** * 不应本身拿就不要抢 */ if(!accept) { continue; } /** * 先设置成待执行 */ task.setStatus(TaskStatus.PENDING); task.setNodeId(config.getNodeId()); /** * 使用乐观锁尝试更新状态,若是更新成功,其余节点就不会更新成功。若是其它节点也正在查询未完成的 * 任务列表和当前这段时间有节点已经更新了这个任务,version必然和查出来时候的version不同了,这里更新 * 必然会返回0了 */ int n = taskRepository.updateWithVersion(task); Date nextStartTime = task.getNextStartTime(); if(n == 0 || nextStartTime == null) { continue; } /** * 封装成延时对象放入延时队列,这里再查一次是由于上面乐观锁已经更新了版本,会致使后面结束任务更新不成功 */ task = taskRepository.get(task.getId()); DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task); taskQueue.offer(delayItem); } Thread.sleep(config.getFetchPeriod()); } catch(Exception e) { logger.error("fetch task list failed,cause by:{}", e); } } } } class Boss implements Runnable { @Override public void run() { for (;;) { try { /** * 时间到了就能够从延时队列拿出任务对象,而后交给worker线程池去执行 */ DelayItem<Task> item = taskQueue.take(); if(item != null && item.getItem() != null) { Task task = item.getItem(); /** * 真正开始执行了设置成执行中 */ task.setStatus(TaskStatus.DOING); /** * loader线程中已经使用乐观锁控制了,这里不必了 */ taskRepository.update(task); /** * 提交到线程池 */ Future future = workerPool.submit(new Worker(task)); /** * 暂存在doingFutures */ doingFutures.put(task.getId(),future); } } catch (Exception e) { logger.error("fetch task failed,cause by:{}", e); } } } } class Worker implements Callable<String> { private Task task; public Worker(Task task) { this.task = task; } @Override public String call() { logger.info("Begin to execute task:{}",task.getId()); TaskDetail detail = null; try { //开始任务 detail = taskRepository.start(task); if(detail == null) return null; //执行任务 task.getInvokor().invoke(); //完成任务 finish(task,detail); logger.info("finished execute task:{}",task.getId()); /** * 执行完后删了 */ doingFutures.remove(task.getId()); } catch (Exception e) { logger.error("execute task:{} error,cause by:{}",task.getId(), e); try { taskRepository.fail(task,detail,e.getCause().getMessage()); } catch(Exception e1) { logger.error("fail task:{} error,cause by:{}",task.getId(), e); } } return null; } } /** * 完成子任务,若是父任务失败了,子任务不会执行 * @param task * @param detail * @throws Exception */ private void finish(Task task,TaskDetail detail) throws Exception { //查看是否有子类任务 List<Task> childTasks = taskRepository.getChilds(task.getId()); if(childTasks == null || childTasks.isEmpty()) { //当没有子任务时完成父任务 taskRepository.finish(task,detail); return; } else { for (Task childTask : childTasks) { //开始任务 TaskDetail childDetail = null; try { //将子任务状态改为执行中 childTask.setStatus(TaskStatus.DOING); childTask.setNodeId(config.getNodeId()); //开始子任务 childDetail = taskRepository.startChild(childTask,detail); //使用乐观锁更新下状态,否则这里可能和恢复线程产生并发问题 int n = taskRepository.updateWithVersion(childTask); if (n > 0) { //再从数据库取一下,避免上面update修改后version不一样步 childTask = taskRepository.get(childTask.getId()); //执行子任务 childTask.getInvokor().invoke(); //完成子任务 finish(childTask, childDetail); } } catch (Exception e) { logger.error("execute child task error,cause by:{}", e); try { taskRepository.fail(childTask, childDetail, e.getCause().getMessage()); } catch (Exception e1) { logger.error("fail child task error,cause by:{}", e); } } } /** * 当有子任务时完成子任务后再完成父任务 */ taskRepository.finish(task,detail); } } /** * 添加任务 * @param name * @param cronExp * @param invockor * @return * @throws Exception */ public long addTask(String name, String cronExp, Invocation invockor) throws Exception { Task task = new Task(name,cronExp,invockor); return taskRepository.insert(task); } /** * 添加子任务 * @param pid * @param name * @param cronExp * @param invockor * @return * @throws Exception */ public long addChildTask(Long pid,String name, String cronExp, Invocation invockor) throws Exception { Task task = new Task(name,cronExp,invockor); task.setPid(pid); return taskRepository.insert(task); } /** * 当即执行任务,就是设置一下延时为0加入任务队列就行了,这个能够外部直接调用 * @param taskId * @return */ public boolean startNow(Long taskId) { Task task = taskRepository.get(taskId); task.setStatus(TaskStatus.DOING); taskRepository.update(task); DelayItem<Task> delayItem = new DelayItem<Task>(0L, task); return taskQueue.offer(delayItem); } /** * 当即中止正在执行的任务,留给外部调用的方法 * @param taskId * @return */ public boolean stopNow(Long taskId) { Task task = taskRepository.get(taskId); if(task == null) { return false; } /** * 该任务不是正在执行,直接修改task状态为已完成便可 */ if(task.getStatus() != TaskStatus.DOING) { task.setStatus(TaskStatus.STOP); taskRepository.update(task); return true; } /** * 该任务正在执行,使用节点配合心跳发布停用通知 */ int n = nodeRepository.updateNotifyInfo(NotifyCmd.STOP_TASK,String.valueOf(taskId)); return n > 0; } /** * 当即中止正在执行的任务,这个不须要本身调用,是给心跳线程调用 * @param taskId * @return */ public boolean stop(Long taskId) { Task task = taskRepository.get(taskId); /** * 不是本身节点的任务,本节点不能执行停用 */ if(task == null || !config.getNodeId().equals(task.getNodeId())) { return false; } /** * 拿到正在执行任务的future,而后强制停用,并删除doingFutures的任务 */ Future future = doingFutures.get(taskId); boolean flag = future.cancel(true); if(flag) { doingFutures.remove(taskId); /** * 修改状态为已停用 */ task.setStatus(TaskStatus.STOP); taskRepository.update(task); } /** * 重置通知信息,避免重复执行停用通知 */ nodeRepository.resetNotifyInfo(NotifyCmd.STOP_TASK); return flag; } }
好吧,其实实现很简单,关键在于思路,不BB了,详细代码见:https://github.com/rongdi/easy-job 在下告辞!网络