在对调度系统架构说明以前,咱们先来认识一下调度系统经常使用的名词前端
DAG: 全称Directed Acyclic Graph,简称DAG。工做流中的Task任务以有向无环图的形式组装起来,从入度为零的节点进行拓扑遍历,直到无后继节点为止。举例以下图: java
流程定义:经过拖拽任务节点并创建任务节点的关联所造成的可视化DAGgit
流程实例:流程实例是流程定义的实例化,能够经过手动启动或定时调度生成github
任务实例:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态算法
任务类型: 目前支持有SHELL、SQL、SUB_PROCESS、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT,同时计划支持动态插件扩展,注意:其中子 SUB_PROCESS 也是一个单独的流程定义,是能够单独启动执行的数据库
调度方式: 系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工做流、从当前节点开始执行、恢复被容错的工做流、恢复暂停流程、从失败节点开始执行、补数、调度、重跑、暂停、中止、恢复等待线程。其中 恢复被容错的工做流 和 恢复等待线程 两种命令类型是由调度内部控制使用,外部没法调用json
定时调度:系统采用 quartz 分布式调度器,并同时支持cron表达式可视化的生成api
依赖:系统不仅仅支持 DAG 简单的前驱和后继节点之间的依赖,同时还提供任务依赖节点,支持流程间的自定义任务依赖网络
优先级 :支持流程实例和任务实例的优先级,若是流程实例和任务实例的优先级不设置,则默认是先进先出架构
邮件告警:支持 SQL任务 查询结果邮件发送,流程实例运行结果邮件告警及容错告警通知
失败策略:对于并行运行的任务,若是有任务失败,提供两种失败策略处理方式,继续是指无论并行运行任务的状态,直到流程失败结束。结束是指一旦发现失败任务,则同时Kill掉正在运行的并行任务,流程失败结束
补数:补历史数据,支持区间并行和串行两种补数方式
MasterServer
MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,经过监听Zookeeper临时节点变化来进行容错处理。
Distributed Quartz分布式调度组件,主要负责定时任务的启停操做,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操做
MasterSchedulerThread是一个扫描线程,定时扫描数据库中的 command 表,根据不一样的命令类型进行不一样的业务操做
MasterExecThread主要是负责DAG任务切分、任务提交监控、各类不一样命令类型的逻辑处理
MasterTaskExecThread主要负责任务的持久化
WorkerServer
WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。
FetchTaskThread主要负责不断从Task Queue中领取任务,并根据不一样任务类型调用TaskScheduleThread对应执行器。
LoggerServer是一个RPC服务,提供日志分片查看、刷新和下载等功能
ZooKeeper
ZooKeeper服务,系统中的MasterServer和WorkerServer节点都经过ZooKeeper来进行集群管理和容错。另外系统还基于ZooKeeper进行事件监听和分布式锁。 咱们也曾经基于Redis实现过队列,不过咱们但愿EasyScheduler依赖到的组件尽可能地少,因此最后仍是去掉了Redis实现。
Task Queue
提供任务队列的操做,目前队列也是基于Zookeeper来实现。因为队列中存的信息较少,没必要担忧队列里数据过多的状况,实际上咱们压测过百万级数据存队列,对系统稳定性和性能没影响。
Alert
提供告警相关接口,接口主要包括告警两种类型的告警数据的存储、查询和通知功能。其中通知功能又有邮件通知和**SNMP(暂未实现)**两种。
API
API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。 接口包括工做流的建立、定义、查询、修改、发布、下线、手工启动、中止、暂停、恢复、从该节点开始执行等等。
UI
系统的前端页面,提供系统的各类可视化操做界面,详见**系统使用手册**部分。
中心化的设计理念比较简单,分布式集群中的节点按照角色分工,大致上分为两种角色:
中心化思想设计存在的问题:
在去中心化设计里,一般没有Master/Slave的概念,全部的角色都是同样的,地位是平等的,全球互联网就是一个典型的去中心化的分布式系统,联网的任意节点设备down机,都只会影响很小范围的功能。
去中心化设计的核心设计在于整个分布式系统中不存在一个区别于其余节点的”管理者”,所以不存在单点故障问题。但因为不存在” 管理者”节点因此每一个节点都须要跟其余节点通讯才获得必需要的机器信息,而分布式系统通讯的不可靠行,则大大增长了上述功能的实现难度。
实际上,真正去中心化的分布式系统并很少见。反而动态中心化分布式系统正在不断涌出。在这种架构下,集群中的管理者是被动态选择出来的,而不是预置的,而且集群在发生故障的时候,集群的节点会自发的举行"会议"来选举新的"管理者"去主持工做。最典型的案例就是ZooKeeper及Go语言实现的Etcd。
EasyScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心,并使用Zookeeper分布式锁来选举其中的一台Master或Worker为“管理者”来执行任务。
EasyScheduler使用ZooKeeper分布式锁来实现同一时刻只有一台Master执行Scheduler,或者只有一台Worker执行任务的提交。
对于启动新Master来打破僵局,彷佛有点差强人意,因而咱们提出了如下三种方案来下降这种风险:
注意:Master Scheduler线程在获取Command的时候是FIFO的方式执行的。
因而咱们选择了第三种方式来解决线程不足的问题。
容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种状况
服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:
其中Master监控其余Master和Worker的目录,若是监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。
Master Scheduler线程一旦发现任务实例为” 须要容错”状态,则接管任务并进行从新提交。
注意:因为” 网络抖动”可能会使得节点短期内失去和ZooKeeper的心跳,从而发生节点的remove事件。对于这种状况,咱们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时链接,则直接将Master或Worker服务停掉。
这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:
接下来讲正题,咱们将工做流中的任务节点分了两种类型。
一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,好比Shell节点,MR节点、Spark节点、依赖节点等。
还有一种是逻辑节点,这种节点不作实际的脚本或语句处理,只是整个流程流转的逻辑处理,好比子流程节等。
每个业务节点均可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。逻辑节点不支持失败重试。可是逻辑节点里的任务支持重试。
若是工做流中有任务失败达到最大重试次数,工做流就会失败中止,失败的工做流能够手动进行重跑操做或者流程恢复操做
在早期调度设计中,若是没有优先级设计,采用公平调度设计的话,会遇到先行提交的任务可能会和后继提交的任务同时完成的状况,而不能作到设置流程或者任务的优先级,所以咱们对此进行了从新设计,目前咱们设计以下:
具体实现是根据任务实例的json解析优先级,而后把流程实例优先级_流程实例id_任务优先级_任务id信息保存在ZooKeeper任务队列中,当从任务队列获取的时候,经过字符串比较便可得出最须要优先执行的任务
- 任务的优先级也分为5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。以下图
复制代码
因为Web(UI)和Worker不必定在同一台机器上,因此查看日志不能像查询本地文件那样。有两种方案:
将日志放到ES搜索引擎上
经过gRPC通讯获取远程日志信息
介于考虑到尽量的EasyScheduler的轻量级性,因此选择了gRPC实现远程访问日志信息。
/** * task log appender */
public class TaskLogAppender extends FileAppender<ILoggingEvent {
...
@Override
protected void append(ILoggingEvent event) {
if (currentlyActiveFile == null){
currentlyActiveFile = getFile();
}
String activeFile = currentlyActiveFile;
// thread name: taskThreadName-processDefineId_processInstanceId_taskInstanceId
String threadName = event.getThreadName();
String[] threadNameArr = threadName.split("-");
// logId = processDefineId_processInstanceId_taskInstanceId
String logId = threadNameArr[1];
...
super.subAppend(event);
}
}
复制代码
以/流程定义id/流程实例id/任务实例id.log的形式生成日志
过滤匹配以TaskLogInfo开始的线程名称:
TaskLogFilter实现以下:
/** * task log filter */
public class TaskLogFilter extends Filter<ILoggingEvent {
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith("TaskLogInfo-")){
return FilterReply.ACCEPT;
}
return FilterReply.DENY;
}
}
复制代码
本文从调度出发,初步介绍了大数据分布式工做流调度系统--EasyScheduler的架构原理及实现思路。