Monorepo 中的任务调度机制

前言

Monorepo 中的一个项目称为 project,对 project 进行的具体操做称为任务 task,好比 buildtest,能够狭义地理解为 npm scripts 中注册的操做,Monorepo 管理工具应当有能力调度这些任务。git

项目依赖图

先看一个 🌰,如上图所示,存在一个依赖关系较为复杂的 Monorepo,此时须要执行某个任务,例如 build,如何同时保证任务执行顺序以及任务执行效率(假设最大任务并行数为 N)?github

接下来就是枯燥乏味的作题过程,我们先把上面那张项目依赖图抽象成代码。shell

问题

interface Project {
  name: string;
  actions: { name: string; fn: () => Promise<void> }[];
  dependencyProjects: Project[];
}

const sleep = (s: number): Promise<void> =>
  new Promise((r) => setTimeout(r, s));

// Monorepo 中注册的全部项目
const projects: Project[] = [
  "@monorepo/a",
  "@monorepo/b",
  "@monorepo/c",
  "@monorepo/d",
  "@monorepo/x",
  "@monorepo/y",
  "@monorepo/z",
].map((name) => ({
  name,
  actions: [{ name: "build", fn: () => sleep(Math.random() * 1000) }],
  dependencyProjects: [],
}));

const [A, B, C, D, X, Y, Z] = projects;

A.dependencyProjects = [B];
B.dependencyProjects = [D];
C.dependencyProjects = [D, X, Y];
X.dependencyProjects = [Y, Z];

/** * 实现本方法,使得 build 行为按照正确的顺序执行,且保证执行效率 * @param projects 须要执行任务的 project 集合 * @param actionName 具体操做名称 * @param limit 任务最大并行数 */
function run(projects: Project[], actionName: string, limit: number) {
  // todo
}

run(projects, "build", 12);
复制代码

解题

很明显,project 之间存在依赖关系,那么任务之间也存在依赖关系,那么能够获得如下结论:npm

  1. 当前任务做为下游任务时,当前任务完成后,须要更新其上游任务的依赖任务,从其内移除当前任务
  2. 当前任务做为上游任务时,只有当前任务的下游任务都被清空(完成)时,当前任务才能够执行

因而 task 定义以下:json

interface Task {
  // 任务名 `${projectName}:{actionName}`
  name: string;
  // 当前任务依赖的任务,即当前任务的下游任务,当该 dependenciesSet 被清空,说明当前任务能够被执行
  dependencies: Set<Task>;
  // 依赖当前任务的任务,即当前任务的上游任务,当前任务完成后,须要更新其上游任务的 dependenciesSet(从其内移除当前任务)
  dependents: Set<Task>;
  // 具体任务执行函数
  fn: () => Promise<void>;
}
复制代码

初始化任务

根据 projects 参数,构造出项目对应的任务。bash

function run(projects: Project[], actionName: string, limit: number) {
  // 任务名与任务的映射
  const tasks = new Map<string, Task>();
  projects.forEach((project) =>
    tasks.set(getTaskName(project, actionName), {
      name: getTaskName(project, actionName),
      dependencies: new Set(),
      dependents: new Set(),
      fn: project.actions.find((a) => a.name === actionName)?.fn ?? noop,
    })
  );
}

// 获取任务名
function getTaskName(project: Project, actionName: string) {
  return `${project.name}:${actionName}`;
}

function noop(): Promise<void> {
  return new Promise((r) => r());
}
复制代码

补充 dependencies 与 dependents

假设存在 project1,对其进行如下操做:markdown

  1. 取到当前项目对应的任务 task1
  2. 获取当前任务对应的下游任务名 dependencyTaskNames(基于 project1.dependencyProjects)
  3. 遍历下游任务名 dependencyTaskName
  4. 取到下游任务(上一步初始化而来) dependencyTask
  5. 补充 task1 的 dependencies
  6. 补充 dependencyTask 的 dependents
function run(projects: Project[], actionName: string, limit: number) {
  // ...
  // project 与 project 对应 task 的下游任务名称
  function getDependencyTaskNames(project: Project): Set<string> {
    const dependencyTaskNames: Set<string> = new Set();
    // 遍历下游项目
    for (const dep of project.dependencyProjects) {
      // 搜集下游任务名
      dependencyTaskNames.add(getTaskName(dep, actionName));
    }

    return dependencyTaskNames;
  }

  projects.forEach((project) => {
    // 1. 获取当前项目对应的任务
    const task = tasks.get(getTaskName(project, actionName))!;
    // 2. 获取当前任务对应的下游任务名
    const dependencyTaskNames = getDependencyTaskNames(project);
    // 3. 遍历下游任务名
    for (const dependencyName of dependencyTaskNames) {
      // 4. 取到下游任务(上一步初始化而来)
      const dependency: Task = tasks.get(dependencyName)!;
      // 5. 补充当前任务的 dependencies
      task.dependencies.add(dependency);
      // 6. 补充下游任务的 dependents
      dependency.dependents.add(task);
    }
  });
}
复制代码

任务依赖图

并行执行任务

function run(projects: Project[], actionName: string, limit: number) {
  // ...
  const taskQueue: Task[] = [];
  for (const [, task] of tasks) {
    taskQueue.push(task);
  }
  runTasks(taskQueue, limit);
}

async function runTasks(taskQueue: Task[], limit: number) {
  let currentActiveTasks = 0;
  function getNextTask() {
    for (let i = 0; i < taskQueue.length; i++) {
      const task: Task = taskQueue[i];
      // 返回准备好执行的任务
      if (task.dependencies.size === 0) {
        return taskQueue.splice(i, 1)[0];
      }
    }
    return null;
  }

  function _run(task: Task): Promise<void> {
    return task.fn().then(() => {
      console.log("act success", task.name);
      currentActiveTasks--;
      // 当前任务执行完成,从其上游任务的 dependencies 中移除当前任务
      task.dependents.forEach((dependent: Task) => {
        dependent.dependencies.delete(task);
      });
      // 继续执行
      start();
    });
  }

  async function start() {
    let ctask: Task | null = null;
    const taskPromises: Promise<void>[] = [];
    while (currentActiveTasks < limit && (ctask = getNextTask())) {
      currentActiveTasks++;
      const task: Task = ctask;
      taskPromises.push(_run(task));
    }

    await Promise.all(taskPromises);
  }

  start();
}
复制代码

执行 run(projects, "build", 12),能够按照正确顺序输出结果。app

act success @monorepo/z:build
act success @monorepo/y:build
act success @monorepo/x:build
act success @monorepo/d:build
act success @monorepo/b:build
act success @monorepo/a:build
act success @monorepo/c:build
复制代码

关键路径长度

上文中的实现使得任务能够按照正确的顺序执行,可是在实际任务执行过程当中,最长的任务链限制了整个任务树的执行速度,效率不能获得保证。dom

关键路径长度:任务距离最远的根节点的距离。async

interface Task {
  name: string;
  dependencies: Set<Task>;
  dependents: Set<Task>;
  // 关联路径长度
  criticalPathLength?: number;
  fn: () => Promise<void>;
}

function run(projects: Project[], actionName: string, limit: number) {
  // ...
  const taskQueue: Task[] = [];
  for (const [, task] of tasks) {
    // 计算关键路径长度
    task.criticalPathLength = calculateCriticalPaths(task);
    taskQueue.push(task);
  }
  // 基于关键路径长度对任务进行降序排序
  taskQueue.sort((a, b) => b.criticalPathLength! - a.criticalPathLength!);
  runTasks(taskQueue, limit);
}

// 计算关键路径长度
function calculateCriticalPaths(task: Task): number {
  // 重复走到某一个任务了 直接返回值
  if (task.criticalPathLength !== undefined) {
    return task.criticalPathLength;
  }

  // 若是没有 dependents, 说明咱们是 "root",即 app 此类不被依赖的 project
  if (task.dependents.size === 0) {
    task.criticalPathLength = 0;
    return task.criticalPathLength;
  }

  // 递归向上取最大值 每次 +1
  const depsLengths: number[] = [];
  task.dependents.forEach((dep) =>
    depsLengths.push(calculateCriticalPaths(dep))
  );
  task.criticalPathLength = Math.max(...depsLengths) + 1;
  return task.criticalPathLength;
}
复制代码

criticalPathLength

Selecting subsets of projects

实际业务开发中,通常不须要构建 Monorepo 内所有的项目,在 应用级 Monorepo 优化方案 一文中介绍了使用 Monorepo 方式管理业务项目可能遇到的一些坑点以及相关解决方案,其中有这样一个问题:

发布速度慢

monorepo-1 若须要发布 app1,则全部 package 均会被构建,而非仅有 app1 依赖的 package1 与 package2 的 build 脚本被执行。

最终经过 Rush 提供的 Selecting subsets of projects 能力解决了以上问题。

具体命令以下:

# 只会执行 @monorepo/app1 及其依赖 package 的 build script
$ rush build -t @monorepo/app1
复制代码

-t PROJECT--to PROJECT,后面PROJECT参数为这次任务的终点项目包名,若不想包含终点项目,能够改成-T参数,即--to-except PROJECT,与之相似的可挑选出项目子集的参数还有:

  • -f PROJECT, --from PROJECT
  • -F PROJECT, --from-except PROJECT
  • -i PROJECT, --impacted-by PROJECT
  • -I PROJECT, --impacted-by-except PROJECT
  • -o PROJECT, --only PROJECT

若是不指定这些参数,那么默认会执行全部项目(rush.json 中注册过的项目)的对应的 npm scripts

固然,也能够指定多个参数,最差状况获取到的 subsets 是 projects 自己,与不指定参数表现一致(通常不会)。

实际应用场景:在 CI 阶段,会筛选出全部发生变动的项目及其会影响到的项目进行一次 build check,好比一次提交改动了 @monorepo/a 以及 @monorepo/b的源码,CI 就会执行如下命令:

$ rush build -t @monorepo/a -f @monorepo/a -t @monorepo/b -f @monorepo/b
复制代码

不用担忧某些 project 的任务会被重复执行,这种任务只是图里的一个入度不为零的点,挑选出 subsets 后,按照前面的任务调度机制执行任务便可,任务都是并行的,速度通常可接受。

除了内置的rush build等命令支持以上参数(默认执行 package.json 中的 build script),Rush 也将此能力开放给了自定义命令,也就是说你能够自定义一个 rush foo 命令,用于调用指定范围项目 package.json 中的 foo script,配合 Rush 的任务调度机制,任务能够保证执行顺序(若是须要的话),具体能够参考 custom_commands 一节。

Selecting subsets of projects 的具体实现不在本文讨论范围内。

结语

拓扑排序与关键路径,作了道题。🌚

相关文章
相关标签/搜索