RxJS源码解析(六)——Scheduler

image

DevUI是一支兼具设计视角和工程视角的团队,服务于华为云 DevCloud平台和华为内部数个中后台系统,服务于设计师和前端工程师。
官方网站:[devui.design
]( https://devui.design/)Ng组件库ng-devui(欢迎Star)
官方交流群:添加DevUI小助手(微信号:devui-official)
DevUIHelper插件:DevUIHelper-LSP(欢迎Star)

引言

在这以前,我一直都没有讲过 Scheduler 的做用,那么本章就开始讲解 Scheduler 的设计思路和基本结构。RxJS 的存在是为了处理异步 IO,而异步 IO 所包含的一系列 API 确定也是要通过进一步的封装才能让 RxJS 中的异步操做使用。前端

能够看到,它主要仍是根据 JS 的所可以提供的异步能力来设计这些基本结构。git

  • AsyncScheduler:异步调度器,使用 setInterval 实现。
  • QueueScheduler:队列异步调度器,继承了 AsyncScheduler,可是 QueueAction 是一种链式结构,使得调度以迭代器的形式进行。
  • AnimationFrameScheduler:使用 reqeustAnimationFrame 实现了帧调度器。
  • AsapScheduler:使用 Promise.resolve().then() 实现的微任务调度器。

SchedulerLike 、 Scheduler & Action

首先,SchedulerLike 提供了如下两个接口。github

export interface SchedulerLike {
  // 标记当前时间
  now(): number;


  // 开启调度的基础接口
  schedule<T>(
    work: (this: SchedulerAction<T>, state?: T) => void,
    delay?: number,
    state?: T
  ): Subscription;
}

Scheduler 则实现了这些接口。算法

export class Scheduler implements SchedulerLike {


  // 获取当前时间戳
  public static now: () => number = () => Date.now();


  constructor(
    private SchedulerAction: typeof Action,
    now: () => number = Scheduler.now
) {
    this.now = now;
  }


  public now: () => number;
  // 直接调用 action 的 schedule
  public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
    return new this.SchedulerAction<T>(this, work).schedule(state, delay);
  }
}

Scheduler 为后续的继承它的调度器定义了建立方式,经过传入一个 Action 工厂,使得内部能够构造特定的 Action 。而 Action 继承了 Subscription,意味着 Action 其实是一种的订阅器。segmentfault

export class Action<T> extends Subscription {
  constructor(scheduler: Scheduler, work: (this: SchedulerAction<T>, state?: T) => void) {
    super();
  }
  // Action 开始调度
  public schedule(state?: T, delay: number = 0): Subscription {
    return this;
  }
}

上面的设计是一种名为 Template Method 的设计模式,这种方法有效地约束了后续的不一样的 Scheduler 的实现。设计模式

定义一个操做中的算法的骨架,而将一些步骤延迟到子类中。它使得子类能够不改变一个算法的结构便可重定义该算法的某些特定步骤。

异步调度器

先来了解一下 Scheduler 的子类 AsyncScheduler,余下全部的 Scheduler 都会继承它。在这里,先不急着进行源码分析,咱们须要先为了弄清楚调度器的运行原理,了解调度器是如何对异步 API 进行封装的。微信

首先,调度器自己也是基于观察者模式来进行设计,可是它又独立于 Rxjs 的 Observable。通常来讲, AsyncScheduler 是这样调用的。前端工程师

const scheduler = AsyncScheduler(AsyncAction);
const subscription = async.schedule(function (counter) {
    console.log(counter);
    // this 绑定了 AsyncAction
    this.schedule(counter + 1, 1000);
}, 1000, 1);


// subscription.unsubscribe();

它的调用栈是这样的。less

AsyncScheduler.schedule
AsyncAction.schedule
AsyncAction.requestAsyncId
listOnTimeout // 原生事件
processTimers // 原生事件
AsyncScheduler.flush
AsyncAction.execute
AsyncAction.\_execute
AsyncAction.work

AsyncAction.schedule

跟着调用栈分析源码来溯源,在 AsyncScheduler 的 schedule 方法中,它先构造了 AsyncAction ,而后调用它的 schedule 。在这个方法中,其实是对 Action 的内部状态进行更新,因此此处关注的地方就是在于 schedule 如何触发异步 API。异步

class AsyncAction<T> extends Action<T> {
  constructor(
    protected scheduler: AsyncScheduler,
    protected work: (this: SchedulerAction<T>, state?: T) => void
  ) {
    super(scheduler, work);
  }


  public schedule(state?: T, delay: number = 0): Subscription {
    if (this.closed) {
      return this;
    }
    this.state = state;
    const id = this.id;
    const scheduler = this.scheduler;
    // 须要对相应的异步 API 进行取消操做
    if (id != null) {
      this.id = this.recycleAsyncId(scheduler, id, delay);
    }
    this.pending = true;
    this.delay = delay;
    // 从新配置异步 API
    this.id = this.id || this.requestAsyncId(scheduler, this.id, delay);


    return this;
  }
}

能够看到,从 scheduler 传入的回调函数最终会被 Action 持有,因此调用栈最终执行的 work 实际上就是回调函数。

AsyncAction.requestAsyncId

requestAsyncId 是调用异步 API 的方法,这个方法在 AsyncAction 最终触发了 setInterval 这一异步 API。那么实际上,根据 Template Method 的设计,全部继承 AsyncAction 的 Action 都会经过这个方法实现相对应的异步 API 。

至于 AsyncAction 为何会使用 setInterval 而不是 setTimeout,源代码里是这样说明的。

Actions only execute once by default, unless rescheduled from within the scheduled callback. This allows us to implement single and repeat actions via the same code path, without adding API surface area, as well as mimic traditional recursion but across asynchronous boundaries. However, JS runtimes and timers distinguish between intervals achieved by serial setTimeout calls vs. a single setInterval call. An interval of serial setTimeout calls can be individufenally delayed, which delays scheduling the next setTimeout, and so on. setInterval attempts to guarantee the interval callback will be invoked more precisely to the interval period, regardless of load. Therefore, we use setInterval to schedule single and repeat actions. If the action reschedules itself with the same delay, the interval is not canceled. If the action doesn't reschedule, or reschedules with a different delay, the interval will be canceled after scheduled callback execution.

对于某一个 Action 来讲,除非它在调度的回调中被从新调度,那么它默认只会执行一次。这样的方式可使得咱们经过统一的代码实现调度单一或重复的 Actions,而无需添加 API,而且能够模仿传统递归来扩展异步。然而, JS 的运行时或者计时器分别经过串行的 setTimout 或者是单个 setInterval 来获取调用的定时器。串行的 setTimout 定时器能够单独延迟,这样作会延迟c下一个 setTimout 的调度,以此类推。而 setInterval 则无论程序运行的负载如何,它老是尝试去确保每一次定时器的回调更加精准的安排到合适的间隔时间。所以,咱们使用 setInterval 来安排单一或重复的 Actions,若是 action 以相同的时延调度自己,那么当前定时器不会被取消。若是 action 只没有从新调度或者以不一样的时延从新调度,则安排的回调执行后,改定时器会被取消。

class AsyncAction<T> extends Action<T> {
  protected requestAsyncId(
    scheduler: AsyncScheduler,
    id?: any,
    delay: number = 0
  ): any {
    // 绑定 scheduler,而且把当前的 AsyncAction 看成参数传入。
    return setInterval(scheduler.flush.bind(scheduler, this), delay);
  }
}

AsyncScheduler.flush

因此,在 AsyncScheduler 中,新增的 flush 方法其实是为 setInterval 服务的,它做为异步 API 的回调函数,主要步骤以下。

  • 若是存在运行中的 Action ,它会保存所用调用它的 Action。
  • 若是不存在运行中的 Action,它会执行全部调用队列中的 Action.execute
  • 处理 Action.execute 的运行错误。
export class AsyncScheduler extends Scheduler {


  public flush(action: AsyncAction<any>): void {


    const {actions} = this;


    if (this.active) {
      // 使用了一个队列保存全部输入的 Actions
      actions.push(action);
      return;
    }


    let error: any;
    this.active = true;
    // 默认 action 也是队列中的一员
    // 将全部队列中的 Action 进行调用。
    do {
      if (error = action.execute(action.state, action.delay)) {
        break;
      }
    } while (action = actions.shift());


    this.active = false;


    // 出现错误时,取消全部未运行 action 的订阅
    if (error) {
      // 注意,此处不会重复取消订阅,由于执行错误的Action会先退出队列,再执行循环。
      while (action = actions.shift()) {
        action.unsubscribe();
      }
      throw error;
    }
  }
}

AsyncAction.execute

上述的 flush 调用了 action 的 execute 方法。该方法也是经过处理 action 的内部状态来得到执行结果,其中会调用 _execute 这一内部方法,这个内部方法主要做用是调用 AsyncAction.work ,并处理它出现的异常。

class AsyncAction<T> extends Action<T> {
  public execute(state: T, delay: number): any {
    if (this.closed) {
      return new Error('executing a cancelled action');
    }
    this.pending = false;
    // 获取异常错误
    const error = this.\_execute(state, delay);
    if (error) {
      return error;
    } else if (this.pending === false && this.id != null) {
      this.id = this.recycleAsyncId(this.scheduler, this.id, null);
    }
  }


  protected \_execute(state: T, delay: number): any {
    let errored: boolean = false;
    let errorValue: any = undefined;
    try {
      // work
      this.work(state);
    } catch (e) {
      errored = true;
      errorValue = !!e && e || new Error(e);
    }
    if (errored) {
      this.unsubscribe();
      return errorValue;
    }
  }
}

AsyncAction.recycleAsyncId

在分析到 Action.schedule 的时候,引用了源码内部的注释,其中有一句话很重要,那就是 “若是 action 以相同的时延调度自己,那么当前定时器不会被取消”,因此 recycleAsyncId 这个方法是须要处理这种状况。

class AsyncAction<T> extends Action<T> {
  protected recycleAsyncId(scheduler: AsyncScheduler, id: any, delay: number = 0): any {
    // this.delay === delay 处理了这种状况。
    if (delay !== null && this.delay === delay && this.pending === false) {
      return id;
    }
    // 取消当前的定时器
    clearInterval(id);
    return undefined;
  }
}

运用 Template Method

AsyncScheduler 能够说已经把全部的地基都打好了,它能够直接拿来用,也能够继承并重写一些相关的接口把相应的异步 API 进行替换。

队列调度器

队列调度器根据调用者传入的时延来决定使用同步方式的调度仍是 setInterval 方式的调度。

QueueScheduler 单纯继承了 AsyncScheduler,其主要实如今 QueueAction 中,经过重写 scheduleexecute 以及 requestAsyncId 等方法来实现这种功能。

export class QueueAction<T> extends AsyncAction<T> {
  public schedule(state?: T, delay: number = 0): Subscription {
    // delay > 0 ,执行异步调度
    if (delay > 0) {
      return super.schedule(state, delay);
    }
    this.delay = delay;
    this.state = state;
    // 不然直接执行同步调度
    this.scheduler.flush(this);
    return this;
  }


  public execute(state: T, delay: number): any {
    // 根据传入的 delay 判断是否直接执行 work (同步执行)
    return (delay > 0 || this.closed) ?
      super.execute(state, delay) :
      this.\_execute(state, delay) ;
  }


  protected requestAsyncId(scheduler: QueueScheduler, id?: any, delay: number = 0): any {
    // 根据传入的 delay 以及自己的 delay 来决定是否使用异步
    if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) {
      return super.requestAsyncId(scheduler, id, delay);
    }
    // delay 为 0,直接同步调度
    return scheduler.flush(this);
  }
}

帧调度器 与 微任务调度器

帧调度器根据调用者传入的时延来决定使用 requestAnimationFrame 仍是 setInterval ,微任务调度器则是根据时延来决定使用 Promise.reslove().then() 仍是 setInterval

二者的调用相似,以致于能够结合起来分析。

Action

它们的 action 方法均重写了requestAsyncIdrecycleAsyncId, 主要仍是为了处理不一样异步 API 。

protected requestAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any {
  if (delay !== null && delay > 0) {
    return super.requestAsyncId(scheduler, id, delay);
  }
  // 把当前action 加入到 actions 队列末端
  scheduler.actions.push(this);


  if (!scheduler.scheduled) {
      // AsapAction 的状况
      const scheduled = Immediate.setImmediate(scheduler.flush.bind(scheduler, null));


      // AnimationFrameAction 的状况
      const scheduled = requestAnimationFrame(scheduler.flush.bind(scheduler, null));


      scheduler.scheduled = scheduled;
  }
  return scheduler.scheduled;
}


protected recycleAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any {
  if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) {
    return super.recycleAsyncId(scheduler, id, delay);
  }
  if (scheduler.actions.length === 0) {
    // AsapAction
    Immediate.clearImmediate(id);
    // AnimationFrameAction
    cancelAnimationFrame(id);


    scheduler.scheduled = undefined;
  }
  return undefined;
}

Scheduler

它们的 flush,跟 AsyncScheduler 的 flush 实现思路差很少,依旧是轮询 actions 队列调用 action.execute ,只是它们的 flush 须要去处理额外的如下细节。

  • action 传入可能为空。
  • 处理 actions 的状态。
  • 清空 scheduled,使得 scheduler 可以进行下一次调度。
// export class AnimationFrameScheduler extends AsyncScheduler {
export class AsapScheduler extends AsyncScheduler {
  public flush(action?: AsyncAction<any>): void {
    this.active = true;
    this.scheduled = undefined;


    const {actions} = this;
    let error: any;
    let index: number = -1;
    // 此处顺序不能打乱,由于这样
    action = action || actions.shift()!;
    let count: number = actions.length;


    do {
      if (error = action.execute(action.state, action.delay)) {
        break;
      }
    } while (++index < count && (action = actions.shift()));


    this.active = false;


    if (error) {
      while (++index < count && (action = actions.shift())) {
        action.unsubscribe();
      }
      throw error;
    }
  }
}

Immediate

这里颇有意思的一点, AsapScheduler 并无直接经过 Promise.reslove().then() 来实现。而是把它封装成 Immediate,造成 setImmediateclearImmediate 两个 API ,这样就使得微任务的调用其余的定时 API 无异。

内部实现是经过一个 Map 保存标记当前的是第几个微任务,这里并不直接保存 Promise,由于 Promise 执行完毕后就自行释放了,因此它须要的只是一个标记。

let nextHandle = 1;
const RESOLVED = (() => Promise.resolve())();
const activeHandles: { \[key: number\]: any } = {};


function findAndClearHandle(handle: number): boolean {
  if (handle in activeHandles) {
    delete activeHandles\[handle\];
    return true;
  }
  return false;
}


export const Immediate = {
  setImmediate(cb: () => void): number {
    const handle = nextHandle++;
    activeHandles\[handle\] = true;
    RESOLVED.then(() => findAndClearHandle(handle) && cb());
    return handle;
  },


  clearImmediate(handle: number): void {
    findAndClearHandle(handle);
  },
};

总结

本篇分析了 RxJS 的调度器相关的一系列内容,经过封装 JS 异步 API ,调度器实现相对应的异步功能,加强了 RxJS 对异步 IO 的掌控。

加入咱们

咱们是DevUI团队,欢迎来这里和咱们一块儿打造优雅高效的人机设计/研发体系。招聘邮箱:muyang2@huawei.com

做者:zcx(公众号:Coder写字的地方)

原文连接:https://mp.weixin.qq.com/s/vG0aaQmDy7Cqfv0CwJ_d0Q

往期文章推荐

《RxJS 源码解析(五)—— Operator III》

《Web界面深色模式和主题化开发》

《手把手教你搭建一个灰度发布环境》

相关文章
相关标签/搜索