基于DAG实现的任务编排框架&平台

最近在作的工做比较须要一个支持任务编排工做流的框架或者平台,这里记录下实现上的一些思路。前端

任务编排工做流

任务编排是什么意思呢,顾名思义就是能够把"任务"这个原子单位按照本身的方式进行编排,任务之间可能互相依赖。复杂一点的编排以后就能造成一个 workflow 工做流了。咱们但愿这个工做流按照咱们编排的方式去执行每一个原子 task 任务。以下图所示,咱们但愿先并发运行 Task A 和 Task C,Task A 执行完后串行运行 Task B,在并发等待 Task B 和 C 都结束后运行 Task D,这样就完成了一个典型的任务编排工做流。node

DAG 有向无环图

首先咱们了解图这个数据结构,每一个元素称为顶点 vertex,顶点之间的连线称为边 edge。像咱们画的这种带箭头关系的称为有向图,箭头关系之间能造成一个环的成为有环图,反之称为无环图。显然运用在咱们任务编排工做流上,最合适的是 DAG 有向无环图。数据库

咱们在代码里怎么存储图呢,有两种数据结构:邻接矩阵和邻接表。后端

下图表示一个有向图的邻接矩阵,例如 x->y 的边,只需将 Array[x][y]标识为 1 便可。数据结构

此外咱们也能够使用邻接表来存储,这种存储方式较好地弥补了邻接矩阵浪费空间的缺点,但相对来讲邻接矩阵能更快地判断连通性。并发

通常在代码实现上,咱们会选择邻接矩阵,这样咱们在判断两点之间是否有边更方便点。框架

一个任务编排框架

了解了 DAG 的基本知识后咱们能够来简单实现一下。首先是存储结构,咱们的 Dag 表示一整个图,Node 表示各个顶点,每一个顶点有其 parents 和 children:ide

//Dagpublic final class DefaultDag<T, R> implements Dag<T, R> {	private Map<T, Node<T, R>> nodes = new HashMap<T, Node<T, R>>();
    ...
}//Nodepublic final class Node<T, R> {	/**
	 * incoming dependencies for this node
	 */private Set<Node<T, R>> parents = new LinkedHashSet<Node<T, R>>();/**
     * outgoing dependencies for this node
     */private Set<Node<T, R>> children = new LinkedHashSet<Node<T, R>>();
    ...
}复制代码

画两个顶点,以及为这两个顶点连边操做以下:this

public void addDependency(final T evalFirstNode, final T evalLaterNode) {
		Node<T, R> firstNode = createNode(evalFirstNode);
		Node<T, R> afterNode = createNode(evalLaterNode);

		addEdges(firstNode, afterNode);
	}   private Node<T, R> createNode(final T value) {
		Node<T, R> node = new Node<T, R>(value);		return node;
	}	private void addEdges(final Node<T, R> firstNode, final Node<T, R> afterNode) {		if (!firstNode.equals(afterNode)) {
			firstNode.getChildren().add(afterNode);
			afterNode.getParents().add(firstNode);
		}
	}复制代码

到如今咱们其实已经把基础数据结构写好了,但咱们做为一个任务编排框架最终是须要线程去执行的,咱们把它和线程池一块儿给包装一下。线程

//任务编排线程池public class DefaultDexecutor <T, R> {//执行线程,和2种重试线程private final ExecutorService<T, R> executionEngine;	private final ExecutorService immediatelyRetryExecutor;	private final ScheduledExecutorService scheduledRetryExecutor;//执行状态private final ExecutorState<T, R> state;
    ...
}//执行状态public class DefaultExecutorState<T, R> {//底层图数据结构private final Dag<T, R> graph;//已完成private final Collection<Node<T, R>> processedNodes;//未完成private final Collection<Node<T, R>> unProcessedNodes;//错误taskprivate final Collection<ExecutionResult<T, R>> erroredTasks;//执行结果private final Collection<ExecutionResult<T, R>> executionResults;
}复制代码

能够看到咱们的线程包括执行线程池,2 种重试线程池。咱们使用 ExecutorState 来保存一些整个任务工做流执行过程当中的一些状态记录,包括已完成和未完成的 task,每一个 task 执行的结果等。同时它也依赖咱们底层的图数据结构 DAG。

接下来咱们要作的事其实很简单,就是 BFS 这整个 DAG 数据结构,而后提交到线程池中去执行就能够了,过程当中注意一些节点状态的保持,结果的保存便可。

仍是以上图为例,值得说的一点是在 Task D 这个点须要有一个并发等待的操做,即 Task D 须要依赖 Task B 和 Task C 执行结束后再往下执行。这里有不少办法,我选择了共享变量的方式来完成并发等待。遍历工做流中被递归的方法的伪代码以下:

private void doProcessNodes(final Set<Node<T, R>> nodes) {		for (Node<T, R> node : nodes) {//共享变量 并发等待if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents())) {
            Task<T, R> task = newTask(node);this.executionEngine.submit(task);
            ...
            ExecutionResult<T, R> executionResult = this.executionEngine.proce***esult();if (executionResult.isSuccess()) {
			    state.markProcessingDone(processedNode);
		    }//继续执行孩子节点doExecute(processedNode.getChildren());
            ...
        }
    }
}复制代码

这样咱们基本完成了这个任务编排框架的工做,如今咱们能够以下来进行示例图中的任务编排以及执行:

DefaultExecutor<String, String> executor = newTaskExecutor();
executor.addDependency("A", "B");
executor.addDependency("B", "D");
executor.addDependency("C", "D");
executor.execute();复制代码

任务编排平台化

好了如今咱们已经有一款任务编排框架了,但不少时候咱们想要可视化、平台化,让使用者更加无脑。

框架与平台最大的区别在哪里?是可拖拽的可视化输入么?我以为这个的复杂度更多在前端。而对于后端平台来说,与框架最大的区别是数据的持久化。

对于 DAG 的顶点来讲,咱们须要将每一个节点 Task 的信息给持久化到关系数据库中,包括 Task 的状态、输出结果等。而对于 DAG 的边来讲,咱们也得用数据库来存储各 Task 之间的方向关系。此外,在遍历执行 DAG 的整个过程当中的中间状态数据,咱们也得搬运到数据库中。

首先咱们能够设计一个 workflow 表,来表示一个工做流。接着咱们设计一个 task 表,来表示一个执行单元。task 表主要字段以下,这里主要是 task_parents 的设计,它是一个 string,存储 parents 的 taskId,多个由分隔符分隔。

task_id
workflow_id
task_name
task_status
result
task_parents复制代码

依赖是上图这个例子,对比框架来讲,咱们首先得将其存储到数据库中去,最终可能获得以下数据:

task_id  workflow_id  task_name  task_status  result  task_parents
  1          1           A           0                    -1
  2          1           B           0                    1
  3          1           C           0                    -1
  4          1           D           0                    2,3复制代码

能够看到,这样也能很好地存储 DAG 数据,和框架中代码的输入方式差异并非很大。

接下来咱们要作的是遍历执行整个 workflow,这边和框架的差异也不大。首先咱们能够利用select * from task where workflow_id = 1 and task_parents = -1来获取初始化节点 Task A 和 Task C,将其提交到咱们的线程池中。

接着对应框架代码中的doExecute(processedNode.getChildren());,咱们使用select * from task where task_parents like %3%,就能够获得 Task C 的孩子节点 Task D,这里使用了模糊查询是由于咱们的 task_parents 多是由多个父亲的 taskId 与分隔号组合而成的字符串。查询到孩子节点后,继续提交到线程池便可。

别忘了咱们在 Task D 这边还有一个并发等待的操做,对应框架代码中的if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents()))。这边咱们只要判断select count(1) from task where task_id in (2,3) and status != 1的个数为 0 便可,即保证 parents task 所有成功。

另外值得注意的是 task 的重试。在框架中,失败 task 的重试能够是当即使用当前线程重试或者放到一个定时线程池中去重试。而在平台上,咱们的重试基本上来自于用户在界面上的点击,即主线程。

至此,咱们已经将任务编排框架的功能基本平台化了。做为一个任务编排平台,可拖拽编排的可视化输入、整个工做流状态的可视化展现、任务的可人工重试都是其优势。

相关文章
相关标签/搜索