折腾了一周的 Java Quartz 集群任务调度,很遗憾没能搞定,网上的相关文章也少得可怜,在多节点(多进程)环境下 Quartz 彷佛没法动态增减任务,恼火。无奈之下本身撸了一个简单的任务调度器,结果只花了不到 2天时间,并且感受很是简单好用,代码量也很少,扩展性很好。java
实现一个分布式的任务调度器有几个关键的考虑点git
在深刻讲解实现方法以前,咱们先来看看这个调度器是如何使用的github
class Demo {
public static void main(String[] args) {
var redis = new RedisStore();
// sample 为任务分组名称
var store = new RedisTaskStore(redis, "sample");
// 5s 为任务锁寿命
var scheduler = new DistributedScheduler(store, 5);
// 注册一个单次任务
scheduler.register(Trigger.onceOfDelay(5), Task.of("once1", () -> {
System.out.println("once1");
}));
// 注册一个循环任务
scheduler.register(Trigger.periodOfDelay(5, 5), Task.of("period2", () -> {
System.out.println("period2");
}));
// 注册一个 CRON 任务
scheduler.register(Trigger.cronOfMinutes(1), Task.of("cron3", () -> {
System.out.println("cron3");
}));
// 设置全局版本号
scheduler.version(1);
// 注册监听器
scheduler.listener(ctx -> {
System.out.println(ctx.task().name() + " is complete");
});
// 启动调度器
scheduler.start();
}
}
复制代码
当代码升级任务须要增长减小时(或者变动调度时间),只须要递增全局版本号,现有的进程中的任务会自动被从新调度,那些没有被注册的任务(任务减小)会自动清除。新增的任务(新任务)在老代码的进程里是不会被调度的(没有新任务的代码没法调度),被清除的任务(老任务)在老代码的进程里会被取消调度。redis
好比咱们要取消 period2 任务,增长 period4 任务数据库
class Demo {
public static void main(String[] args) {
var redis = new RedisStore();
// sample 为任务分组名称
var store = new RedisTaskStore(redis, "sample");
// 5s 为任务锁寿命
var scheduler = new DistributedScheduler(store, 5);
// 注册一个单次任务
scheduler.register(Trigger.onceOfDelay(5), Task.of("once1", () -> {
System.out.println("once1");
}));
// 注册一个 CRON 任务
scheduler.register(Trigger.cronOfMinutes(1), Task.of("cron3", () -> {
System.out.println("cron3");
}));
// 注册一个循环任务
scheduler.register(Trigger.periodOfDelay(5, 10), Task.of("period4", () -> {
System.out.println("period4");
}));
// 递增全局版本号
scheduler.version(2);
// 注册监听器
scheduler.listener(ctx -> {
System.out.println(ctx.task().name() + " is complete");
});
// 启动调度器
scheduler.start();
}
}
复制代码
<dependency>
<groupId>it.sauronsoftware.cron4j</groupId>
<artifactId>cron4j</artifactId>
<version>2.2.5</version>
</dependency>
复制代码
这个开源的 library 包含了基础的 cron 表达式解析功能,它还提供了任务的调度功能,不过这里并不须要使用它的调度器。我只会用到它的表达式解析功能,以及一个简单的方法用来判断当前的时间是否匹配表达式(是否该运行任务了)。bash
咱们对 cron 的时间精度要求很低,1 分钟判断一次当前的时间是否到了该运行任务的时候就能够了。分布式
class SchedulingPattern {
// 表达式是否有效
boolean validate(String cronExpr);
// 是否应该运行任务了(一分钟判断一次)
boolean match(long nowTs);
}
复制代码
由于是分布式任务调度器,多进程环境下要控制同一个任务在调度的时间点只能有一个进程运行。使用 Redis 分布式锁很容易就能够搞定。锁须要保持必定的时间(好比默认 5s)。this
全部的进程都会在同一时间调度这个任务,可是只有一个进程能够抢到锁。由于分布式环境下时间的不一致性,不一样机器上的进程会有较小的时间差别窗口,锁必须保持一个窗口时间,这里我默认设置为 5s(可定制),这就要求不一样机器的时间差不能超过 5s,超出了这个值就会出现重复调度。spa
public boolean grabTask(String name) {
var holder = new Holder<Boolean>();
redis.execute(jedis -> {
var lockKey = keyFor("task_lock", name);
var ok = jedis.set(lockKey, "true", SetParams.setParams().nx().ex(lockAge));
holder.value(ok != null);
});
return holder.value();
}
复制代码
咱们给任务列表附上一个全局的版本号,当业务上须要增长或者减小调度任务时,经过变动版本号来触发进程的任务重加载。这个重加载的过程包含轮询全局版本号(Redis 的一个key),若是发现版本号变更,当即从新加载任务列表配置并从新调度全部的任务。线程
private void scheduleReload() {
// 1s 对比一次
this.scheduler.scheduleWithFixedDelay(() -> {
try {
if (this.reloadIfChanged()) {
this.rescheduleTasks();
}
} catch (Exception e) {
LOG.error("reloading tasks error", e);
}
}, 0, 1, TimeUnit.SECONDS);
}
复制代码
从新调度任务先要取消当前全部正在调度的任务,而后调度刚刚加载的全部任务。
private void rescheduleTasks() {
this.cancelAllTasks();
this.scheduleTasks();
}
private void cancelAllTasks() {
this.futures.forEach((name, future) -> {
LOG.warn("cancelling task {}", name);
future.cancel(false);
});
this.futures.clear();
}
复制代码
由于须要将任务持久化,因此设计了一套任务的序列化格式,这个也很简单,使用文本符号分割任务配置属性就行。
// 一次性任务(startTime)
ONCE@2019-04-29T15:26:29.946+0800
// 循环任务,(startTime,endTime,period),这里任务的结束时间是天荒地老
PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5
// cron 任务,一分钟一次
CRON@*/1 * * * *
$ redis-cli
127.0.0.1:6379> hgetall sample_triggers
1) "task3"
2) "CRON@*/1 * * * *"
3) "task2"
4) "PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5"
5) "task1"
6) "ONCE@2019-04-29T15:26:29.946+0800"
7) "task4"
8) "PERIOD@2019-04-29T15:26:29.957+0800|292278994-08-17T15:12:55.807+0800|10"
复制代码
时间调度会有一个单独的线程(单线程线程池),任务的运行由另一个线程池来完成(数量可定制)。
class DistributedScheduler {
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private ExecutorService executor = Executors.newFixedThreadPool(threads);
}
复制代码
之因此要将线程池分开,是为了不任务的执行(IO)影响了时间的精确调度。
Java 的内置调度器提供两种调度策略 FixedDelay 和 FixedRate。FixedDelay 保证同一个任务的连续两次运行有相等的时延(nextRun.startTime - lastRun.endTime),FixedRate 保证同一个任务的连续运行有肯定的间隔(nextRun.startTime - lastRun.startTime)。
FixedDelay 就比如你加班到深夜12点,能够次日12点再来上班(保证固定的休息时间),而 FixedRate 就没那么体贴了,次日你继续 9点过来上班。若是你不走运到次日 9 点了还在加班,那你今天就没有休息时间了,继续上班吧。
class ScheduledExecutorService {
void scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
void scheduleAtFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
复制代码
分布式调度器要求有精确的调度时间,因此必须采用 FixedRate 模式,保证多节点同一个任务在同一时间被争抢。若是采用 FixedDelay 模式,会致使不一样进程的调度时间错开了,分布式锁的默认 5s 时间窗口将起不到互斥做用。
互斥任务要求任务的单进程运行,无互斥任务就是没有加分布式锁的任务,能够多进程同时运行。默认须要互斥。
class Task {
/** * 是否须要考虑多进程互斥(true表示不互斥,多进程能同时跑) */
private boolean concurrent;
private String name;
private Runnable runner;
...
public static Task of(String name, Runnable runner) {
return new Task(name, false, runner);
}
public static Task concurrent(String name, Runnable runner) {
return new Task(name, true, runner);
}
}
复制代码
考虑到调度器的使用者可能须要对任务运行状态进行监控,这里增长了一个简单的回调接口,目前功能比较简单。能汇报运行结果(成功仍是异常)和运行的耗时
class TaskContext {
private Task task;
private long cost; // 运行时间
private boolean ok;
private Throwable e;
}
interface ISchedulerListener {
public void onComplete(TaskContext ctx);
}
复制代码
目前只实现了 Redis 和 Memory 形式的任务存储,扩展到 zk、etcd、关系数据库也是可行的,实现下面的接口便可。
interface ITaskStore {
public long getRemoteVersion();
public Map<String, String> getAllTriggers();
public void saveAllTriggers(long version, Map<String, String> triggers);
public boolean grabTask(String name);
}
复制代码